From 2d8280a225d57d30f083f4d0334124c80411773c Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Thu, 27 Nov 2014 22:22:12 +0100 Subject: hub: fix storing tags, returning clients,tags --- src/hub/Makefile | 2 +- src/hub/src/spreadspace.org/sfive/s5store.go | 89 ++++++++++++++++------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 4 +- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 6 +- src/hub/src/spreadspace.org/sfive/s5typesStore.go | 2 +- 5 files changed, 68 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/hub/Makefile b/src/hub/Makefile index eab73bd..1cb9eee 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -41,7 +41,7 @@ getlibs: $(GOCMD) get "github.com/mattn/go-sqlite3" $(GOCMD) get "github.com/zenazn/goji" $(GOCMD) get "code.google.com/p/go-uuid/uuid" - $(GOCMD) get "github.com/go-sql-driver/mysql" +# $(GOCMD) get "github.com/go-sql-driver/mysql" # $(GOCMD) get "github.com/ziutek/mymysql/godrv" build: export GOPATH=$(curdir) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 17ddbd1..ef5be34 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -7,7 +7,6 @@ import ( "code.google.com/p/go-uuid/uuid" "github.com/coopernurse/gorp" - _ "github.com/go-sql-driver/mysql" _ "github.com/mattn/go-sqlite3" ) @@ -52,7 +51,7 @@ func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { value.SourceHubUuid, value.SourceHubDataUpdateId, value.StartTime.Unix(), - int64(value.Duration.Seconds()), + int64(value.Duration / 1000), value.Data.ClientCount, value.Data.BytesReceived, value.Data.BytesSent} @@ -261,8 +260,8 @@ func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error for i := range st { _, err = s.db.Exec( "insert or ignore into "+sourceTagsTn+" values (?,?)", - st[i].SourceId, - st[i].TagId) + st[i].TagId, + st[i].SourceId) // err = s.db.Insert(&st[i]) if err != nil { // TODO @@ -333,6 +332,7 @@ func (s sqliteStore) Append(update StatisticsData) (err error) { } du, cd, src, tags := updateFromStatisticsData(update) + s5l.Printf("blah: %v", du) err = s.appendItem(du, cd, src, tags) if err != nil { tx.Rollback() @@ -377,6 +377,19 @@ func (s sqliteStore) GetTags() ([]string, error) { return nil, dbErr } +func (s sqliteStore) GetTagsByDataUpdateId(id int) (res []string, err error) { + qres, err := s.db.Select( + tagDb{}, + "select * from "+tagsTn+" where Id in (select TagId from "+sourceTagsTn+" where SourceId = (select SourceId from "+dataUpdatesTn+" where Id = ?))", id) + if err == nil { + res = make([]string, len(qres)) + for i := range qres { + res[i] = qres[i].(*tagDb).Name + } + } + return +} + func (s sqliteStore) GetSources() (res []sourceDb, err error) { qres, err := s.db.Select(sourceDb{}, "select * from "+sourcesTn) if err == nil { @@ -398,6 +411,17 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { return } +func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) { + qres, err := s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id) + if err == nil { + res = make([]clientDataDb, len(qres)) + for i := range qres { + res[i] = *qres[i].(*clientDataDb) + } + } + return +} + var ( updateColumnSelect = ` Id, @@ -415,6 +439,36 @@ var ( ` ) +func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) { + clientsDb, err := s.GetClientsByUpdateId(dat.Id) + if err != nil { + s5l.Printf("store GetClients failed: %v", err) + return + } + tagsDb, err := s.GetTagsByDataUpdateId(dat.Id) + if err != nil { + s5l.Printf("store GetClients failed: %v", err) + return + } + res.CopyFromDataUpdateDb(dat.dataUpdateDb, s.hubId) + res.Hostname = dat.Hostname + res.StreamId.ContentId = dat.ContentId + res.StreamId.Format = dat.Format + res.StreamId.Quality = dat.Quality + res.CopyFromClientDataDb(clientsDb) + res.Tags = tagsDb + return +} + +func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) { + res = make([]StatisticsData, len(dat)) + for i := range dat { + t := *dat[i].(*dataUpdateQueryResult) + res[i], _ = s.CreateStatisticsDataFrom(t) + } + return +} + func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { limit := 5000 sourceSql, parameters := getFilteredDataUpdateSelect(&StatsFilter{afterUpdateId: &id, limit: &limit}) @@ -422,17 +476,7 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters) s5tl.Printf("sql: %s", sql) if err == nil { - res = make([]StatisticsData, len(updates)) - for i := range updates { - t := *updates[i].(*dataUpdateQueryResult) - res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId) - res[i].Hostname = t.Hostname - res[i].StreamId.ContentId = t.ContentId - res[i].StreamId.Format = t.Format - res[i].StreamId.Quality = t.Quality - // TODO clients - // TODO tags - } + res, _ = s.CreateStatisticsDatasFrom(updates) } return } @@ -441,20 +485,9 @@ func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err sourceSql, parameters := getFilteredDataUpdateSelect(filter) sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql s5tl.Printf("store: sql: %s", sql) - updates, err := s.db.Select( - dataUpdateQueryResult{}, sql, parameters) + updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters) if err == nil { - res = make([]StatisticsData, len(updates)) - for i := range updates { - t := *updates[i].(*dataUpdateQueryResult) - res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId) - res[i].Hostname = t.Hostname - res[i].StreamId.ContentId = t.ContentId - res[i].StreamId.Format = t.Format - res[i].StreamId.Quality = t.Quality - // TODO clients - // TODO tags - } + res, _ = s.CreateStatisticsDatasFrom(updates) } return } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 927d84a..d8e831d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -14,7 +14,7 @@ func TestAppend(t *testing.T) { defer store.Close() startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) - update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond} + update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} dat := StatisticsData{nil, nil, source, update} @@ -75,7 +75,7 @@ func TestGetUpdatesAfter(t *testing.T) { defer store.Close() startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC) - update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond} + update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} dat := StatisticsData{nil, nil, source, update} diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index e3a397e..5b2b29f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -35,9 +35,9 @@ type SourceData struct { } type DataUpdate struct { - StartTime time.Time `json:"start-time"` - Duration time.Duration `json:"duration-ms"` - Data SourceData `json:"data"` + StartTime time.Time `json:"start-time"` + Duration int64 `json:"duration-ms"` + Data SourceData `json:"data"` } type StatisticsData struct { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index f757f42..7ee5bce 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -93,7 +93,7 @@ func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId strin } self.StartTime = time.Unix(value.StartTime, 0) - self.Duration = time.Duration(value.Duration) * time.Second + self.Duration = value.Duration self.Data.ClientCount = value.ClientCount self.Data.BytesReceived = value.BytesReceived self.Data.BytesSent = value.BytesSent -- cgit v1.2.3