summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-04-27 01:12:24 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-04-27 01:12:24 +0200
commit1461c1f8588809c5dd7c52ca92c05ea7eb6e526e (patch)
treee58833395199af3c029d6b0e596de68bda161ab2
parentalso move source info into bolt (diff)
remove most of no-longer-needed stats stuff
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go15
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go31
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go85
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go126
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go41
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go13
-rwxr-xr-xsrc/hub/test-client13
7 files changed, 12 insertions, 312 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index cd65cf7..a8ea4f8 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -13,10 +13,6 @@ type StatsEncoder interface {
Encode(data StatisticsData) []byte
}
-type FilterDecoder interface {
- Decode(jsonString []byte) (StatsFilter, error)
-}
-
type StatefulDecoder struct {
sourceId SourceId
}
@@ -25,8 +21,6 @@ type PlainDecoder struct{}
type PlainEncoder struct{}
-type filterDecoder struct{}
-
func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) {
res := new(StatefulDecoder)
err = json.Unmarshal(jsonString, &res.sourceId)
@@ -44,10 +38,6 @@ func NewPlainDecoder() StatsDecoder {
return new(PlainDecoder)
}
-func NewFilterDecoder() FilterDecoder {
- return new(filterDecoder)
-}
-
func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) {
dat.CopyFromSourceId(&self.sourceId)
err = json.Unmarshal(jsonString, &dat)
@@ -68,8 +58,3 @@ func (self *PlainEncoder) Encode(data *StatisticsData) []byte {
}
return res
}
-
-func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) {
- err = json.Unmarshal(jsonString, &dat)
- return
-}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 3dbd6e7..a69adfe 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -10,16 +10,6 @@ type appendManyToken struct {
response chan bool
}
-type queryStatsResult struct {
- stats StatsResult
- err error
-}
-
-type queryStatsToken struct {
- filter *StatsFilter
- response chan queryStatsResult
-}
-
type getUpdatesResult struct {
values []StatisticsData
err error
@@ -31,7 +21,6 @@ type getUpdatesAfterToken struct {
}
type getUpdatesToken struct {
- filter *StatsFilter
response chan getUpdatesResult
}
@@ -59,7 +48,6 @@ type StatsSinkServer struct {
done chan bool
appendData chan StatisticsData
appendManyData chan appendManyToken // chan []StatisticsData
- getStatsChan chan queryStatsToken
getUpdatesAfterChan chan getUpdatesAfterToken
getUpdatesChan chan getUpdatesToken
getHubIdChan chan getHubIdToken
@@ -93,14 +81,11 @@ func (self StatsSinkServer) appendActor() {
} else {
token.response <- true
}
- case token := <-self.getStatsChan:
- stats, err := self.store.GetStats(token.filter)
- token.response <- queryStatsResult{stats, err}
case token := <-self.getUpdatesAfterChan:
values, err := self.store.GetUpdatesAfter(token.id)
token.response <- getUpdatesResult{values, err}
case token := <-self.getUpdatesChan:
- values, err := self.store.GetUpdates(token.filter)
+ values, err := self.store.GetUpdates()
token.response <- getUpdatesResult{values, err}
case token := <-self.getHubIdChan:
storeId, err := self.store.GetStoreId()
@@ -124,22 +109,14 @@ func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, err
return res.values, res.err
}
-func (self StatsSinkServer) getUpdatesInvoke(filter *StatsFilter) ([]StatisticsData, error) {
- token := getUpdatesToken{filter: filter, response: make(chan getUpdatesResult, 1)}
+func (self StatsSinkServer) getUpdatesInvoke() ([]StatisticsData, error) {
+ token := getUpdatesToken{response: make(chan getUpdatesResult, 1)}
defer close(token.response)
self.getUpdatesChan <- token
res := <-token.response
return res.values, res.err
}
-func (self StatsSinkServer) getStatsInvoke(filter *StatsFilter) (StatsResult, error) {
- token := queryStatsToken{filter: filter, response: make(chan queryStatsResult, 1)}
- defer close(token.response)
- self.getStatsChan <- token
- res := <-token.response
- return res.stats, res.err
-}
-
func (self StatsSinkServer) getHubIdInvoke() (string, error) {
token := getHubIdToken{response: make(chan getHubIdResult, 1)}
defer close(token.response)
@@ -163,7 +140,6 @@ func (self StatsSinkServer) Close() {
close(self.done)
close(self.appendData)
close(self.appendManyData)
- close(self.getStatsChan)
close(self.getUpdatesAfterChan)
close(self.getUpdatesChan)
close(self.getHubIdChan)
@@ -186,7 +162,6 @@ func NewServer(dbPath string) (server *StatsSinkServer, err error) {
server.done = make(chan bool)
server.appendData = make(chan StatisticsData, 5)
server.appendManyData = make(chan appendManyToken, 5)
- server.getStatsChan = make(chan queryStatsToken, 5)
server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1)
server.getUpdatesChan = make(chan getUpdatesToken, 3)
server.getHubIdChan = make(chan getHubIdToken, 1)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index aa98532..54daa65 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -7,7 +7,6 @@ import (
"net/http"
"os"
"strconv"
- "time"
"github.com/zenazn/goji"
"github.com/zenazn/goji/web"
@@ -57,73 +56,9 @@ func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Re
fmt.Fprintf(w, "%s", jsonString)
}
-func getFilter(r *http.Request) (filter StatsFilter) {
- from := r.FormValue("from")
- if from != "" {
- fromT, err := time.Parse(time.RFC3339, from)
- if err == nil {
- filter.start = &fromT
- }
- }
-
- to := r.FormValue("to")
- if to != "" {
- toT, err := time.Parse(time.RFC3339, to)
- if err == nil {
- filter.end = &toT
- }
- }
-
- hostname := r.FormValue("hostname")
- if hostname != "" {
- filter.hostname = &hostname
- }
-
- contentId := r.FormValue("contentId")
- if contentId != "" {
- filter.contentId = &contentId
- }
-
- format := r.FormValue("format")
- if format != "" {
- filter.format = &format
- }
-
- quality := r.FormValue("quality")
- if quality != "" {
- filter.quality = &quality
- }
-
- afterUpdateId := r.FormValue("afterUpdateId")
- if afterUpdateId != "" {
- id, err := strconv.ParseInt(afterUpdateId, 10, 32)
- if err == nil {
- idInt := int(id)
- filter.afterUpdateId = &idInt
- }
- }
-
- limit := r.FormValue("limit")
- if limit != "" {
- limitInt, err := strconv.ParseInt(limit, 10, 32)
- if err == nil {
- limitIntInt := int(limitInt)
- filter.limit = &limitIntInt
- }
- }
-
- sortOrder := r.FormValue("sortOrder")
- if sortOrder != "" {
- filter.sortOrder = &sortOrder
- }
-
- return
-}
-
func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) {
const resourceName = "updates"
- filter := getFilter(r)
- values, err := self.getUpdatesInvoke(&filter)
+ values, err := self.getUpdatesInvoke()
if err != nil {
http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
return
@@ -207,23 +142,6 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite
}
}
-func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) {
- const resourceName = "stats"
- filter := getFilter(r)
- values, err := self.getStatsInvoke(&filter)
-
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- jsonString, err := json.Marshal(values)
- if err != nil {
- http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError)
- return
- }
- fmt.Fprintf(w, "%s", jsonString)
-}
-
func (self StatsSinkServer) ServeWeb(vizAppLocation string) {
if _, err := os.Stat(vizAppLocation); err != nil {
if os.IsNotExist(err) {
@@ -240,7 +158,6 @@ func (self StatsSinkServer) ServeWeb(vizAppLocation string) {
goji.Get("/updates/:id", self.getUpdate)
goji.Post("/updates", self.postUpdate)
goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid)
- goji.Get("/stats", self.getStats)
goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation))))
goji.Serve()
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 07a73e9..c4e1676 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -3,7 +3,6 @@ package sfive
import (
"database/sql"
"encoding/json"
- "fmt"
"time"
// needed for gorp tracing
@@ -114,14 +113,6 @@ func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, hubId string, err error
return
}
-func insertAnd(needsAnd *bool) (res string) {
- if *needsAnd {
- res = " and"
- *needsAnd = false
- }
- return
-}
-
func (s sqliteStore) insertDataUpdateEntry(srcId int, du *dataUpdateDb) (err error) {
du.SourceId = srcId
err = s.db.Insert(du)
@@ -364,7 +355,7 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
parameters := make(map[string]interface{})
sql := "SELECT * FROM " + dataUpdatesTn + " WHERE Id > :afterUpdateId limit :limit"
parameters["afterUpdateId"] = id
- parameters["limit"] = 5000
+ parameters["limit"] = 5000 // TODO: hardcoded value
var updates []interface{}
updates, err = s.db.Select(dataUpdateDb{}, sql, parameters)
s5tl.Printf("sql: %s", sql)
@@ -374,24 +365,8 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) {
return
}
-func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) {
- err = fmt.Errorf("not implemented!")
- return
- // limit := 5000
- // if filter.limit == nil {
- // filter.limit = &limit
- // } else if *filter.limit > limit {
- // *filter.limit = limit
- // }
- // sourceSql, parameters := getFilteredDataUpdateSelect(filter)
- // sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql
- // s5tl.Printf("store: sql: %s", sql)
- // var updates []interface{}
- // updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters)
- // if err == nil {
- // res, _ = s.CreateStatisticsDatasFrom(updates)
- // }
- // return
+func (s sqliteStore) GetUpdates() (res []StatisticsData, err error) {
+ return s.GetUpdatesAfter(-1)
}
type lastUpdateQueryResult struct {
@@ -423,101 +398,6 @@ func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) {
return
}
-type statsResult struct {
- UpdateCount *int
- HubCount *int
- SourcesCount *int
- ClientCount *float32
- BytesSent *uint
- BytesReceived *uint
- StartTime *int64
- LastStartTime *int64
-}
-
-type StatsResult struct {
- UpdateCount int
- HubCount int
- SourcesCount int
- ClientCount float32
- BytesSent uint
- BytesReceived uint
- StartTime time.Time
- LastStartTime time.Time
-}
-
-func toApiStatsResult(value statsResult) (res StatsResult) {
- if value.UpdateCount != nil {
- res.UpdateCount = *value.UpdateCount
- }
- if value.HubCount != nil {
- res.HubCount = *value.HubCount
- }
- if value.SourcesCount != nil {
- res.SourcesCount = *value.SourcesCount
- }
- if value.ClientCount != nil {
- res.ClientCount = *value.ClientCount
- }
- if value.BytesSent != nil {
- res.BytesSent = *value.BytesSent
- }
- if value.BytesReceived != nil {
- res.BytesReceived = *value.BytesReceived
- }
- if value.StartTime != nil {
- res.StartTime = time.Unix(*value.StartTime, 0)
- }
- if value.LastStartTime != nil {
- res.LastStartTime = time.Unix(*value.LastStartTime, 0)
- }
- return res
-}
-
-// var (
-// statsGroupSelect = `
-// SELECT
-// count(*) as UpdateCount,
-// SourceHubUuid as SourceHubUuid,
-// count(distinct SourceId) as SourcesCount,
-// avg(ClientCount) as ClientCount,
-// sum(BytesSent) as BytesSent,
-// sum(BytesReceived) as BytesReceived,
-// min(StartTime) as StartTime,
-// max(StartTime) as LastStartTime
-// FROM
-// `
-// statsGroupClause = `
-// GROUP BY
-// SourceHubUuid
-// `
-// statsAggregateSelect = `
-// SELECT
-// sum(UpdateCount) as UpdateCount,
-// count(distinct SourceHubUuid) as HubCount,
-// sum(SourcesCount) as SourcesCount,
-// sum(ClientCount) as ClientCount,
-// sum(BytesSent) as BytesSent,
-// sum(BytesReceived) as BytesReceived,
-// min(StartTime) as StartTime,
-// max(LastStartTime) as LastStartTime
-// FROM
-// `
-// )
-
-func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) {
- return StatsResult{}, fmt.Errorf("not implemented!")
- // sourceSql, parameters := getFilteredDataUpdateSelect(filter)
- // _ = sourceSql
- // sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause)
- // s5tl.Printf("store: stats sql: %s", sql)
- // res := statsResult{}
- // err := s.db.SelectOne(&res, sql, parameters)
- // if err == nil {
- // return toApiStatsResult(res), nil
- // }
- // return StatsResult{}, err
-}
-
func (s sqliteStore) GetStoreId() (uuid string, err error) {
uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid")
return
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 27732a5..fe43bf5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -42,47 +42,6 @@ func TestAppend(t *testing.T) {
t.Errorf("Failed to append: %v", err)
return
}
-
- // stats, err := store.GetStats(nil)
- // if err != nil {
- // t.Errorf("Failed to get stats: %v", err)
- // } else {
- // clientCount := int(stats.ClientCount)
- // updateCount := stats.UpdateCount
- // if 3 != clientCount {
- // t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount)
- // }
- // if 1 != updateCount {
- // t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount)
- // }
- // }
-
- // queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC)
- // filterStruct := StatsFilter{start: &queryStartTime}
- // stats, err = store.GetStats(&filterStruct)
- // if err != nil {
- // t.Errorf("Failed to get stats: %v", err)
- // } else {
- // updateCount := stats.UpdateCount
- // if 0 != updateCount {
- // t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount)
- // }
- // }
-}
-
-func TestCount(t *testing.T) {
- os.Remove(__boltPath)
- store, err := NewStore(__sqlitePath, __boltPath)
- if err != nil {
- t.Errorf("Failed to initialize: %v", err)
- }
- defer store.Close()
-
- stats, err := store.GetStats(nil)
- clientCount := int(stats.ClientCount)
- if 0 != clientCount {
- t.Errorf("Failed to count correctly.")
- }
}
func TestGetUpdatesAfter(t *testing.T) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 515b869..525b6d3 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -55,19 +55,6 @@ type StatisticsDataContainer struct {
Data []StatisticsData `json:"data"`
}
-type StatsFilter struct {
- start *time.Time
- end *time.Time
- hostname *string
- contentId *string
- format *string
- quality *string
- tagsAny []string
- afterUpdateId *int
- limit *int
- sortOrder *string
-}
-
func (self *StatisticsData) CopyFromSourceId(id *SourceId) {
self.Hostname = id.Hostname
self.StreamId = id.StreamId
diff --git a/src/hub/test-client b/src/hub/test-client
index d5756e7..fcb9a98 100755
--- a/src/hub/test-client
+++ b/src/hub/test-client
@@ -10,16 +10,13 @@ echo pipe-gram: import sample-gram.json
echo ----------------------------------
while read x; do echo "$x" | socat stdio "unix-sendto:$TEST_D/pipegram"; done < ../../dat/sample-gram.json
+echo post update
+echo -----------
+curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates'
+
echo show query result
echo -----------------
-curl -i 'http://localhost:8000/updates?from=2013-10-21T00:00:00Z&to=2013-10-21T12:31:00Z'
-
-echo '\npost update'
-echo ------------
-curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates'
+curl -i 'http://localhost:8000/updates'
-echo show stats
-echo ----------
-curl -i 'http://localhost:8000/stats'
echo '\n\ndone'