From 26177d58b82bc11f080ee2dafca366080f084c6b Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 18 Oct 2014 21:58:37 +0200 Subject: hub: add basic filter-by-time support --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 15 +++ src/hub/src/spreadspace.org/sfive/s5store.go | 153 ++++++++++++++-------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 31 ++++- src/hub/src/spreadspace.org/sfive/s5types.go | 10 ++ 4 files changed, 150 insertions(+), 59 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 24b458a..e908bc6 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -13,6 +13,10 @@ type StatsEncoder interface { Encode(data StatisticsData) []byte } +type FilterDecoder interface { + Decode(jsonString []byte) (StatsFilter, error) +} + type StatefulDecoder struct { sourceId SourceId } @@ -21,6 +25,8 @@ type PlainDecoder struct{} type PlainEncoder struct{} +type filterDecoder struct{} + func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) { res := new(StatefulDecoder) err = json.Unmarshal(jsonString, &res.sourceId) @@ -38,6 +44,10 @@ func NewPlainDecoder() (decoder StatsDecoder) { return new(PlainDecoder) } +func NewFilterDecoder() (decoder FilterDecoder) { + return new(filterDecoder) +} + func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { dat.CopyFrom(&self.sourceId) err = json.Unmarshal(jsonString, &dat) @@ -58,3 +68,8 @@ func (self *PlainEncoder) Encode(data *StatisticsData) []byte { } return res } + +func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) { + err = json.Unmarshal(jsonString, &dat) + return +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 09ba684..6a6990e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -27,14 +27,6 @@ type tagDb struct { Name string } -func tagsFromStatisticsData(value StatisticsData) []tagDb { - tags := make([]tagDb, len(value.SourceId.Tags)) - for i := range value.SourceId.Tags { - tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} - } - return tags -} - // stored in SourceTagsTn // Stream m:n Tag type sourceTagsDb struct { @@ -49,6 +41,47 @@ type sourceDb struct { SourceId } +// stored in ClientDataUpdatesTn +// ClientData n:1 DataUpdate +type clientDataDb struct { + Id int + DataUpdatesId int // foreign key to DataUpdatesTn + ClientData +} + +// stored in DataUpdatesTn +// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs +type dataUpdateDb struct { + Id int + SourceId int // foreign key to SourcesTn + StartTime time.Time + Duration time.Duration + ClientCount uint + BytesReceived uint + BytesSent uint +} + +type StatsContainer interface { + Close() + Append(update StatisticsData) error + GetTags() ([]string, error) + CountUpdateEntries(filter *StatsFilter) (int64, error) + CountClients(filter *StatsFilter) uint + GetAverageBps(filter *StatsFilter) (uint, error) +} + +type sqliteStore struct { + db *gorp.DbMap +} + +func tagsFromStatisticsData(value StatisticsData) []tagDb { + tags := make([]tagDb, len(value.SourceId.Tags)) + for i := range value.SourceId.Tags { + tags[i] = tagDb{Id: -1, Name: value.SourceId.Tags[i]} + } + return tags +} + func sourceFromStatisticsData(value StatisticsData) sourceDb { return sourceDb{ -1, @@ -62,14 +95,6 @@ func sourceFromStatisticsData(value StatisticsData) sourceDb { } } -// stored in ClientDataUpdatesTn -// ClientData n:1 DataUpdate -type clientDataDb struct { - Id int - DataUpdatesId int // foreign key to DataUpdatesTn - ClientData -} - func clientsFromStatisticsData(value StatisticsData) []clientDataDb { res := make([]clientDataDb, len(value.Data.Clients)) for i := range value.Data.Clients { @@ -78,18 +103,6 @@ func clientsFromStatisticsData(value StatisticsData) []clientDataDb { return res } -// stored in DataUpdatesTn -// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs -type dataUpdateDb struct { - Id int - SourceId int // foreign key to SourcesTn - StartTime time.Time - Duration time.Duration - ClientCount uint - BytesReceived uint - BytesSent uint -} - func dataUpdateFromStatisticsData(value StatisticsData) dataUpdateDb { return dataUpdateDb{ -1, @@ -136,28 +149,53 @@ func checkErr(err error, msg string) { } } -type StatsFilter struct { - start *time.Time - end *time.Time - hostname *string - contentId *string - format *string - quality *string - tagsAny []string +func isEmptyFilter(filter *StatsFilter) bool { + if filter == nil { + return true + } + if filter.start == nil && + filter.end == nil && + filter.hostname == nil && + filter.contentId == nil && + filter.format == nil && + filter.quality == nil && + (filter.tagsAny == nil || len(filter.tagsAny) == 0) { + return true + } + return false } -type StatsContainer interface { - Append(update StatisticsData) error - CountUpdateEntries() (int64, error) - GetTags() ([]string, error) - ClientCount(filter *StatsFilter) uint - AverageBps(filter *StatsFilter) (uint, error) - Locations(filter *StatsFilter) map[string]int - Close() +func insertAnd(needsAnd *bool) (res string) { + if *needsAnd { + res = " and" + *needsAnd = false + } + return } -type sqliteStore struct { - db *gorp.DbMap +func getFilteredDataUpdateSelect(filter *StatsFilter) (string, map[string]interface{}) { + if isEmptyFilter(filter) { + return DataUpdatesTn, nil + } + + query := "(select * from " + DataUpdatesTn + " where" + parameters := make(map[string]interface{}) + needsAnd := false + + if filter.start != nil { + query += insertAnd(&needsAnd) + query += " Start >= :filterstart" + parameters["filterstart"] = filter.start + } + if filter.end != nil { + query += insertAnd(&needsAnd) + query += " Start < :filterend" + parameters["filterend"] = filter.end + } + + // TODO other fields + query += ")" + return query, parameters } func (s sqliteStore) findTag(name string) (tag *tagDb, err error) { @@ -295,8 +333,9 @@ func (s sqliteStore) Append(update StatisticsData) (err error) { return tx.Commit() } -func (s sqliteStore) CountUpdateEntries() (count int64, err error) { - count, err = s.db.SelectInt("select count(*) from " + DataUpdatesTn) +func (s sqliteStore) CountUpdateEntries(filter *StatsFilter) (count int64, err error) { + sourceSql, parameters := getFilteredDataUpdateSelect(filter) + count, err = s.db.SelectInt("select count(*) from "+sourceSql, parameters) return } @@ -317,9 +356,9 @@ func ToString(value []interface{}) []string { return res } -func (s sqliteStore) ClientCount(filter *StatsFilter) uint { - count, _ := s.db.SelectInt( - "select count(distict (Ip, UserAgent)) from " + ClientDataUpdatesTn) +func (s sqliteStore) CountClients(filter *StatsFilter) uint { + sourceSql, parameters := getFilteredDataUpdateSelect(filter) + count, _ := s.db.SelectInt("select sum(ClientCount) from "+sourceSql, parameters) return uint(count) } @@ -331,9 +370,12 @@ type bpsQueryResult struct { LastDuration time.Duration } -func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) { +func (s sqliteStore) GetAverageBps(filter *StatsFilter) (uint, error) { + sourceSql, parameters := getFilteredDataUpdateSelect(filter) res := bpsQueryResult{} - err := s.db.SelectOne(res, "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+DataUpdatesTn) + err := s.db.SelectOne( + res, + "select (sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime) from "+sourceSql, parameters) if err == nil { bps := (res.BytesSent + res.BytesReceived) / uint(res.StartTime.Sub(res.LastStartTime).Seconds()) return bps, nil @@ -341,11 +383,6 @@ func (s sqliteStore) AverageBps(filter *StatsFilter) (uint, error) { return 0, err } -func (s sqliteStore) Locations(filter *StatsFilter) map[string]int { - return nil - // TODO -} - func NewStore() (store StatsContainer, err error) { db := initDb() if db == nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 356b42f..c3609da 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -5,6 +5,18 @@ import ( "time" ) +func TestGetFilter(t *testing.T) { + queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) + filterStruct := StatsFilter{start: &queryStartTime} + fe, np := getFilteredDataUpdateSelect(&filterStruct) + if fe != "(select * from DataUpdates where Start >= :filterstart)" { + t.Errorf("get filter sql failed: fe: %v", fe) + } + if !queryStartTime.Equal(*np["filterstart"].(*time.Time)) { + t.Errorf("get filter sql failed: np: %v != %v", np["filterstart"], queryStartTime) + } +} + func TestAppend(t *testing.T) { store, err := NewStore() if err != nil { @@ -22,6 +34,23 @@ func TestAppend(t *testing.T) { if err != nil { t.Errorf("Failed to append: %v", err) } + + clientCnt := store.CountClients(nil) + if 3 != clientCnt { + t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCnt) + } + + updateCnt, err := store.CountUpdateEntries(nil) + if 1 != updateCnt { + t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCnt) + } + + queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) + filterStruct := StatsFilter{start: &queryStartTime} + updateCnt, err = store.CountUpdateEntries(&filterStruct) + if 0 != updateCnt { + t.Errorf("Failed to filter entries by start time, 0 != %v", updateCnt) + } } func IgnoreTestCount(t *testing.T) { @@ -31,7 +60,7 @@ func IgnoreTestCount(t *testing.T) { } defer EatDataAndClose(store) - if 0 != store.ClientCount(nil) { + if 0 != store.CountClients(nil) { t.Errorf("Failed to count correctly.") } } diff --git a/src/hub/src/spreadspace.org/sfive/s5types.go b/src/hub/src/spreadspace.org/sfive/s5types.go index 74c7f1c..fe98961 100644 --- a/src/hub/src/spreadspace.org/sfive/s5types.go +++ b/src/hub/src/spreadspace.org/sfive/s5types.go @@ -45,6 +45,16 @@ type StatisticsData struct { DataUpdate } +type StatsFilter struct { + start *time.Time + end *time.Time + hostname *string + contentId *string + format *string + quality *string + tagsAny []string +} + func (self *StatisticsData) CopyFrom(id *SourceId) { self.Hostname = id.Hostname self.StreamId = id.StreamId -- cgit v1.2.3