From 1461c1f8588809c5dd7c52ca92c05ea7eb6e526e Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Thu, 27 Apr 2017 01:12:24 +0200 Subject: remove most of no-longer-needed stats stuff --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 15 --- src/hub/src/spreadspace.org/sfive/s5srv.go | 31 +----- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 85 +-------------- src/hub/src/spreadspace.org/sfive/s5store.go | 126 +--------------------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 41 ------- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 13 --- src/hub/test-client | 13 +-- 7 files changed, 12 insertions(+), 312 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index cd65cf7..a8ea4f8 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -13,10 +13,6 @@ type StatsEncoder interface { Encode(data StatisticsData) []byte } -type FilterDecoder interface { - Decode(jsonString []byte) (StatsFilter, error) -} - type StatefulDecoder struct { sourceId SourceId } @@ -25,8 +21,6 @@ type PlainDecoder struct{} type PlainEncoder struct{} -type filterDecoder struct{} - func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) { res := new(StatefulDecoder) err = json.Unmarshal(jsonString, &res.sourceId) @@ -44,10 +38,6 @@ func NewPlainDecoder() StatsDecoder { return new(PlainDecoder) } -func NewFilterDecoder() FilterDecoder { - return new(filterDecoder) -} - func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { dat.CopyFromSourceId(&self.sourceId) err = json.Unmarshal(jsonString, &dat) @@ -68,8 +58,3 @@ func (self *PlainEncoder) Encode(data *StatisticsData) []byte { } return res } - -func (self *filterDecoder) Decode(jsonString []byte) (dat StatsFilter, err error) { - err = json.Unmarshal(jsonString, &dat) - return -} diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 3dbd6e7..a69adfe 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -10,16 +10,6 @@ type appendManyToken struct { response chan bool } -type queryStatsResult struct { - stats StatsResult - err error -} - -type queryStatsToken struct { - filter *StatsFilter - response chan queryStatsResult -} - type getUpdatesResult struct { values []StatisticsData err error @@ -31,7 +21,6 @@ type getUpdatesAfterToken struct { } type getUpdatesToken struct { - filter *StatsFilter response chan getUpdatesResult } @@ -59,7 +48,6 @@ type StatsSinkServer struct { done chan bool appendData chan StatisticsData appendManyData chan appendManyToken // chan []StatisticsData - getStatsChan chan queryStatsToken getUpdatesAfterChan chan getUpdatesAfterToken getUpdatesChan chan getUpdatesToken getHubIdChan chan getHubIdToken @@ -93,14 +81,11 @@ func (self StatsSinkServer) appendActor() { } else { token.response <- true } - case token := <-self.getStatsChan: - stats, err := self.store.GetStats(token.filter) - token.response <- queryStatsResult{stats, err} case token := <-self.getUpdatesAfterChan: values, err := self.store.GetUpdatesAfter(token.id) token.response <- getUpdatesResult{values, err} case token := <-self.getUpdatesChan: - values, err := self.store.GetUpdates(token.filter) + values, err := self.store.GetUpdates() token.response <- getUpdatesResult{values, err} case token := <-self.getHubIdChan: storeId, err := self.store.GetStoreId() @@ -124,22 +109,14 @@ func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, err return res.values, res.err } -func (self StatsSinkServer) getUpdatesInvoke(filter *StatsFilter) ([]StatisticsData, error) { - token := getUpdatesToken{filter: filter, response: make(chan getUpdatesResult, 1)} +func (self StatsSinkServer) getUpdatesInvoke() ([]StatisticsData, error) { + token := getUpdatesToken{response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesChan <- token res := <-token.response return res.values, res.err } -func (self StatsSinkServer) getStatsInvoke(filter *StatsFilter) (StatsResult, error) { - token := queryStatsToken{filter: filter, response: make(chan queryStatsResult, 1)} - defer close(token.response) - self.getStatsChan <- token - res := <-token.response - return res.stats, res.err -} - func (self StatsSinkServer) getHubIdInvoke() (string, error) { token := getHubIdToken{response: make(chan getHubIdResult, 1)} defer close(token.response) @@ -163,7 +140,6 @@ func (self StatsSinkServer) Close() { close(self.done) close(self.appendData) close(self.appendManyData) - close(self.getStatsChan) close(self.getUpdatesAfterChan) close(self.getUpdatesChan) close(self.getHubIdChan) @@ -186,7 +162,6 @@ func NewServer(dbPath string) (server *StatsSinkServer, err error) { server.done = make(chan bool) server.appendData = make(chan StatisticsData, 5) server.appendManyData = make(chan appendManyToken, 5) - server.getStatsChan = make(chan queryStatsToken, 5) server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) server.getUpdatesChan = make(chan getUpdatesToken, 3) server.getHubIdChan = make(chan getHubIdToken, 1) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index aa98532..54daa65 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -7,7 +7,6 @@ import ( "net/http" "os" "strconv" - "time" "github.com/zenazn/goji" "github.com/zenazn/goji/web" @@ -57,73 +56,9 @@ func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Re fmt.Fprintf(w, "%s", jsonString) } -func getFilter(r *http.Request) (filter StatsFilter) { - from := r.FormValue("from") - if from != "" { - fromT, err := time.Parse(time.RFC3339, from) - if err == nil { - filter.start = &fromT - } - } - - to := r.FormValue("to") - if to != "" { - toT, err := time.Parse(time.RFC3339, to) - if err == nil { - filter.end = &toT - } - } - - hostname := r.FormValue("hostname") - if hostname != "" { - filter.hostname = &hostname - } - - contentId := r.FormValue("contentId") - if contentId != "" { - filter.contentId = &contentId - } - - format := r.FormValue("format") - if format != "" { - filter.format = &format - } - - quality := r.FormValue("quality") - if quality != "" { - filter.quality = &quality - } - - afterUpdateId := r.FormValue("afterUpdateId") - if afterUpdateId != "" { - id, err := strconv.ParseInt(afterUpdateId, 10, 32) - if err == nil { - idInt := int(id) - filter.afterUpdateId = &idInt - } - } - - limit := r.FormValue("limit") - if limit != "" { - limitInt, err := strconv.ParseInt(limit, 10, 32) - if err == nil { - limitIntInt := int(limitInt) - filter.limit = &limitIntInt - } - } - - sortOrder := r.FormValue("sortOrder") - if sortOrder != "" { - filter.sortOrder = &sortOrder - } - - return -} - func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "updates" - filter := getFilter(r) - values, err := self.getUpdatesInvoke(&filter) + values, err := self.getUpdatesInvoke() if err != nil { http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) return @@ -207,23 +142,6 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite } } -func (self StatsSinkServer) getStats(c web.C, w http.ResponseWriter, r *http.Request) { - const resourceName = "stats" - filter := getFilter(r) - values, err := self.getStatsInvoke(&filter) - - if err != nil { - http.Error(w, fmt.Sprintf("failed to retrieve %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - jsonString, err := json.Marshal(values) - if err != nil { - http.Error(w, fmt.Sprintf("failed to marshal %s: %v", resourceName, err), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, "%s", jsonString) -} - func (self StatsSinkServer) ServeWeb(vizAppLocation string) { if _, err := os.Stat(vizAppLocation); err != nil { if os.IsNotExist(err) { @@ -240,7 +158,6 @@ func (self StatsSinkServer) ServeWeb(vizAppLocation string) { goji.Get("/updates/:id", self.getUpdate) goji.Post("/updates", self.postUpdate) goji.Get("/lastupdate/:id", self.getLastUpdateIdForUuid) - goji.Get("/stats", self.getStats) goji.Handle("/viz/*", http.StripPrefix("/viz/", http.FileServer(http.Dir(vizAppLocation)))) goji.Serve() diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 07a73e9..c4e1676 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -3,7 +3,6 @@ package sfive import ( "database/sql" "encoding/json" - "fmt" "time" // needed for gorp tracing @@ -114,14 +113,6 @@ func initDbSqlite(sqlitePath string) (dbmap *gorp.DbMap, hubId string, err error return } -func insertAnd(needsAnd *bool) (res string) { - if *needsAnd { - res = " and" - *needsAnd = false - } - return -} - func (s sqliteStore) insertDataUpdateEntry(srcId int, du *dataUpdateDb) (err error) { du.SourceId = srcId err = s.db.Insert(du) @@ -364,7 +355,7 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { parameters := make(map[string]interface{}) sql := "SELECT * FROM " + dataUpdatesTn + " WHERE Id > :afterUpdateId limit :limit" parameters["afterUpdateId"] = id - parameters["limit"] = 5000 + parameters["limit"] = 5000 // TODO: hardcoded value var updates []interface{} updates, err = s.db.Select(dataUpdateDb{}, sql, parameters) s5tl.Printf("sql: %s", sql) @@ -374,24 +365,8 @@ func (s sqliteStore) GetUpdatesAfter(id int) (res []StatisticsData, err error) { return } -func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []StatisticsData, err error) { - err = fmt.Errorf("not implemented!") - return - // limit := 5000 - // if filter.limit == nil { - // filter.limit = &limit - // } else if *filter.limit > limit { - // *filter.limit = limit - // } - // sourceSql, parameters := getFilteredDataUpdateSelect(filter) - // sql := "SELECT " + updateColumnSelect + " FROM " + sourceSql - // s5tl.Printf("store: sql: %s", sql) - // var updates []interface{} - // updates, err = s.db.Select(dataUpdateQueryResult{}, sql, parameters) - // if err == nil { - // res, _ = s.CreateStatisticsDatasFrom(updates) - // } - // return +func (s sqliteStore) GetUpdates() (res []StatisticsData, err error) { + return s.GetUpdatesAfter(-1) } type lastUpdateQueryResult struct { @@ -423,101 +398,6 @@ func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) { return } -type statsResult struct { - UpdateCount *int - HubCount *int - SourcesCount *int - ClientCount *float32 - BytesSent *uint - BytesReceived *uint - StartTime *int64 - LastStartTime *int64 -} - -type StatsResult struct { - UpdateCount int - HubCount int - SourcesCount int - ClientCount float32 - BytesSent uint - BytesReceived uint - StartTime time.Time - LastStartTime time.Time -} - -func toApiStatsResult(value statsResult) (res StatsResult) { - if value.UpdateCount != nil { - res.UpdateCount = *value.UpdateCount - } - if value.HubCount != nil { - res.HubCount = *value.HubCount - } - if value.SourcesCount != nil { - res.SourcesCount = *value.SourcesCount - } - if value.ClientCount != nil { - res.ClientCount = *value.ClientCount - } - if value.BytesSent != nil { - res.BytesSent = *value.BytesSent - } - if value.BytesReceived != nil { - res.BytesReceived = *value.BytesReceived - } - if value.StartTime != nil { - res.StartTime = time.Unix(*value.StartTime, 0) - } - if value.LastStartTime != nil { - res.LastStartTime = time.Unix(*value.LastStartTime, 0) - } - return res -} - -// var ( -// statsGroupSelect = ` -// SELECT -// count(*) as UpdateCount, -// SourceHubUuid as SourceHubUuid, -// count(distinct SourceId) as SourcesCount, -// avg(ClientCount) as ClientCount, -// sum(BytesSent) as BytesSent, -// sum(BytesReceived) as BytesReceived, -// min(StartTime) as StartTime, -// max(StartTime) as LastStartTime -// FROM -// ` -// statsGroupClause = ` -// GROUP BY -// SourceHubUuid -// ` -// statsAggregateSelect = ` -// SELECT -// sum(UpdateCount) as UpdateCount, -// count(distinct SourceHubUuid) as HubCount, -// sum(SourcesCount) as SourcesCount, -// sum(ClientCount) as ClientCount, -// sum(BytesSent) as BytesSent, -// sum(BytesReceived) as BytesReceived, -// min(StartTime) as StartTime, -// max(LastStartTime) as LastStartTime -// FROM -// ` -// ) - -func (s sqliteStore) GetStats(filter *StatsFilter) (StatsResult, error) { // (map[string]interface{}, error) { - return StatsResult{}, fmt.Errorf("not implemented!") - // sourceSql, parameters := getFilteredDataUpdateSelect(filter) - // _ = sourceSql - // sql := fmt.Sprintf("%s (%s %s %s)", statsAggregateSelect, statsGroupSelect, sourceSql, statsGroupClause) - // s5tl.Printf("store: stats sql: %s", sql) - // res := statsResult{} - // err := s.db.SelectOne(&res, sql, parameters) - // if err == nil { - // return toApiStatsResult(res), nil - // } - // return StatsResult{}, err -} - func (s sqliteStore) GetStoreId() (uuid string, err error) { uuid, err = s.db.SelectStr("select Value from HubInfo where Name = ?", "HubUuid") return diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 27732a5..fe43bf5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -42,47 +42,6 @@ func TestAppend(t *testing.T) { t.Errorf("Failed to append: %v", err) return } - - // stats, err := store.GetStats(nil) - // if err != nil { - // t.Errorf("Failed to get stats: %v", err) - // } else { - // clientCount := int(stats.ClientCount) - // updateCount := stats.UpdateCount - // if 3 != clientCount { - // t.Errorf("Failed fo append, invalid number of clients, 3 != %v", clientCount) - // } - // if 1 != updateCount { - // t.Errorf("Failed to append, invalid number of updates, 1 != %v", updateCount) - // } - // } - - // queryStartTime := time.Date(2015, time.December, 24, 1, 1, 1, 0, time.UTC) - // filterStruct := StatsFilter{start: &queryStartTime} - // stats, err = store.GetStats(&filterStruct) - // if err != nil { - // t.Errorf("Failed to get stats: %v", err) - // } else { - // updateCount := stats.UpdateCount - // if 0 != updateCount { - // t.Errorf("Failed to filter entries by start time, 0 != %v", updateCount) - // } - // } -} - -func TestCount(t *testing.T) { - os.Remove(__boltPath) - store, err := NewStore(__sqlitePath, __boltPath) - if err != nil { - t.Errorf("Failed to initialize: %v", err) - } - defer store.Close() - - stats, err := store.GetStats(nil) - clientCount := int(stats.ClientCount) - if 0 != clientCount { - t.Errorf("Failed to count correctly.") - } } func TestGetUpdatesAfter(t *testing.T) { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 515b869..525b6d3 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -55,19 +55,6 @@ type StatisticsDataContainer struct { Data []StatisticsData `json:"data"` } -type StatsFilter struct { - start *time.Time - end *time.Time - hostname *string - contentId *string - format *string - quality *string - tagsAny []string - afterUpdateId *int - limit *int - sortOrder *string -} - func (self *StatisticsData) CopyFromSourceId(id *SourceId) { self.Hostname = id.Hostname self.StreamId = id.StreamId diff --git a/src/hub/test-client b/src/hub/test-client index d5756e7..fcb9a98 100755 --- a/src/hub/test-client +++ b/src/hub/test-client @@ -10,16 +10,13 @@ echo pipe-gram: import sample-gram.json echo ---------------------------------- while read x; do echo "$x" | socat stdio "unix-sendto:$TEST_D/pipegram"; done < ../../dat/sample-gram.json +echo post update +echo ----------- +curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates' + echo show query result echo ----------------- -curl -i 'http://localhost:8000/updates?from=2013-10-21T00:00:00Z&to=2013-10-21T12:31:00Z' - -echo '\npost update' -echo ------------ -curl -i --data @../../dat/sample-post.json 'http://localhost:8000/updates' +curl -i 'http://localhost:8000/updates' -echo show stats -echo ---------- -curl -i 'http://localhost:8000/stats' echo '\n\ndone' -- cgit v1.2.3