diff options
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/.gitignore | 1 | ||||
-rw-r--r-- | src/hub/Makefile | 5 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 80 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 121 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 3 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 16 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 21 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 6 |
10 files changed, 173 insertions, 84 deletions
diff --git a/src/hub/.gitignore b/src/hub/.gitignore index 7be1b96..dd65bbd 100644 --- a/src/hub/.gitignore +++ b/src/hub/.gitignore @@ -6,3 +6,4 @@ *.a *.o /test +/coverage.out diff --git a/src/hub/Makefile b/src/hub/Makefile index dd77ded..daf06ef 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -80,6 +80,11 @@ bench: getlibs @echo "testing and benchmarking: sfive" @$(GOCMD) test -bench=. spreadspace.org/sfive +cover: getlibs + @echo "testing and benchmarking: sfive" + @$(GOCMD) test -coverprofile=coverage.out spreadspace.org/sfive + @$(GOCMD) tool cover -html=coverage.out + clean: rm -rf pkg/*/spreadspace.org rm -rf bin diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 48eac4b..415cd3f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -41,57 +41,75 @@ const ( ProtocolVersion = 1 ) -type FullDecoder interface { +// +// Decoder +// + +type Decoder interface { Decode(jsonString []byte) (DataUpdateFull, error) } -type FullEncoder interface { - Encode(data DataUpdateFull) []byte +// stateless protocol interfaces +type StatelessDecoder struct{} + +func NewStatelessDecoder() Decoder { + return &StatelessDecoder{} +} + +func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + if err = json.Unmarshal(jsonString, &res); err != nil { + return + } + if res.Version != ProtocolVersion { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) + } + return } +// stateful protocol interfaces type StatefulDecoder struct { Version uint SourceId } -type PlainDecoder struct{} - -type PlainEncoder struct{} +func NewStatefulDecoder(jsonString []byte) (Decoder, error) { + res := &StatefulDecoder{} + if err := json.Unmarshal(jsonString, &res); err != nil { + return nil, err + } + if res.Version != ProtocolVersion { + return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) + } + return res, nil +} -func NewStatefulDecoder(jsonString []byte) (decoder FullDecoder, err error) { - res := new(StatefulDecoder) - err = json.Unmarshal(jsonString, &res) - if err != nil { +func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + res.Version = sd.Version + res.CopyFromSourceId(&sd.SourceId) + if err = json.Unmarshal(jsonString, &res); err != nil { return } - if res.Version != ProtocolVersion { - err = fmt.Errorf("unsupported version, expected %d, actual %v", ProtocolVersion, res.Version) + if res.Version != sd.Version { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version) } - decoder = res return } -func NewPlainDecoder() FullDecoder { - return new(PlainDecoder) -} +// +// Encoder +// -func (sd *StatefulDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { - dat.CopyFromSourceId(&sd.SourceId) - err = json.Unmarshal(jsonString, &dat) - // like in PlainDecoder, let the client decide how to use partial results - // (Unmarshal returns partial results in case of errors) - return +type Encoder interface { + Encode(data DataUpdateFull) ([]byte, error) } -func (pd *PlainDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { - err = json.Unmarshal(jsonString, &dat) - return +type StatelessEncoder struct{} + +func NewStatelessEncoder() Encoder { + return &StatelessEncoder{} } -func (pe *PlainEncoder) Encode(data *DataUpdateFull) []byte { - res, err := json.Marshal(data) - if err != nil { - s5l.Panicln("failed to encode DataUpdateFull") - } - return res +func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) { + data.Version = ProtocolVersion + return json.Marshal(data) } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 9e30b59..2d89485 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -33,67 +33,118 @@ package sfive import ( + "fmt" "reflect" "testing" "time" ) var ( - sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` - sourceIdData = `{` + sourceIdFields + `}` - sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} - updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` - updateData = "{" + updateFields + "}" - updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} - - statefulInitMsg = `{ "version": 1, ` + sourceIdFields + `}` - statefulDataMsg = `{ "version": 1, ` + updateFields + "}" - - statelessDataMsg = `{ "version": 1, ` + sourceIdFields + "," + updateFields + "}" + initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` + initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` + updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) -func GetExpected() *DataUpdateFull { - expected := new(DataUpdateFull) +func GetExpected() (expected DataUpdateFull) { expected.Version = ProtocolVersion - expected.CopyFromSourceId(&sourceIdDataStruct) + expected.CopyFromSourceId(&initDataStruct) expected.CopyFromUpdate(&updateDataStruct) - return expected + return } func TestDecodeStateful(t *testing.T) { - dc, err := NewStatefulDecoder([]byte(statefulInitMsg)) + // invalid init message + if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil { + t.Fatalf("creating decoder with invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded) + if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("creating decoder with wrong protocol version should throw an error") + } + + // valid init message + statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + dec, err := NewStatefulDecoder([]byte(statefulInitMsg)) if err != nil { - t.Errorf("Creating decoder failed with %v", err) - return + t.Fatalf("creating decoder failed: %v", err) + } + + // invalid data-update message + if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil { + t.Fatalf("decoding message with invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") } - dat, err := dc.Decode([]byte(statefulDataMsg)) + + // valid data-update message + statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded) + decoded, err := dec.Decode([]byte(statefulUpdateMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestDecodePlain(t *testing.T) { - ec := new(PlainDecoder) - dat, err := ec.Decode([]byte(statelessDataMsg)) +func TestDecodeStateless(t *testing.T) { + dec := NewStatelessDecoder() + + // invalid message + if _, err := dec.Decode([]byte("this is still not json")); err == nil { + t.Fatalf("decoding invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") + } + + // valid message + statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded) + decoded, err := dec.Decode([]byte(statelessDataMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestEncode(t *testing.T) { - ec := new(PlainEncoder) - td := new(DataUpdateFull) - td.CopyFromSourceId(&sourceIdDataStruct) +func TestEncodeStateless(t *testing.T) { + enc := NewStatelessEncoder() + + var td DataUpdateFull + td.CopyFromSourceId(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) - t.Logf("dada: %v", ec.Encode(td)) + + encoded, err := enc.Encode(td) + if err != nil { + t.Fatalf("encoding message failed: %v", err) + } + + decoded, err := NewStatelessDecoder().Decode(encoded) + if err != nil { + t.Fatalf("decoding message failed: %v", err) + } + + expected := td + expected.Version = ProtocolVersion + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("Encoding failed:\n actual: %v\n expected: %v\n", decoded, expected) + } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 23429a9..e49c30d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -160,7 +160,7 @@ func (srv Server) Close() { func NewServer(dbPath string, readOnly bool) (server *Server, err error) { // TODO read configuration and create instance with correct settings - server = new(Server) + server = &Server{} server.store, err = NewStore(dbPath, readOnly) if err != nil { return diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 050726a..9aa5e49 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -73,7 +73,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } func (srv Server) pipegramHandle(pconn net.PacketConn) { - decoder := NewPlainDecoder() + decoder := NewStatelessDecoder() buffer := make([]byte, 64*1024) for { n, _, err := pconn.ReadFrom(buffer) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index fd7e214..657492f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -143,7 +143,7 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" - decoder := NewPlainDecoder() + decoder := NewStatelessDecoder() buffer, err := ioutil.ReadAll(r.Body) if err != nil { @@ -152,6 +152,7 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) return } + // TODO: add different API endpoint called bulk container := DataUpdateFullContainer{} err = json.Unmarshal(buffer, &container) if err == nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 8b9cae6..f097e8e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -400,21 +400,15 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { // fetch all the key-value pairs and merge them into the multidimensional dataupdate func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { - var clients []ClientData - if clients, err = st.getClients(tx, duId); err != nil { - return - } + res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) var src sourceDb if src, err = st.getSource(tx, du.SourceId); err != nil { return } - res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) - res.Hostname = src.Hostname - res.StreamId.ContentId = src.StreamId.ContentId - res.StreamId.Format = src.StreamId.Format - res.StreamId.Quality = src.StreamId.Quality - res.Tags = src.Tags - res.Data.Clients = clients + res.CopyFromSourceDb(src) + if res.Data.Clients, err = st.getClients(tx, duId); err != nil { + return + } return } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index b9aa36c..4c74a81 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -223,8 +223,27 @@ func TestAppendAndFetch(t *testing.T) { t.Fatalf("should have been equal\nactual: %v\nexpected: %v\n", out, expected) } + // remote dataupdate + in = DataUpdateFull{0, "7411836d-58f7-4a36-85d6-409b4e4ca30c", 3, source, update} + + if err = store.Append(in); err != nil { + t.Fatalf("Failed to append update: %v", err) + } + + out, err = store.GetUpdate(2) + if err != nil { + t.Fatalf("Failed to fetch update: %v", err) + + } + out.StartTime = out.StartTime.UTC() // this should be handled by the protocol encoder + expected = in + + if !reflect.DeepEqual(expected, out) { + t.Fatalf("should have been equal\nactual: %v\nexpected: %v\n", out, expected) + } + // - // TODO: more example data + // TODO: more example data and cleanup this copy&paste hell... // } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index 7154740..e940dff 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -113,9 +113,9 @@ type dataUpdateDb struct { SourceId int `json:"si"` StartTime int64 `json:"st"` // unix timestamp in milliseconds Duration int64 `json:"du"` // duration in milliseconds - ClientCount uint `json:"cc"` - BytesReceived uint `json:"br"` - BytesSent uint `json:"bs"` + ClientCount uint `json:"cc,omitempty"` + BytesReceived uint `json:"br,omitempty"` + BytesSent uint `json:"bs,omitempty"` } func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { |