diff options
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 20 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 6 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 28 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 10 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 6 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 6 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 6 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 8 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 18 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 18 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 12 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 10 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 9 |
13 files changed, 78 insertions, 79 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index a8ea4f8..12c0562 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -5,12 +5,12 @@ import ( "fmt" ) -type StatsDecoder interface { - Decode(jsonString []byte) (StatisticsData, error) +type FullDecoder interface { + Decode(jsonString []byte) (DataUpdateFull, error) } -type StatsEncoder interface { - Encode(data StatisticsData) []byte +type FullEncoder interface { + Encode(data DataUpdateFull) []byte } type StatefulDecoder struct { @@ -21,7 +21,7 @@ type PlainDecoder struct{} type PlainEncoder struct{} -func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) { +func NewStatefulDecoder(jsonString []byte) (decoder FullDecoder, err error) { res := new(StatefulDecoder) err = json.Unmarshal(jsonString, &res.sourceId) if err != nil { @@ -34,11 +34,11 @@ func NewStatefulDecoder(jsonString []byte) (decoder StatsDecoder, err error) { return } -func NewPlainDecoder() StatsDecoder { +func NewPlainDecoder() FullDecoder { return new(PlainDecoder) } -func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { +func (self *StatefulDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { dat.CopyFromSourceId(&self.sourceId) err = json.Unmarshal(jsonString, &dat) // like in PlainDecoder, let the client decide how to use partial results @@ -46,15 +46,15 @@ func (self *StatefulDecoder) Decode(jsonString []byte) (dat StatisticsData, err return } -func (self *PlainDecoder) Decode(jsonString []byte) (dat StatisticsData, err error) { +func (self *PlainDecoder) Decode(jsonString []byte) (dat DataUpdateFull, err error) { err = json.Unmarshal(jsonString, &dat) return } -func (self *PlainEncoder) Encode(data *StatisticsData) []byte { +func (self *PlainEncoder) Encode(data *DataUpdateFull) []byte { res, err := json.Marshal(data) if err != nil { - s5l.Panicln("failed to encode StatisticsData") + s5l.Panicln("failed to encode DataUpdateFull") } return res } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 32f35dd..30d45cf 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -16,8 +16,8 @@ var ( testData = "{" + sourceIdFields + "," + updateFields + "}" ) -func GetExpected() *StatisticsData { - expected := new(StatisticsData) +func GetExpected() *DataUpdateFull { + expected := new(DataUpdateFull) expected.CopyFromSourceId(&sourceIdDataStruct) expected.CopyFromUpdate(&updateDataStruct) return expected @@ -55,7 +55,7 @@ func TestDecodePlain(t *testing.T) { func TestEncode(t *testing.T) { ec := new(PlainEncoder) - td := new(StatisticsData) + td := new(DataUpdateFull) td.CopyFromSourceId(&sourceIdDataStruct) td.CopyFromUpdate(&updateDataStruct) t.Logf("dada: %v", ec.Encode(td)) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index c5f6e21..047a318 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -5,12 +5,12 @@ import ( ) type appendManyToken struct { - data []StatisticsData + data []DataUpdateFull response chan bool } type getUpdatesResult struct { - values []StatisticsData + values []DataUpdateFull err error } @@ -40,19 +40,19 @@ type getLastUpdateIdToken struct { response chan getLastUpdateIdResult } -type StatsSinkServer struct { +type Server struct { store Store quit chan bool done chan bool - appendData chan StatisticsData - appendManyData chan appendManyToken // chan []StatisticsData + appendData chan DataUpdateFull + appendManyData chan appendManyToken // chan []DataUpdateFull getUpdatesAfterChan chan getUpdatesAfterToken getUpdatesChan chan getUpdatesToken getHubIdChan chan getHubIdToken getLastUpdateIdChan chan getLastUpdateIdToken } -func (self StatsSinkServer) appendActor() { +func (self Server) appendActor() { defer func() { self.done <- true }() for { select { @@ -94,7 +94,7 @@ func (self StatsSinkServer) appendActor() { } } -func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, error) { +func (self Server) getUpdatesAfterInvoke(id int) ([]DataUpdateFull, error) { token := getUpdatesAfterToken{id: id, response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesAfterChan <- token @@ -102,7 +102,7 @@ func (self StatsSinkServer) getUpdatesAfterInvoke(id int) ([]StatisticsData, err return res.values, res.err } -func (self StatsSinkServer) getUpdatesInvoke() ([]StatisticsData, error) { +func (self Server) getUpdatesInvoke() ([]DataUpdateFull, error) { token := getUpdatesToken{response: make(chan getUpdatesResult, 1)} defer close(token.response) self.getUpdatesChan <- token @@ -110,7 +110,7 @@ func (self StatsSinkServer) getUpdatesInvoke() ([]StatisticsData, error) { return res.values, res.err } -func (self StatsSinkServer) getHubIdInvoke() string { +func (self Server) getHubIdInvoke() string { token := getHubIdToken{response: make(chan getHubIdResult, 1)} defer close(token.response) self.getHubIdChan <- token @@ -118,7 +118,7 @@ func (self StatsSinkServer) getHubIdInvoke() string { return res.id } -func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) { +func (self Server) getLastUpdateIdInvoke() (int, error) { token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)} defer close(token.response) self.getLastUpdateIdChan <- token @@ -126,7 +126,7 @@ func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) { return res.id, res.err } -func (self StatsSinkServer) Close() { +func (self Server) Close() { self.quit <- true <-self.done close(self.quit) @@ -140,9 +140,9 @@ func (self StatsSinkServer) Close() { self.store.Close() } -func NewServer(dbPath string) (server *StatsSinkServer, err error) { +func NewServer(dbPath string) (server *Server, err error) { // TODO read configuration and create instance with correct settings - server = new(StatsSinkServer) + server = new(Server) server.store, err = NewStore(dbPath) if err != nil { return @@ -150,7 +150,7 @@ func NewServer(dbPath string) (server *StatsSinkServer, err error) { server.quit = make(chan bool) server.done = make(chan bool) - server.appendData = make(chan StatisticsData, 5) + server.appendData = make(chan DataUpdateFull, 5) server.appendManyData = make(chan appendManyToken, 5) server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) server.getUpdatesChan = make(chan getUpdatesToken, 3) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index bdb0cbf..abb00fb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -9,7 +9,7 @@ import ( "time" ) -func findMaxId(values []StatisticsData) int { +func findMaxId(values []DataUpdateFull) int { maxId := -1 for i := range values { id := values[i].SourceHubDataUpdateId @@ -20,7 +20,7 @@ func findMaxId(values []StatisticsData) int { return maxId } -func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { +func (self Server) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { storeId = self.getHubIdInvoke() var resp *http.Response @@ -58,7 +58,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } -func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { +func (self Server) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: for { @@ -88,7 +88,7 @@ tryResync: continue nextBatch } - data, err := json.Marshal(StatisticsDataContainer{updates}) + data, err := json.Marshal(DataUpdateFullContainer{updates}) if err != nil { s5l.Panicf("fwd: encode failed: %v\n", err) @@ -116,6 +116,6 @@ tryResync: } } -func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { +func (self Server) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index da5ff80..4a4c838 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -15,7 +15,7 @@ const lastUpdateJson = `{ "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } }` -func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) { +func (self Server) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) { url := baseurl + "/dataupdate/_search?search_type=count" storeId = self.getHubIdInvoke() @@ -68,7 +68,7 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) return } -func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { +func (self Server) handleForwardingToElasticSearch(baseurl string, client *http.Client) { url := baseurl + "/_bulk" tryResync: for { @@ -136,6 +136,6 @@ tryResync: } } -func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { +func (self Server) RunForwardingToElasticSearch(forwardBaseUrl string) { self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index 9779960..42c16bc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -7,7 +7,7 @@ import ( "github.com/equinox0815/graphite-golang" ) -func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) { +func (self Server) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) { latestId, err = self.getLastUpdateIdInvoke() if err != nil { s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err) @@ -17,7 +17,7 @@ func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (late return } -func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) { +func (self Server) handleForwardingToGraphite(forwardHost string, basePath string) { tryResync: for { client, err := graphite.NewGraphiteFromAddress(forwardHost) @@ -80,6 +80,6 @@ tryResync: } } -func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) { +func (self Server) RunForwardingToGraphite(forwardHost string, basePath string) { self.handleForwardingToGraphite(forwardHost, basePath) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 5a25622..55e67e1 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -17,7 +17,7 @@ type PiwikBulkRequest struct { TokenAuth string `json:"token_auth"` } -func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { +func (self Server) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { // TODO: ask piwik what the last update was... latestId, err = 0, nil //self.getLastUpdateIdInvoke() @@ -29,7 +29,7 @@ func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID return } -func (self StatsSinkServer) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +func (self Server) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { tryResync: for { lastId, _, err := self.getLastUpdatePiwik(piwikURL, siteURL, siteID, token, client) @@ -106,6 +106,6 @@ tryResync: } } -func (self StatsSinkServer) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) { +func (self Server) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) { self.handleForwardingToPiwik(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index efc190c..1a4d6a2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -6,7 +6,7 @@ import ( "net" ) -func (self StatsSinkServer) handleConnection(conn net.Conn) { +func (self Server) handleConnection(conn net.Conn) { reader := bufio.NewReader(conn) buffer, err := reader.ReadBytes('\n') if err != nil { @@ -42,7 +42,7 @@ func (self StatsSinkServer) handleConnection(conn net.Conn) { } } -func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) { +func (self Server) handlePacketConn(pconn net.PacketConn) { decoder := NewPlainDecoder() buffer := make([]byte, 64*1024) for { @@ -61,7 +61,7 @@ func (self StatsSinkServer) handlePacketConn(pconn net.PacketConn) { } } -func (self StatsSinkServer) ServePipe(pipePath string) { +func (self Server) ServePipe(pipePath string) { ln, err := net.Listen("unix", pipePath) if err != nil { s5l.Printf("pipe: failed to connect: %v", err) @@ -80,7 +80,7 @@ func (self StatsSinkServer) ServePipe(pipePath string) { } } -func (self StatsSinkServer) ServeGramPipe(pipePath string) { +func (self Server) ServeGramPipe(pipePath string) { pconn, err := net.ListenPacket("unixgram", pipePath) if err != nil { s5l.Printf("p-pipe: failed to listen: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index a474f47..efe4718 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -12,12 +12,12 @@ import ( "github.com/zenazn/goji/web" ) -func (self StatsSinkServer) healthz(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) healthz(c web.C, w http.ResponseWriter, r *http.Request) { // TODO: do a more sophisticated check fmt.Fprintf(w, "%s\n", self.store.GetStoreId()) } -func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) getSourcesList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "sources" values, err := self.store.GetSources() if err != nil { @@ -32,7 +32,7 @@ func (self StatsSinkServer) getSourcesList(c web.C, w http.ResponseWriter, r *ht fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) getSource(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "source" id, err := strconv.ParseInt(c.URLParams["id"], 10, 64) if err != nil { @@ -56,7 +56,7 @@ func (self StatsSinkServer) getSource(c web.C, w http.ResponseWriter, r *http.Re fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) getUpdateList(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "updates" values, err := self.getUpdatesInvoke() if err != nil { @@ -71,7 +71,7 @@ func (self StatsSinkServer) getUpdateList(c web.C, w http.ResponseWriter, r *htt fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) getUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" id, err := strconv.ParseInt(c.URLParams["id"], 10, 64) if err != nil { @@ -91,7 +91,7 @@ func (self StatsSinkServer) getUpdate(c web.C, w http.ResponseWriter, r *http.Re fmt.Fprintf(w, "%s", jsonString) } -func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) postUpdate(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "update" decoder := NewPlainDecoder() @@ -102,7 +102,7 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R return } - container := StatisticsDataContainer{} + container := DataUpdateFullContainer{} err = json.Unmarshal(buffer, &container) if err == nil { token := appendManyToken{ @@ -129,7 +129,7 @@ func (self StatsSinkServer) postUpdate(c web.C, w http.ResponseWriter, r *http.R // TODO send response channel, wait for OK } -func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { +func (self Server) getLastUpdateIdForUuid(c web.C, w http.ResponseWriter, r *http.Request) { const resourceName = "lastupdateid" id := c.URLParams["id"] value, err := self.store.GetLastUpdateForUuid(id) @@ -140,7 +140,7 @@ func (self StatsSinkServer) getLastUpdateIdForUuid(c web.C, w http.ResponseWrite fmt.Fprintf(w, "%d", value) } -func (self StatsSinkServer) ServeWeb(vizAppLocation string) { +func (self Server) ServeWeb(vizAppLocation string) { if _, err := os.Stat(vizAppLocation); err != nil { if os.IsNotExist(err) { s5l.Panicf("web: viz-app at %s does not exist.", vizAppLocation) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 1339a4b..4845660 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -58,7 +58,7 @@ func initDb(boltPath string) (boltDb *bolt.DB, hubId string, err error) { return } -func updateFromStatisticsData(value StatisticsData) (dataUpdateDb, []ClientData, sourceDb) { +func updateFromDataUpdateFull(value DataUpdateFull) (dataUpdateDb, []ClientData, sourceDb) { du := NewDataUpdateDb(value) cd := value.Data.Clients src := NewSourceDb(value) @@ -164,10 +164,10 @@ func (s Store) appendItem(tx *bolt.Tx, du dataUpdateDb, cd []ClientData, src sou return } -func (s Store) AppendMany(updates []StatisticsData) (err error) { +func (s Store) AppendMany(updates []DataUpdateFull) (err error) { return s.db.Update(func(tx *bolt.Tx) error { for _, update := range updates { - du, cd, src := updateFromStatisticsData(update) + du, cd, src := updateFromDataUpdateFull(update) if err := s.appendItem(tx, du, cd, src); err != nil { return err } @@ -176,8 +176,8 @@ func (s Store) AppendMany(updates []StatisticsData) (err error) { }) } -func (s Store) Append(update StatisticsData) (err error) { - return s.AppendMany([]StatisticsData{update}) +func (s Store) Append(update DataUpdateFull) (err error) { + return s.AppendMany([]DataUpdateFull{update}) } func (s Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { @@ -253,7 +253,7 @@ func (s Store) getClientsByUpdateId(tx *bolt.Tx, id int) (res []ClientData, err return } -func (s Store) CreateStatisticsDataFrom(tx *bolt.Tx, duId int, dat dataUpdateDb) (res StatisticsData, err error) { +func (s Store) CreateDataUpdateFullFromDb(tx *bolt.Tx, duId int, dat dataUpdateDb) (res DataUpdateFull, err error) { var clients []ClientData if clients, err = s.getClientsByUpdateId(tx, duId); err != nil { return @@ -273,13 +273,13 @@ func (s Store) CreateStatisticsDataFrom(tx *bolt.Tx, duId int, dat dataUpdateDb) return } -func (s Store) GetUpdatesAfter(id int) (res []StatisticsData, err error) { +func (s Store) GetUpdatesAfter(id int) (res []DataUpdateFull, err error) { // err = s.db.View(func(tx *bolt.Tx) error { // // TODO: iterate over ids // duId := 1 // for i := range dat { - // sd, err := s.CreateStatisticsDataFrom(tx, duId, du) + // sd, err := s.CreateDataUpdateFullFromDb(tx, duId, du) // if err != nil { // return err // } @@ -290,7 +290,7 @@ func (s Store) GetUpdatesAfter(id int) (res []StatisticsData, err error) { return } -func (s Store) GetUpdates() (res []StatisticsData, err error) { +func (s Store) GetUpdates() (res []DataUpdateFull, err error) { return s.GetUpdatesAfter(-1) } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 38e7bf7..7b45e59 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -34,7 +34,7 @@ func TestAppend(t *testing.T) { update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{"", -1, source, update} + dat := DataUpdateFull{"", -1, source, update} err = store.Append(dat) if err != nil { @@ -56,7 +56,7 @@ func TestGetUpdatesAfter(t *testing.T) { update := DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: startTime, Duration: 5000} streamId := StreamId{ContentId: "content", Format: "7bitascii", Quality: QualityHigh} source := SourceId{Hostname: "localhost", Tags: []string{"tag1", "master"}, StreamId: streamId, Version: 1} - dat := StatisticsData{"", -1, source, update} + dat := DataUpdateFull{"", -1, source, update} err = store.Append(dat) if err != nil { @@ -68,7 +68,7 @@ func TestGetUpdatesAfter(t *testing.T) { t.Logf("got updates (err %v):\n%#v", err, res) } -func generateStatisticsData(n int) (data []StatisticsData) { +func generateDataUpdateFull(n int) (data []DataUpdateFull) { hostnames := []string{"streamer1", "streamer2"} contents := []string{"av", "audio"} formats := []string{"webm", "flash", "hls"} @@ -91,7 +91,7 @@ func generateStatisticsData(n int) (data []StatisticsData) { for _, content := range contents { for _, format := range formats { for _, quality := range qualities { - d := StatisticsData{} + d := DataUpdateFull{} d.SourceId.Version = 1 d.SourceId.Hostname = hostname d.SourceId.Tags = tags @@ -126,7 +126,7 @@ func BenchmarkAppendMany(b *testing.B) { b.Errorf("Failed to initialize: %v", err) } defer store.Close() - data := generateStatisticsData(b.N) + data := generateDataUpdateFull(b.N) b.ResetTimer() @@ -142,7 +142,7 @@ func BenchmarkGetUpdatesAfter(b *testing.B) { b.Errorf("Failed to initialize: %v", err) } defer store.Close() - data := generateStatisticsData(b.N) + data := generateDataUpdateFull(b.N) if err := store.AppendMany(data); err != nil { b.Errorf("Failed to append: %v", err) } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index ad6deaf..ae99b03 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -40,7 +40,7 @@ type DataUpdate struct { Data SourceData `json:"data"` } -type StatisticsData struct { +type DataUpdateFull struct { SourceHubUuid string `json:"SourceHubUuid,omitempty"` SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"` SourceId @@ -51,18 +51,18 @@ type DataContainer struct { Data interface{} `json:"data"` } -type StatisticsDataContainer struct { - Data []StatisticsData `json:"data"` +type DataUpdateFullContainer struct { + Data []DataUpdateFull `json:"data"` } -func (self *StatisticsData) CopyFromSourceId(id *SourceId) { +func (self *DataUpdateFull) CopyFromSourceId(id *SourceId) { self.Hostname = id.Hostname self.StreamId = id.StreamId self.Tags = id.Tags self.Version = id.Version } -func (self *StatisticsData) CopyFromUpdate(id *DataUpdate) { +func (self *DataUpdateFull) CopyFromUpdate(id *DataUpdate) { self.StartTime = id.StartTime self.Duration = id.Duration self.Data = id.Data diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index d6de68e..ccbc864 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -39,7 +39,7 @@ type sourceDb struct { Tags []string `json:"t"` } -func NewSourceDb(value StatisticsData) sourceDb { +func NewSourceDb(value DataUpdateFull) sourceDb { return sourceDb{ Hostname: value.SourceId.Hostname, StreamId: streamIdDb{ @@ -70,8 +70,7 @@ type clientDataDb struct { BytesSent uint `json:"bs"` } -// stored in dataUpdatesTn -// in DB, StatisticsData/DataUpdate is flattened compared to JSON DTOs +// stored in dataUpdatesBn type dataUpdateDb struct { SourceHubUuid string `json:"h,omitempty"` SourceHubDataUpdateId int `json:"hi,omitempty"` @@ -83,7 +82,7 @@ type dataUpdateDb struct { BytesSent uint `json:"bs"` } -func NewDataUpdateDb(v StatisticsData) dataUpdateDb { +func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { return dataUpdateDb{ v.SourceHubUuid, v.SourceHubDataUpdateId, @@ -96,7 +95,7 @@ func NewDataUpdateDb(v StatisticsData) dataUpdateDb { } } -func (s *StatisticsData) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) { +func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, vId int, hubId string) { if v.SourceHubUuid == "" { s.SourceHubUuid = hubId s.SourceHubDataUpdateId = vId |