summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-24 14:04:07 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-24 14:04:07 +0200
commit90f180067badeabd989eab64cbec811af1d6383f (patch)
treec850e72cf50e42a5722b0f05cf82bc2b0909321b /src
parentviz: Show more stats, fix refresh issue, show connection-error. (diff)
hub: Fix DataUpdate queries in store.
- known issue: clients and tags still not part of the result
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go93
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go8
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go17
3 files changed, 92 insertions, 26 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 082a611..fa0ba3a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -12,7 +12,8 @@ import (
)
type sqliteStore struct {
- db *gorp.DbMap
+ db *gorp.DbMap
+ hubId string
}
func tagsFromStatisticsData(value StatisticsData) []tagDb {
@@ -66,7 +67,7 @@ func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []clientDataD
return du, cd, src, tags
}
-func initDb(path string) (res *gorp.DbMap, err error) {
+func initDb(path string) (res *gorp.DbMap, hubId string, err error) {
// connect to db using standard Go database/sql API
db, err := sql.Open("sqlite3", path)
if err != nil {
@@ -89,12 +90,13 @@ func initDb(path string) (res *gorp.DbMap, err error) {
return
}
- oldid, err := dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'")
+ hubId, err = dbmap.SelectStr("select Value from " + hubInfoTn + " where Name = 'HubUuid'")
// TODO handle only not-found this way
- if err != nil || oldid == "" {
- newid := uuid.New()
- _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", newid)
+ if err != nil || hubId == "" {
+ hubId = uuid.New()
+ _, err = db.Exec("insert into "+hubInfoTn+" values ('HubUuid', ?)", hubId)
if err != nil {
+ hubId = ""
return
}
}
@@ -127,6 +129,12 @@ func insertAnd(needsAnd *bool) (res string) {
return
}
+type dataUpdateQueryResult struct {
+ dataUpdateDb
+ StreamId
+ SourceId
+}
+
func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) {
const baseQuery = "(select * from " + dataUpdatesTn + "," + sourcesTn + " on " + dataUpdatesTn + ".SourceId = " + sourcesTn + ".Id"
if isEmptyFilter(filter) {
@@ -330,15 +338,39 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
return
}
-func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) {
+var (
+ updateColumnSelect = `
+ Id,
+ SourceHubUuid,
+ SourceHubDataUpdateId,
+ StartTime,
+ Duration,
+ ClientCount,
+ BytesReceived,
+ BytesSent,
+ Hostname,
+ ContentId,
+ Format,
+ Quality
+`
+)
+
+func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
updates, err := s.db.Select(
- dataUpdateDb{},
- "select * from "+dataUpdatesTn+" where Id > ?",
+ dataUpdateQueryResult{},
+ "SELECT "+updateColumnSelect+" FROM "+dataUpdatesTn+","+sourcesTn+" ON "+dataUpdatesTn+".SourceId = "+sourcesTn+".Id WHERE "+dataUpdatesTn+".Id > ?",
id)
if err == nil {
- res = make([]dataUpdateDb, len(updates))
+ res = make([]StatisticsData, len(updates))
for i := range updates {
- res[i] = *updates[i].(*dataUpdateDb)
+ t := *updates[i].(*dataUpdateQueryResult)
+ res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
+ res[i].Hostname = t.Hostname
+ res[i].StreamId.ContentId = t.ContentId
+ res[i].StreamId.Format = t.Format
+ res[i].StreamId.Quality = t.Quality
+ // TODO clients
+ // TODO tags
}
}
return
@@ -347,27 +379,37 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) {
func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) {
sourceSql, parameters := getFilteredDataUpdateSelect(filter)
updates, err := s.db.Select(
- StatisticsData{},
- "select * from "+sourceSql,
+ dataUpdateQueryResult{},
+ "SELECT "+updateColumnSelect+" FROM "+sourceSql,
parameters)
if err == nil {
res = make([]StatisticsData, len(updates))
for i := range updates {
- res[i] = *updates[i].(*StatisticsData)
+ t := *updates[i].(*dataUpdateQueryResult)
+ res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
+ res[i].Hostname = t.Hostname
+ res[i].StreamId.ContentId = t.ContentId
+ res[i].StreamId.Format = t.Format
+ res[i].StreamId.Quality = t.Quality
+ // TODO clients
+ // TODO tags
}
-
- // TODO clients
- // TODO tags
}
return
}
-func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId int, err error) {
- res, err := s.db.SelectInt(
- "select max(SourceHubDataUpdateId) from "+dataUpdatesTn+" where SourceHubUuid = ?",
+type lastUpdateQueryResult struct {
+ MaxDataUpdateId *int
+}
+
+func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error) {
+ result := lastUpdateQueryResult{}
+ err = s.db.SelectOne(
+ &result,
+ "select max(SourceHubDataUpdateId) as MaxDataUpdateId from "+dataUpdatesTn+" where SourceHubUuid = ?",
uuid)
if err == nil {
- updateId = int(res)
+ updateId = result.MaxDataUpdateId
} else {
s5l.Printf("db: failed to find max SourceHubDataUpdateId for %s: %v", uuid, err)
}
@@ -468,12 +510,17 @@ func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (ma
return StatsResult{}, err
}
+func (s sqliteStore) GetStoreId() (uuid string, err error) {
+ uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid")
+ return
+}
+
func NewStore(path string) (store sqliteStore, err error) {
- db, err := initDb(path)
+ db, hubid, err := initDb(path)
if err != nil {
return
}
- store = sqliteStore{db}
+ store = sqliteStore{db, hubid}
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 871a903..ac89b0c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -47,6 +47,14 @@ type StatisticsData struct {
DataUpdate
}
+type DataContainer struct {
+ Data interface{} `json:"data"`
+}
+
+type StatisticsDataContainer struct {
+ Data []StatisticsData `json:"data"`
+}
+
type StatsFilter struct {
start *time.Time
end *time.Time
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index cb73d2e..f757f42 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -80,7 +80,18 @@ func (self *SourceId) CopyFromTagsDb(values []tagDb) {
self.Tags = tags
}
-func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb) {
+func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId string) {
+ if value.SourceHubUuid == nil {
+ self.SourceHubUuid = &hubId
+ } else {
+ self.SourceHubUuid = value.SourceHubUuid
+ }
+ if value.SourceHubDataUpdateId == nil {
+ self.SourceHubDataUpdateId = &value.Id
+ } else {
+ self.SourceHubDataUpdateId = value.SourceHubDataUpdateId
+ }
+
self.StartTime = time.Unix(value.StartTime, 0)
self.Duration = time.Duration(value.Duration) * time.Second
self.Data.ClientCount = value.ClientCount
@@ -99,10 +110,10 @@ func (self *StatisticsData) CopyFromClientDataDb(values []clientDataDb) {
}
func cvtToApiStatisticsData(
- source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData {
+ hubId string, source sourceDb, update dataUpdateDb, clients []clientDataDb, tags []tagDb) StatisticsData {
res := StatisticsData{}
res.CopyFromSourceDb(source)
- res.CopyFromDataUpdateDb(update)
+ res.CopyFromDataUpdateDb(update, hubId)
res.CopyFromClientDataDb(clients)
res.CopyFromTagsDb(tags)
return res