summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-11-27 22:22:12 +0100
committerMarkus Grüneis <gimpf@gimpf.org>2014-11-27 22:25:31 +0100
commit2d8280a225d57d30f083f4d0334124c80411773c (patch)
treebca35f7a92cf7303f1a3515db01c04bb2cfac0b9
parentcreating index with keyword tokenizer (diff)
hub: fix storing tags, returning clients,tags
-rw-r--r--src/hub/Makefile2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go89
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go2
5 files changed, 68 insertions, 35 deletions
diff --git a/src/hub/Makefile b/src/hub/Makefile
index eab73bd..1cb9eee 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -41,7 +41,7 @@ getlibs:
$(GOCMD) get "github.com/mattn/go-sqlite3"
$(GOCMD) get "github.com/zenazn/goji"
$(GOCMD) get "code.google.com/p/go-uuid/uuid"
- $(GOCMD) get "github.com/go-sql-driver/mysql"
+# $(GOCMD) get "github.com/go-sql-driver/mysql"
# $(GOCMD) get "github.com/ziutek/mymysql/godrv"
build: export GOPATH=$(curdir)
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 17ddbd1..ef5be34 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -7,7 +7,6 @@ import (
"code.google.com/p/go-uuid/uuid"
"github.com/coopernurse/gorp"
- _ "github.com/go-sql-driver/mysql"
_ "github.com/mattn/go-sqlite3"
)
@@ -52,7 +51,7 @@ func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
value.SourceHubUuid,
value.SourceHubDataUpdateId,
value.StartTime.Unix(),
- int64(value.Duration.Seconds()),
+ int64(value.Duration / 1000),
value.Data.ClientCount,
value.Data.BytesReceived,
value.Data.BytesSent}
@@ -261,8 +260,8 @@ func (s sqliteStore) insertSourceTagLinks(src sourceDb, tags []tagDb) (err error
for i := range st {
_, err = s.db.Exec(
"insert or ignore into "+sourceTagsTn+" values (?,?)",
- st[i].SourceId,
- st[i].TagId)
+ st[i].TagId,
+ st[i].SourceId)
// err = s.db.Insert(&st[i])
if err != nil {
// TODO
@@ -333,6 +332,7 @@ func (s sqliteStore) Append(update StatisticsData) (err error) {
}
du, cd, src, tags := updateFromStatisticsData(update)
+ s5l.Printf("blah: %v", du)
err = s.appendItem(du, cd, src, tags)
if err != nil {
tx.Rollback()
@@ -377,6 +377,19 @@ func (s sqliteStore) GetTags() ([]string, error) {
return nil, dbErr
}
+func (s sqliteStore) GetTagsByDataUpdateId(id int) (res []string, err error) {
+ qres, err := s.db.Select(
+ tagDb{},
+ "select * from "+tagsTn+" where Id in (select TagId from "+sourceTagsTn+" where SourceId = (select SourceId from "+dataUpdatesTn+" where Id = ?))", id)
+ if err == nil {
+ res = make([]string, len(qres))
+ for i := range qres {
+ res[i] = qres[i].(*tagDb).Name
+ }
+ }
+ return
+}
+
func (s sqliteStore) GetSources() (res []sourceDb, err error) {
qres, err := s.db.Select(sourceDb{}, "select * from "+sourcesTn)
if err == nil {
@@ -398,6 +411,17 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
return
}
+func (s sqliteStore) GetClientsByUpdateId(id int) (res []clientDataDb, err error) {
+ qres, err := s.db.Select(clientDataDb{}, "select * from "+clientdataUpdatesTn+" where DataUpdatesId = ?", id)
+ if err == nil {
+ res = make([]clientDataDb, len(qres))
+ for i := range qres {
+ res[i] = *qres[i].(*clientDataDb)
+ }
+ }
+ return
+}
+
var (
updateColumnSelect = `
Id,
@@ -415,6 +439,36 @@ var (
`
)
+func (s sqliteStore) CreateStatisticsDataFrom(dat dataUpdateQueryResult) (res StatisticsData, err error) {
+ clientsDb, err := s.GetClientsByUpdateId(dat.Id)
+ if err != nil {
+ s5l.Printf("store GetClients failed: %v", err)
+ return
+ }
+ tagsDb, err := s.GetTagsByDataUpdateId(dat.Id)
+ if err != nil {
+ s5l.Printf("store GetClients failed: %v", err)
+ return
+ }
+ res.CopyFromDataUpdateDb(dat.dataUpdateDb, s.hubId)
+ res.Hostname = dat.Hostname
+ res.StreamId.ContentId = dat.ContentId
+ res.StreamId.Format = dat.Format
+ res.StreamId.Quality = dat.Quality
+ res.CopyFromClientDataDb(clientsDb)
+ res.Tags = tagsDb
+ return
+}
+
+func (s sqliteStore) CreateStatisticsDatasFrom(dat []interface{}) (res []StatisticsData, err error) {
+ res = make([]StatisticsData, len(dat))
+ for i := range dat {
+ t := *dat[i].(*dataUpdateQueryResult)
+ res[i], _ = s.CreateStatisticsDataFrom(t)
+ }
+ return
+}
+
func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
limit := 5000
sourceSql, parameters := getFilteredDataUpdateSelect(&StatsFilter{afterUpdateId: &id, limit: &limit})
@@ -422,17 +476,7 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters)
s5tl.Printf("sql: %s", sql)
if err == nil {
- res = make([]StatisticsData, len(updates))
- for i := range updates {
- t := *updates[i].(*dataUpdateQueryResult)
- res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
- res[i].Hostname = t.Hostname
- res[i].StreamId.ContentId = t.ContentId
- res[i].StreamId.Format = t.Format
- res[i].StreamId.Quality = t.Quality
- // TODO clients
- // TODO tags
- }
+ res, _ = s.CreateStatisticsDatasFrom(updates)
}
return
}
@@ -441,20 +485,9 @@ func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err
sourceSql, parameters := getFilteredDataUpdateSelect(filter)
sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql
s5tl.Printf("store: sql: %s", sql)
- updates, err := s.db.Select(
- dataUpdateQueryResult{}, sql, parameters)
+ updates, err := s.db.Select(dataUpdateQueryResult{}, sql, parameters)
if err == nil {
- res = make([]StatisticsData, len(updates))
- for i := range updates {
- t := *updates[i].(*dataUpdateQueryResult)
- res[i].CopyFromDataUpdateDb(t.dataUpdateDb, s.hubId)
- res[i].Hostname = t.Hostname
- res[i].StreamId.ContentId = t.ContentId
- res[i].StreamId.Format = t.Format
- res[i].StreamId.Quality = t.Quality
- // TODO clients
- // TODO tags
- }
+ res, _ = s.CreateStatisticsDatasFrom(updates)
}
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 927d84a..d8e831d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -14,7 +14,7 @@ func TestAppend(t *testing.T) {
defer store.Close()
startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
- update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond}
+ update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
dat := StatisticsData{nil, nil, source, update}
@@ -75,7 +75,7 @@ func TestGetUpdatesAfter(t *testing.T) {
defer store.Close()
startTime := time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC)
- update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5 * time.Millisecond}
+ update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5}
streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh}
source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1}
dat := StatisticsData{nil, nil, source, update}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index e3a397e..5b2b29f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -35,9 +35,9 @@ type SourceData struct {
}
type DataUpdate struct {
- StartTime time.Time `json:"start-time"`
- Duration time.Duration `json:"duration-ms"`
- Data SourceData `json:"data"`
+ StartTime time.Time `json:"start-time"`
+ Duration int64 `json:"duration-ms"`
+ Data SourceData `json:"data"`
}
type StatisticsData struct {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index f757f42..7ee5bce 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -93,7 +93,7 @@ func (self *StatisticsData) CopyFromDataUpdateDb(value dataUpdateDb, hubId strin
}
self.StartTime = time.Unix(value.StartTime, 0)
- self.Duration = time.Duration(value.Duration) * time.Second
+ self.Duration = value.Duration
self.Data.ClientCount = value.ClientCount
self.Data.BytesReceived = value.BytesReceived
self.Data.BytesSent = value.BytesSent