summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5store.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5store.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go89
1 files changed, 61 insertions, 28 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 17ddbd1..ef5be34 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -7,7 +7,6 @@ import (
"code.google.com/p/go-uuid/uuid"
"github.com/coopernurse/gorp"
- _ "github.com/go-sql-driver/mysql"
_ "github.com/mattn/go-sqlite3"
)
@@ -52,7 +51,7 @@ func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
value.SourceHubUuid,
value.SourceHubDataUpdateId,
value.StartTime.Unix(),
- int64(value.Duration.Seconds()),
+ int64(value.Duration / 1000),
value.Data.ClientCount,
value.Data.BytesReceived,
value.Data.BytesSent}
@@ -261,8 +260,8 @@ func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error
for i := range st {
_, err = s.db.Exec(
"insert or ignore into "+sourceTagsTn+" values (?,?)",
- st[i].SourceId,
- st[i].TagId)
+ st[i].TagId,
+ st[i].SourceId)
// err = s.db.Insert(&st[i])
if err != nil {
// TODO
@@ -333,6 +332,7 @@ func (s sqliteStore) Append(update StatisticsData) (err error) {
}
du, cd, src, tags := updateFromStatisticsData(update)
+ s5l.Printf("blah: %v", du)
err = s.appendItem(du, cd, src, tags)
if err != nil {
tx.Rollback()
@@ -377,6 +377,19 @@ func (s sqliteStore) GetTags() ([]string, error) {
return nil, dbErr
}
+func (s sqliteStore) GetTagsByDataUpdateId(id int) (res []string, err error) {
+ qres, err := s.db.Select(
+ tagDb{},
+ "select * from "+tagsTn+" where Id in (select TagId from "+sourceTagsTn+" where SourceId = (select SourceId from "+dataUpdatesTn+" where Id = ?))", id)
+ if err == nil {
+ res = make([]string, len(qres))
+ for i := range qres {
+ res[i] = qres[i].(*tagDb).Name
+ }
+ }
+ return
+}
+
func (s sqliteStore) GetSources() (res []sourceDb, err error) {
qres, err := s.db.Select(sourceDb{}, "select * from "+sourcesTn)
if err == nil {
@@ -398,6 +411,17 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
return
}
+func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) {
+ qres, err := s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id)
+ if err == nil {
+ res = make([]clientDataDb, len(qres))
+ for i := range qres {
+ res[i] = *qres[i].(*clientDataDb)
+ }
+ }
+ return
+}
+
var (
updateColumnSelect = `
Id,
@@ -415,6 +439,36 @@ var (
`
)
+func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) {
+ clientsDb, err := s.GetClientsByUpdateId(dat.Id)
+ if err != nil {
+ s5l.Printf("store GetClients failed: %v", err)
+ return
+ }
+ tagsDb, err := s.GetTagsByDataUpdateId(dat.Id)
+ if err != nil {
+ s5l.Printf("store GetClients failed: %v", err)
+ return
+ }
+ res.CopyFromDataUpdateDb(dat.dataUpdateDb, s.hubId)
+ res.Hostname = dat.Hostname
+ res.StreamId.ContentId = dat.ContentId
+ res.StreamId.Format = dat.Format
+ res.StreamId.Quality = dat.Quality
+ res.CopyFromClientDataDb(clientsDb)
+ res.Tags = tagsDb
+ return
+}
+
+func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) {
+ res = make([]StatisticsData, len(dat))
+ for i := range dat {
+ t := *dat[i].(*dataUpdateQueryResult)
+ res[i], _ = s.CreateStatisticsDataFrom(t)
+ }
+ return
+}
+
func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
limit := 5000
sourceSql, parameters := getFilteredDataUpdateSelect(&StatsFilter{afterUpdateId: &id, limit: &limit})
@@ -422,17 +476,7 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters)
s5tl.Printf("sql: %s", sql)
if err == nil {
- res = make([]StatisticsData, len(updates))
- for i := range updates {
- t := *updates[i].(*dataUpdateQueryResult)
- res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
- res[i].Hostname = t.Hostname
- res[i].StreamId.ContentId = t.ContentId
- res[i].StreamId.Format = t.Format
- res[i].StreamId.Quality = t.Quality
- // TODO clients
- // TODO tags
- }
+ res, _ = s.CreateStatisticsDatasFrom(updates)
}
return
}
@@ -441,20 +485,9 @@ func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err
sourceSql, parameters := getFilteredDataUpdateSelect(filter)
sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql
s5tl.Printf("store: sql: %s", sql)
- updates, err := s.db.Select(
- dataUpdateQueryResult{}, sql, parameters)
+ updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters)
if err == nil {
- res = make([]StatisticsData, len(updates))
- for i := range updates {
- t := *updates[i].(*dataUpdateQueryResult)
- res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
- res[i].Hostname = t.Hostname
- res[i].StreamId.ContentId = t.ContentId
- res[i].StreamId.Format = t.Format
- res[i].StreamId.Quality = t.Quality
- // TODO clients
- // TODO tags
- }
+ res, _ = s.CreateStatisticsDatasFrom(updates)
}
return
}