diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 80 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 121 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 3 |
5 files changed, 139 insertions, 69 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 48eac4b..415cd3f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -41,57 +41,75 @@ const ( ProtocolVersion = 1 ) -type FullDecoder interface { +// +// Decoder +// + +type Decoder interface { Decode(jsonString []byte) (DataUpdateFull, error) } -type FullEncoder interface { - Encode(data DataUpdateFull) []byte +// stateless protocol interfaces +type StatelessDecoder struct{} + +func NewStatelessDecoder() Decoder { + return &StatelessDecoder{} +} + +func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + if err = json.Unmarshal(jsonString, &res); err != nil { + return + } + if res.Version != ProtocolVersion { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) + } + return } +// stateful protocol interfaces type StatefulDecoder struct { Version uint SourceId } -type PlainDecoder struct{} - -type PlainEncoder struct{} +func NewStatefulDecoder(jsonString []byte) (Decoder, error) { + res := &StatefulDecoder{} + if err := json.Unmarshal(jsonString, &res); err != nil { + return nil, err + } + if res.Version != ProtocolVersion { + return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion) + } + return res, nil +} -func NewStatefulDecoder(jsonString []byte) (decoder FullDecoder, err error) { - res := new(StatefulDecoder) - err = json.Unmarshal(jsonString, &res) - if err != nil { +func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) { + res.Version = sd.Version + res.CopyFromSourceId(&sd.SourceId) + if err = json.Unmarshal(jsonString, &res); err != nil { return } - if res.Version != ProtocolVersion { - err = fmt.Errorf("unsupported version, expected %d, actual %v", ProtocolVersion, res.Version) + if res.Version != sd.Version { + err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version) } - decoder = res return } -func NewPlainDecoder() FullDecoder { - return new(PlainDecoder) -} +// +// Encoder +// -func (sd *StatefulDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { - dat.CopyFromSourceId(&sd.SourceId) - err = json.Unmarshal(jsonString, &dat) - // like in PlainDecoder, let the client decide how to use partial results - // (Unmarshal returns partial results in case of errors) - return +type Encoder interface { + Encode(data DataUpdateFull) ([]byte, error) } -func (pd *PlainDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { - err = json.Unmarshal(jsonString, &dat) - return +type StatelessEncoder struct{} + +func NewStatelessEncoder() Encoder { + return &StatelessEncoder{} } -func (pe *PlainEncoder) Encode(data *DataUpdateFull) []byte { - res, err := json.Marshal(data) - if err != nil { - s5l.Panicln("failed to encode DataUpdateFull") - } - return res +func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) { + data.Version = ProtocolVersion + return json.Marshal(data) } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 9e30b59..2d89485 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -33,67 +33,118 @@ package sfive import ( + "fmt" "reflect" "testing" "time" ) var ( - sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` - sourceIdData = `{` + sourceIdFields + `}` - sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} - updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` - updateData = "{" + updateFields + "}" - updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} - - statefulInitMsg = `{ "version": 1, ` + sourceIdFields + `}` - statefulDataMsg = `{ "version": 1, ` + updateFields + "}" - - statelessDataMsg = `{ "version": 1, ` + sourceIdFields + "," + updateFields + "}" + initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` + initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` + updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) -func GetExpected() *DataUpdateFull { - expected := new(DataUpdateFull) +func GetExpected() (expected DataUpdateFull) { expected.Version = ProtocolVersion - expected.CopyFromSourceId(&sourceIdDataStruct) + expected.CopyFromSourceId(&initDataStruct) expected.CopyFromUpdate(&updateDataStruct) - return expected + return } func TestDecodeStateful(t *testing.T) { - dc, err := NewStatefulDecoder([]byte(statefulInitMsg)) + // invalid init message + if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil { + t.Fatalf("creating decoder with invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded) + if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("creating decoder with wrong protocol version should throw an error") + } + + // valid init message + statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded) + dec, err := NewStatefulDecoder([]byte(statefulInitMsg)) if err != nil { - t.Errorf("Creating decoder failed with %v", err) - return + t.Fatalf("creating decoder failed: %v", err) + } + + // invalid data-update message + if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil { + t.Fatalf("decoding message with invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") } - dat, err := dc.Decode([]byte(statefulDataMsg)) + + // valid data-update message + statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded) + decoded, err := dec.Decode([]byte(statefulUpdateMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestDecodePlain(t *testing.T) { - ec := new(PlainDecoder) - dat, err := ec.Decode([]byte(statelessDataMsg)) +func TestDecodeStateless(t *testing.T) { + dec := NewStatelessDecoder() + + // invalid message + if _, err := dec.Decode([]byte("this is still not json")); err == nil { + t.Fatalf("decoding invalid json should throw an error") + } + + // wrong protocol version + wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded) + if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil { + t.Fatalf("decoding message with wrong protocol version should throw an error") + } + + // valid message + statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded) + decoded, err := dec.Decode([]byte(statelessDataMsg)) if err != nil { - t.Errorf("Decode failed with %v", err) - return + t.Fatalf("decoding message failed: %v", err) } + + // compare with expected result expected := GetExpected() - if !reflect.DeepEqual(dat, *expected) { - t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected) } } -func TestEncode(t *testing.T) { - ec := new(PlainEncoder) - td := new(DataUpdateFull) - td.CopyFromSourceId(&sourceIdDataStruct) +func TestEncodeStateless(t *testing.T) { + enc := NewStatelessEncoder() + + var td DataUpdateFull + td.CopyFromSourceId(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) - t.Logf("dada: %v", ec.Encode(td)) + + encoded, err := enc.Encode(td) + if err != nil { + t.Fatalf("encoding message failed: %v", err) + } + + decoded, err := NewStatelessDecoder().Decode(encoded) + if err != nil { + t.Fatalf("decoding message failed: %v", err) + } + + expected := td + expected.Version = ProtocolVersion + if !reflect.DeepEqual(decoded, expected) { + t.Fatalf("Encoding failed:\n actual: %v\n expected: %v\n", decoded, expected) + } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 23429a9..e49c30d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -160,7 +160,7 @@ func (srv Server) Close() { func NewServer(dbPath string, readOnly bool) (server *Server, err error) { // TODO read configuration and create instance with correct settings - server = new(Server) + server = &Server{} server.store, err = NewStore(dbPath, readOnly) if err != nil { return diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index 050726a..9aa5e49 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -73,7 +73,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } func (srv Server) pipegramHandle(pconn net.PacketConn) { - decoder := NewPlainDecoder() + decoder := NewStatelessDecoder() buffer := make([]byte, 64*1024) for { n, _, err := pconn.ReadFrom(buffer) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 8ecb673..7f727ac 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -128,7 +128,7 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request) func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" - decoder := NewPlainDecoder() + decoder := NewStatelessDecoder() buffer, err := ioutil.ReadAll(r.Body) if err != nil { @@ -137,6 +137,7 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) return } + // TODO: add different API endpoint called bulk container := DataUpdateFullContainer{} err = json.Unmarshal(buffer, &container) if err == nil { |