From 90f180067badeabd989eab64cbec811af1d6383f Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Fri, 24 Oct 2014 14:04:07 +0200 Subject: hub: Fix DataUpdate queries in store. - known issue: clients and tags still not part of the result --- src/hub/src/spreadspace.org/sfive/s5store.go | 93 +++++++++++++++++------ src/hub/src/spreadspace.org/sfive/s5typesApi.go | 8 ++ src/hub/src/spreadspace.org/sfive/s5typesStore.go | 17 ++++- 3 files changed, 92 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 082a611..fa0ba3a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -12,7 +12,8 @@ import ( ) type sqliteStore struct { - db *gorp.DbMap + db *gorp.DbMap + hubId string } func tagsFromStatisticsData(value StatisticsData) []tagDb { @@ -66,7 +67,7 @@ func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataD return du, cd, src, tags } -func initDb(path string) (res *gorp.DbMap, err error) { +func initDb(path string) (res *gorp.DbMap, hubId string, err error) { // connect to db using standard Go database/sql API db, err := sql.Open("sqlite3", path) if err != nil { @@ -89,12 +90,13 @@ func initDb(path string) (res *gorp.DbMap, err error) { return } - oldid, err := dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'") + hubId, err = dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'") // TODO handle only not-found this way - if err != nil || oldid == "" { - newid := uuid.New() - _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", newid) + if err != nil || hubId == "" { + hubId = uuid.New() + _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", hubId) if err != nil { + hubId = "" return } } @@ -127,6 +129,12 @@ func insertAnd(needsAnd *bool) (res string) { return } +type dataUpdateQueryResult struct { + dataUpdateDb + StreamId + SourceId +} + func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) { const baseQuery = "(select * from " + dataUpdatesTn + "," + sourcesTn + " on " + dataUpdatesTn + ".SourceId = " + sourcesTn + ".Id" if isEmptyFilter(filter) { @@ -330,15 +338,39 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { return } -func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) { +var ( + updateColumnSelect = ` + Id, + SourceHubUuid, + SourceHubDataUpdateId, + StartTime, + Duration, + ClientCount, + BytesReceived, + BytesSent, + Hostname, + ContentId, + Format, + Quality +` +) + +func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { updates, err := s.db.Select( - dataUpdateDb{}, - "select * from "+dataUpdatesTn+" where Id > ?", + dataUpdateQueryResult{}, + "SELECT "+updateColumnSelect+" FROM "+dataUpdatesTn+","+sourcesTn+" ON "+dataUpdatesTn+".SourceId = "+sourcesTn+".Id WHERE "+dataUpdatesTn+".Id > ?", id) if err == nil { - res = make([]dataUpdateDb, len(updates)) + res = make([]StatisticsData, len(updates)) for i := range updates { - res[i] = *updates[i].(*dataUpdateDb) + t := *updates[i].(*dataUpdateQueryResult) + res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId) + res[i].Hostname = t.Hostname + res[i].StreamId.ContentId = t.ContentId + res[i].StreamId.Format = t.Format + res[i].StreamId.Quality = t.Quality + // TODO clients + // TODO tags } } return @@ -347,27 +379,37 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) { func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) updates, err := s.db.Select( - StatisticsData{}, - "select * from "+sourceSql, + dataUpdateQueryResult{}, + "SELECT "+updateColumnSelect+" FROM "+sourceSql, parameters) if err == nil { res = make([]StatisticsData, len(updates)) for i := range updates { - res[i] = *updates[i].(*StatisticsData) + t := *updates[i].(*dataUpdateQueryResult) + res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId) + res[i].Hostname = t.Hostname + res[i].StreamId.ContentId = t.ContentId + res[i].StreamId.Format = t.Format + res[i].StreamId.Quality = t.Quality + // TODO clients + // TODO tags } - - // TODO clients - // TODO tags } return } -func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId int, err error) { - res, err := s.db.SelectInt( - "select max(SourceHubDataUpdateId) from "+dataUpdatesTn+" where SourceHubUuid = ?", +type lastUpdateQueryResult struct { + MaxDataUpdateId *int +} + +func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) { + result := lastUpdateQueryResult{} + err = s.db.SelectOne( + &result, + "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?", uuid) if err == nil { - updateId = int(res) + updateId = result.MaxDataUpdateId } else { s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err) } @@ -468,12 +510,17 @@ func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (ma return StatsResult{}, err } +func (s sqliteStore) GetStoreId() (uuid string, err error) { + uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid") + return +} + func NewStore(path string) (store sqliteStore, err error) { - db, err := initDb(path) + db, hubid, err := initDb(path) if err != nil { return } - store = sqliteStore{db} + store = sqliteStore{db, hubid} return } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 871a903..ac89b0c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -47,6 +47,14 @@ type StatisticsData struct { DataUpdate } +type DataContainer struct { + Data interface{} `json:"data"` +} + +type StatisticsDataContainer struct { + Data []StatisticsData `json:"data"` +} + type StatsFilter struct { start *time.Time end *time.Time diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index cb73d2e..f757f42 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -80,7 +80,18 @@ func (self *SourceId) CopyFromTagsDb(values []tagDb) { self.Tags = tags } -func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb) { +func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId string) { + if value.SourceHubUuid == nil { + self.SourceHubUuid = &hubId + } else { + self.SourceHubUuid = value.SourceHubUuid + } + if value.SourceHubDataUpdateId == nil { + self.SourceHubDataUpdateId = &value.Id + } else { + self.SourceHubDataUpdateId = value.SourceHubDataUpdateId + } + self.StartTime = time.Unix(value.StartTime, 0) self.Duration = time.Duration(value.Duration) * time.Second self.Data.ClientCount = value.ClientCount @@ -99,10 +110,10 @@ func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) { } func cvtToApiStatisticsData( - source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData { + hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData { res := StatisticsData{} res.CopyFromSourceDb(source) - res.CopyFromDataUpdateDb(update) + res.CopyFromDataUpdateDb(update, hubId) res.CopyFromClientDataDb(clients) res.CopyFromTagsDb(tags) return res -- cgit v1.2.3