From 48cb92410d0012ac72a5d126b73fc827c41f45e2 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Fri, 24 Oct 2014 12:38:39 +0200 Subject: hub: Fix stats query, return accurate client count. --- src/hub/src/spreadspace.org/sfive/s5log.go | 8 ++- src/hub/src/spreadspace.org/sfive/s5store.go | 60 +++++++++++++++++------ src/hub/src/spreadspace.org/sfive/s5store_test.go | 36 ++++++-------- src/hub/test-client | 14 ++++-- 4 files changed, 77 insertions(+), 41 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go index 679718b..9b80f6f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5log.go +++ b/src/hub/src/spreadspace.org/sfive/s5log.go @@ -1,8 +1,14 @@ package sfive import ( + "io/ioutil" "log" "os" ) -var s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags) +var ( + s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags) + // use ioutil.Discard to switch that thing off + // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags) + s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags) +) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 24e9ae6..082a611 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -2,8 +2,8 @@ package sfive import ( "database/sql" + "fmt" "time" - //"time" _ "github.com/mattn/go-sqlite3" @@ -374,21 +374,10 @@ func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId int, err error) return } -func getCountEntriesSql() string { - return "count(*) as UpdateCount" -} - -// TODO this is totally utterly wrong -func getClientCountSql() string { - return "avg(ClientCount) as ClientCount" -} - -func getBpsStatsSql() string { - return "sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime" -} - type statsResult struct { UpdateCount *int + HubCount *int + SourcesCount *int ClientCount *float32 BytesSent *uint BytesReceived *uint @@ -398,6 +387,8 @@ type statsResult struct { type StatsResult struct { UpdateCount int + HubCount int + SourcesCount int ClientCount float32 BytesSent uint BytesReceived uint @@ -409,6 +400,12 @@ func toApiStatsResult(value statsResult) (res StatsResult) { if value.UpdateCount != nil { res.UpdateCount = *value.UpdateCount } + if value.HubCount != nil { + res.HubCount = *value.HubCount + } + if value.SourcesCount != nil { + res.SourcesCount = *value.SourcesCount + } if value.ClientCount != nil { res.ClientCount = *value.ClientCount } @@ -427,11 +424,42 @@ func toApiStatsResult(value statsResult) (res StatsResult) { return res } +var ( + statsGroupSelect = ` +SELECT + count(*) as UpdateCount, + SourceHubUuid as SourceHubUuid, + count(distinct SourceId) as SourcesCount, + avg(ClientCount) as ClientCount, + sum(BytesSent) as BytesSent, + sum(BytesReceived) as BytesReceived, + min(StartTime) as StartTime, + max(StartTime) as LastStartTime +FROM +` + statsGroupClause = ` +GROUP BY + SourceHubUuid +` + statsAggregateSelect = ` +SELECT + sum(UpdateCount) as UpdateCount, + count(distinct SourceHubUuid) as HubCount, + sum(SourcesCount) as SourcesCount, + sum(ClientCount) as ClientCount, + sum(BytesSent) as BytesSent, + sum(BytesReceived) as BytesReceived, + min(StartTime) as StartTime, + max(LastStartTime) as LastStartTime +FROM +` +) + func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) _ = sourceSql - sql := "select " + getCountEntriesSql() + "," + getClientCountSql() + "," + getBpsStatsSql() + " from " + sourceSql - // s5l.Printf("stats sql: %v", sql) + sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause) + s5tl.Printf("store: stats sql: %s", sql) res := statsResult{} err := s.db.SelectOne(&res, sql, parameters) if err == nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index a34633e..ed49175 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -5,22 +5,11 @@ import ( "time" ) -func ignoreTestGetFilter(t *testing.T) { - queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) - filterStruct := StatsFilter{start: &queryStartTime} - fe, np := getFilteredDataUpdateSelect(&filterStruct) - if fe != "(select * from DataUpdates where StartTime >= :filterstart)" { - t.Errorf("get filter sql failed: fe: %v", fe) - } - if queryStartTime.Unix() != np["filterstart"].(int64) { - t.Errorf("get filter sql failed: np: %v != %v", np["filterstart"], queryStartTime) - } -} - func TestAppend(t *testing.T) { store, err := NewStore("file:memdb1?mode=memory&cache=shared") if err != nil { t.Errorf("Failed to initialize: %v", err) + return } defer store.Close() @@ -33,16 +22,21 @@ func TestAppend(t *testing.T) { err = store.Append(dat) if err != nil { t.Errorf("Failed to append: %v", err) + return } stats, err := store.GetStats(nil) - clientCount := int(stats.ClientCount) - updateCount := stats.UpdateCount - if 3 != clientCount { - t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount) - } - if 1 != updateCount { - t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount) + if err != nil { + t.Errorf("Failed to get stats: %v", err) + } else { + clientCount := int(stats.ClientCount) + updateCount := stats.UpdateCount + if 3 != clientCount { + t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount) + } + if 1 != updateCount { + t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount) + } } queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) @@ -51,14 +45,14 @@ func TestAppend(t *testing.T) { if err != nil { t.Errorf("Failed to get stats: %v", err) } else { - updateCount = stats.UpdateCount + updateCount := stats.UpdateCount if 0 != updateCount { t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount) } } } -func IgnoreTestCount(t *testing.T) { +func TestCount(t *testing.T) { store, err := NewStore("file:memdb1?mode=memory&cache=shared") if err != nil { t.Errorf("Failed to initialize: %v", err) diff --git a/src/hub/test-client b/src/hub/test-client index 859b2da..f6c2c18 100755 --- a/src/hub/test-client +++ b/src/hub/test-client @@ -1,15 +1,23 @@ #!/bin/sh echo pipe: import sample.json +echo ------------------------ socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe echo pipe-gram: import sample-gram.json +echo ---------------------------------- while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json echo show query result -wget -q -S -O - 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z' +echo ----------------- +curl -i 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z' echo '\npost update' -wget -q -S -O - --post-file='../../dat/sample-post.json' 'http://localhost:8000/updates' +echo ------------ +curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates' -echo '\ndone' +echo show stats +echo ---------- +curl -i 'http://localhost:8000/stats' + +echo '\n\ndone' -- cgit v1.2.3