summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-30 17:55:32 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-30 17:55:32 +0200
commit9623607857b2048bd347f1a8a38d0de202abf2be (patch)
treeb4b8b7cb00fb692f5ca1da7e0c581a5f558c9ae9
parentadd one more testcase (diff)
cleaned up proto parser a little
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go80
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go121
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go3
5 files changed, 139 insertions, 69 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 48eac4b..415cd3f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -41,57 +41,75 @@ const (
ProtocolVersion = 1
)
-type FullDecoder interface {
+//
+// Decoder
+//
+
+type Decoder interface {
Decode(jsonString []byte) (DataUpdateFull, error)
}
-type FullEncoder interface {
- Encode(data DataUpdateFull) []byte
+// stateless protocol interfaces
+type StatelessDecoder struct{}
+
+func NewStatelessDecoder() Decoder {
+ return &StatelessDecoder{}
+}
+
+func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ if err = json.Unmarshal(jsonString, &res); err != nil {
+ return
+ }
+ if res.Version != ProtocolVersion {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
+ }
+ return
}
+// stateful protocol interfaces
type StatefulDecoder struct {
Version uint
SourceId
}
-type PlainDecoder struct{}
-
-type PlainEncoder struct{}
+func NewStatefulDecoder(jsonString []byte) (Decoder, error) {
+ res := &StatefulDecoder{}
+ if err := json.Unmarshal(jsonString, &res); err != nil {
+ return nil, err
+ }
+ if res.Version != ProtocolVersion {
+ return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
+ }
+ return res, nil
+}
-func NewStatefulDecoder(jsonString []byte) (decoder FullDecoder, err error) {
- res := new(StatefulDecoder)
- err = json.Unmarshal(jsonString, &res)
- if err != nil {
+func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ res.Version = sd.Version
+ res.CopyFromSourceId(&sd.SourceId)
+ if err = json.Unmarshal(jsonString, &res); err != nil {
return
}
- if res.Version != ProtocolVersion {
- err = fmt.Errorf("unsupported version, expected %d, actual %v", ProtocolVersion, res.Version)
+ if res.Version != sd.Version {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version)
}
- decoder = res
return
}
-func NewPlainDecoder() FullDecoder {
- return new(PlainDecoder)
-}
+//
+// Encoder
+//
-func (sd *StatefulDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) {
- dat.CopyFromSourceId(&sd.SourceId)
- err = json.Unmarshal(jsonString, &dat)
- // like in PlainDecoder, let the client decide how to use partial results
- // (Unmarshal returns partial results in case of errors)
- return
+type Encoder interface {
+ Encode(data DataUpdateFull) ([]byte, error)
}
-func (pd *PlainDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) {
- err = json.Unmarshal(jsonString, &dat)
- return
+type StatelessEncoder struct{}
+
+func NewStatelessEncoder() Encoder {
+ return &StatelessEncoder{}
}
-func (pe *PlainEncoder) Encode(data *DataUpdateFull) []byte {
- res, err := json.Marshal(data)
- if err != nil {
- s5l.Panicln("failed to encode DataUpdateFull")
- }
- return res
+func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) {
+ data.Version = ProtocolVersion
+ return json.Marshal(data)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 9e30b59..2d89485 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -33,67 +33,118 @@
package sfive
import (
+ "fmt"
"reflect"
"testing"
"time"
)
var (
- sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
- sourceIdData = `{` + sourceIdFields + `}`
- sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
- updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
- updateData = "{" + updateFields + "}"
- updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
-
- statefulInitMsg = `{ "version": 1, ` + sourceIdFields + `}`
- statefulDataMsg = `{ "version": 1, ` + updateFields + "}"
-
- statelessDataMsg = `{ "version": 1, ` + sourceIdFields + "," + updateFields + "}"
+ initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
+ updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
+ updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
)
-func GetExpected() *DataUpdateFull {
- expected := new(DataUpdateFull)
+func GetExpected() (expected DataUpdateFull) {
expected.Version = ProtocolVersion
- expected.CopyFromSourceId(&sourceIdDataStruct)
+ expected.CopyFromSourceId(&initDataStruct)
expected.CopyFromUpdate(&updateDataStruct)
- return expected
+ return
}
func TestDecodeStateful(t *testing.T) {
- dc, err := NewStatefulDecoder([]byte(statefulInitMsg))
+ // invalid init message
+ if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil {
+ t.Fatalf("creating decoder with invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded)
+ if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("creating decoder with wrong protocol version should throw an error")
+ }
+
+ // valid init message
+ statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ dec, err := NewStatefulDecoder([]byte(statefulInitMsg))
if err != nil {
- t.Errorf("Creating decoder failed with %v", err)
- return
+ t.Fatalf("creating decoder failed: %v", err)
+ }
+
+ // invalid data-update message
+ if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil {
+ t.Fatalf("decoding message with invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
}
- dat, err := dc.Decode([]byte(statefulDataMsg))
+
+ // valid data-update message
+ statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statefulUpdateMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestDecodePlain(t *testing.T) {
- ec := new(PlainDecoder)
- dat, err := ec.Decode([]byte(statelessDataMsg))
+func TestDecodeStateless(t *testing.T) {
+ dec := NewStatelessDecoder()
+
+ // invalid message
+ if _, err := dec.Decode([]byte("this is still not json")); err == nil {
+ t.Fatalf("decoding invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
+ }
+
+ // valid message
+ statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statelessDataMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestEncode(t *testing.T) {
- ec := new(PlainEncoder)
- td := new(DataUpdateFull)
- td.CopyFromSourceId(&sourceIdDataStruct)
+func TestEncodeStateless(t *testing.T) {
+ enc := NewStatelessEncoder()
+
+ var td DataUpdateFull
+ td.CopyFromSourceId(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
- t.Logf("dada: %v", ec.Encode(td))
+
+ encoded, err := enc.Encode(td)
+ if err != nil {
+ t.Fatalf("encoding message failed: %v", err)
+ }
+
+ decoded, err := NewStatelessDecoder().Decode(encoded)
+ if err != nil {
+ t.Fatalf("decoding message failed: %v", err)
+ }
+
+ expected := td
+ expected.Version = ProtocolVersion
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("Encoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
+ }
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 23429a9..e49c30d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -160,7 +160,7 @@ func (srv Server) Close() {
func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
// TODO read configuration and create instance with correct settings
- server = new(Server)
+ server = &Server{}
server.store, err = NewStore(dbPath, readOnly)
if err != nil {
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 050726a..9aa5e49 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -73,7 +73,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
func (srv Server) pipegramHandle(pconn net.PacketConn) {
- decoder := NewPlainDecoder()
+ decoder := NewStatelessDecoder()
buffer := make([]byte, 64*1024)
for {
n, _, err := pconn.ReadFrom(buffer)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 8ecb673..7f727ac 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -128,7 +128,7 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request)
func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
- decoder := NewPlainDecoder()
+ decoder := NewStatelessDecoder()
buffer, err := ioutil.ReadAll(r.Body)
if err != nil {
@@ -137,6 +137,7 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request)
return
}
+ // TODO: add different API endpoint called bulk
container := DataUpdateFullContainer{}
err = json.Unmarshal(buffer, &container)
if err == nil {