summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-24 12:38:39 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-24 12:38:39 +0200
commit48cb92410d0012ac72a5d126b73fc827c41f45e2 (patch)
treea2dbb2cafd72a7ef92c21d5f3c8714db973bdf2e /src/hub
parentupdated changelog for release (diff)
hub: Fix stats query, return accurate client count.
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5log.go8
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go60
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go36
-rwxr-xr-xsrc/hub/test-client14
4 files changed, 77 insertions, 41 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5log.go b/src/hub/src/spreadspace.org/sfive/s5log.go
index 679718b..9b80f6f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5log.go
+++ b/src/hub/src/spreadspace.org/sfive/s5log.go
@@ -1,8 +1,14 @@
package sfive
import (
+ "io/ioutil"
"log"
"os"
)
-var s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags)
+var (
+ s5l = log.New(os.Stderr, "[s5]\t", log.LstdFlags)
+ // use ioutil.Discard to switch that thing off
+ // s5tl = log.New(os.Stderr, "[s5dbg]\t", log.LstdFlags)
+ s5tl = log.New(ioutil.Discard, "[s5dbg]\t", log.LstdFlags)
+)
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 24e9ae6..082a611 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -2,8 +2,8 @@ package sfive
import (
"database/sql"
+ "fmt"
"time"
- //"time"
_ "github.com/mattn/go-sqlite3"
@@ -374,21 +374,10 @@ func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId int, err error)
return
}
-func getCountEntriesSql() string {
- return "count(*) as UpdateCount"
-}
-
-// TODO this is totally utterly wrong
-func getClientCountSql() string {
- return "avg(ClientCount) as ClientCount"
-}
-
-func getBpsStatsSql() string {
- return "sum(BytesSent) as BytesSent, sum(BytesReceived) as BytesReceived, min(StartTime) as StartTime, max(StartTime) as LastStartTime"
-}
-
type statsResult struct {
UpdateCount *int
+ HubCount *int
+ SourcesCount *int
ClientCount *float32
BytesSent *uint
BytesReceived *uint
@@ -398,6 +387,8 @@ type statsResult struct {
type StatsResult struct {
UpdateCount int
+ HubCount int
+ SourcesCount int
ClientCount float32
BytesSent uint
BytesReceived uint
@@ -409,6 +400,12 @@ func toApiStatsResult(value statsResult) (res StatsResult) {
if value.UpdateCount != nil {
res.UpdateCount = *value.UpdateCount
}
+ if value.HubCount != nil {
+ res.HubCount = *value.HubCount
+ }
+ if value.SourcesCount != nil {
+ res.SourcesCount = *value.SourcesCount
+ }
if value.ClientCount != nil {
res.ClientCount = *value.ClientCount
}
@@ -427,11 +424,42 @@ func toApiStatsResult(value statsResult) (res StatsResult) {
return res
}
+var (
+ statsGroupSelect = `
+SELECT
+ count(*) as UpdateCount,
+ SourceHubUuid as SourceHubUuid,
+ count(distinct SourceId) as SourcesCount,
+ avg(ClientCount) as ClientCount,
+ sum(BytesSent) as BytesSent,
+ sum(BytesReceived) as BytesReceived,
+ min(StartTime) as StartTime,
+ max(StartTime) as LastStartTime
+FROM
+`
+ statsGroupClause = `
+GROUP BY
+ SourceHubUuid
+`
+ statsAggregateSelect = `
+SELECT
+ sum(UpdateCount) as UpdateCount,
+ count(distinct SourceHubUuid) as HubCount,
+ sum(SourcesCount) as SourcesCount,
+ sum(ClientCount) as ClientCount,
+ sum(BytesSent) as BytesSent,
+ sum(BytesReceived) as BytesReceived,
+ min(StartTime) as StartTime,
+ max(LastStartTime) as LastStartTime
+FROM
+`
+)
+
func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) {
sourceSql, parameters := getFilteredDataUpdateSelect(filter)
_ = sourceSql
- sql := "select " + getCountEntriesSql() + "," + getClientCountSql() + "," + getBpsStatsSql() + " from " + sourceSql
- // s5l.Printf("stats sql: %v", sql)
+ sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause)
+ s5tl.Printf("store: stats sql: %s", sql)
res := statsResult{}
err := s.db.SelectOne(&res, sql, parameters)
if err == nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index a34633e..ed49175 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -5,22 +5,11 @@ import (
"time"
)
-func ignoreTestGetFilter(t *testing.T) {
- queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
- filterStruct := StatsFilter{start: &queryStartTime}
- fe, np := getFilteredDataUpdateSelect(&filterStruct)
- if fe != "(select * from DataUpdates where StartTime >= :filterstart)" {
- t.Errorf("get filter sql failed: fe: %v", fe)
- }
- if queryStartTime.Unix() != np["filterstart"].(int64) {
- t.Errorf("get filter sql failed: np: %v != %v", np["filterstart"], queryStartTime)
- }
-}
-
func TestAppend(t *testing.T) {
store, err := NewStore("file:memdb1?mode=memory&cache=shared")
if err != nil {
t.Errorf("Failed to initialize: %v", err)
+ return
}
defer store.Close()
@@ -33,16 +22,21 @@ func TestAppend(t *testing.T) {
err = store.Append(dat)
if err != nil {
t.Errorf("Failed to append: %v", err)
+ return
}
stats, err := store.GetStats(nil)
- clientCount := int(stats.ClientCount)
- updateCount := stats.UpdateCount
- if 3 != clientCount {
- t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount)
- }
- if 1 != updateCount {
- t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount)
+ if err != nil {
+ t.Errorf("Failed to get stats: %v", err)
+ } else {
+ clientCount := int(stats.ClientCount)
+ updateCount := stats.UpdateCount
+ if 3 != clientCount {
+ t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount)
+ }
+ if 1 != updateCount {
+ t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount)
+ }
}
queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
@@ -51,14 +45,14 @@ func TestAppend(t *testing.T) {
if err != nil {
t.Errorf("Failed to get stats: %v", err)
} else {
- updateCount = stats.UpdateCount
+ updateCount := stats.UpdateCount
if 0 != updateCount {
t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount)
}
}
}
-func IgnoreTestCount(t *testing.T) {
+func TestCount(t *testing.T) {
store, err := NewStore("file:memdb1?mode=memory&cache=shared")
if err != nil {
t.Errorf("Failed to initialize: %v", err)
diff --git a/src/hub/test-client b/src/hub/test-client
index 859b2da..f6c2c18 100755
--- a/src/hub/test-client
+++ b/src/hub/test-client
@@ -1,15 +1,23 @@
#!/bin/sh
echo pipe: import sample.json
+echo ------------------------
socat file:../../dat/sample.json,rdonly unix-client:/run/sfive/pipe
echo pipe-gram: import sample-gram.json
+echo ----------------------------------
while read x; do echo "$x" | socat stdio unix-sendto:/run/sfive/pipegram; done < ../../dat/sample-gram.json
echo show query result
-wget -q -S -O - 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z'
+echo -----------------
+curl -i 'http://localhost:8000/updates?from=2013-10-24T05:00:00Z&to=2013-10-24T00:05:20Z'
echo '\npost update'
-wget -q -S -O - --post-file='../../dat/sample-post.json' 'http://localhost:8000/updates'
+echo ------------
+curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates'
-echo '\ndone'
+echo show stats
+echo ----------
+curl -i 'http://localhost:8000/stats'
+
+echo '\n\ndone'