diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-05-18 21:32:06 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-05-18 21:32:06 +0200 |
commit | 83ef99e217b30cec699f06bf7111cc41adc0c299 (patch) | |
tree | 21c686b7ed38d121e3ddb922b7865122bfbe206f /src/hub | |
parent | more accurate back pressure (diff) |
make variable names more comlient to golang convention
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 32 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 24 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 12 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 176 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 124 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 20 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesStore.go | 66 |
12 files changed, 254 insertions, 254 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 0abbdef..ccf35d2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -95,7 +95,7 @@ func NewStatefulDecoder(r io.Reader) (Decoder, error) { func (sd *StatefulDecoder) Slug() string { s := sd.Source - return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality) + return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality) } func (sd *StatefulDecoder) Decode() (uf UpdateFull, err error) { diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 55d8415..0b13043 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -43,7 +43,7 @@ import ( var ( initEncoded = `"hostname": "localhost", "stream": {"quality": "low", "content": "av", "format": "webm"}, "tags": ["elevate", "2014"]` - initStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + initStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentID: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} updateEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` updateStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) @@ -146,7 +146,7 @@ func TestDecodeStateful(t *testing.T) { } slug := dec.Slug() - expectedSlug := strings.Join([]string{initStruct.Hostname, initStruct.Stream.ContentId, + expectedSlug := strings.Join([]string{initStruct.Hostname, initStruct.Stream.ContentID, initStruct.Stream.Format, initStruct.Stream.Quality}, "/") if slug != expectedSlug { t.Fatalf("slug failed:\n actual: %v\n expected: %v\n", slug, expectedSlug) diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index e296fd2..bc9dc43 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -61,11 +61,11 @@ type Server struct { func (srv Server) Anonymize(update UpdateFull) UpdateFull { anonymized := []Client{} for _, client := range update.Data.Clients { - aIp, err := srv.anonymization.Anonymize(client.Ip) + aIP, err := srv.anonymization.Anonymize(client.IP) if err != nil { s5l.Printf("anonymization: failed: %v", err) } else { - client.Ip = aIp + client.IP = aIP } anonymized = append(anonymized, client) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 8ee3999..866efe9 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -40,21 +40,21 @@ import ( "time" ) -func findMaxId(updates []UpdateFull) int { - maxId := -1 +func findMaxID(updates []UpdateFull) int { + maxID := -1 for _, value := range updates { - if id := value.SourceHubUpdateId; id > maxId { - maxId = id + if id := value.SourceHubUpdateID; id > maxID { + maxID = id } } - return maxId + return maxID } -func fwdGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) { - lastId = -1 +func fwdGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (lastID int, err error) { + lastID = -1 var resp *http.Response - resp, err = client.Get(baseUrl + "/lastupdate/" + hubUuid) + resp, err = client.Get(baseUrl + "/lastupdate/" + hubUUID) if err != nil { s5l.Printf("fwd: failed to query for lastupdate: %v\n", err) return @@ -66,13 +66,13 @@ func fwdGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (la return } - result := WebLastUpdateIdResponse{} + result := WebLastUpdateIDResponse{} if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err) return } - lastId = result.LastUpdateId + lastID = result.LastUpdateID return } @@ -111,20 +111,20 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er func (srv Server) forwardRun(baseUrl string, client *http.Client) { url := baseUrl + "/updates/_bulk" - hubUuid := srv.store.GetHubUuid() + hubUUID := srv.store.GetHubUUID() tryResync: for { - lastId, err := fwdGetLastUpdateId(baseUrl, client, hubUuid) + lastID, err := fwdGetLastUpdateID(baseUrl, client, hubUUID) if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd: lastupdate: %d", lastId) + s5l.Printf("fwd: lastupdate: %d", lastID) nextBatch: for { - updates, err := srv.store.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { s5l.Printf("fwd: failed reading updates: %v", err) time.Sleep(500 * time.Millisecond) @@ -145,8 +145,8 @@ tryResync: continue tryResync } - lastId = findMaxId(updates) - s5l.Printf("fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastId) + lastID = findMaxID(updates) + s5l.Printf("fwd: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 2d3cf15..d61c5fc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -42,15 +42,15 @@ import ( "time" ) -const forwardEsLastUpdateIdJson = `{ +const forwardEsLastUpdateIDJson = `{ "query": {"match": { "SourceHubUuid": "%s" } }, "aggregations": { "last-id" : { "max" : { "field": "SourceHubUpdateId" } } } }` -func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) { +func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (lastID int, err error) { url := baseUrl + "/dataupdate/_search?search_type=count" - queryJson := fmt.Sprintf(forwardEsLastUpdateIdJson, hubUuid) + queryJson := fmt.Sprintf(forwardEsLastUpdateIDJson, hubUUID) s5dl.Printf("fwd-es: query: %s", queryJson) var resp *http.Response @@ -76,7 +76,7 @@ func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) ( s5dl.Printf("fwd-es: lastupdate response: %s\n", body) if len(body) == 0 { - lastId = -1 + lastID = -1 } else { var value interface{} err = json.Unmarshal(body, &value) @@ -88,11 +88,11 @@ func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) ( idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] if idstrcntr == nil { s5l.Printf("fwd-es: we are new here\n") - lastId = -1 + lastID = -1 return } tid := idstrcntr.(float64) - lastId = int(tid) + lastID = int(tid) } return @@ -100,20 +100,20 @@ func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) ( func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { url := baseUrl + "/_bulk" - hubUuid := srv.store.GetHubUuid() + hubUUID := srv.store.GetHubUUID() tryResync: for { - lastId, err := fwdEsGetLastUpdateId(baseUrl, client, hubUuid) + lastID, err := fwdEsGetLastUpdateID(baseUrl, client, hubUUID) if err != nil { s5l.Printf("fwd-es: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-es: lastupdate: %d", lastId) + s5l.Printf("fwd-es: lastupdate: %d", lastID) nextBatch: for { - updates, err := srv.store.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { s5l.Printf("fwd-es: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -153,8 +153,8 @@ tryResync: } resp.Body.Close() // TODO: check result from elasticsearch - lastId = findMaxId(updates) - s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastId) + lastID = findMaxID(updates) + s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index c972cbc..2972b91 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -49,18 +49,18 @@ tryResync: continue tryResync } - lastId, err := srv.store.GetLastUpdateId() + lastID, err := srv.store.GetLastUpdateID() if err != nil { s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) client.Disconnect() time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-graphite: lastupdate: %d", lastId) + s5l.Printf("fwd-graphite: lastupdate: %d", lastID) nextBatch: for { - updates, err := srv.store.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { s5l.Printf("fwd-graphite: failed reading updates: %v", err) time.Sleep(500 * time.Millisecond) @@ -74,7 +74,7 @@ tryResync: metrics := make([]graphite.Metric, len(updates)*3) for i, update := range updates { path := basePath + "." + update.Hostname - path = path + "." + update.Stream.ContentId + path = path + "." + update.Stream.ContentID path = path + "." + update.Stream.Format path = path + "." + update.Stream.Quality @@ -90,8 +90,8 @@ tryResync: continue tryResync } - lastId = findMaxId(updates) - s5l.Printf("fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastId) + lastID = findMaxID(updates) + s5l.Printf("fwd-graphite: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 9fa1dcc..047ebcf 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -49,9 +49,9 @@ type forwardPiwikBulkRequest struct { TokenAuth string `json:"token_auth"` } -func fwdPiwikGetLastUpdateId(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) { +func fwdPiwikGetLastUpdateID(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastID int, err error) { // TODO: ask piwik what the last update was... - lastId = 0 + lastID = 0 return } @@ -59,18 +59,18 @@ func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token s // hubUuid := srv.store.GetHubUuid() tryResync: for { - // lastId, err := srv.forwardPiwikGetLastUpdateId(piwikURL, siteURL, siteID, token, client, hubUuid) - lastId, err := srv.store.GetLastUpdateId() + // lastID, err := srv.forwardPiwikGetLastUpdateID(piwikURL, siteURL, siteID, token, client, hubUuid) + lastID, err := srv.store.GetLastUpdateID() if err != nil { s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-piwik: lastupdate: %d", lastId) + s5l.Printf("fwd-piwik: lastupdate: %d", lastID) nextBatch: for { - updates, err := srv.store.GetUpdatesAfter(lastId, 5000) + updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) time.Sleep(500 * time.Millisecond) @@ -89,15 +89,15 @@ tryResync: } for _, client := range update.Data.Clients { - ip, _, err := net.SplitHostPort(client.Ip) + ip, _, err := net.SplitHostPort(client.IP) if err != nil { - ip = client.Ip + ip = client.IP } q := make(url.Values) q.Add("rec", "1") q.Add("idsite", strconv.FormatUint(uint64(siteID), 10)) - q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Stream.ContentId, update.Stream.Format, update.Stream.Quality, update.Hostname)) + q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Stream.ContentID, update.Stream.Format, update.Stream.Quality, update.Hostname)) q.Add("cip", ip) q.Add("cdt", strconv.FormatInt(update.StartTime.Unix(), 10)) q.Add("ua", client.UserAgent) @@ -125,8 +125,8 @@ tryResync: } resp.Body.Close() // TODO: check result from Piwik - lastId = findMaxId(updates) - s5l.Printf("fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastId) + lastID = findMaxID(updates) + s5l.Printf("fwd-piwik: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 21f4083..bde8abc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -60,7 +60,7 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { } resp := WebHealthzResponse{} - resp.HubUuid = srv.store.GetHubUuid() + resp.HubUUID = srv.store.GetHubUUID() resp.Status = "OK" // TODO: do a more sophisticated check sendWebResponse(w, http.StatusOK, resp) } @@ -273,7 +273,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { // /lastupdate/:UUID // -func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) { +func webLastUpdateIDForUUID(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) return @@ -285,9 +285,9 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) } var err error - resp := WebLastUpdateIdResponse{} - resp.HubUuid = path.Base(r.URL.Path) - if resp.LastUpdateId, err = srv.store.GetLastUpdateIdForUuid(resp.HubUuid); err != nil { + resp := WebLastUpdateIDResponse{} + resp.HubUUID = path.Base(r.URL.Path) + if resp.LastUpdateID, err = srv.store.GetLastUpdateIDForUUID(resp.HubUUID); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } @@ -298,16 +298,16 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) // /lastupdate // -func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) { +func webLastUpdateID(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) return } var err error - resp := WebLastUpdateIdResponse{} - resp.HubUuid = srv.store.GetHubUuid() - if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil { + resp := WebLastUpdateIDResponse{} + resp.HubUUID = srv.store.GetHubUUID() + if resp.LastUpdateID, err = srv.store.GetLastUpdateID(); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } @@ -384,8 +384,8 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { mux.Handle("/sources", webHandler{srv, webSources}) mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) mux.Handle("/updates", webHandler{srv, webUpdates}) - mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIdForUuid}) - mux.Handle("/lastupdate", webHandler{srv, webLastUpdateId}) + mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIDForUUID}) + mux.Handle("/lastupdate", webHandler{srv, webLastUpdateID}) // mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir( ..staticDir.. )))) mux.Handle("/", webHandler{srv, webNotFound}) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 75b9bb7..86a4fa0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -49,13 +49,13 @@ const ( ) var ( - storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, updatesBn, + storeBuckets = []string{latestUpdatesBn, hubUUIDsFwdBn, hubUUIDsRevBn, updatesBn, sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} ) type Store struct { version int - hubUuid string + hubUUID string db *bolt.DB readOnly bool } @@ -64,7 +64,7 @@ type Store struct { // Initialization and Destruction // -func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid string, err error) { +func openDB(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUUID string, err error) { if _, err = os.Stat(dbPath); err != nil { if os.IsNotExist(err) { err = nil @@ -91,11 +91,11 @@ func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid str return fmt.Errorf("store: failed to open, wrong hub version: %d (expected: %d)", version, StoreVersion) } - bHubId := b.Get([]byte(hubUuidKey)) - if bHubId != nil { - hubUuid = string(bHubId) + bHubID := b.Get([]byte(hubUUIDKey)) + if bHubID != nil { + hubUUID = string(bHubID) } - if hubUuid == "" { + if hubUUID == "" { return errors.New("store: failed to open, UUID does not exist or is empty.") } @@ -112,7 +112,7 @@ func openDb(dbPath string, readOnly bool) (db *bolt.DB, version int, hubUuid str return } -func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err error) { +func createDB(dbPath string) (db *bolt.DB, version int, hubUUID string, err error) { db, err = bolt.Open(dbPath, 0600, &bolt.Options{Timeout: 100 * time.Millisecond}) if err != nil { return @@ -133,8 +133,8 @@ func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err erro if err := b.Put([]byte(storeVersionKey), itob(version)); err != nil { return err } - hubUuid = uuid.New() - if err := b.Put([]byte(hubUuidKey), []byte(hubUuid)); err != nil { + hubUUID = uuid.New() + if err := b.Put([]byte(hubUUIDKey), []byte(hubUUID)); err != nil { return err } return nil @@ -147,7 +147,7 @@ func createDb(dbPath string) (db *bolt.DB, version int, hubUuid string, err erro } func NewStore(dbPath string, readOnly bool) (Store, error) { - db, version, hubid, err := openDb(dbPath, readOnly) + db, version, hubid, err := openDB(dbPath, readOnly) if err != nil { return Store{}, err } @@ -165,7 +165,7 @@ func NewStore(dbPath string, readOnly bool) (Store, error) { return Store{}, errors.New("store: failed to open, requested read-only mode but store file does not exist.") } - db, version, hubid, err = createDb(dbPath) + db, version, hubid, err = createDB(dbPath) if err != nil { return Store{}, err } @@ -184,43 +184,43 @@ func (st Store) Close() { // append key-value pairs to buckets -func (st Store) insertNewHub(tx *bolt.Tx, hubUuid string) (hubId int, err error) { - if hubUuid == "" { +func (st Store) insertNewHub(tx *bolt.Tx, hubUUID string) (hubID int, err error) { + if hubUUID == "" { return } - bf := tx.Bucket([]byte(hubUuidsFwdBn)) + bf := tx.Bucket([]byte(hubUUIDsFwdBn)) bf.FillPercent = 1.0 // we only do appends - br := tx.Bucket([]byte(hubUuidsRevBn)) + br := tx.Bucket([]byte(hubUUIDsRevBn)) br.FillPercent = 1.0 // we only do appends - bHubId := bf.Get([]byte(hubUuid)) - if bHubId != nil { - return btoi(bHubId), nil + bHubID := bf.Get([]byte(hubUUID)) + if bHubID != nil { + return btoi(bHubID), nil } next, _ := bf.NextSequence() - hubId = int(next) - if err = bf.Put([]byte(hubUuid), itob(hubId)); err != nil { + hubID = int(next) + if err = bf.Put([]byte(hubUUID), itob(hubID)); err != nil { return } - if err = br.Put(itob(hubId), []byte(hubUuid)); err != nil { + if err = br.Put(itob(hubID), []byte(hubUUID)); err != nil { return } - return hubId, err + return hubID, err } -func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error) { +func (st Store) insertNewSource(tx *bolt.Tx, src sourceDB) (srcID int, err error) { bf := tx.Bucket([]byte(sourcesFwdBn)) // br.FillPercent = 1.0 // these appends are not ordered (the key is the slug and not an integer id) br := tx.Bucket([]byte(sourcesRevBn)) br.FillPercent = 1.0 // we only do appends (with ever incrementing interger ids) slug := src.Slug() - bSrcId := bf.Get([]byte(slug)) - if bSrcId != nil { - return btoi(bSrcId), nil + bSrcID := bf.Get([]byte(slug)) + if bSrcID != nil { + return btoi(bSrcID), nil } var jsonData []byte @@ -229,56 +229,56 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error } next, _ := bf.NextSequence() - srcId = int(next) - if err = bf.Put([]byte(slug), itob(srcId)); err != nil { + srcID = int(next) + if err = bf.Put([]byte(slug), itob(srcID)); err != nil { return } - if err = br.Put(itob(srcId), jsonData); err != nil { + if err = br.Put(itob(srcID), jsonData); err != nil { return } - return srcId, err + return srcID, err } -func (st Store) insertUpdate(tx *bolt.Tx, du updateDb) (duId int, err error) { +func (st Store) insertUpdate(tx *bolt.Tx, du updateDB) (duID int, err error) { b := tx.Bucket([]byte(updatesBn)) b.FillPercent = 1.0 // we only do appends next, _ := b.NextSequence() - duId = int(next) + duID = int(next) var jsonData []byte if jsonData, err = json.Marshal(du); err != nil { return } - err = b.Put(itob(duId), jsonData) + err = b.Put(itob(duID), jsonData) return } -func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) { +func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaID int, err error) { bf := tx.Bucket([]byte(userAgentsFwdBn)) bf.FillPercent = 1.0 // we only do appends br := tx.Bucket([]byte(userAgentsRevBn)) br.FillPercent = 1.0 // we only do appends - bUaId := bf.Get([]byte(ua)) - if bUaId != nil { - return btoi(bUaId), nil + bUaID := bf.Get([]byte(ua)) + if bUaID != nil { + return btoi(bUaID), nil } next, _ := bf.NextSequence() - uaId = int(next) - if err = bf.Put([]byte(ua), itob(uaId)); err != nil { + uaID = int(next) + if err = bf.Put([]byte(ua), itob(uaID)); err != nil { return } - if err = br.Put(itob(uaId), []byte(ua)); err != nil { + if err = br.Put(itob(uaID), []byte(ua)); err != nil { return } - return uaId, err + return uaID, err } -func (st Store) insertClient(tx *bolt.Tx, uId int, cd []Client) error { +func (st Store) insertClient(tx *bolt.Tx, uID int, cd []Client) error { if len(cd) == 0 { return nil } @@ -286,57 +286,57 @@ func (st Store) insertClient(tx *bolt.Tx, uId int, cd []Client) error { b := tx.Bucket([]byte(clientDataBn)) b.FillPercent = 1.0 // we only do appends - data := []clientDataDb{} + data := []clientDataDB{} for _, c := range cd { - uaId, err := st.insertNewUserAgent(tx, c.UserAgent) + uaID, err := st.insertNewUserAgent(tx, c.UserAgent) if err != nil { return err } - data = append(data, clientDataDb{c.Ip, uaId, c.BytesSent}) + data = append(data, clientDataDB{c.IP, uaID, c.BytesSent}) } jsonData, err := json.Marshal(data) if err != nil { return err } - return b.Put(itob(uId), jsonData) + return b.Put(itob(uID), jsonData) } -func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, uId int) error { +func (st Store) setLastUpdateForUUID(tx *bolt.Tx, uuid string, uID int) error { b := tx.Bucket([]byte(latestUpdatesBn)) b.FillPercent = 1.0 // we only do appends last := b.Get([]byte(uuid)) - if last != nil && btoi(last) > uId { + if last != nil && btoi(last) > uID { return nil } - return b.Put([]byte(uuid), itob(uId)) + return b.Put([]byte(uuid), itob(uID)) } // Split up the multidimensional dataupdate and append all the key-value pairs -func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uId int, err error) { - u := NewUpdateDb(uf) - s := NewSourceDb(uf) +func (st Store) appendItem(tx *bolt.Tx, uf UpdateFull) (uID int, err error) { + u := NewUpdateDB(uf) + s := NewSourceDB(uf) - if u.SourceHubId, err = st.insertNewHub(tx, uf.SourceHubUuid); err != nil { + if u.SourceHubID, err = st.insertNewHub(tx, uf.SourceHubUUID); err != nil { return } - if u.SourceId, err = st.insertNewSource(tx, s); err != nil { + if u.SourceID, err = st.insertNewSource(tx, s); err != nil { return } - if uId, err = st.insertUpdate(tx, u); err != nil { + if uID, err = st.insertUpdate(tx, u); err != nil { return } - if err = st.insertClient(tx, uId, uf.Data.Clients); err != nil { + if err = st.insertClient(tx, uID, uf.Data.Clients); err != nil { return } - if uf.SourceHubUuid != "" { - err = st.setLastUpdateForUuid(tx, uf.SourceHubUuid, u.SourceHubUpdateId) + if uf.SourceHubUUID != "" { + err = st.setLastUpdateForUUID(tx, uf.SourceHubUUID, u.SourceHubUpdateID) } - if uf.ForwardHubUuid != "" { - err = st.setLastUpdateForUuid(tx, uf.ForwardHubUuid, uf.ForwardHubUpdateId) + if uf.ForwardHubUUID != "" { + err = st.setLastUpdateForUUID(tx, uf.ForwardHubUUID, uf.ForwardHubUpdateID) } return } @@ -368,7 +368,7 @@ func (st Store) Append(update UpdateFull) (err error) { // fetch key-value pairs from buckets func (st Store) getHub(tx *bolt.Tx, id int) string { - b := tx.Bucket([]byte(hubUuidsRevBn)) + b := tx.Bucket([]byte(hubUUIDsRevBn)) uuid := b.Get(itob(id)) if uuid == nil { return "" @@ -376,7 +376,7 @@ func (st Store) getHub(tx *bolt.Tx, id int) string { return string(uuid) } -func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDb, err error) { +func (st Store) getSource(tx *bolt.Tx, id int) (source sourceDB, err error) { b := tx.Bucket([]byte(sourcesRevBn)) jsonData := b.Get(itob(id)) @@ -398,13 +398,13 @@ func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { if jsonData == nil { return } - data := []clientDataDb{} + data := []clientDataDB{} if err = json.Unmarshal(jsonData, &data); err != nil { return } for _, c := range data { - cd := Client{Ip: c.Ip, BytesSent: c.BytesSent} - ua := bu.Get(itob(c.UserAgentId)) + cd := Client{IP: c.IP, BytesSent: c.BytesSent} + ua := bu.Get(itob(c.UserAgentID)) if ua != nil { cd.UserAgent = string(ua) } @@ -415,14 +415,14 @@ func (st Store) getClients(tx *bolt.Tx, id int) (clients []Client, err error) { // fetch all the key-value pairs and merge them into the multidimensional dataupdate -func (st Store) fetchItem(tx *bolt.Tx, uId int, u updateDb) (updates UpdateFull, err error) { - updates.CopyFromUpdateDb(u, st.getHub(tx, u.SourceHubId), st.hubUuid, uId) - var src sourceDb - if src, err = st.getSource(tx, u.SourceId); err != nil { +func (st Store) fetchItem(tx *bolt.Tx, uID int, u updateDB) (updates UpdateFull, err error) { + updates.CopyFromUpdateDB(u, st.getHub(tx, u.SourceHubID), st.hubUUID, uID) + var src sourceDB + if src, err = st.getSource(tx, u.SourceID); err != nil { return } - updates.CopyFromSourceDb(src) - if updates.Data.Clients, err = st.getClients(tx, uId); err != nil { + updates.CopyFromSourceDB(src) + if updates.Data.Clients, err = st.getClients(tx, uID); err != nil { return } return @@ -451,7 +451,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (updates []UpdateFull, err error) k, v = c.Next() } for ; k != nil; k, v = c.Next() { - var d updateDb + var d updateDB if err := json.Unmarshal(v, &d); err != nil { return err } @@ -479,7 +479,7 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) { return ErrNotFound } - var d updateDb + var d updateDB if err := json.Unmarshal(jsonData, &d); err != nil { return err } @@ -497,38 +497,38 @@ func (st Store) GetUpdate(id int) (update UpdateFull, err error) { // Auxilliary Data // -func (st Store) GetHubUuid() string { - return st.hubUuid +func (st Store) GetHubUUID() string { + return st.hubUUID } -func (st Store) GetLastUpdateId() (updateId int, err error) { +func (st Store) GetLastUpdateID() (updateID int, err error) { err = st.db.View(func(tx *bolt.Tx) error { - updateId = int(tx.Bucket([]byte(updatesBn)).Sequence()) + updateID = int(tx.Bucket([]byte(updatesBn)).Sequence()) return nil }) return } -func (st Store) GetLastUpdateIdForUuid(uuid string) (updateId int, err error) { - if uuid == st.hubUuid { - return st.GetLastUpdateId() +func (st Store) GetLastUpdateIDForUUID(uuid string) (updateID int, err error) { + if uuid == st.hubUUID { + return st.GetLastUpdateID() } err = st.db.View(func(tx *bolt.Tx) error { - bUpdateId := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid)) - if bUpdateId == nil { + bUpdateID := tx.Bucket([]byte(latestUpdatesBn)).Get([]byte(uuid)) + if bUpdateID == nil { return nil } - updateId = btoi(bUpdateId) + updateID = btoi(bUpdateID) return nil }) return } func (st Store) GetHubs() (hubs []string, err error) { - hubs = []string{st.hubUuid} + hubs = []string{st.hubUUID} err = st.db.View(func(tx *bolt.Tx) error { - c := tx.Bucket([]byte(hubUuidsRevBn)).Cursor() + c := tx.Bucket([]byte(hubUUIDsRevBn)).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { hubs = append(hubs, string(v)) } @@ -542,12 +542,12 @@ func (st Store) GetSources() (sources []Source, err error) { err = st.db.View(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(sourcesRevBn)).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { - var s sourceDb + var s sourceDB if err := json.Unmarshal(v, &s); err != nil { return err } var src Source - src.CopyFromSourceDb(s) + src.CopyFromSourceDB(s) sources = append(sources, src) } return nil diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 3bc912b..5ad129a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -50,8 +50,8 @@ var ( testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt" testBoltPathFinal = "/run/s5hub_testing_db_final.bolt" - streamIdData = Stream{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"} - sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIdData} + streamIDData = Stream{ContentID: "talkingheads", Format: "7bitascii", Quality: "high"} + sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIDData} updateData = Update{Data: UpdateData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000} clientsData = []Client{ Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, @@ -84,7 +84,7 @@ func generateTestData(n int) ([]UpdateFull, int) { d.Version = ProtocolVersion d.Source.Hostname = hostname - d.Source.Stream.ContentId = content + d.Source.Stream.ContentID = content d.Source.Stream.Format = format d.Source.Stream.Quality = quality d.Source.Tags = tags @@ -233,7 +233,7 @@ func TestOpen(t *testing.T) { } else { err = db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(hubInfoBn)) - if err := b.Put([]byte(hubUuidKey), []byte("")); err != nil { + if err := b.Put([]byte(hubUUIDKey), []byte("")); err != nil { return err } return nil @@ -255,7 +255,7 @@ func TestOpen(t *testing.T) { } else { err = db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(hubInfoBn)) - if err := b.Put([]byte(hubUuidKey), []byte("6d1a2192-7cb6-404d-80fb-add9b40a8f33")); err != nil { + if err := b.Put([]byte(hubUUIDKey), []byte("6d1a2192-7cb6-404d-80fb-add9b40a8f33")); err != nil { return err } return nil @@ -277,15 +277,15 @@ func TestOpen(t *testing.T) { if err != nil { t.Fatalf("creating new store failed: %v", err) } - createdUuid := store.hubUuid + createdUUID := store.hubUUID store.Close() store, err = NewStore(testBoltPath, false) if err != nil { t.Fatalf("re-opening existing store failed: %v", err) } - if createdUuid != store.hubUuid { - t.Fatalf("UUID of opened store differs from the one previously generated: '%s' != '%s'", createdUuid, store.hubUuid) + if createdUUID != store.hubUUID { + t.Fatalf("UUID of opened store differs from the one previously generated: '%s' != '%s'", createdUUID, store.hubUUID) } if _, err := NewStore(testBoltPath, false); err == nil { @@ -322,10 +322,10 @@ func TestAppendAndFetch(t *testing.T) { out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder expected := in - expected.SourceHubUuid = store.GetHubUuid() - expected.SourceHubUpdateId = 1 - expected.ForwardHubUuid = "" - expected.ForwardHubUpdateId = 0 + expected.SourceHubUUID = store.GetHubUUID() + expected.SourceHubUpdateID = 1 + expected.ForwardHubUUID = "" + expected.ForwardHubUpdateID = 0 if !reflect.DeepEqual(expected, out) { t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) @@ -350,10 +350,10 @@ func TestAppendAndFetch(t *testing.T) { } out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder expected = ins[i] - expected.SourceHubUuid = store.GetHubUuid() - expected.SourceHubUpdateId = i + 2 - expected.ForwardHubUuid = "" - expected.ForwardHubUpdateId = 0 + expected.SourceHubUUID = store.GetHubUUID() + expected.SourceHubUpdateID = i + 2 + expected.ForwardHubUUID = "" + expected.ForwardHubUpdateID = 0 if !reflect.DeepEqual(expected, out) { t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) @@ -421,29 +421,29 @@ func TestGetUpdatesAfter(t *testing.T) { t.Fatalf("unexpected error: %v", err) } e := in - e.SourceHubUuid = store.hubUuid - e.SourceHubUpdateId = i + 1 - e.ForwardHubUuid = "" - e.ForwardHubUpdateId = 0 + e.SourceHubUUID = store.hubUUID + e.SourceHubUpdateID = i + 1 + e.ForwardHubUUID = "" + e.ForwardHubUpdateID = 0 expected = append(expected, e) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) } // check if there are 3 updates in store - lastId, err := store.GetLastUpdateId() + lastID, err := store.GetLastUpdateID() if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != 3 { - t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) + if lastID != 3 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastID) } - lastIdForUuid, err := store.GetLastUpdateIdForUuid(store.hubUuid) + lastIDForUUID, err := store.GetLastUpdateIDForUUID(store.hubUUID) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != lastIdForUuid { - t.Fatalf("last update ID from GetLastUpdateId = %d and GetLastUpdateIdForUuid(store.hubUuid) = %d differ", lastId, lastIdForUuid) + if lastID != lastIDForUUID { + t.Fatalf("last update ID from GetLastUpdateID = %d and GetLastUpdateIDForUUID(store.hubUUID) = %d differ", lastID, lastIDForUUID) } // all the updates @@ -511,16 +511,16 @@ func TestForwardedUpdates(t *testing.T) { if err := store.AppendMany(data); err != nil { t.Fatalf("unexpected error: %v", err) } - myLastId := 10 + myLastID := 10 forwardedHub := "05defdfa-e7d1-4ca8-8b5c-02abb0088d29" // check if there are no updates for this hub in store - lastId, err := store.GetLastUpdateIdForUuid(forwardedHub) + lastID, err := store.GetLastUpdateIDForUUID(forwardedHub) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != 0 { - t.Fatalf("failed to get last update ID: %d, expected 0", lastId) + if lastID != 0 { + t.Fatalf("failed to get last update ID: %d, expected 0", lastID) } // get list of all hubs @@ -531,7 +531,7 @@ func TestForwardedUpdates(t *testing.T) { if len(hubs) != 1 { t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 1", len(hubs)) } - if hubs[0] != store.hubUuid { + if hubs[0] != store.hubUUID { t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0]) } @@ -543,14 +543,14 @@ func TestForwardedUpdates(t *testing.T) { expected := []UpdateFull{} for i := 0; i < 3; i = i + 1 { in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} - in.SourceHubUuid = forwardedHub - in.SourceHubUpdateId = 3 - i // out of order + in.SourceHubUUID = forwardedHub + in.SourceHubUpdateID = 3 - i // out of order if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } - myLastId = myLastId + 1 - in.ForwardHubUuid = store.GetHubUuid() - in.ForwardHubUpdateId = myLastId + myLastID = myLastID + 1 + in.ForwardHubUUID = store.GetHubUUID() + in.ForwardHubUpdateID = myLastID expected = append(expected, in) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) } @@ -567,12 +567,12 @@ func TestForwardedUpdates(t *testing.T) { } // check if the last update for this hub is 3 - lastId, err = store.GetLastUpdateIdForUuid(forwardedHub) + lastID, err = store.GetLastUpdateIDForUUID(forwardedHub) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != 3 { - t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) + if lastID != 3 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastID) } // get list of all hubs @@ -583,7 +583,7 @@ func TestForwardedUpdates(t *testing.T) { if len(hubs) != 2 { t.Fatalf("failed to get hub UUIDs: got %d hubs, expected 2", len(hubs)) } - if hubs[0] != store.hubUuid { + if hubs[0] != store.hubUUID { t.Fatalf("fist hub should be the own stores UUID but is: %s", hubs[0]) } if hubs[1] != forwardedHub { @@ -591,47 +591,47 @@ func TestForwardedUpdates(t *testing.T) { } // check if the last update is now 13 - lastId, _ = store.GetLastUpdateId() - if lastId != 13 { - t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastId) + lastID, _ = store.GetLastUpdateID() + if lastID != 13 { + t.Fatalf("failed to get last update ID: got %d updates, expected 3", lastID) } } -func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { - lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid()) +func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1ID, fwdSrc2ID, finalSrc1ID, finalSrc2ID, finalFwdID int) { + lastID, err := fwdStore.GetLastUpdateIDForUUID(src1Store.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != fwdSrc1Id { - t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id) + if lastID != fwdSrc1ID { + t.Fatalf("failed to get last update ID: %d, expected %d", lastID, fwdSrc1ID) } - lastId, err = fwdStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid()) + lastID, err = fwdStore.GetLastUpdateIDForUUID(src2Store.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != fwdSrc2Id { - t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id) + if lastID != fwdSrc2ID { + t.Fatalf("failed to get last update ID: %d, expected %d", lastID, fwdSrc2ID) } - lastId, err = finalStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid()) + lastID, err = finalStore.GetLastUpdateIDForUUID(src1Store.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != finalSrc1Id { - t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id) + if lastID != finalSrc1ID { + t.Fatalf("failed to get last update ID: %d, expected %d", lastID, finalSrc1ID) } - lastId, err = finalStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid()) + lastID, err = finalStore.GetLastUpdateIDForUUID(src2Store.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != finalSrc2Id { - t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id) + if lastID != finalSrc2ID { + t.Fatalf("failed to get last update ID: %d, expected %d", lastID, finalSrc2ID) } - lastId, err = finalStore.GetLastUpdateIdForUuid(fwdStore.GetHubUuid()) + lastID, err = finalStore.GetLastUpdateIDForUUID(fwdStore.GetHubUUID()) if err != nil { t.Fatalf("unexpected error: %v", err) } - if lastId != finalFwdId { - t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalFwdId) + if lastID != finalFwdID { + t.Fatalf("failed to get last update ID: %d, expected %d", lastID, finalFwdID) } } @@ -809,15 +809,15 @@ func BenchmarkGetUpdatesAfter(b *testing.B) { b.ResetTimer() - latestId := -1 + latestID := -1 for { - updates, err := store.GetUpdatesAfter(latestId, -1) + updates, err := store.GetUpdatesAfter(latestID, -1) if err != nil { b.Fatalf("failed to retrieve: %v", err) } if len(updates) == 0 { break } - latestId = findMaxId(updates) + latestID = findMaxID(updates) } } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 0a30162..28f1fc7 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -35,7 +35,7 @@ package sfive import "time" type Stream struct { - ContentId string `json:"content"` + ContentID string `json:"content"` Format string `json:"format"` Quality string `json:"quality"` } @@ -47,7 +47,7 @@ type Source struct { } type Client struct { - Ip string `json:"ip"` + IP string `json:"ip"` UserAgent string `json:"user-agent,omitempty"` BytesSent uint `json:"bytes-sent"` } @@ -67,10 +67,10 @@ type Update struct { type Header struct { Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API - SourceHubUuid string `json:"SourceHubUuid,omitempty"` - SourceHubUpdateId int `json:"SourceHubUpdateId,omitempty"` - ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` - ForwardHubUpdateId int `json:"ForwardHubUpdateId,omitempty"` + SourceHubUUID string `json:"SourceHubUuid,omitempty"` + SourceHubUpdateID int `json:"SourceHubUpdateId,omitempty"` + ForwardHubUUID string `json:"ForwardHubUuid,omitempty"` + ForwardHubUpdateID int `json:"ForwardHubUpdateId,omitempty"` } type UpdateFull struct { @@ -99,7 +99,7 @@ type WebErrorResponse struct { type WebHealthzResponse struct { Status string `json:"status"` - HubUuid string `json:"hub-uuid"` + HubUUID string `json:"hub-uuid"` } type WebHubsResponse struct { @@ -118,7 +118,7 @@ type WebUpdatesPostResponse struct { NumUpdates int `json:"num-updates"` } -type WebLastUpdateIdResponse struct { - HubUuid string `json:"hub-uuid"` - LastUpdateId int `json:"lastupdate"` +type WebLastUpdateIDResponse struct { + HubUUID string `json:"hub-uuid"` + LastUpdateID int `json:"lastupdate"` } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index 5234bb3..f4bf6e8 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -50,8 +50,8 @@ const ( // bucket names hubInfoBn = "HubInfo" latestUpdatesBn = "LatestUpdates" - hubUuidsFwdBn = "HubUUIDsFwd" - hubUuidsRevBn = "HubUUIDsRev" + hubUUIDsFwdBn = "HubUUIDsFwd" + hubUUIDsRevBn = "HubUUIDsRev" updatesBn = "Updates" sourcesFwdBn = "SourcesFwd" sourcesRevBn = "SourcesRev" @@ -60,28 +60,28 @@ const ( userAgentsRevBn = "UserAgentsRev" // well-known keys - hubUuidKey = "HubUUID" + hubUUIDKey = "HubUUID" storeVersionKey = "Version" ) // stored in sourcesRevBn -type streamDb struct { - ContentId string `json:"c"` +type streamDB struct { + ContentID string `json:"c"` Format string `json:"f"` Quality string `json:"q"` } -type sourceDb struct { +type sourceDB struct { Hostname string `json:"h"` - Stream streamDb `json:"s"` + Stream streamDB `json:"s"` Tags []string `json:"t"` } -func NewSourceDb(value UpdateFull) sourceDb { - return sourceDb{ +func NewSourceDB(value UpdateFull) sourceDB { + return sourceDB{ Hostname: value.Source.Hostname, - Stream: streamDb{ - ContentId: value.Source.Stream.ContentId, + Stream: streamDB{ + ContentID: value.Source.Stream.ContentID, Format: value.Source.Stream.Format, Quality: value.Source.Stream.Quality, }, @@ -89,30 +89,30 @@ func NewSourceDb(value UpdateFull) sourceDb { } } -func (s sourceDb) Slug() string { - return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ",")) +func (s sourceDB) Slug() string { + return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentID, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ",")) } -func (s *Source) CopyFromSourceDb(sdb sourceDb) { +func (s *Source) CopyFromSourceDB(sdb sourceDB) { s.Hostname = sdb.Hostname - s.Stream.ContentId = sdb.Stream.ContentId + s.Stream.ContentID = sdb.Stream.ContentID s.Stream.Format = sdb.Stream.Format s.Stream.Quality = sdb.Stream.Quality s.Tags = sdb.Tags } // stored in clientDataBn -type clientDataDb struct { - Ip string `json:"ip"` - UserAgentId int `json:"ua"` +type clientDataDB struct { + IP string `json:"ip"` + UserAgentID int `json:"ua"` BytesSent uint `json:"bs"` } // stored in dataUpdatesBn -type updateDb struct { - SourceHubId int `json:"h,omitempty"` - SourceHubUpdateId int `json:"hi,omitempty"` - SourceId int `json:"si"` +type updateDB struct { + SourceHubID int `json:"h,omitempty"` + SourceHubUpdateID int `json:"hi,omitempty"` + SourceID int `json:"si"` StartTime int64 `json:"st"` // unix timestamp in milliseconds Duration int64 `json:"du"` // duration in milliseconds ClientCount uint `json:"cc,omitempty"` @@ -120,10 +120,10 @@ type updateDb struct { BytesSent uint `json:"bs,omitempty"` } -func NewUpdateDb(uf UpdateFull) updateDb { - return updateDb{ +func NewUpdateDB(uf UpdateFull) updateDB { + return updateDB{ -1, - uf.SourceHubUpdateId, + uf.SourceHubUpdateID, -1, int64(uf.StartTime.Unix()*1000) + int64(uf.StartTime.Nanosecond()/1000000), uf.Duration, @@ -133,15 +133,15 @@ func NewUpdateDb(uf UpdateFull) updateDb { } } -func (uf *UpdateFull) CopyFromUpdateDb(udb updateDb, srcHubUuid, hubUuid string, id int) { - if srcHubUuid == "" { - uf.SourceHubUuid = hubUuid - uf.SourceHubUpdateId = id +func (uf *UpdateFull) CopyFromUpdateDB(udb updateDB, srcHubUUID, hubUUID string, id int) { + if srcHubUUID == "" { + uf.SourceHubUUID = hubUUID + uf.SourceHubUpdateID = id } else { - uf.SourceHubUuid = srcHubUuid - uf.SourceHubUpdateId = udb.SourceHubUpdateId - uf.ForwardHubUuid = hubUuid - uf.ForwardHubUpdateId = id + uf.SourceHubUUID = srcHubUUID + uf.SourceHubUpdateID = udb.SourceHubUpdateID + uf.ForwardHubUUID = hubUUID + uf.ForwardHubUpdateID = id } uf.StartTime = time.Unix((udb.StartTime / 1000), (udb.StartTime%1000)*1000000) |