summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go5
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go59
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go275
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go37
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5types.go38
5 files changed, 361 insertions, 53 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 3402e7f..5a2ef70 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -47,11 +47,10 @@ func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err err
return
}
-func (self *PlainEncoder) Encode(data StatisticsData) []byte {
- res, err := json.Marshal(&data)
+func (self *PlainEncoder) Encode(data *StatisticsData) []byte {
+ res, err := json.Marshal(data)
if err != nil {
panic("oh fuck I cannot event marshal my own data")
}
return res
}
-
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 8ffcbd7..5b53160 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -1,25 +1,58 @@
package sfive
import (
+ "reflect"
"testing"
- "fmt"
+ "time"
)
-func TestEncode(t *testing.T) {
- testData := `{"streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["elevate", "2014"]}`
-// otherDingy := `
-// {"data": {"bytes-sent": 0, "client-count": 3, "bytes-received": 0}, "start-time": "2014-08-24Z14:35:33.847282", "duration-ms": 5000}
-// {"data": {"bytes-sent": 1183266, "client-count": 3, "bytes-received": 394422}, "start-time": "2014-08-24Z14:35:38.848950", "duration-ms": 5000}
-// {"data": {"bytes-sent": 1199616, "client-count": 3, "bytes-received": 399872}, "start-time": "2014-08-24Z14:35:43.851006", "duration-ms": 5000}
-// {"data": {"bytes-sent": 1181094, "client-count": 3, "bytes-received": 393698}, "start-time": "2014-08-24Z14:35:48.852863", "duration-ms": 5000}
-// {"data": {"bytes-sent": 1190148, "client-count": 3, "bytes-received": 396716}, "start-time": "2014-08-24Z14:35:53.854541", "duration-ms": 5000}
-// `
- dc := new(StatefulDecoder)
- res, err := dc.Decode([]byte(testData))
+var (
+ sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
+ sourceIdData = `{` + sourceIdFields + `}`
+ sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
+ updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
+ updateData = "{" + updateFields + "}"
+ updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
+ testData = "{" + sourceIdFields + "," + updateFields + "}"
+)
+
+func GetExpected() *StatisticsData {
+ expected := new(StatisticsData)
+ expected.CopyFrom(&sourceIdDataStruct)
+ expected.CopyFromUpdate(&updateDataStruct)
+ return expected
+}
+
+func TestDecodeStateful(t *testing.T) {
+ dc := NewStatefulDecoder([]byte(sourceIdData))
+ dat, err := dc.Decode([]byte(testData))
if err != nil {
t.Errorf("Decode failed with %v", err)
return
}
- fmt.Println("%q", res)
+ expected := GetExpected()
+ if !reflect.DeepEqual(dat, *expected) {
+ t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ }
}
+func TestDecodePlain(t *testing.T) {
+ ec := new(PlainDecoder)
+ dat, err := ec.Decode([]byte(testData))
+ if err != nil {
+ t.Errorf("Decode failed with %v", err)
+ return
+ }
+ expected := GetExpected()
+ if !reflect.DeepEqual(dat, *expected) {
+ t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected)
+ }
+}
+
+func TestEncode(t *testing.T) {
+ ec := new(PlainEncoder)
+ td := new(StatisticsData)
+ td.CopyFrom(&sourceIdDataStruct)
+ td.CopyFromUpdate(&updateDataStruct)
+ t.Logf("dada: %v", ec.Encode(td))
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 604a9ff..9d3d57e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -2,9 +2,141 @@ package sfive
import (
"database/sql"
+ "log"
+ "os"
"time"
+
+ _ "github.com/mattn/go-sqlite3"
+
+ "github.com/coopernurse/gorp"
+)
+
+// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections
+// this is very much not normalized at all, because I'm too lazy to type
+
+const (
+ TagsTn = "Tags"
+ SourceTagsTn = "StreamToTagMap"
+ SourcesTn = "Sources"
+ ClientDataUpdatesTn = "ClientDataUpdates"
+ DataUpdatesTn = "DataUpdates"
)
+// stored in TagsTn
+type tagDb struct {
+ Id int
+ Name string
+}
+
+func tagsFromStatisticsData(value StatisticsData) []tagDb {
+ tags := make([]tagDb, len(value.SourceId.Tags))
+ for i := range value.SourceId.Tags {
+ tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]}
+ }
+ return tags
+}
+
+// stored in SourceTagsTn
+// Stream m:n Tag
+type sourceTagsDb struct {
+ TagId int // foreign key to TagsTn
+ SourceId int // foreign key to SourcesTn
+}
+
+// stored in SourcesTn
+type sourceDb struct {
+ Id int
+ StreamId
+ SourceId
+}
+
+func sourceFromStatisticsData(value StatisticsData) sourceDb {
+ return sourceDb{
+ -1,
+ StreamId{
+ ContentId: value.SourceId.StreamId.ContentId,
+ Format: value.SourceId.StreamId.Format,
+ Quality: value.SourceId.StreamId.Quality,
+ },
+ SourceId{
+ Hostname: value.SourceId.Hostname},
+ }
+}
+
+// stored in ClientDataUpdatesTn
+// ClientData n:1 DataUpdate
+type clientDataDb struct {
+ Id int
+ DataUpdatesId int // foreign key to DataUpdatesTn
+ ClientData
+}
+
+func clientsFromStatisticsData(value StatisticsData) []clientDataDb {
+ res := make([]clientDataDb, len(value.Data.Clients))
+ for i := range value.Data.Clients {
+ res[i] = clientDataDb{-1, -1, value.Data.Clients[i]}
+ }
+ return res
+}
+
+// stored in DataUpdatesTn
+// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
+type dataUpdateDb struct {
+ Id int
+ SourceId int // foreign key to SourcesTn
+ StartTime time.Time
+ Duration time.Duration
+ ClientCount uint
+ BytesReceived uint
+ BytesSent uint
+}
+
+func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
+ return dataUpdateDb{
+ -1,
+ -1,
+ value.StartTime,
+ value.Duration,
+ value.Data.ClientCount,
+ value.Data.BytesReceived,
+ value.Data.BytesSent}
+}
+
+func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) {
+ du := dataUpdateFromStatisticsData(value)
+ cd := clientsFromStatisticsData(value)
+ src := sourceFromStatisticsData(value)
+ tags := tagsFromStatisticsData(value)
+
+ return du, cd, src, tags
+}
+
+func initDb() *gorp.DbMap {
+ // connect to db using standard Go database/sql API
+ db, err := sql.Open("sqlite3", "/home/gimpf/test.sqlite")
+ checkErr(err, "sql.Open failed")
+
+ dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}}
+
+ dbmap.AddTableWithName(tagDb{}, TagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true)
+ dbmap.AddTableWithName(sourceTagsDb{}, SourceTagsTn).SetKeys(false, "TagId", "SourceId")
+ dbmap.AddTableWithName(sourceDb{}, SourcesTn).SetKeys(true, "Id")
+ dbmap.AddTableWithName(clientDataDb{}, ClientDataUpdatesTn).SetKeys(true, "Id")
+ dbmap.AddTableWithName(dataUpdateDb{}, DataUpdatesTn).SetKeys(true, "Id")
+
+ // TODO use some real migration, yadda yadda
+ err = dbmap.CreateTablesIfNotExists()
+ checkErr(err, "Create tables failed")
+
+ return dbmap
+}
+
+func checkErr(err error, msg string) {
+ if err != nil {
+ log.Fatalln(msg, err)
+ }
+}
+
type StatsFilter struct {
start *time.Time
end *time.Time
@@ -15,47 +147,148 @@ type StatsFilter struct {
tagsAny []string
}
+type Closer interface {
+ Close()
+}
+
type StatsContainer interface {
Append(update StatisticsData) error
- ClientCount(filter StatsFilter) uint
- AverageBps(filter StatsFilter) uint
- Locations(filter StatsFilter) map[string]int
+ CountUpdateEntries() (int64, error)
+ GetTags() ([]string, error)
+ ClientCount(filter *StatsFilter) uint
+ AverageBps(filter *StatsFilter) (uint, error)
+ Locations(filter *StatsFilter) map[string]int
}
-type SqliteStore struct {
- db *sql.DB
+type sqliteStore struct {
+ db *gorp.DbMap
}
-func InitSqlDb(db *sql.DB) {
+func (s sqliteStore) Append(update StatisticsData) (err error) {
+ du, cd, src, tags := updateFromStatisticsData(update)
-}
+ s.db.TraceOn("", log.New(os.Stdout, "gorptest: ", log.Lmicroseconds))
+ tx, err := s.db.Begin()
+ if err != nil {
+ //fmt.Printf("tx\n")
+ return
+ }
-func NewStore() (store StatsContainer, err error) {
- db, err := sql.Open("sqlite3", ":memory:")
+ for i := range tags {
+ err = s.db.Insert(&tags[i])
+ if err != nil {
+ //fmt.Printf("tags\n")
+ return
+ }
+ }
+
+ err = s.db.Insert(&src)
if err != nil {
+ //fmt.Printf("src\n")
return
}
- res := &SqliteStore{db}
- store = res
+
+ st := make([]sourceTagsDb, len(tags))
+ for i := range tags {
+ st[i].TagId = tags[i].Id
+ st[i].SourceId = src.Id
+ }
+ for i := range st {
+ err = s.db.Insert(&st[i])
+ if err != nil {
+ //fmt.Printf("st\n")
+ return
+ }
+ }
+
+ du.SourceId = src.Id
+ err = s.db.Insert(&du)
+ if err != nil {
+ //fmt.Printf("du\n")
+ return
+ }
+
+ for i := range cd {
+ cd[i].DataUpdatesId = du.Id
+ err = s.db.Insert(&cd)
+ if err != nil {
+ return
+ }
+
+ }
+ return tx.Commit()
+}
+
+func (s sqliteStore) CountUpdateEntries() (count int64, err error) {
+ count, err = s.db.SelectInt("select count(*) from " + DataUpdatesTn)
return
}
-func (s *SqliteStore) Append(update StatisticsData) (err error) {
- // TODO
- return nil
+func (s sqliteStore) GetTags() ([]string, error) {
+ res, dbErr := s.db.Select("", "select Name from "+TagsTn)
+ if dbErr == nil {
+ sRes := ToString(res)
+ return sRes, nil
+ }
+ return nil, dbErr
}
-func (s *SqliteStore) ClientCount(filter StatsFilter) uint {
- return 0
- // TODO
+func ToString(value []interface{}) []string {
+ res := make([]string, len(value))
+ for i := range value {
+ res[i] = value[i].(string)
+ }
+ return res
}
-func (s *SqliteStore) AverageBps(filter StatsFilter) uint {
- return 0
- // TODO
+func (s sqliteStore) ClientCount(filter *StatsFilter) uint {
+ count, _ := s.db.SelectInt(
+ "select count(distict (Ip, UserAgent)) from " + ClientDataUpdatesTn)
+ return uint(count)
+}
+
+type bpsQueryResult struct {
+ BytesReceived uint
+ BytesSent uint
+ StartTime time.Time
+ LastStartTime time.Time
+ LastDuration time.Duration
}
-func (s *SqliteStore) Locations(filter StatsFilter) map[string]int {
+func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) {
+ res := bpsQueryResult{}
+ err := s.db.SelectOne(res, "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+DataUpdatesTn)
+ if err == nil {
+ bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds())
+ return bps, nil
+ }
+ return 0, err
+}
+
+func (s sqliteStore) Locations(filter *StatsFilter) map[string]int {
return nil
// TODO
}
+
+func NewStore() (store StatsContainer, err error) {
+ db := initDb()
+ if db == nil {
+ return
+ }
+ res := sqliteStore{db}
+ store = res
+ return
+}
+
+func EatDataAndClose(sc StatsContainer) {
+ s := sc.(sqliteStore)
+ // if s == nil {
+ // return
+ // }
+ s.db.TruncateTables()
+ s.Close()
+}
+
+func (s *sqliteStore) Close() {
+ s.db.Db.Close()
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
new file mode 100644
index 0000000..3ff437e
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -0,0 +1,37 @@
+package sfive
+
+import (
+ "testing"
+ "time"
+)
+
+func TestAppend(t *testing.T) {
+ store, err := NewStore()
+ if err != nil {
+ t.Errorf("Failed to initialize: %v", err)
+ }
+ defer EatDataAndClose(store)
+
+ startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
+ update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond}
+ streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
+ source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId}
+ dat := StatisticsData{source, update}
+
+ err = store.Append(dat)
+ if err != nil {
+ t.Errorf("Failed to append: %v", err)
+ }
+}
+
+func IgnoreTestCount(t *testing.T) {
+ store, err := NewStore()
+ if err != nil {
+ t.Errorf("Failed to initialize: %v", err)
+ }
+ defer EatDataAndClose(store)
+
+ if 0 != store.ClientCount(nil) {
+ t.Errorf("Failed to count correctly.")
+ }
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go
index 56b1552..64b3f48 100644
--- a/src/hub/src/spreadspace.org/sfive/s5types.go
+++ b/src/hub/src/spreadspace.org/sfive/s5types.go
@@ -9,34 +9,34 @@ const (
)
type StreamId struct {
- ContentId string `json:"content-id"`
- Format string `json:"format"`
- Quality string `json:"quality"`
+ ContentId string `json:"content-id"`
+ Format string `json:"format"`
+ Quality string `json:"quality"`
}
type SourceId struct {
- Hostname string `json:"hostname"`
- StreamId StreamId `json:"stream-id"`
- Tags []string `json:"tags"`
+ Hostname string `json:"hostname"`
+ StreamId StreamId `json:"streamer-id" db:"-"`
+ Tags []string `json:"tags" db:"-"`
}
type ClientData struct {
- Ip string
- BytesTransferred uint
- UserAgent string
+ Ip string `json:"ip"`
+ BytesTransferred uint `json:"bytes-transferred"`
+ UserAgent string `json:"user-agent"`
}
type SourceData struct {
- ClientCount uint
- BytesReceived uint
- BytesSent uint
- Clients []ClientData
+ ClientCount uint `json:"client-count"`
+ BytesReceived uint `json:"bytes-received"`
+ BytesSent uint `json:"bytes-sent"`
+ Clients []ClientData `json:"clients"`
}
type DataUpdate struct {
- StartTime time.Time
- Duration time.Duration
- Data SourceData
+ StartTime time.Time `json:"start-time"`
+ Duration time.Duration `json:"duration-ms"`
+ Data SourceData `json:"data"`
}
type StatisticsData struct {
@@ -49,3 +49,9 @@ func (self *StatisticsData) CopyFrom(id *SourceId) {
self.StreamId = id.StreamId
self.Tags = id.Tags
}
+
+func (self *StatisticsData) CopyFromUpdate(id *DataUpdate) {
+ self.StartTime = id.StartTime
+ self.Duration = id.Duration
+ self.Data = id.Data
+}