From 91379f77d7093fe1f667147e3099e2163419cddb Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Tue, 30 Sep 2014 21:48:58 +0200 Subject: add sfive-hub project scaffolding --- src/hub/README | 3 +- src/hub/build | 6 +++ src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 7 +++ src/hub/src/spreadspace.org/sfive-hub/sfive-hub.go | 8 --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 57 ++++++++++++++++++++ src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 25 +++++++++ src/hub/src/spreadspace.org/sfive/s5store.go | 61 ++++++++++++++++++++++ src/hub/src/spreadspace.org/sfive/s5types.go | 51 ++++++++++++++++++ 8 files changed, 209 insertions(+), 9 deletions(-) create mode 100755 src/hub/build create mode 100644 src/hub/src/spreadspace.org/sfive-hub/s5hub.go delete mode 100644 src/hub/src/spreadspace.org/sfive-hub/sfive-hub.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5cvt.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5cvt_test.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5store.go create mode 100644 src/hub/src/spreadspace.org/sfive/s5types.go (limited to 'src') diff --git a/src/hub/README b/src/hub/README index f69f1cc..0a17056 100644 --- a/src/hub/README +++ b/src/hub/README @@ -1,7 +1,8 @@ sfive-hub ========= -A service providing data aggregation, processing, persistence, filtering and retrieval. +`sfive-hub` is a service providing data aggregation, processing, persistence, +filtering and retrieval. It accepts data via unix domain sockets and REST API, either stores it persistently in a SQL database or caches it in-memory and queried for data through a REST API. diff --git a/src/hub/build b/src/hub/build new file mode 100755 index 0000000..3c42290 --- /dev/null +++ b/src/hub/build @@ -0,0 +1,6 @@ +#!/bin/sh +export GOPATH=$(pwd) +export PATH=$GOPATH/bin:$PATH +go install spreadspace.org/sfive-hub +go test spreadspace.org/sfive + diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go new file mode 100644 index 0000000..0166cf9 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Printf("Hello, world.\n") +} diff --git a/src/hub/src/spreadspace.org/sfive-hub/sfive-hub.go b/src/hub/src/spreadspace.org/sfive-hub/sfive-hub.go deleted file mode 100644 index 57ddeaf..0000000 --- a/src/hub/src/spreadspace.org/sfive-hub/sfive-hub.go +++ /dev/null @@ -1,8 +0,0 @@ -package main - -import "fmt" - -func main() { - fmt.Printf("Hello, world.\n") -} - diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go new file mode 100644 index 0000000..3402e7f --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -0,0 +1,57 @@ +package sfive + +import ( + "encoding/json" +) + +type StatsDecoder interface { + Decode(jsonString []byte) (StatisticsData, error) +} + +type StatsEncoder interface { + Encode(data StatisticsData) []byte +} + +type StatefulDecoder struct { + sourceId SourceId +} + +type PlainDecoder struct{} + +type PlainEncoder struct{} + +func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder) { + res := new(StatefulDecoder) + err := json.Unmarshal(jsonString, &res.sourceId) + if err != nil { + return nil + } + return res +} + +func NewPlainDecoder() (decoder StatsDecoder) { + return new(PlainDecoder) +} + +func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { + err = json.Unmarshal(jsonString, &dat) + if err != nil { + return + } + dat.CopyFrom(&self.sourceId) + return +} + +func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { + err = json.Unmarshal(jsonString, &dat) + return +} + +func (self *PlainEncoder) Encode(data StatisticsData) []byte { + res, err := json.Marshal(&data) + if err != nil { + panic("oh fuck I cannot event marshal my own data") + } + return res +} + diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go new file mode 100644 index 0000000..8ffcbd7 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -0,0 +1,25 @@ +package sfive + +import ( + "testing" + "fmt" +) + +func TestEncode(t *testing.T) { + testData := `{"streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["elevate", "2014"]}` +// otherDingy := ` +// {"data": {"bytes-sent": 0, "client-count": 3, "bytes-received": 0}, "start-time": "2014-08-24Z14:35:33.847282", "duration-ms": 5000} +// {"data": {"bytes-sent": 1183266, "client-count": 3, "bytes-received": 394422}, "start-time": "2014-08-24Z14:35:38.848950", "duration-ms": 5000} +// {"data": {"bytes-sent": 1199616, "client-count": 3, "bytes-received": 399872}, "start-time": "2014-08-24Z14:35:43.851006", "duration-ms": 5000} +// {"data": {"bytes-sent": 1181094, "client-count": 3, "bytes-received": 393698}, "start-time": "2014-08-24Z14:35:48.852863", "duration-ms": 5000} +// {"data": {"bytes-sent": 1190148, "client-count": 3, "bytes-received": 396716}, "start-time": "2014-08-24Z14:35:53.854541", "duration-ms": 5000} +// ` + dc := new(StatefulDecoder) + res, err := dc.Decode([]byte(testData)) + if err != nil { + t.Errorf("Decode failed with %v", err) + return + } + fmt.Println("%q", res) +} + diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go new file mode 100644 index 0000000..604a9ff --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -0,0 +1,61 @@ +package sfive + +import ( + "database/sql" + "time" +) + +type StatsFilter struct { + start *time.Time + end *time.Time + hostname *string + contentId *string + format *string + quality *string + tagsAny []string +} + +type StatsContainer interface { + Append(update StatisticsData) error + ClientCount(filter StatsFilter) uint + AverageBps(filter StatsFilter) uint + Locations(filter StatsFilter) map[string]int +} + +type SqliteStore struct { + db *sql.DB +} + +func InitSqlDb(db *sql.DB) { + +} + +func NewStore() (store StatsContainer, err error) { + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + return + } + res := &SqliteStore{db} + store = res + return +} + +func (s *SqliteStore) Append(update StatisticsData) (err error) { + // TODO + return nil +} + +func (s *SqliteStore) ClientCount(filter StatsFilter) uint { + return 0 + // TODO +} + +func (s *SqliteStore) AverageBps(filter StatsFilter) uint { + return 0 + // TODO +} + +func (s *SqliteStore) Locations(filter StatsFilter) map[string]int { + return nil + // TODO +} diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go new file mode 100644 index 0000000..56b1552 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5types.go @@ -0,0 +1,51 @@ +package sfive + +import "time" + +const ( + QualityLow = "low" + QualityMedium = "medium" + QualityHigh = "high" +) + +type StreamId struct { + ContentId string `json:"content-id"` + Format string `json:"format"` + Quality string `json:"quality"` +} + +type SourceId struct { + Hostname string `json:"hostname"` + StreamId StreamId `json:"stream-id"` + Tags []string `json:"tags"` +} + +type ClientData struct { + Ip string + BytesTransferred uint + UserAgent string +} + +type SourceData struct { + ClientCount uint + BytesReceived uint + BytesSent uint + Clients []ClientData +} + +type DataUpdate struct { + StartTime time.Time + Duration time.Duration + Data SourceData +} + +type StatisticsData struct { + SourceId + DataUpdate +} + +func (self *StatisticsData) CopyFrom(id *SourceId) { + self.Hostname = id.Hostname + self.StreamId = id.StreamId + self.Tags = id.Tags +} -- cgit v1.2.3