summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-18 21:58:37 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-18 21:59:22 +0200
commit26177d58b82bc11f080ee2dafca366080f084c6b (patch)
treeca7e40d6aa560b42e03cce8d90989322d6397619 /src/hub
parentupdate sample data (diff)
hub: add basic filter-by-time support
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go15
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go153
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go31
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5types.go10
4 files changed, 150 insertions, 59 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 24b458a..e908bc6 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -13,6 +13,10 @@ type StatsEncoder interface {
Encode(data StatisticsData) []byte
}
+type FilterDecoder interface {
+ Decode(jsonString []byte) (StatsFilter, error)
+}
+
type StatefulDecoder struct {
sourceId SourceId
}
@@ -21,6 +25,8 @@ type PlainDecoder struct{}
type PlainEncoder struct{}
+type filterDecoder struct{}
+
func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) {
res := new(StatefulDecoder)
err = json.Unmarshal(jsonString, &res.sourceId)
@@ -38,6 +44,10 @@ func NewPlainDecoder() (decoder StatsDecoder) {
return new(PlainDecoder)
}
+func NewFilterDecoder() (decoder FilterDecoder) {
+ return new(filterDecoder)
+}
+
func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) {
dat.CopyFrom(&self.sourceId)
err = json.Unmarshal(jsonString, &dat)
@@ -58,3 +68,8 @@ func (self *PlainEncoder) Encode(data *StatisticsData) []byte {
}
return res
}
+
+func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) {
+ err = json.Unmarshal(jsonString, &dat)
+ return
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 09ba684..6a6990e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -27,14 +27,6 @@ type tagDb struct {
Name string
}
-func tagsFromStatisticsData(value StatisticsData) []tagDb {
- tags := make([]tagDb, len(value.SourceId.Tags))
- for i := range value.SourceId.Tags {
- tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]}
- }
- return tags
-}
-
// stored in SourceTagsTn
// Stream m:n Tag
type sourceTagsDb struct {
@@ -49,6 +41,47 @@ type sourceDb struct {
SourceId
}
+// stored in ClientDataUpdatesTn
+// ClientData n:1 DataUpdate
+type clientDataDb struct {
+ Id int
+ DataUpdatesId int // foreign key to DataUpdatesTn
+ ClientData
+}
+
+// stored in DataUpdatesTn
+// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
+type dataUpdateDb struct {
+ Id int
+ SourceId int // foreign key to SourcesTn
+ StartTime time.Time
+ Duration time.Duration
+ ClientCount uint
+ BytesReceived uint
+ BytesSent uint
+}
+
+type StatsContainer interface {
+ Close()
+ Append(update StatisticsData) error
+ GetTags() ([]string, error)
+ CountUpdateEntries(filter *StatsFilter) (int64, error)
+ CountClients(filter *StatsFilter) uint
+ GetAverageBps(filter *StatsFilter) (uint, error)
+}
+
+type sqliteStore struct {
+ db *gorp.DbMap
+}
+
+func tagsFromStatisticsData(value StatisticsData) []tagDb {
+ tags := make([]tagDb, len(value.SourceId.Tags))
+ for i := range value.SourceId.Tags {
+ tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]}
+ }
+ return tags
+}
+
func sourceFromStatisticsData(value StatisticsData) sourceDb {
return sourceDb{
-1,
@@ -62,14 +95,6 @@ func sourceFromStatisticsData(value StatisticsData) sourceDb {
}
}
-// stored in ClientDataUpdatesTn
-// ClientData n:1 DataUpdate
-type clientDataDb struct {
- Id int
- DataUpdatesId int // foreign key to DataUpdatesTn
- ClientData
-}
-
func clientsFromStatisticsData(value StatisticsData) []clientDataDb {
res := make([]clientDataDb, len(value.Data.Clients))
for i := range value.Data.Clients {
@@ -78,18 +103,6 @@ func clientsFromStatisticsData(value StatisticsData) []clientDataDb {
return res
}
-// stored in DataUpdatesTn
-// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs
-type dataUpdateDb struct {
- Id int
- SourceId int // foreign key to SourcesTn
- StartTime time.Time
- Duration time.Duration
- ClientCount uint
- BytesReceived uint
- BytesSent uint
-}
-
func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb {
return dataUpdateDb{
-1,
@@ -136,28 +149,53 @@ func checkErr(err error, msg string) {
}
}
-type StatsFilter struct {
- start *time.Time
- end *time.Time
- hostname *string
- contentId *string
- format *string
- quality *string
- tagsAny []string
+func isEmptyFilter(filter *StatsFilter) bool {
+ if filter == nil {
+ return true
+ }
+ if filter.start == nil &&
+ filter.end == nil &&
+ filter.hostname == nil &&
+ filter.contentId == nil &&
+ filter.format == nil &&
+ filter.quality == nil &&
+ (filter.tagsAny == nil || len(filter.tagsAny) == 0) {
+ return true
+ }
+ return false
}
-type StatsContainer interface {
- Append(update StatisticsData) error
- CountUpdateEntries() (int64, error)
- GetTags() ([]string, error)
- ClientCount(filter *StatsFilter) uint
- AverageBps(filter *StatsFilter) (uint, error)
- Locations(filter *StatsFilter) map[string]int
- Close()
+func insertAnd(needsAnd *bool) (res string) {
+ if *needsAnd {
+ res = " and"
+ *needsAnd = false
+ }
+ return
}
-type sqliteStore struct {
- db *gorp.DbMap
+func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) {
+ if isEmptyFilter(filter) {
+ return DataUpdatesTn, nil
+ }
+
+ query := "(select * from " + DataUpdatesTn + " where"
+ parameters := make(map[string]interface{})
+ needsAnd := false
+
+ if filter.start != nil {
+ query += insertAnd(&needsAnd)
+ query += " Start >= :filterstart"
+ parameters["filterstart"] = filter.start
+ }
+ if filter.end != nil {
+ query += insertAnd(&needsAnd)
+ query += " Start < :filterend"
+ parameters["filterend"] = filter.end
+ }
+
+ // TODO other fields
+ query += ")"
+ return query, parameters
}
func (s sqliteStore) findTag(name string) (tag *tagDb, err error) {
@@ -295,8 +333,9 @@ func (s sqliteStore) Append(update StatisticsData) (err error) {
return tx.Commit()
}
-func (s sqliteStore) CountUpdateEntries() (count int64, err error) {
- count, err = s.db.SelectInt("select count(*) from " + DataUpdatesTn)
+func (s sqliteStore) CountUpdateEntries(filter *StatsFilter) (count int64, err error) {
+ sourceSql, parameters := getFilteredDataUpdateSelect(filter)
+ count, err = s.db.SelectInt("select count(*) from "+sourceSql, parameters)
return
}
@@ -317,9 +356,9 @@ func ToString(value []interface{}) []string {
return res
}
-func (s sqliteStore) ClientCount(filter *StatsFilter) uint {
- count, _ := s.db.SelectInt(
- "select count(distict (Ip, UserAgent)) from " + ClientDataUpdatesTn)
+func (s sqliteStore) CountClients(filter *StatsFilter) uint {
+ sourceSql, parameters := getFilteredDataUpdateSelect(filter)
+ count, _ := s.db.SelectInt("select sum(ClientCount) from "+sourceSql, parameters)
return uint(count)
}
@@ -331,9 +370,12 @@ type bpsQueryResult struct {
LastDuration time.Duration
}
-func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) {
+func (s sqliteStore) GetAverageBps(filter *StatsFilter) (uint, error) {
+ sourceSql, parameters := getFilteredDataUpdateSelect(filter)
res := bpsQueryResult{}
- err := s.db.SelectOne(res, "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+DataUpdatesTn)
+ err := s.db.SelectOne(
+ res,
+ "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+sourceSql, parameters)
if err == nil {
bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds())
return bps, nil
@@ -341,11 +383,6 @@ func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) {
return 0, err
}
-func (s sqliteStore) Locations(filter *StatsFilter) map[string]int {
- return nil
- // TODO
-}
-
func NewStore() (store StatsContainer, err error) {
db := initDb()
if db == nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 356b42f..c3609da 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -5,6 +5,18 @@ import (
"time"
)
+func TestGetFilter(t *testing.T) {
+ queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
+ filterStruct := StatsFilter{start: &queryStartTime}
+ fe, np := getFilteredDataUpdateSelect(&filterStruct)
+ if fe != "(select * from DataUpdates where Start >= :filterstart)" {
+ t.Errorf("get filter sql failed: fe: %v", fe)
+ }
+ if !queryStartTime.Equal(*np["filterstart"].(*time.Time)) {
+ t.Errorf("get filter sql failed: np: %v != %v", np["filterstart"], queryStartTime)
+ }
+}
+
func TestAppend(t *testing.T) {
store, err := NewStore()
if err != nil {
@@ -22,6 +34,23 @@ func TestAppend(t *testing.T) {
if err != nil {
t.Errorf("Failed to append: %v", err)
}
+
+ clientCnt := store.CountClients(nil)
+ if 3 != clientCnt {
+ t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCnt)
+ }
+
+ updateCnt, err := store.CountUpdateEntries(nil)
+ if 1 != updateCnt {
+ t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCnt)
+ }
+
+ queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
+ filterStruct := StatsFilter{start: &queryStartTime}
+ updateCnt, err = store.CountUpdateEntries(&filterStruct)
+ if 0 != updateCnt {
+ t.Errorf("Failed to filter entries by start time, 0 != %v", updateCnt)
+ }
}
func IgnoreTestCount(t *testing.T) {
@@ -31,7 +60,7 @@ func IgnoreTestCount(t *testing.T) {
}
defer EatDataAndClose(store)
- if 0 != store.ClientCount(nil) {
+ if 0 != store.CountClients(nil) {
t.Errorf("Failed to count correctly.")
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go
index 74c7f1c..fe98961 100644
--- a/src/hub/src/spreadspace.org/sfive/s5types.go
+++ b/src/hub/src/spreadspace.org/sfive/s5types.go
@@ -45,6 +45,16 @@ type StatisticsData struct {
DataUpdate
}
+type StatsFilter struct {
+ start *time.Time
+ end *time.Time
+ hostname *string
+ contentId *string
+ format *string
+ quality *string
+ tagsAny []string
+}
+
func (self *StatisticsData) CopyFrom(id *SourceId) {
self.Hostname = id.Hostname
self.StreamId = id.StreamId