summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-06 02:04:12 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-06 02:04:12 +0200
commitd1ae501ce79260ed932a38e6385c51c340b927e0 (patch)
tree8c845dc638c7ea2c9c55e579101c24a089aff359
parentactually populate forward hub UUID (diff)
handling for forwarder hubs works now
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go11
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go150
2 files changed, 156 insertions, 5 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 64dd5a7..a59166f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -318,8 +318,8 @@ func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error {
func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) {
du := NewDataUpdateDb(update)
- hubUuid := update.SourceHubUuid
- if du.SourceHubId, err = st.insertNewHub(tx, hubUuid); err != nil {
+ srcUuid := update.SourceHubUuid
+ if du.SourceHubId, err = st.insertNewHub(tx, srcUuid); err != nil {
return
}
if du.SourceId, err = st.insertNewSource(tx, NewSourceDb(update)); err != nil {
@@ -332,8 +332,11 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er
return
}
- if hubUuid != "" {
- err = st.setLastUpdateForUuid(tx, hubUuid, du.SourceHubDataUpdateId)
+ if srcUuid != "" {
+ err = st.setLastUpdateForUuid(tx, srcUuid, du.SourceHubDataUpdateId)
+ }
+ if fwdUuid := update.ForwardHubUuid; fwdUuid != "" {
+ err = st.setLastUpdateForUuid(tx, fwdUuid, update.ForwardHubDataUpdateId)
}
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index e74e3d7..5fcae5e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -45,7 +45,10 @@ import (
)
var (
- testBoltPath = "/run/s5hub_testing_db.bolt"
+ testBoltPath = "/run/s5hub_testing_db.bolt"
+ testBoltPath2 = "/run/s5hub_testing_db2.bolt"
+ testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt"
+ testBoltPathFinal = "/run/s5hub_testing_db_final.bolt"
streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"}
sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData}
@@ -108,6 +111,9 @@ func TestMain(m *testing.M) {
os.Exit(-1)
}
testBoltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid)
+ testBoltPath2 = fmt.Sprintf("/run/user/%s/s5hub_testing_db2.bolt", u.Uid)
+ testBoltPathFwd = fmt.Sprintf("/run/user/%s/s5hub_testing_db_fwd.bolt", u.Uid)
+ testBoltPathFinal = fmt.Sprintf("/run/user/%s/s5hub_testing_db_final.bolt", u.Uid)
os.Exit(m.Run())
}
@@ -581,7 +587,149 @@ func TestForwardedDataUpdates(t *testing.T) {
if lastId != 13 {
t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId)
}
+}
+
+func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) {
+ lastId, err := fwdStore.GetLastUpdateForUuid(src1Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != fwdSrc1Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id)
+ }
+ lastId, err = fwdStore.GetLastUpdateForUuid(src2Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != fwdSrc2Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(src1Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalSrc1Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(src2Store.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalSrc2Id {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id)
+ }
+ lastId, err = finalStore.GetLastUpdateForUuid(fwdStore.GetStoreId())
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if lastId != finalFwdId {
+ t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalFwdId)
+ }
+}
+
+func TestForwardedDataUpdates2(t *testing.T) {
+ // prepare 4 new stores
+ os.Remove(testBoltPath)
+ src1Store, err := NewStore(testBoltPath, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer src1Store.Close()
+
+ os.Remove(testBoltPath2)
+ src2Store, err := NewStore(testBoltPath2, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer src2Store.Close()
+
+ os.Remove(testBoltPathFwd)
+ fwdStore, err := NewStore(testBoltPathFwd, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer fwdStore.Close()
+
+ os.Remove(testBoltPathFinal)
+ finalStore, err := NewStore(testBoltPathFinal, false)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ defer finalStore.Close()
+
+ // generate/append some updates to src
+ data, _ := generateTestData(10)
+ if err := src1Store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ data, _ = generateTestData(7)
+ if err := src2Store.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0)
+
+ // forward 5 updates from src1 to fwd
+ if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0)
+
+ // forward 3 updates from src2 to fwd
+ if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0)
+
+ // forward 7 updates from fwd to final
+ if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := finalStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7)
+
+ // forward remaining updates from src1 and src2 to fwd
+ if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if data, err = src2Store.GetUpdatesAfter(3, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := fwdStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7)
+
+ // forward remainging from fwd to final
+ if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if err := finalStore.AppendMany(data); err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ // check last updates so far
+ checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17)
}
func TestGetSources(t *testing.T) {