From 3bf7274a0414a987136f3bbacf817d9d046aa201 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 11 Oct 2014 20:54:05 +0200 Subject: fix data conversion and tests --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 5 +-- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 59 +++++++++++++++++++------ src/hub/src/spreadspace.org/sfive/s5types.go | 38 +++++++++------- 3 files changed, 70 insertions(+), 32 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 3402e7f..5a2ef70 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -47,11 +47,10 @@ func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err err return } -func (self *PlainEncoder) Encode(data StatisticsData) []byte { - res, err := json.Marshal(&data) +func (self *PlainEncoder) Encode(data *StatisticsData) []byte { + res, err := json.Marshal(data) if err != nil { panic("oh fuck I cannot event marshal my own data") } return res } - diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 8ffcbd7..5b53160 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -1,25 +1,58 @@ package sfive import ( + "reflect" "testing" - "fmt" + "time" ) -func TestEncode(t *testing.T) { - testData := `{"streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "hostname": "localhost", "tags": ["elevate", "2014"]}` -// otherDingy := ` -// {"data": {"bytes-sent": 0, "client-count": 3, "bytes-received": 0}, "start-time": "2014-08-24Z14:35:33.847282", "duration-ms": 5000} -// {"data": {"bytes-sent": 1183266, "client-count": 3, "bytes-received": 394422}, "start-time": "2014-08-24Z14:35:38.848950", "duration-ms": 5000} -// {"data": {"bytes-sent": 1199616, "client-count": 3, "bytes-received": 399872}, "start-time": "2014-08-24Z14:35:43.851006", "duration-ms": 5000} -// {"data": {"bytes-sent": 1181094, "client-count": 3, "bytes-received": 393698}, "start-time": "2014-08-24Z14:35:48.852863", "duration-ms": 5000} -// {"data": {"bytes-sent": 1190148, "client-count": 3, "bytes-received": 396716}, "start-time": "2014-08-24Z14:35:53.854541", "duration-ms": 5000} -// ` - dc := new(StatefulDecoder) - res, err := dc.Decode([]byte(testData)) +var ( + sourceIdFields = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` + sourceIdData = `{` + sourceIdFields + `}` + sourceIdDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + updateFields = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` + updateData = "{" + updateFields + "}" + updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} + testData = "{" + sourceIdFields + "," + updateFields + "}" +) + +func GetExpected() *StatisticsData { + expected := new(StatisticsData) + expected.CopyFrom(&sourceIdDataStruct) + expected.CopyFromUpdate(&updateDataStruct) + return expected +} + +func TestDecodeStateful(t *testing.T) { + dc := NewStatefulDecoder([]byte(sourceIdData)) + dat, err := dc.Decode([]byte(testData)) if err != nil { t.Errorf("Decode failed with %v", err) return } - fmt.Println("%q", res) + expected := GetExpected() + if !reflect.DeepEqual(dat, *expected) { + t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + } } +func TestDecodePlain(t *testing.T) { + ec := new(PlainDecoder) + dat, err := ec.Decode([]byte(testData)) + if err != nil { + t.Errorf("Decode failed with %v", err) + return + } + expected := GetExpected() + if !reflect.DeepEqual(dat, *expected) { + t.Errorf("should have been equal\nactual: %v\nexpected: %v\n", &dat, expected) + } +} + +func TestEncode(t *testing.T) { + ec := new(PlainEncoder) + td := new(StatisticsData) + td.CopyFrom(&sourceIdDataStruct) + td.CopyFromUpdate(&updateDataStruct) + t.Logf("dada: %v", ec.Encode(td)) +} diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go index 56b1552..3b7dc31 100644 --- a/src/hub/src/spreadspace.org/sfive/s5types.go +++ b/src/hub/src/spreadspace.org/sfive/s5types.go @@ -9,34 +9,34 @@ const ( ) type StreamId struct { - ContentId string `json:"content-id"` - Format string `json:"format"` - Quality string `json:"quality"` + ContentId string `json:"content-id"` + Format string `json:"format"` + Quality string `json:"quality"` } type SourceId struct { - Hostname string `json:"hostname"` - StreamId StreamId `json:"stream-id"` - Tags []string `json:"tags"` + Hostname string `json:"hostname"` + StreamId StreamId `json:"streamer-id"` + Tags []string `json:"tags"` } type ClientData struct { - Ip string - BytesTransferred uint - UserAgent string + Ip string `json:"ip"` + BytesTransferred uint `json:"bytes-transferred"` + UserAgent string `json:"user-agent"` } type SourceData struct { - ClientCount uint - BytesReceived uint - BytesSent uint - Clients []ClientData + ClientCount uint `json:"client-count"` + BytesReceived uint `json:"bytes-received"` + BytesSent uint `json:"bytes-sent"` + Clients []ClientData `json:"clients"` } type DataUpdate struct { - StartTime time.Time - Duration time.Duration - Data SourceData + StartTime time.Time `json:"start-time"` + Duration time.Duration `json:"duration-ms"` + Data SourceData `json:"data"` } type StatisticsData struct { @@ -49,3 +49,9 @@ func (self *StatisticsData) CopyFrom(id *SourceId) { self.StreamId = id.StreamId self.Tags = id.Tags } + +func (self *StatisticsData) CopyFromUpdate(id *DataUpdate) { + self.StartTime = id.StartTime + self.Duration = id.Duration + self.Data = id.Data +} -- cgit v1.2.3 From 79407cfc14d8141957c4c42f02f088d0691bcf1f Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Mon, 13 Oct 2014 18:11:41 +0200 Subject: add db (gorp) initialization --- src/hub/src/spreadspace.org/sfive/s5store.go | 87 +++++++++++++++++------ src/hub/src/spreadspace.org/sfive/s5store_test.go | 13 ++++ 2 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5store_test.go (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 604a9ff..28b6210 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -2,9 +2,60 @@ package sfive import ( "database/sql" + "log" "time" + + _ "github.com/mattn/go-sqlite3" + + "github.com/coopernurse/gorp" ) +type streamIdDb struct { + Id int + StreamId +} + +type dataUpdateDb struct { + Id int + DataUpdate +} + +type dataUpdateToStreamIdDb struct { + StreamId int + DataUpdateId int +} + +func initDb() *gorp.DbMap { + // connect to db using standard Go database/sql API + // use whatever database/sql driver you wish + db, err := sql.Open("sqlite3", "/home/gimpf/test.sqlite") + checkErr(err, "sql.Open failed") + + // construct a gorp DbMap + dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} + + // add a table, setting the table name to 'posts' and + // specifying that the Id property is an auto incrementing PK + dbmap.AddTableWithName(streamIdDb{}, "StreamIds").SetKeys(true, "Id") + dbmap.AddTableWithName(dataUpdateDb{}, "DataUpdates").SetKeys(true, "Id") + // dbmap.AddTableWithName(dataUpdateToStreamIdDb{}, "DataUpdatesPerStream").SetKeys(true, "StreamId", "DataUpdateId") + + // create the table. in a production system you'd generally + // use a migration tool, or create the tables via scripts + err = dbmap.DropTablesIfExists() + checkErr(err, "Drop tables failed") + err = dbmap.CreateTables() + checkErr(err, "Create tables failed") + + return dbmap +} + +func checkErr(err error, msg string) { + if err != nil { + log.Fatalln(msg, err) + } +} + type StatsFilter struct { start *time.Time end *time.Time @@ -22,40 +73,36 @@ type StatsContainer interface { Locations(filter StatsFilter) map[string]int } -type SqliteStore struct { - db *sql.DB +type sqliteStore struct { + db *gorp.DbMap } -func InitSqlDb(db *sql.DB) { - -} - -func NewStore() (store StatsContainer, err error) { - db, err := sql.Open("sqlite3", ":memory:") - if err != nil { - return - } - res := &SqliteStore{db} - store = res - return -} - -func (s *SqliteStore) Append(update StatisticsData) (err error) { +func (s *sqliteStore) Append(update StatisticsData) (err error) { // TODO return nil } -func (s *SqliteStore) ClientCount(filter StatsFilter) uint { +func (s *sqliteStore) ClientCount(filter StatsFilter) uint { return 0 // TODO } -func (s *SqliteStore) AverageBps(filter StatsFilter) uint { +func (s *sqliteStore) AverageBps(filter StatsFilter) uint { return 0 // TODO } -func (s *SqliteStore) Locations(filter StatsFilter) map[string]int { +func (s *sqliteStore) Locations(filter StatsFilter) map[string]int { return nil // TODO } + +func NewStore() (store StatsContainer, err error) { + db := initDb() + if db == nil { + return + } + res := &sqliteStore{db} + store = res + return +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go new file mode 100644 index 0000000..7708525 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -0,0 +1,13 @@ +package sfive + +import ( + "testing" + "time" +) + +func TestStuff(t *testing.T) { + gd := initDb() + dat := dataUpdateDb{-1, DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}} + gd.Insert(dat) + // _ = gd.Db +} -- cgit v1.2.3 From 7348b5e9b62275aa93ede91048b628b393ae7884 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Mon, 13 Oct 2014 21:54:48 +0200 Subject: add parts of delusional data store --- src/hub/src/spreadspace.org/sfive/s5store.go | 252 +++++++++++++++++++--- src/hub/src/spreadspace.org/sfive/s5store_test.go | 34 ++- src/hub/src/spreadspace.org/sfive/s5types.go | 4 +- 3 files changed, 250 insertions(+), 40 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 28b6210..9d3d57e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -3,6 +3,7 @@ package sfive import ( "database/sql" "log" + "os" "time" _ "github.com/mattn/go-sqlite3" @@ -10,41 +11,121 @@ import ( "github.com/coopernurse/gorp" ) -type streamIdDb struct { +// compared to JSON DTOs, DB types are flattened, and use key-relations instead of collections +// this is very much not normalized at all, because I'm too lazy to type + +const ( + TagsTn = "Tags" + SourceTagsTn = "StreamToTagMap" + SourcesTn = "Sources" + ClientDataUpdatesTn = "ClientDataUpdates" + DataUpdatesTn = "DataUpdates" +) + +// stored in TagsTn +type tagDb struct { + Id int + Name string +} + +func tagsFromStatisticsData(value StatisticsData) []tagDb { + tags := make([]tagDb, len(value.SourceId.Tags)) + for i := range value.SourceId.Tags { + tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} + } + return tags +} + +// stored in SourceTagsTn +// Stream m:n Tag +type sourceTagsDb struct { + TagId int // foreign key to TagsTn + SourceId int // foreign key to SourcesTn +} + +// stored in SourcesTn +type sourceDb struct { Id int StreamId + SourceId +} + +func sourceFromStatisticsData(value StatisticsData) sourceDb { + return sourceDb{ + -1, + StreamId{ + ContentId: value.SourceId.StreamId.ContentId, + Format: value.SourceId.StreamId.Format, + Quality: value.SourceId.StreamId.Quality, + }, + SourceId{ + Hostname: value.SourceId.Hostname}, + } } +// stored in ClientDataUpdatesTn +// ClientData n:1 DataUpdate +type clientDataDb struct { + Id int + DataUpdatesId int // foreign key to DataUpdatesTn + ClientData +} + +func clientsFromStatisticsData(value StatisticsData) []clientDataDb { + res := make([]clientDataDb, len(value.Data.Clients)) + for i := range value.Data.Clients { + res[i] = clientDataDb{-1, -1, value.Data.Clients[i]} + } + return res +} + +// stored in DataUpdatesTn +// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs type dataUpdateDb struct { - Id int - DataUpdate + Id int + SourceId int // foreign key to SourcesTn + StartTime time.Time + Duration time.Duration + ClientCount uint + BytesReceived uint + BytesSent uint +} + +func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { + return dataUpdateDb{ + -1, + -1, + value.StartTime, + value.Duration, + value.Data.ClientCount, + value.Data.BytesReceived, + value.Data.BytesSent} } -type dataUpdateToStreamIdDb struct { - StreamId int - DataUpdateId int +func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataDb, sourceDb, []tagDb) { + du := dataUpdateFromStatisticsData(value) + cd := clientsFromStatisticsData(value) + src := sourceFromStatisticsData(value) + tags := tagsFromStatisticsData(value) + + return du, cd, src, tags } func initDb() *gorp.DbMap { // connect to db using standard Go database/sql API - // use whatever database/sql driver you wish db, err := sql.Open("sqlite3", "/home/gimpf/test.sqlite") checkErr(err, "sql.Open failed") - // construct a gorp DbMap dbmap := &gorp.DbMap{Db: db, Dialect: gorp.SqliteDialect{}} - // add a table, setting the table name to 'posts' and - // specifying that the Id property is an auto incrementing PK - dbmap.AddTableWithName(streamIdDb{}, "StreamIds").SetKeys(true, "Id") - dbmap.AddTableWithName(dataUpdateDb{}, "DataUpdates").SetKeys(true, "Id") - // dbmap.AddTableWithName(dataUpdateToStreamIdDb{}, "DataUpdatesPerStream").SetKeys(true, "StreamId", "DataUpdateId") - - // create the table. in a production system you'd generally - // use a migration tool, or create the tables via scripts - err = dbmap.DropTablesIfExists() - checkErr(err, "Drop tables failed") - err = dbmap.CreateTables() + dbmap.AddTableWithName(tagDb{}, TagsTn).SetKeys(true, "Id").ColMap("Name").SetUnique(true) + dbmap.AddTableWithName(sourceTagsDb{}, SourceTagsTn).SetKeys(false, "TagId", "SourceId") + dbmap.AddTableWithName(sourceDb{}, SourcesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(clientDataDb{}, ClientDataUpdatesTn).SetKeys(true, "Id") + dbmap.AddTableWithName(dataUpdateDb{}, DataUpdatesTn).SetKeys(true, "Id") + + // TODO use some real migration, yadda yadda + err = dbmap.CreateTablesIfNotExists() checkErr(err, "Create tables failed") return dbmap @@ -66,33 +147,125 @@ type StatsFilter struct { tagsAny []string } +type Closer interface { + Close() +} + type StatsContainer interface { Append(update StatisticsData) error - ClientCount(filter StatsFilter) uint - AverageBps(filter StatsFilter) uint - Locations(filter StatsFilter) map[string]int + CountUpdateEntries() (int64, error) + GetTags() ([]string, error) + ClientCount(filter *StatsFilter) uint + AverageBps(filter *StatsFilter) (uint, error) + Locations(filter *StatsFilter) map[string]int } type sqliteStore struct { db *gorp.DbMap } -func (s *sqliteStore) Append(update StatisticsData) (err error) { - // TODO - return nil +func (s sqliteStore) Append(update StatisticsData) (err error) { + du, cd, src, tags := updateFromStatisticsData(update) + + s.db.TraceOn("", log.New(os.Stdout, "gorptest: ", log.Lmicroseconds)) + tx, err := s.db.Begin() + if err != nil { + //fmt.Printf("tx\n") + return + } + + for i := range tags { + err = s.db.Insert(&tags[i]) + if err != nil { + //fmt.Printf("tags\n") + return + } + } + + err = s.db.Insert(&src) + if err != nil { + //fmt.Printf("src\n") + return + } + + st := make([]sourceTagsDb, len(tags)) + for i := range tags { + st[i].TagId = tags[i].Id + st[i].SourceId = src.Id + } + for i := range st { + err = s.db.Insert(&st[i]) + if err != nil { + //fmt.Printf("st\n") + return + } + } + + du.SourceId = src.Id + err = s.db.Insert(&du) + if err != nil { + //fmt.Printf("du\n") + return + } + + for i := range cd { + cd[i].DataUpdatesId = du.Id + err = s.db.Insert(&cd) + if err != nil { + return + } + + } + return tx.Commit() } -func (s *sqliteStore) ClientCount(filter StatsFilter) uint { - return 0 - // TODO +func (s sqliteStore) CountUpdateEntries() (count int64, err error) { + count, err = s.db.SelectInt("select count(*) from " + DataUpdatesTn) + return } -func (s *sqliteStore) AverageBps(filter StatsFilter) uint { - return 0 - // TODO +func (s sqliteStore) GetTags() ([]string, error) { + res, dbErr := s.db.Select("", "select Name from "+TagsTn) + if dbErr == nil { + sRes := ToString(res) + return sRes, nil + } + return nil, dbErr +} + +func ToString(value []interface{}) []string { + res := make([]string, len(value)) + for i := range value { + res[i] = value[i].(string) + } + return res +} + +func (s sqliteStore) ClientCount(filter *StatsFilter) uint { + count, _ := s.db.SelectInt( + "select count(distict (Ip, UserAgent)) from " + ClientDataUpdatesTn) + return uint(count) +} + +type bpsQueryResult struct { + BytesReceived uint + BytesSent uint + StartTime time.Time + LastStartTime time.Time + LastDuration time.Duration +} + +func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) { + res := bpsQueryResult{} + err := s.db.SelectOne(res, "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+DataUpdatesTn) + if err == nil { + bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds()) + return bps, nil + } + return 0, err } -func (s *sqliteStore) Locations(filter StatsFilter) map[string]int { +func (s sqliteStore) Locations(filter *StatsFilter) map[string]int { return nil // TODO } @@ -102,7 +275,20 @@ func NewStore() (store StatsContainer, err error) { if db == nil { return } - res := &sqliteStore{db} + res := sqliteStore{db} store = res return } + +func EatDataAndClose(sc StatsContainer) { + s := sc.(sqliteStore) + // if s == nil { + // return + // } + s.db.TruncateTables() + s.Close() +} + +func (s *sqliteStore) Close() { + s.db.Db.Close() +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 7708525..3ff437e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -5,9 +5,33 @@ import ( "time" ) -func TestStuff(t *testing.T) { - gd := initDb() - dat := dataUpdateDb{-1, DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}} - gd.Insert(dat) - // _ = gd.Db +func TestAppend(t *testing.T) { + store, err := NewStore() + if err != nil { + t.Errorf("Failed to initialize: %v", err) + } + defer EatDataAndClose(store) + + startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) + update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond} + streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} + source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId} + dat := StatisticsData{source, update} + + err = store.Append(dat) + if err != nil { + t.Errorf("Failed to append: %v", err) + } +} + +func IgnoreTestCount(t *testing.T) { + store, err := NewStore() + if err != nil { + t.Errorf("Failed to initialize: %v", err) + } + defer EatDataAndClose(store) + + if 0 != store.ClientCount(nil) { + t.Errorf("Failed to count correctly.") + } } diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go index 3b7dc31..64b3f48 100644 --- a/src/hub/src/spreadspace.org/sfive/s5types.go +++ b/src/hub/src/spreadspace.org/sfive/s5types.go @@ -16,8 +16,8 @@ type StreamId struct { type SourceId struct { Hostname string `json:"hostname"` - StreamId StreamId `json:"streamer-id"` - Tags []string `json:"tags"` + StreamId StreamId `json:"streamer-id" db:"-"` + Tags []string `json:"tags" db:"-"` } type ClientData struct { -- cgit v1.2.3