diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 29 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 16 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 80 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 26 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 25 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 12 |
9 files changed, 133 insertions, 83 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index ccf35d2..21aba20 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -47,7 +47,7 @@ const ( // type Decoder interface { - Decode() (UpdateFull, error) + Decode() (*UpdateFull, error) Slug() string } @@ -64,13 +64,16 @@ func (sd *StatelessDecoder) Slug() string { return "" } -func (sd *StatelessDecoder) Decode() (uf UpdateFull, err error) { - if err = sd.dec.Decode(&uf); err != nil { +func (sd *StatelessDecoder) Decode() (uf *UpdateFull, err error) { + uf = &UpdateFull{} + if err = sd.dec.Decode(uf); err != nil { return } if uf.Version != ProtocolVersion { err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, ProtocolVersion) + return } + err = uf.checkSource() return } @@ -98,15 +101,18 @@ func (sd *StatefulDecoder) Slug() string { return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality) } -func (sd *StatefulDecoder) Decode() (uf UpdateFull, err error) { +func (sd *StatefulDecoder) Decode() (uf *UpdateFull, err error) { + uf = &UpdateFull{} uf.Version = sd.Version uf.CopyFromSource(&sd.Source) - if err = sd.dec.Decode(&uf); err != nil { + if err = sd.dec.Decode(uf); err != nil { return } if uf.Version != sd.Version { err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, sd.Version) + return } + err = uf.checkSource() return } @@ -115,7 +121,7 @@ func (sd *StatefulDecoder) Decode() (uf UpdateFull, err error) { // type Encoder interface { - Encode(UpdateFull) error + Encode(*UpdateFull) error Slug() string } @@ -132,7 +138,7 @@ func (se *StatelessEncoder) Slug() string { return "" } -func (se *StatelessEncoder) Encode(uf UpdateFull) error { +func (se *StatelessEncoder) Encode(uf *UpdateFull) error { uf.Version = ProtocolVersion uf.StartTime = uf.StartTime.UTC() return se.enc.Encode(uf) @@ -155,7 +161,7 @@ func (se *StatefulEncoder) Slug() string { return "" } -func (se *StatefulEncoder) Encode(uf UpdateFull) error { +func (se *StatefulEncoder) Encode(uf *UpdateFull) error { uf.Version = 0 // the init message took care of that uf.StartTime = uf.StartTime.UTC() return se.enc.Encode(uf) diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 0b13043..95c7958 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -48,7 +48,8 @@ var ( updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) -func GetExpected() (expected UpdateFull) { +func GetExpected() (expected *UpdateFull) { + expected = &UpdateFull{} expected.Version = ProtocolVersion expected.CopyFromSource(&initStruct) expected.CopyFromUpdate(&updateStruct) @@ -73,6 +74,28 @@ func TestDecodeStateless(t *testing.T) { t.Fatalf("decoding message with wrong protocol version should throw an error") } + // missing/empty source fields + emptyHostname := `"hostname": "", "stream": {"quality": "low", "content": "av", "format": "webm"}` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyHostname, updateEncoded))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with empty/missing hostname should throw an error") + } + emptyContentid := `"hostname": "localhost", "stream": {"quality": "low", "format": "webm"}` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyContentid, updateEncoded))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with empty/missing content-id should throw an error") + } + emptyFormat := `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": ""}` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyFormat, updateEncoded))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with empty/missing format should throw an error") + } + emptyQuality := `"hostname": "localhost", "stream": {"content": "av", "format": "webm"}` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyQuality, updateEncoded))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with empty/missing quality should throw an error") + } + // valid message dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, updateEncoded))) decoded, err := dec.Decode() @@ -155,7 +178,7 @@ func TestDecodeStateful(t *testing.T) { } func TestEncodeStateless(t *testing.T) { - var uf UpdateFull + uf := &UpdateFull{} uf.CopyFromSource(&initStruct) uf.CopyFromUpdate(&updateStruct) @@ -184,7 +207,7 @@ func TestEncodeStateless(t *testing.T) { } func TestEncodeStateful(t *testing.T) { - var uf UpdateFull + uf := &UpdateFull{} uf.CopyFromSource(&initStruct) uf.CopyFromUpdate(&updateStruct) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index bc9dc43..ee1cba0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -39,17 +39,17 @@ import ( ) type appendToken struct { - update UpdateFull + update *UpdateFull response chan error } type appendManyToken struct { - updates []UpdateFull + updates []*UpdateFull response chan error } type Server struct { - store Store + store *Store numWorker int anonymization AnonymizationAlgo quit chan bool @@ -58,7 +58,7 @@ type Server struct { appendManyChan chan appendManyToken } -func (srv Server) Anonymize(update UpdateFull) UpdateFull { +func (srv Server) Anonymize(update *UpdateFull) *UpdateFull { anonymized := []Client{} for _, client := range update.Data.Clients { aIP, err := srv.anonymization.Anonymize(client.IP) @@ -73,8 +73,8 @@ func (srv Server) Anonymize(update UpdateFull) UpdateFull { return update } -func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull { - anonymized := []UpdateFull{} +func (srv Server) AnonymizeMany(updates []*UpdateFull) []*UpdateFull { + anonymized := []*UpdateFull{} for _, update := range updates { anonymized = append(anonymized, srv.Anonymize(update)) } @@ -103,14 +103,14 @@ func (srv Server) appendWorker(idx int) { } } -func (srv Server) Append(update UpdateFull) error { +func (srv Server) Append(update *UpdateFull) error { token := appendToken{update: update, response: make(chan error, 1)} defer close(token.response) srv.appendChan <- token return <-token.response } -func (srv Server) AppendMany(updates []UpdateFull) error { +func (srv Server) AppendMany(updates []*UpdateFull) error { token := appendManyToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.appendManyChan <- token diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 866efe9..2d07844 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -40,7 +40,7 @@ import ( "time" ) -func findMaxID(updates []UpdateFull) int { +func findMaxID(updates []*UpdateFull) int { maxID := -1 for _, value := range updates { if id := value.SourceHubUpdateID; id > maxID { @@ -76,7 +76,7 @@ func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (la return } -func fwdWriteUpdates(updates []UpdateFull, pw *io.PipeWriter) { +func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) { defer pw.Close() enc, err := NewStatefulEncoder(pw) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index bde8abc..cf08e72 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -243,7 +243,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { return } - updates := []UpdateFull{} + updates := []*UpdateFull{} for { update, err := dec.Decode() if err != nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 86a4fa0..e8aebf5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -146,10 +146,10 @@ func createDB(dbPath string) (db *bolt.DB, version int, hubUUID string, err erro return } -func NewStore(dbPath string, readOnly bool) (Store, error) { +func NewStore(dbPath string, readOnly bool) (*Store, error) { db, version, hubid, err := openDB(dbPath, readOnly) if err != nil { - return Store{}, err + return nil, err } if db != nil { @@ -158,22 +158,22 @@ func NewStore(dbPath string, readOnly bool) (Store, error) { } else { s5l.Printf("store: opened (UUID: %s)", hubid) } - return Store{version, hubid, db, readOnly}, nil + return &Store{version, hubid, db, readOnly}, nil } if readOnly { - return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.") + return nil, errors.New("store: failed to open, requested read-only mode but store file does not exist.") } db, version, hubid, err = createDB(dbPath) if err != nil { - return Store{}, err + return nil, err } s5l.Printf("store: initialized (UUID: %s)", hubid) - return Store{version, hubid, db, readOnly}, nil + return &Store{version, hubid, db, readOnly}, nil } -func (st Store) Close() { +func (st *Store) Close() { s5l.Printf("store: closing") st.db.Close() } @@ -184,7 +184,7 @@ func (st Store) Close() { // append key-value pairs to buckets -func (st Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) { +func (st *Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) { if hubUUID == "" { return } @@ -211,7 +211,7 @@ func (st Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) return hubID, err } -func (st Store) insertNewSource(tx *bolt.Tx, src sourceDB) (srcID int, err error) { +func (st *Store) insertNewSource(tx *bolt.Tx, src *sourceDB) (srcID int, err error) { bf := tx.Bucket([]byte(sourcesFwdBn)) // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id) br := tx.Bucket([]byte(sourcesRevBn)) @@ -240,7 +240,7 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDB) (srcID int, err error return srcID, err } -func (st Store) insertUpdate(tx *bolt.Tx, du updateDB) (duID int, err error) { +func (st *Store) insertUpdate(tx *bolt.Tx, du *updateDB) (duID int, err error) { b := tx.Bucket([]byte(updatesBn)) b.FillPercent = 1.0 // we only do appends @@ -255,7 +255,7 @@ func (st Store) insertUpdate(tx *bolt.Tx, du updateDB) (duID int, err error) { return } -func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) { +func (st *Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) { bf := tx.Bucket([]byte(userAgentsFwdBn)) bf.FillPercent = 1.0 // we only do appends br := tx.Bucket([]byte(userAgentsRevBn)) @@ -278,7 +278,7 @@ func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) return uaID, err } -func (st Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { +func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { if len(cd) == 0 { return nil } @@ -302,7 +302,7 @@ func (st Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { return b.Put(itob(uID), jsonData) } -func (st Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error { +func (st *Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error { b := tx.Bucket([]byte(latestUpdatesBn)) b.FillPercent = 1.0 // we only do appends @@ -315,7 +315,7 @@ func (st Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error { // Split up the multidimensional dataupdate and append all the key-value pairs -func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uID int, err error) { +func (st *Store) appendItem(tx *bolt.Tx, uf *UpdateFull) (uID int, err error) { u := NewUpdateDB(uf) s := NewSourceDB(uf) @@ -343,7 +343,7 @@ func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uID int, err error) { // Public Append Interface -func (st Store) AppendMany(updates []UpdateFull) (err error) { +func (st *Store) AppendMany(updates []*UpdateFull) (err error) { if st.readOnly { return ErrReadOnly } @@ -357,8 +357,8 @@ func (st Store) AppendMany(updates []UpdateFull) (err error) { }) } -func (st Store) Append(update UpdateFull) (err error) { - return st.AppendMany([]UpdateFull{update}) +func (st *Store) Append(update *UpdateFull) (err error) { + return st.AppendMany([]*UpdateFull{update}) } // @@ -367,7 +367,7 @@ func (st Store) Append(update UpdateFull) (err error) { // fetch key-value pairs from buckets -func (st Store) getHub(tx *bolt.Tx, id int) string { +func (st *Store) getHub(tx *bolt.Tx, id int) string { b := tx.Bucket([]byte(hubUUIDsRevBn)) uuid := b.Get(itob(id)) if uuid == nil { @@ -376,7 +376,7 @@ func (st Store) getHub(tx *bolt.Tx, id int) string { return string(uuid) } -func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDB, err error) { +func (st *Store) getSource(tx *bolt.Tx, id int) (source *sourceDB, err error) { b := tx.Bucket([]byte(sourcesRevBn)) jsonData := b.Get(itob(id)) @@ -384,13 +384,14 @@ func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDB, err error) { err = ErrSourceNotFound return } - if err = json.Unmarshal(jsonData, &source); err != nil { + source = &sourceDB{} + if err = json.Unmarshal(jsonData, source); err != nil { return } return } -func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { +func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { bc := tx.Bucket([]byte(clientDataBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) @@ -415,14 +416,15 @@ func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { // fetch all the key-value pairs and merge them into the multidimensional dataupdate -func (st Store) fetchItem(tx *bolt.Tx, uID int, u updateDB) (updates UpdateFull, err error) { - updates.CopyFromUpdateDB(u, st.getHub(tx, u.SourceHubID), st.hubUUID, uID) - var src sourceDB +func (st *Store) fetchItem(tx *bolt.Tx, uID int, u *updateDB) (update *UpdateFull, err error) { + update = &UpdateFull{} + update.CopyFromUpdateDB(u, st.getHub(tx, u.SourceHubID), st.hubUUID, uID) + var src *sourceDB if src, err = st.getSource(tx, u.SourceID); err != nil { return } - updates.CopyFromSourceDB(src) - if updates.Data.Clients, err = st.getClients(tx, uID); err != nil { + update.CopyFromSourceDB(src) + if update.Data.Clients, err = st.getClients(tx, uID); err != nil { return } return @@ -430,8 +432,8 @@ func (st Store) fetchItem(tx *bolt.Tx, uID int, u updateDB) (updates UpdateFull, // Public Fetch Interface -func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error) { - updates = []UpdateFull{} +func (st *Store) GetUpdatesAfter(id, limit int) (updates []*UpdateFull, err error) { + updates = []*UpdateFull{} if id < 0 { // TODO: interpret negative values as last x values id = 0 } @@ -451,7 +453,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error) k, v = c.Next() } for ; k != nil; k, v = c.Next() { - var d updateDB + var d *updateDB if err := json.Unmarshal(v, &d); err != nil { return err } @@ -470,7 +472,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error) return } -func (st Store) GetUpdate(id int) (update UpdateFull, err error) { +func (st *Store) GetUpdate(id int) (update *UpdateFull, err error) { err = st.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(updatesBn)) @@ -479,7 +481,7 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) { return ErrNotFound } - var d updateDB + var d *updateDB if err := json.Unmarshal(jsonData, &d); err != nil { return err } @@ -497,11 +499,11 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) { // Auxilliary Data // -func (st Store) GetHubUUID() string { +func (st *Store) GetHubUUID() string { return st.hubUUID } -func (st Store) GetLastUpdateID() (updateID int, err error) { +func (st *Store) GetLastUpdateID() (updateID int, err error) { err = st.db.View(func(tx *bolt.Tx) error { updateID = int(tx.Bucket([]byte(updatesBn)).Sequence()) return nil @@ -509,7 +511,7 @@ func (st Store) GetLastUpdateID() (updateID int, err error) { return } -func (st Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) { +func (st *Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) { if uuid == st.hubUUID { return st.GetLastUpdateID() } @@ -525,7 +527,7 @@ func (st Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) { return } -func (st Store) GetHubs() (hubs []string, err error) { +func (st *Store) GetHubs() (hubs []string, err error) { hubs = []string{st.hubUUID} err = st.db.View(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(hubUUIDsRevBn)).Cursor() @@ -537,16 +539,16 @@ func (st Store) GetHubs() (hubs []string, err error) { return } -func (st Store) GetSources() (sources []Source, err error) { - sources = []Source{} +func (st *Store) GetSources() (sources []*Source, err error) { + sources = []*Source{} err = st.db.View(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(sourcesRevBn)).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - var s sourceDB + var s *sourceDB if err := json.Unmarshal(v, &s); err != nil { return err } - var src Source + src := &Source{} src.CopyFromSourceDB(s) sources = append(sources, src) } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 5ad129a..a82c208 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -61,7 +61,7 @@ var ( Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} ) -func generateTestData(n int) ([]UpdateFull, int) { +func generateTestData(n int) ([]*UpdateFull, int) { hostnames := []string{"streamer1", "streamer2"} contents := []string{"av", "audio"} formats := []string{"webm", "flash", "hls"} @@ -74,13 +74,13 @@ func generateTestData(n int) ([]UpdateFull, int) { if n < 0 { n = numSrcs } - var data []UpdateFull + var data []*UpdateFull for i := 0; i < n; i += numSrcs { for _, hostname := range hostnames { for _, content := range contents { for _, format := range formats { for _, quality := range qualities { - d := UpdateFull{} + d := &UpdateFull{} d.Version = ProtocolVersion d.Source.Hostname = hostname @@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("failed to append update: %v", err) @@ -332,12 +332,12 @@ func TestAppendAndFetch(t *testing.T) { } // append many - var ins []UpdateFull + var ins []*UpdateFull upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) - ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) + ins = append(ins, &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) upd.Data.Clients = nil - ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) + ins = append(ins, &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) if err = store.AppendMany(ins); err != nil { t.Fatalf("failed to append update: %v", err) } @@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -414,9 +414,9 @@ func TestGetUpdatesAfter(t *testing.T) { upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - expected := []UpdateFull{} + expected := []*UpdateFull{} for i := 0; i < 3; i = i + 1 { - in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -540,9 +540,9 @@ func TestForwardedUpdates(t *testing.T) { upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - expected := []UpdateFull{} + expected := []*UpdateFull{} for i := 0; i < 3; i = i + 1 { - in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} in.SourceHubUUID = forwardedHub in.SourceHubUpdateID = 3 - i // out of order if err = store.Append(in); err != nil { @@ -597,7 +597,7 @@ func TestForwardedUpdates(t *testing.T) { } } -func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1ID, fwdSrc2ID, finalSrc1ID, finalSrc2ID, finalFwdID int) { +func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore *Store, fwdSrc1ID, fwdSrc2ID, finalSrc1ID, finalSrc2ID, finalFwdID int) { lastID, err := fwdStore.GetLastUpdateIDForUUID(src1Store.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 28f1fc7..90a413a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -32,7 +32,10 @@ package sfive -import "time" +import ( + "errors" + "time" +) type Stream struct { ContentID string `json:"content"` @@ -79,6 +82,22 @@ type UpdateFull struct { Update } +func (uf *UpdateFull) checkSource() error { + if uf.Source.Hostname == "" { + return errors.New("empty hostname ist not allowed") + } + if uf.Source.Stream.ContentID == "" { + return errors.New("empty content-id ist not allowed") + } + if uf.Source.Stream.Format == "" { + return errors.New("empty format ist not allowed") + } + if uf.Source.Stream.Quality == "" { + return errors.New("empty quality ist not allowed") + } + return nil +} + func (uf *UpdateFull) CopyFromSource(src *Source) { uf.Hostname = src.Hostname uf.Stream = src.Stream @@ -107,11 +126,11 @@ type WebHubsResponse struct { } type WebSourcesResponse struct { - Sources []Source `json:"sources"` + Sources []*Source `json:"sources"` } type WebUpdatesGetResponse struct { - Updates []UpdateFull `json:"updates"` + Updates []*UpdateFull `json:"updates"` } type WebUpdatesPostResponse struct { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index f4bf6e8..92147e1 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -77,8 +77,8 @@ type sourceDB struct { Tags []string `json:"t"` } -func NewSourceDB(value UpdateFull) sourceDB { - return sourceDB{ +func NewSourceDB(value *UpdateFull) *sourceDB { + return &sourceDB{ Hostname: value.Source.Hostname, Stream: streamDB{ ContentID: value.Source.Stream.ContentID, @@ -93,7 +93,7 @@ func (s sourceDB) Slug() string { return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ",")) } -func (s *Source) CopyFromSourceDB(sdb sourceDB) { +func (s *Source) CopyFromSourceDB(sdb *sourceDB) { s.Hostname = sdb.Hostname s.Stream.ContentID = sdb.Stream.ContentID s.Stream.Format = sdb.Stream.Format @@ -120,8 +120,8 @@ type updateDB struct { BytesSent uint `json:"bs,omitempty"` } -func NewUpdateDB(uf UpdateFull) updateDB { - return updateDB{ +func NewUpdateDB(uf *UpdateFull) *updateDB { + return &updateDB{ -1, uf.SourceHubUpdateID, -1, @@ -133,7 +133,7 @@ func NewUpdateDB(uf UpdateFull) updateDB { } } -func (uf *UpdateFull) CopyFromUpdateDB(udb updateDB, srcHubUUID, hubUUID string, id int) { +func (uf *UpdateFull) CopyFromUpdateDB(udb *updateDB, srcHubUUID, hubUUID string, id int) { if srcHubUUID == "" { uf.SourceHubUUID = hubUUID uf.SourceHubUpdateID = id |