summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-19 00:19:05 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-19 00:19:05 +0200
commit843ba562b751d9f7ede1e11a44b8c976b3288d37 (patch)
treedddebeaa1016cabcc9c1d6bdb9f4d038dff039a8
parentmake variable names more comlient to golang convention (diff)
added some basic sanity checks @ decoder
use pointer to data-update datastructures
-rw-r--r--doc/TODO6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go29
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go16
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go80
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go26
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go25
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go12
10 files changed, 134 insertions, 88 deletions
diff --git a/doc/TODO b/doc/TODO
index 28f20ec..c50c3cd 100644
--- a/doc/TODO
+++ b/doc/TODO
@@ -1,12 +1,8 @@
* how to handle NaN values form batch importer like flumotion-rrd?
* should we allow floats for client-count, bytes-sent etc.?
-* address issues with python-twisted dropping messages when transmit
- buffer is full
+* address issues with python-twisted dropping messages when transmit buffer is full
* move common code of python-twisted based importer to seperate module
* fix wrong usage of twisted
-* hub: implement foward-only mode (delete dataupdates from local store
- as soon as they are forwareded)
* hub: add geo-ip lookups
* hub: add support for Elasticsearch 5.x
-* hub: add additional checks at protocol parser
* hub: sanity check len(clients) and clients field in date update (as well as bytes-sent/received)
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index ccf35d2..21aba20 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -47,7 +47,7 @@ const (
//
type Decoder interface {
- Decode() (UpdateFull, error)
+ Decode() (*UpdateFull, error)
Slug() string
}
@@ -64,13 +64,16 @@ func (sd *StatelessDecoder) Slug() string {
return ""
}
-func (sd *StatelessDecoder) Decode() (uf UpdateFull, err error) {
- if err = sd.dec.Decode(&uf); err != nil {
+func (sd *StatelessDecoder) Decode() (uf *UpdateFull, err error) {
+ uf = &UpdateFull{}
+ if err = sd.dec.Decode(uf); err != nil {
return
}
if uf.Version != ProtocolVersion {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, ProtocolVersion)
+ return
}
+ err = uf.checkSource()
return
}
@@ -98,15 +101,18 @@ func (sd *StatefulDecoder) Slug() string {
return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality)
}
-func (sd *StatefulDecoder) Decode() (uf UpdateFull, err error) {
+func (sd *StatefulDecoder) Decode() (uf *UpdateFull, err error) {
+ uf = &UpdateFull{}
uf.Version = sd.Version
uf.CopyFromSource(&sd.Source)
- if err = sd.dec.Decode(&uf); err != nil {
+ if err = sd.dec.Decode(uf); err != nil {
return
}
if uf.Version != sd.Version {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, sd.Version)
+ return
}
+ err = uf.checkSource()
return
}
@@ -115,7 +121,7 @@ func (sd *StatefulDecoder) Decode() (uf UpdateFull, err error) {
//
type Encoder interface {
- Encode(UpdateFull) error
+ Encode(*UpdateFull) error
Slug() string
}
@@ -132,7 +138,7 @@ func (se *StatelessEncoder) Slug() string {
return ""
}
-func (se *StatelessEncoder) Encode(uf UpdateFull) error {
+func (se *StatelessEncoder) Encode(uf *UpdateFull) error {
uf.Version = ProtocolVersion
uf.StartTime = uf.StartTime.UTC()
return se.enc.Encode(uf)
@@ -155,7 +161,7 @@ func (se *StatefulEncoder) Slug() string {
return ""
}
-func (se *StatefulEncoder) Encode(uf UpdateFull) error {
+func (se *StatefulEncoder) Encode(uf *UpdateFull) error {
uf.Version = 0 // the init message took care of that
uf.StartTime = uf.StartTime.UTC()
return se.enc.Encode(uf)
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 0b13043..95c7958 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -48,7 +48,8 @@ var (
updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
)
-func GetExpected() (expected UpdateFull) {
+func GetExpected() (expected *UpdateFull) {
+ expected = &UpdateFull{}
expected.Version = ProtocolVersion
expected.CopyFromSource(&initStruct)
expected.CopyFromUpdate(&updateStruct)
@@ -73,6 +74,28 @@ func TestDecodeStateless(t *testing.T) {
t.Fatalf("decoding message with wrong protocol version should throw an error")
}
+ // missing/empty source fields
+ emptyHostname := `"hostname": "", "stream": {"quality": "low", "content": "av", "format": "webm"}`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyHostname, updateEncoded)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with empty/missing hostname should throw an error")
+ }
+ emptyContentid := `"hostname": "localhost", "stream": {"quality": "low", "format": "webm"}`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyContentid, updateEncoded)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with empty/missing content-id should throw an error")
+ }
+ emptyFormat := `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": ""}`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyFormat, updateEncoded)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with empty/missing format should throw an error")
+ }
+ emptyQuality := `"hostname": "localhost", "stream": {"content": "av", "format": "webm"}`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, emptyQuality, updateEncoded)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with empty/missing quality should throw an error")
+ }
+
// valid message
dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, updateEncoded)))
decoded, err := dec.Decode()
@@ -155,7 +178,7 @@ func TestDecodeStateful(t *testing.T) {
}
func TestEncodeStateless(t *testing.T) {
- var uf UpdateFull
+ uf := &UpdateFull{}
uf.CopyFromSource(&initStruct)
uf.CopyFromUpdate(&updateStruct)
@@ -184,7 +207,7 @@ func TestEncodeStateless(t *testing.T) {
}
func TestEncodeStateful(t *testing.T) {
- var uf UpdateFull
+ uf := &UpdateFull{}
uf.CopyFromSource(&initStruct)
uf.CopyFromUpdate(&updateStruct)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index bc9dc43..ee1cba0 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -39,17 +39,17 @@ import (
)
type appendToken struct {
- update UpdateFull
+ update *UpdateFull
response chan error
}
type appendManyToken struct {
- updates []UpdateFull
+ updates []*UpdateFull
response chan error
}
type Server struct {
- store Store
+ store *Store
numWorker int
anonymization AnonymizationAlgo
quit chan bool
@@ -58,7 +58,7 @@ type Server struct {
appendManyChan chan appendManyToken
}
-func (srv Server) Anonymize(update UpdateFull) UpdateFull {
+func (srv Server) Anonymize(update *UpdateFull) *UpdateFull {
anonymized := []Client{}
for _, client := range update.Data.Clients {
aIP, err := srv.anonymization.Anonymize(client.IP)
@@ -73,8 +73,8 @@ func (srv Server) Anonymize(update UpdateFull) UpdateFull {
return update
}
-func (srv Server) AnonymizeMany(updates []UpdateFull) []UpdateFull {
- anonymized := []UpdateFull{}
+func (srv Server) AnonymizeMany(updates []*UpdateFull) []*UpdateFull {
+ anonymized := []*UpdateFull{}
for _, update := range updates {
anonymized = append(anonymized, srv.Anonymize(update))
}
@@ -103,14 +103,14 @@ func (srv Server) appendWorker(idx int) {
}
}
-func (srv Server) Append(update UpdateFull) error {
+func (srv Server) Append(update *UpdateFull) error {
token := appendToken{update: update, response: make(chan error, 1)}
defer close(token.response)
srv.appendChan <- token
return <-token.response
}
-func (srv Server) AppendMany(updates []UpdateFull) error {
+func (srv Server) AppendMany(updates []*UpdateFull) error {
token := appendManyToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
srv.appendManyChan <- token
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 866efe9..2d07844 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -40,7 +40,7 @@ import (
"time"
)
-func findMaxID(updates []UpdateFull) int {
+func findMaxID(updates []*UpdateFull) int {
maxID := -1
for _, value := range updates {
if id := value.SourceHubUpdateID; id > maxID {
@@ -76,7 +76,7 @@ func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (la
return
}
-func fwdWriteUpdates(updates []UpdateFull, pw *io.PipeWriter) {
+func fwdWriteUpdates(updates []*UpdateFull, pw *io.PipeWriter) {
defer pw.Close()
enc, err := NewStatefulEncoder(pw)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index bde8abc..cf08e72 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -243,7 +243,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- updates := []UpdateFull{}
+ updates := []*UpdateFull{}
for {
update, err := dec.Decode()
if err != nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 86a4fa0..e8aebf5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -146,10 +146,10 @@ func createDB(dbPath string) (db *bolt.DB, version int, hubUUID string, err erro
return
}
-func NewStore(dbPath string, readOnly bool) (Store, error) {
+func NewStore(dbPath string, readOnly bool) (*Store, error) {
db, version, hubid, err := openDB(dbPath, readOnly)
if err != nil {
- return Store{}, err
+ return nil, err
}
if db != nil {
@@ -158,22 +158,22 @@ func NewStore(dbPath string, readOnly bool) (Store, error) {
} else {
s5l.Printf("store: opened (UUID: %s)", hubid)
}
- return Store{version, hubid, db, readOnly}, nil
+ return &Store{version, hubid, db, readOnly}, nil
}
if readOnly {
- return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.")
+ return nil, errors.New("store: failed to open, requested read-only mode but store file does not exist.")
}
db, version, hubid, err = createDB(dbPath)
if err != nil {
- return Store{}, err
+ return nil, err
}
s5l.Printf("store: initialized (UUID: %s)", hubid)
- return Store{version, hubid, db, readOnly}, nil
+ return &Store{version, hubid, db, readOnly}, nil
}
-func (st Store) Close() {
+func (st *Store) Close() {
s5l.Printf("store: closing")
st.db.Close()
}
@@ -184,7 +184,7 @@ func (st Store) Close() {
// append key-value pairs to buckets
-func (st Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) {
+func (st *Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) {
if hubUUID == "" {
return
}
@@ -211,7 +211,7 @@ func (st Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error)
return hubID, err
}
-func (st Store) insertNewSource(tx *bolt.Tx, src sourceDB) (srcID int, err error) {
+func (st *Store) insertNewSource(tx *bolt.Tx, src *sourceDB) (srcID int, err error) {
bf := tx.Bucket([]byte(sourcesFwdBn))
// br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id)
br := tx.Bucket([]byte(sourcesRevBn))
@@ -240,7 +240,7 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDB) (srcID int, err error
return srcID, err
}
-func (st Store) insertUpdate(tx *bolt.Tx, du updateDB) (duID int, err error) {
+func (st *Store) insertUpdate(tx *bolt.Tx, du *updateDB) (duID int, err error) {
b := tx.Bucket([]byte(updatesBn))
b.FillPercent = 1.0 // we only do appends
@@ -255,7 +255,7 @@ func (st Store) insertUpdate(tx *bolt.Tx, du updateDB) (duID int, err error) {
return
}
-func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) {
+func (st *Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) {
bf := tx.Bucket([]byte(userAgentsFwdBn))
bf.FillPercent = 1.0 // we only do appends
br := tx.Bucket([]byte(userAgentsRevBn))
@@ -278,7 +278,7 @@ func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error)
return uaID, err
}
-func (st Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error {
+func (st *Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error {
if len(cd) == 0 {
return nil
}
@@ -302,7 +302,7 @@ func (st Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error {
return b.Put(itob(uID), jsonData)
}
-func (st Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error {
+func (st *Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error {
b := tx.Bucket([]byte(latestUpdatesBn))
b.FillPercent = 1.0 // we only do appends
@@ -315,7 +315,7 @@ func (st Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error {
// Split up the multidimensional dataupdate and append all the key-value pairs
-func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uID int, err error) {
+func (st *Store) appendItem(tx *bolt.Tx, uf *UpdateFull) (uID int, err error) {
u := NewUpdateDB(uf)
s := NewSourceDB(uf)
@@ -343,7 +343,7 @@ func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uID int, err error) {
// Public Append Interface
-func (st Store) AppendMany(updates []UpdateFull) (err error) {
+func (st *Store) AppendMany(updates []*UpdateFull) (err error) {
if st.readOnly {
return ErrReadOnly
}
@@ -357,8 +357,8 @@ func (st Store) AppendMany(updates []UpdateFull) (err error) {
})
}
-func (st Store) Append(update UpdateFull) (err error) {
- return st.AppendMany([]UpdateFull{update})
+func (st *Store) Append(update *UpdateFull) (err error) {
+ return st.AppendMany([]*UpdateFull{update})
}
//
@@ -367,7 +367,7 @@ func (st Store) Append(update UpdateFull) (err error) {
// fetch key-value pairs from buckets
-func (st Store) getHub(tx *bolt.Tx, id int) string {
+func (st *Store) getHub(tx *bolt.Tx, id int) string {
b := tx.Bucket([]byte(hubUUIDsRevBn))
uuid := b.Get(itob(id))
if uuid == nil {
@@ -376,7 +376,7 @@ func (st Store) getHub(tx *bolt.Tx, id int) string {
return string(uuid)
}
-func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDB, err error) {
+func (st *Store) getSource(tx *bolt.Tx, id int) (source *sourceDB, err error) {
b := tx.Bucket([]byte(sourcesRevBn))
jsonData := b.Get(itob(id))
@@ -384,13 +384,14 @@ func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDB, err error) {
err = ErrSourceNotFound
return
}
- if err = json.Unmarshal(jsonData, &source); err != nil {
+ source = &sourceDB{}
+ if err = json.Unmarshal(jsonData, source); err != nil {
return
}
return
}
-func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) {
+func (st *Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -415,14 +416,15 @@ func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) {
// fetch all the key-value pairs and merge them into the multidimensional dataupdate
-func (st Store) fetchItem(tx *bolt.Tx, uID int, u updateDB) (updates UpdateFull, err error) {
- updates.CopyFromUpdateDB(u, st.getHub(tx, u.SourceHubID), st.hubUUID, uID)
- var src sourceDB
+func (st *Store) fetchItem(tx *bolt.Tx, uID int, u *updateDB) (update *UpdateFull, err error) {
+ update = &UpdateFull{}
+ update.CopyFromUpdateDB(u, st.getHub(tx, u.SourceHubID), st.hubUUID, uID)
+ var src *sourceDB
if src, err = st.getSource(tx, u.SourceID); err != nil {
return
}
- updates.CopyFromSourceDB(src)
- if updates.Data.Clients, err = st.getClients(tx, uID); err != nil {
+ update.CopyFromSourceDB(src)
+ if update.Data.Clients, err = st.getClients(tx, uID); err != nil {
return
}
return
@@ -430,8 +432,8 @@ func (st Store) fetchItem(tx *bolt.Tx, uID int, u updateDB) (updates UpdateFull,
// Public Fetch Interface
-func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error) {
- updates = []UpdateFull{}
+func (st *Store) GetUpdatesAfter(id, limit int) (updates []*UpdateFull, err error) {
+ updates = []*UpdateFull{}
if id < 0 { // TODO: interpret negative values as last x values
id = 0
}
@@ -451,7 +453,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error)
k, v = c.Next()
}
for ; k != nil; k, v = c.Next() {
- var d updateDB
+ var d *updateDB
if err := json.Unmarshal(v, &d); err != nil {
return err
}
@@ -470,7 +472,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error)
return
}
-func (st Store) GetUpdate(id int) (update UpdateFull, err error) {
+func (st *Store) GetUpdate(id int) (update *UpdateFull, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(updatesBn))
@@ -479,7 +481,7 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) {
return ErrNotFound
}
- var d updateDB
+ var d *updateDB
if err := json.Unmarshal(jsonData, &d); err != nil {
return err
}
@@ -497,11 +499,11 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) {
// Auxilliary Data
//
-func (st Store) GetHubUUID() string {
+func (st *Store) GetHubUUID() string {
return st.hubUUID
}
-func (st Store) GetLastUpdateID() (updateID int, err error) {
+func (st *Store) GetLastUpdateID() (updateID int, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
updateID = int(tx.Bucket([]byte(updatesBn)).Sequence())
return nil
@@ -509,7 +511,7 @@ func (st Store) GetLastUpdateID() (updateID int, err error) {
return
}
-func (st Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) {
+func (st *Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) {
if uuid == st.hubUUID {
return st.GetLastUpdateID()
}
@@ -525,7 +527,7 @@ func (st Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) {
return
}
-func (st Store) GetHubs() (hubs []string, err error) {
+func (st *Store) GetHubs() (hubs []string, err error) {
hubs = []string{st.hubUUID}
err = st.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte(hubUUIDsRevBn)).Cursor()
@@ -537,16 +539,16 @@ func (st Store) GetHubs() (hubs []string, err error) {
return
}
-func (st Store) GetSources() (sources []Source, err error) {
- sources = []Source{}
+func (st *Store) GetSources() (sources []*Source, err error) {
+ sources = []*Source{}
err = st.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
- var s sourceDB
+ var s *sourceDB
if err := json.Unmarshal(v, &s); err != nil {
return err
}
- var src Source
+ src := &Source{}
src.CopyFromSourceDB(s)
sources = append(sources, src)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 5ad129a..a82c208 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -61,7 +61,7 @@ var (
Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
)
-func generateTestData(n int) ([]UpdateFull, int) {
+func generateTestData(n int) ([]*UpdateFull, int) {
hostnames := []string{"streamer1", "streamer2"}
contents := []string{"av", "audio"}
formats := []string{"webm", "flash", "hls"}
@@ -74,13 +74,13 @@ func generateTestData(n int) ([]UpdateFull, int) {
if n < 0 {
n = numSrcs
}
- var data []UpdateFull
+ var data []*UpdateFull
for i := 0; i < n; i += numSrcs {
for _, hostname := range hostnames {
for _, content := range contents {
for _, format := range formats {
for _, quality := range qualities {
- d := UpdateFull{}
+ d := &UpdateFull{}
d.Version = ProtocolVersion
d.Source.Hostname = hostname
@@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("failed to append update: %v", err)
@@ -332,12 +332,12 @@ func TestAppendAndFetch(t *testing.T) {
}
// append many
- var ins []UpdateFull
+ var ins []*UpdateFull
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
- ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
+ ins = append(ins, &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
upd.Data.Clients = nil
- ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
+ ins = append(ins, &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
if err = store.AppendMany(ins); err != nil {
t.Fatalf("failed to append update: %v", err)
}
@@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -414,9 +414,9 @@ func TestGetUpdatesAfter(t *testing.T) {
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- expected := []UpdateFull{}
+ expected := []*UpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -540,9 +540,9 @@ func TestForwardedUpdates(t *testing.T) {
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- expected := []UpdateFull{}
+ expected := []*UpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := &UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
in.SourceHubUUID = forwardedHub
in.SourceHubUpdateID = 3 - i // out of order
if err = store.Append(in); err != nil {
@@ -597,7 +597,7 @@ func TestForwardedUpdates(t *testing.T) {
}
}
-func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1ID, fwdSrc2ID, finalSrc1ID, finalSrc2ID, finalFwdID int) {
+func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore *Store, fwdSrc1ID, fwdSrc2ID, finalSrc1ID, finalSrc2ID, finalFwdID int) {
lastID, err := fwdStore.GetLastUpdateIDForUUID(src1Store.GetHubUUID())
if err != nil {
t.Fatalf("unexpected error: %v", err)
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 28f1fc7..90a413a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -32,7 +32,10 @@
package sfive
-import "time"
+import (
+ "errors"
+ "time"
+)
type Stream struct {
ContentID string `json:"content"`
@@ -79,6 +82,22 @@ type UpdateFull struct {
Update
}
+func (uf *UpdateFull) checkSource() error {
+ if uf.Source.Hostname == "" {
+ return errors.New("empty hostname ist not allowed")
+ }
+ if uf.Source.Stream.ContentID == "" {
+ return errors.New("empty content-id ist not allowed")
+ }
+ if uf.Source.Stream.Format == "" {
+ return errors.New("empty format ist not allowed")
+ }
+ if uf.Source.Stream.Quality == "" {
+ return errors.New("empty quality ist not allowed")
+ }
+ return nil
+}
+
func (uf *UpdateFull) CopyFromSource(src *Source) {
uf.Hostname = src.Hostname
uf.Stream = src.Stream
@@ -107,11 +126,11 @@ type WebHubsResponse struct {
}
type WebSourcesResponse struct {
- Sources []Source `json:"sources"`
+ Sources []*Source `json:"sources"`
}
type WebUpdatesGetResponse struct {
- Updates []UpdateFull `json:"updates"`
+ Updates []*UpdateFull `json:"updates"`
}
type WebUpdatesPostResponse struct {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index f4bf6e8..92147e1 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -77,8 +77,8 @@ type sourceDB struct {
Tags []string `json:"t"`
}
-func NewSourceDB(value UpdateFull) sourceDB {
- return sourceDB{
+func NewSourceDB(value *UpdateFull) *sourceDB {
+ return &sourceDB{
Hostname: value.Source.Hostname,
Stream: streamDB{
ContentID: value.Source.Stream.ContentID,
@@ -93,7 +93,7 @@ func (s sourceDB) Slug() string {
return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ","))
}
-func (s *Source) CopyFromSourceDB(sdb sourceDB) {
+func (s *Source) CopyFromSourceDB(sdb *sourceDB) {
s.Hostname = sdb.Hostname
s.Stream.ContentID = sdb.Stream.ContentID
s.Stream.Format = sdb.Stream.Format
@@ -120,8 +120,8 @@ type updateDB struct {
BytesSent uint `json:"bs,omitempty"`
}
-func NewUpdateDB(uf UpdateFull) updateDB {
- return updateDB{
+func NewUpdateDB(uf *UpdateFull) *updateDB {
+ return &updateDB{
-1,
uf.SourceHubUpdateID,
-1,
@@ -133,7 +133,7 @@ func NewUpdateDB(uf UpdateFull) updateDB {
}
}
-func (uf *UpdateFull) CopyFromUpdateDB(udb updateDB, srcHubUUID, hubUUID string, id int) {
+func (uf *UpdateFull) CopyFromUpdateDB(udb *updateDB, srcHubUUID, hubUUID string, id int) {
if srcHubUUID == "" {
uf.SourceHubUUID = hubUUID
uf.SourceHubUpdateID = id