summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/.gitignore1
-rw-r--r--src/hub/Makefile5
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go80
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go121
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go3
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go16
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go21
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go6
10 files changed, 173 insertions, 84 deletions
diff --git a/src/hub/.gitignore b/src/hub/.gitignore
index 7be1b96..dd65bbd 100644
--- a/src/hub/.gitignore
+++ b/src/hub/.gitignore
@@ -6,3 +6,4 @@
*.a
*.o
/test
+/coverage.out
diff --git a/src/hub/Makefile b/src/hub/Makefile
index dd77ded..daf06ef 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -80,6 +80,11 @@ bench: getlibs
@echo "testing and benchmarking: sfive"
@$(GOCMD) test -bench=. spreadspace.org/sfive
+cover: getlibs
+ @echo "testing and benchmarking: sfive"
+ @$(GOCMD) test -coverprofile=coverage.out spreadspace.org/sfive
+ @$(GOCMD) tool cover -html=coverage.out
+
clean:
rm -rf pkg/*/spreadspace.org
rm -rf bin
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 48eac4b..415cd3f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -41,57 +41,75 @@ const (
ProtocolVersion = 1
)
-type FullDecoder interface {
+//
+// Decoder
+//
+
+type Decoder interface {
Decode(jsonString []byte) (DataUpdateFull, error)
}
-type FullEncoder interface {
- Encode(data DataUpdateFull) []byte
+// stateless protocol interfaces
+type StatelessDecoder struct{}
+
+func NewStatelessDecoder() Decoder {
+ return &StatelessDecoder{}
+}
+
+func (pd *StatelessDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ if err = json.Unmarshal(jsonString, &res); err != nil {
+ return
+ }
+ if res.Version != ProtocolVersion {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
+ }
+ return
}
+// stateful protocol interfaces
type StatefulDecoder struct {
Version uint
SourceId
}
-type PlainDecoder struct{}
-
-type PlainEncoder struct{}
+func NewStatefulDecoder(jsonString []byte) (Decoder, error) {
+ res := &StatefulDecoder{}
+ if err := json.Unmarshal(jsonString, &res); err != nil {
+ return nil, err
+ }
+ if res.Version != ProtocolVersion {
+ return nil, fmt.Errorf("unsupported version: %d, expected: %d", res.Version, ProtocolVersion)
+ }
+ return res, nil
+}
-func NewStatefulDecoder(jsonString []byte) (decoder FullDecoder, err error) {
- res := new(StatefulDecoder)
- err = json.Unmarshal(jsonString, &res)
- if err != nil {
+func (sd *StatefulDecoder) Decode(jsonString []byte) (res DataUpdateFull, err error) {
+ res.Version = sd.Version
+ res.CopyFromSourceId(&sd.SourceId)
+ if err = json.Unmarshal(jsonString, &res); err != nil {
return
}
- if res.Version != ProtocolVersion {
- err = fmt.Errorf("unsupported version, expected %d, actual %v", ProtocolVersion, res.Version)
+ if res.Version != sd.Version {
+ err = fmt.Errorf("unsupported version: %d, expected: %d", res.Version, sd.Version)
}
- decoder = res
return
}
-func NewPlainDecoder() FullDecoder {
- return new(PlainDecoder)
-}
+//
+// Encoder
+//
-func (sd *StatefulDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) {
- dat.CopyFromSourceId(&sd.SourceId)
- err = json.Unmarshal(jsonString, &dat)
- // like in PlainDecoder, let the client decide how to use partial results
- // (Unmarshal returns partial results in case of errors)
- return
+type Encoder interface {
+ Encode(data DataUpdateFull) ([]byte, error)
}
-func (pd *PlainDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) {
- err = json.Unmarshal(jsonString, &dat)
- return
+type StatelessEncoder struct{}
+
+func NewStatelessEncoder() Encoder {
+ return &StatelessEncoder{}
}
-func (pe *PlainEncoder) Encode(data *DataUpdateFull) []byte {
- res, err := json.Marshal(data)
- if err != nil {
- s5l.Panicln("failed to encode DataUpdateFull")
- }
- return res
+func (pe *StatelessEncoder) Encode(data DataUpdateFull) (res []byte, err error) {
+ data.Version = ProtocolVersion
+ return json.Marshal(data)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 9e30b59..2d89485 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -33,67 +33,118 @@
package sfive
import (
+ "fmt"
"reflect"
"testing"
"time"
)
var (
- sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
- sourceIdData = `{` + sourceIdFields + `}`
- sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
- updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
- updateData = "{" + updateFields + "}"
- updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
-
- statefulInitMsg = `{ "version": 1, ` + sourceIdFields + `}`
- statefulDataMsg = `{ "version": 1, ` + updateFields + "}"
-
- statelessDataMsg = `{ "version": 1, ` + sourceIdFields + "," + updateFields + "}"
+ initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
+ updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
+ updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
)
-func GetExpected() *DataUpdateFull {
- expected := new(DataUpdateFull)
+func GetExpected() (expected DataUpdateFull) {
expected.Version = ProtocolVersion
- expected.CopyFromSourceId(&sourceIdDataStruct)
+ expected.CopyFromSourceId(&initDataStruct)
expected.CopyFromUpdate(&updateDataStruct)
- return expected
+ return
}
func TestDecodeStateful(t *testing.T) {
- dc, err := NewStatefulDecoder([]byte(statefulInitMsg))
+ // invalid init message
+ if _, err := NewStatefulDecoder([]byte("this is not json")); err == nil {
+ t.Fatalf("creating decoder with invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, initDataEncoded)
+ if _, err := NewStatefulDecoder([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("creating decoder with wrong protocol version should throw an error")
+ }
+
+ // valid init message
+ statefulInitMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, initDataEncoded)
+ dec, err := NewStatefulDecoder([]byte(statefulInitMsg))
if err != nil {
- t.Errorf("Creating decoder failed with %v", err)
- return
+ t.Fatalf("creating decoder failed: %v", err)
+ }
+
+ // invalid data-update message
+ if _, err := dec.Decode([]byte("this isn't valid json either")); err == nil {
+ t.Fatalf("decoding message with invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg = fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion+1, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
}
- dat, err := dc.Decode([]byte(statefulDataMsg))
+
+ // valid data-update message
+ statefulUpdateMsg := fmt.Sprintf(`{ "version": %d, %s }`, ProtocolVersion, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statefulUpdateMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestDecodePlain(t *testing.T) {
- ec := new(PlainDecoder)
- dat, err := ec.Decode([]byte(statelessDataMsg))
+func TestDecodeStateless(t *testing.T) {
+ dec := NewStatelessDecoder()
+
+ // invalid message
+ if _, err := dec.Decode([]byte("this is still not json")); err == nil {
+ t.Fatalf("decoding invalid json should throw an error")
+ }
+
+ // wrong protocol version
+ wrongProtoVerMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion+1, initDataEncoded, updateDataEncoded)
+ if _, err := dec.Decode([]byte(wrongProtoVerMsg)); err == nil {
+ t.Fatalf("decoding message with wrong protocol version should throw an error")
+ }
+
+ // valid message
+ statelessDataMsg := fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initDataEncoded, updateDataEncoded)
+ decoded, err := dec.Decode([]byte(statelessDataMsg))
if err != nil {
- t.Errorf("Decode failed with %v", err)
- return
+ t.Fatalf("decoding message failed: %v", err)
}
+
+ // compare with expected result
expected := GetExpected()
- if !reflect.DeepEqual(dat, *expected) {
- t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("decoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
}
}
-func TestEncode(t *testing.T) {
- ec := new(PlainEncoder)
- td := new(DataUpdateFull)
- td.CopyFromSourceId(&sourceIdDataStruct)
+func TestEncodeStateless(t *testing.T) {
+ enc := NewStatelessEncoder()
+
+ var td DataUpdateFull
+ td.CopyFromSourceId(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
- t.Logf("dada: %v", ec.Encode(td))
+
+ encoded, err := enc.Encode(td)
+ if err != nil {
+ t.Fatalf("encoding message failed: %v", err)
+ }
+
+ decoded, err := NewStatelessDecoder().Decode(encoded)
+ if err != nil {
+ t.Fatalf("decoding message failed: %v", err)
+ }
+
+ expected := td
+ expected.Version = ProtocolVersion
+ if !reflect.DeepEqual(decoded, expected) {
+ t.Fatalf("Encoding failed:\n actual: %v\n expected: %v\n", decoded, expected)
+ }
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 23429a9..e49c30d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -160,7 +160,7 @@ func (srv Server) Close() {
func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
// TODO read configuration and create instance with correct settings
- server = new(Server)
+ server = &Server{}
server.store, err = NewStore(dbPath, readOnly)
if err != nil {
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index 050726a..9aa5e49 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -73,7 +73,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
func (srv Server) pipegramHandle(pconn net.PacketConn) {
- decoder := NewPlainDecoder()
+ decoder := NewStatelessDecoder()
buffer := make([]byte, 64*1024)
for {
n, _, err := pconn.ReadFrom(buffer)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index fd7e214..657492f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -143,7 +143,7 @@ func (srv Server) webGetUpdate(c web.C, w http.ResponseWriter, r *http.Request)
func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "update"
- decoder := NewPlainDecoder()
+ decoder := NewStatelessDecoder()
buffer, err := ioutil.ReadAll(r.Body)
if err != nil {
@@ -152,6 +152,7 @@ func (srv Server) webPostUpdate(c web.C, w http.ResponseWriter, r *http.Request)
return
}
+ // TODO: add different API endpoint called bulk
container := DataUpdateFullContainer{}
err = json.Unmarshal(buffer, &container)
if err == nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 8b9cae6..f097e8e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -400,21 +400,15 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
// fetch all the key-value pairs and merge them into the multidimensional dataupdate
func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
- var clients []ClientData
- if clients, err = st.getClients(tx, duId); err != nil {
- return
- }
+ res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
var src sourceDb
if src, err = st.getSource(tx, du.SourceId); err != nil {
return
}
- res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
- res.Hostname = src.Hostname
- res.StreamId.ContentId = src.StreamId.ContentId
- res.StreamId.Format = src.StreamId.Format
- res.StreamId.Quality = src.StreamId.Quality
- res.Tags = src.Tags
- res.Data.Clients = clients
+ res.CopyFromSourceDb(src)
+ if res.Data.Clients, err = st.getClients(tx, duId); err != nil {
+ return
+ }
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index b9aa36c..4c74a81 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -223,8 +223,27 @@ func TestAppendAndFetch(t *testing.T) {
t.Fatalf("should have been equal\nactual: %v\nexpected: %v\n", out, expected)
}
+ // remote dataupdate
+ in = DataUpdateFull{0, "7411836d-58f7-4a36-85d6-409b4e4ca30c", 3, source, update}
+
+ if err = store.Append(in); err != nil {
+ t.Fatalf("Failed to append update: %v", err)
+ }
+
+ out, err = store.GetUpdate(2)
+ if err != nil {
+ t.Fatalf("Failed to fetch update: %v", err)
+
+ }
+ out.StartTime = out.StartTime.UTC() // this should be handled by the protocol encoder
+ expected = in
+
+ if !reflect.DeepEqual(expected, out) {
+ t.Fatalf("should have been equal\nactual: %v\nexpected: %v\n", out, expected)
+ }
+
//
- // TODO: more example data
+ // TODO: more example data and cleanup this copy&paste hell...
//
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index 7154740..e940dff 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -113,9 +113,9 @@ type dataUpdateDb struct {
SourceId int `json:"si"`
StartTime int64 `json:"st"` // unix timestamp in milliseconds
Duration int64 `json:"du"` // duration in milliseconds
- ClientCount uint `json:"cc"`
- BytesReceived uint `json:"br"`
- BytesSent uint `json:"bs"`
+ ClientCount uint `json:"cc,omitempty"`
+ BytesReceived uint `json:"br,omitempty"`
+ BytesSent uint `json:"bs,omitempty"`
}
func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb {