diff options
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 11 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 150 |
2 files changed, 156 insertions, 5 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 64dd5a7..a59166f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -318,8 +318,8 @@ func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error { func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) { du := NewDataUpdateDb(update) - hubUuid := update.SourceHubUuid - if du.SourceHubId, err = st.insertNewHub(tx, hubUuid); err != nil { + srcUuid := update.SourceHubUuid + if du.SourceHubId, err = st.insertNewHub(tx, srcUuid); err != nil { return } if du.SourceId, err = st.insertNewSource(tx, NewSourceDb(update)); err != nil { @@ -332,8 +332,11 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er return } - if hubUuid != "" { - err = st.setLastUpdateForUuid(tx, hubUuid, du.SourceHubDataUpdateId) + if srcUuid != "" { + err = st.setLastUpdateForUuid(tx, srcUuid, du.SourceHubDataUpdateId) + } + if fwdUuid := update.ForwardHubUuid; fwdUuid != "" { + err = st.setLastUpdateForUuid(tx, fwdUuid, update.ForwardHubDataUpdateId) } return } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index e74e3d7..5fcae5e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -45,7 +45,10 @@ import ( ) var ( - testBoltPath = "/run/s5hub_testing_db.bolt" + testBoltPath = "/run/s5hub_testing_db.bolt" + testBoltPath2 = "/run/s5hub_testing_db2.bolt" + testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt" + testBoltPathFinal = "/run/s5hub_testing_db_final.bolt" streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"} sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData} @@ -108,6 +111,9 @@ func TestMain(m *testing.M) { os.Exit(-1) } testBoltPath = fmt.Sprintf("/run/user/%s/s5hub_testing_db.bolt", u.Uid) + testBoltPath2 = fmt.Sprintf("/run/user/%s/s5hub_testing_db2.bolt", u.Uid) + testBoltPathFwd = fmt.Sprintf("/run/user/%s/s5hub_testing_db_fwd.bolt", u.Uid) + testBoltPathFinal = fmt.Sprintf("/run/user/%s/s5hub_testing_db_final.bolt", u.Uid) os.Exit(m.Run()) } @@ -581,7 +587,149 @@ func TestForwardedDataUpdates(t *testing.T) { if lastId != 13 { t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) } +} + +func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { + lastId, err := fwdStore.GetLastUpdateForUuid(src1Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != fwdSrc1Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id) + } + lastId, err = fwdStore.GetLastUpdateForUuid(src2Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != fwdSrc2Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(src1Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalSrc1Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(src2Store.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalSrc2Id { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id) + } + lastId, err = finalStore.GetLastUpdateForUuid(fwdStore.GetStoreId()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if lastId != finalFwdId { + t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalFwdId) + } +} + +func TestForwardedDataUpdates2(t *testing.T) { + // prepare 4 new stores + os.Remove(testBoltPath) + src1Store, err := NewStore(testBoltPath, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer src1Store.Close() + + os.Remove(testBoltPath2) + src2Store, err := NewStore(testBoltPath2, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer src2Store.Close() + + os.Remove(testBoltPathFwd) + fwdStore, err := NewStore(testBoltPathFwd, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer fwdStore.Close() + + os.Remove(testBoltPathFinal) + finalStore, err := NewStore(testBoltPathFinal, false) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer finalStore.Close() + + // generate/append some updates to src + data, _ := generateTestData(10) + if err := src1Store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + data, _ = generateTestData(7) + if err := src2Store.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0) + + // forward 5 updates from src1 to fwd + if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0) + + // forward 3 updates from src2 to fwd + if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0) + + // forward 7 updates from fwd to final + if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := finalStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7) + + // forward remaining updates from src1 and src2 to fwd + if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if data, err = src2Store.GetUpdatesAfter(3, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := fwdStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7) + + // forward remainging from fwd to final + if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := finalStore.AppendMany(data); err != nil { + t.Fatalf("unexpected error: %v", err) + } + // check last updates so far + checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17) } func TestGetSources(t *testing.T) { |