From fda0f759703ecb394d504fee834b61b8cfb97bf6 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 10 May 2017 21:54:28 +0200 Subject: major variable name refactoring --- src/hub/src/spreadspace.org/sfive/s5cvt.go | 20 ++--- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 20 ++--- src/hub/src/spreadspace.org/sfive/s5srv.go | 12 +-- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 6 +- .../spreadspace.org/sfive/s5srvForwardGraphite.go | 6 +- .../src/spreadspace.org/sfive/s5srvForwardPiwik.go | 2 +- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 2 +- src/hub/src/spreadspace.org/sfive/s5store.go | 56 ++++++------- src/hub/src/spreadspace.org/sfive/s5store_test.go | 96 +++++++++++----------- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 58 ++++++------- src/hub/src/spreadspace.org/sfive/s5typesStore.go | 96 +++++++++++----------- 11 files changed, 187 insertions(+), 187 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 9db899c..8d84fbc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -47,7 +47,7 @@ const ( // type Decoder interface { - Decode() (DataUpdateFull, error) + Decode() (UpdateFull, error) Slug() string } @@ -64,7 +64,7 @@ func (sd *StatelessDecoder) Slug() string { return "" } -func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) { +func (sd *StatelessDecoder) Decode() (res UpdateFull, err error) { if err = sd.dec.Decode(&res); err != nil { return } @@ -78,7 +78,7 @@ func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) { type StatefulDecoder struct { dec *json.Decoder Header - SourceId + Source } func NewStatefulDecoder(r io.Reader) (Decoder, error) { @@ -94,13 +94,13 @@ func NewStatefulDecoder(r io.Reader) (Decoder, error) { } func (sd *StatefulDecoder) Slug() string { - s := sd.SourceId - return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality) + s := sd.Source + return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality) } -func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) { +func (sd *StatefulDecoder) Decode() (res UpdateFull, err error) { res.Version = sd.Version - res.CopyFromSourceId(&sd.SourceId) + res.CopyFromSource(&sd.Source) if err = sd.dec.Decode(&res); err != nil { return } @@ -115,7 +115,7 @@ func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) { // type Encoder interface { - Encode(data DataUpdateFull) error + Encode(data UpdateFull) error Slug() string } @@ -132,7 +132,7 @@ func (se *StatelessEncoder) Slug() string { return "" } -func (se *StatelessEncoder) Encode(data DataUpdateFull) error { +func (se *StatelessEncoder) Encode(data UpdateFull) error { data.Version = ProtocolVersion data.StartTime = data.StartTime.UTC() return se.enc.Encode(data) @@ -155,7 +155,7 @@ func (se *StatefulEncoder) Slug() string { return "" } -func (se *StatefulEncoder) Encode(data DataUpdateFull) error { +func (se *StatefulEncoder) Encode(data UpdateFull) error { data.Version = 0 // the init message took care of that data.StartTime = data.StartTime.UTC() return se.enc.Encode(data) diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index b44c628..722d675 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -43,14 +43,14 @@ import ( var ( initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]` - initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} + initDataStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}} updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000` - updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} + updateDataStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000} ) -func GetExpected() (expected DataUpdateFull) { +func GetExpected() (expected UpdateFull) { expected.Version = ProtocolVersion - expected.CopyFromSourceId(&initDataStruct) + expected.CopyFromSource(&initDataStruct) expected.CopyFromUpdate(&updateDataStruct) return } @@ -146,8 +146,8 @@ func TestDecodeStateful(t *testing.T) { } slug := dec.Slug() - expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.StreamId.ContentId, - initDataStruct.StreamId.Format, initDataStruct.StreamId.Quality}, "/") + expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.Stream.ContentId, + initDataStruct.Stream.Format, initDataStruct.Stream.Quality}, "/") if slug != expectedSlug { t.Fatalf("slug failed:\n actual: %v\n expected: %v\n", slug, expectedSlug) } @@ -155,8 +155,8 @@ func TestDecodeStateful(t *testing.T) { } func TestEncodeStateless(t *testing.T) { - var td DataUpdateFull - td.CopyFromSourceId(&initDataStruct) + var td UpdateFull + td.CopyFromSource(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) encoded := &bytes.Buffer{} @@ -184,8 +184,8 @@ func TestEncodeStateless(t *testing.T) { } func TestEncodeStateful(t *testing.T) { - var td DataUpdateFull - td.CopyFromSourceId(&initDataStruct) + var td UpdateFull + td.CopyFromSource(&initDataStruct) td.CopyFromUpdate(&updateDataStruct) encoded := &bytes.Buffer{} diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 75460d1..91ab598 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,17 +33,17 @@ package sfive type appendToken struct { - data DataUpdateFull + data UpdateFull response chan error } type appendManyToken struct { - data []DataUpdateFull + data []UpdateFull response chan error } type getUpdatesResult struct { - values []DataUpdateFull + values []UpdateFull err error } @@ -103,21 +103,21 @@ func (srv Server) appendActor() { } } -func (srv Server) Append(data DataUpdateFull) error { +func (srv Server) Append(data UpdateFull) error { token := appendToken{data: data, response: make(chan error, 1)} defer close(token.response) srv.appendChan <- token return <-token.response } -func (srv Server) AppendMany(data []DataUpdateFull) error { +func (srv Server) AppendMany(data []UpdateFull) error { token := appendManyToken{data: data, response: make(chan error, 1)} defer close(token.response) srv.appendManyChan <- token return <-token.response } -func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) { +func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) { token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)} defer close(token.response) srv.getUpdatesAfterChan <- token diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 19dbf16..11709a1 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -40,10 +40,10 @@ import ( "time" ) -func findMaxId(values []DataUpdateFull) int { +func findMaxId(values []UpdateFull) int { maxId := -1 for _, value := range values { - if id := value.SourceHubDataUpdateId; id > maxId { + if id := value.SourceHubUpdateId; id > maxId { maxId = id } } @@ -77,7 +77,7 @@ func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (last return } -func fwdWriteUpdates(updates []DataUpdateFull, pw *io.PipeWriter) { +func fwdWriteUpdates(updates []UpdateFull, pw *io.PipeWriter) { defer pw.Close() enc, err := NewStatefulEncoder(pw) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index b4ab8b2..8665f4c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -74,9 +74,9 @@ tryResync: metrics := make([]graphite.Metric, len(updates)*3) for i, update := range updates { path := basePath + "." + update.Hostname - path = path + "." + update.StreamId.ContentId - path = path + "." + update.StreamId.Format - path = path + "." + update.StreamId.Quality + path = path + "." + update.Stream.ContentId + path = path + "." + update.Stream.Format + path = path + "." + update.Stream.Quality metrics[i*3] = graphite.NewMetric(path+".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix()) metrics[i*3+1] = graphite.NewMetric(path+".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix()) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 9e7817a..6caa26f 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -97,7 +97,7 @@ tryResync: q := make(url.Values) q.Add("rec", "1") q.Add("idsite", strconv.FormatUint(uint64(siteID), 10)) - q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.StreamId.ContentId, update.StreamId.Format, update.StreamId.Quality, update.Hostname)) + q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Stream.ContentId, update.Stream.Format, update.Stream.Quality, update.Hostname)) q.Add("cip", ip) q.Add("cdt", strconv.FormatInt(update.StartTime.Unix(), 10)) q.Add("ua", client.UserAgent) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 962187f..29143c9 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -243,7 +243,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { return } - values := []DataUpdateFull{} + values := []UpdateFull{} for { value, err := decoder.Decode() if err != nil { diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 8dda6af..567cadc 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -49,7 +49,7 @@ const ( ) var ( - storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn, + storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, updatesBn, sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn} ) @@ -240,8 +240,8 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error return srcId, err } -func (st Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) { - b := tx.Bucket([]byte(dataUpdatesBn)) +func (st Store) insertUpdate(tx *bolt.Tx, du updateDb) (duId int, err error) { + b := tx.Bucket([]byte(updatesBn)) b.FillPercent = 1.0 // we only do appends next, _ := b.NextSequence() @@ -278,7 +278,7 @@ func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error) return uaId, err } -func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error { +func (st Store) insertClient(tx *bolt.Tx, duId int, cd []Client) error { if len(cd) == 0 { return nil } @@ -315,8 +315,8 @@ func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error { // Split up the multidimensional dataupdate and append all the key-value pairs -func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) { - du := NewDataUpdateDb(update) +func (st Store) appendItem(tx *bolt.Tx, update UpdateFull) (duId int, err error) { + du := NewUpdateDb(update) src := NewSourceDb(update) if du.SourceHubId, err = st.insertNewHub(tx, update.SourceHubUuid); err != nil { @@ -325,25 +325,25 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er if du.SourceId, err = st.insertNewSource(tx, src); err != nil { return } - if duId, err = st.insertDataUpdate(tx, du); err != nil { + if duId, err = st.insertUpdate(tx, du); err != nil { return } - if err = st.insertClientData(tx, duId, update.Data.Clients); err != nil { + if err = st.insertClient(tx, duId, update.Data.Clients); err != nil { return } if update.SourceHubUuid != "" { - err = st.setLastUpdateForUuid(tx, update.SourceHubUuid, du.SourceHubDataUpdateId) + err = st.setLastUpdateForUuid(tx, update.SourceHubUuid, du.SourceHubUpdateId) } if update.ForwardHubUuid != "" { - err = st.setLastUpdateForUuid(tx, update.ForwardHubUuid, update.ForwardHubDataUpdateId) + err = st.setLastUpdateForUuid(tx, update.ForwardHubUuid, update.ForwardHubUpdateId) } return } // Public Append Interface -func (st Store) AppendMany(updates []DataUpdateFull) (err error) { +func (st Store) AppendMany(updates []UpdateFull) (err error) { if st.readOnly { return ErrReadOnly } @@ -357,8 +357,8 @@ func (st Store) AppendMany(updates []DataUpdateFull) (err error) { }) } -func (st Store) Append(update DataUpdateFull) (err error) { - return st.AppendMany([]DataUpdateFull{update}) +func (st Store) Append(update UpdateFull) (err error) { + return st.AppendMany([]UpdateFull{update}) } // @@ -390,7 +390,7 @@ func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) { return } -func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { +func (st Store) getClients(tx *bolt.Tx, id int) (res []Client, err error) { bc := tx.Bucket([]byte(clientDataBn)) bu := tx.Bucket([]byte(userAgentsRevBn)) @@ -403,7 +403,7 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { return } for _, c := range data { - cd := ClientData{Ip: c.Ip, BytesSent: c.BytesSent} + cd := Client{Ip: c.Ip, BytesSent: c.BytesSent} ua := bu.Get(itob(c.UserAgentId)) if ua != nil { cd.UserAgent = string(ua) @@ -415,8 +415,8 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) { // fetch all the key-value pairs and merge them into the multidimensional dataupdate -func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) { - res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) +func (st Store) fetchItem(tx *bolt.Tx, duId int, du updateDb) (res UpdateFull, err error) { + res.CopyFromUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId) var src sourceDb if src, err = st.getSource(tx, du.SourceId); err != nil { return @@ -430,8 +430,8 @@ func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdat // Public Fetch Interface -func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) { - res = []DataUpdateFull{} +func (st Store) GetUpdatesAfter(id, limit int) (res []UpdateFull, err error) { + res = []UpdateFull{} if id < 0 { // TODO: interpret negative values as last x values id = 0 } @@ -442,7 +442,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) limit = StoreGetUpdatesLimit } err = st.db.View(func(tx *bolt.Tx) error { - c := tx.Bucket([]byte(dataUpdatesBn)).Cursor() + c := tx.Bucket([]byte(updatesBn)).Cursor() k, v := c.Seek(itob(id)) if k == nil { return nil @@ -451,7 +451,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) k, v = c.Next() } for ; k != nil; k, v = c.Next() { - var d dataUpdateDb + var d updateDb if err := json.Unmarshal(v, &d); err != nil { return err } @@ -470,16 +470,16 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) return } -func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) { +func (st Store) GetUpdate(id int) (res UpdateFull, err error) { err = st.db.View(func(tx *bolt.Tx) error { - b := tx.Bucket([]byte(dataUpdatesBn)) + b := tx.Bucket([]byte(updatesBn)) jsonData := b.Get(itob(id)) if jsonData == nil { return ErrNotFound } - var d dataUpdateDb + var d updateDb if err := json.Unmarshal(jsonData, &d); err != nil { return err } @@ -503,7 +503,7 @@ func (st Store) GetHubUuid() string { func (st Store) GetLastUpdateId() (updateId int, err error) { err = st.db.View(func(tx *bolt.Tx) error { - updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence()) + updateId = int(tx.Bucket([]byte(updatesBn)).Sequence()) return nil }) return @@ -537,8 +537,8 @@ func (st Store) GetHubs() (res []string, err error) { return } -func (st Store) GetSources() (res []SourceId, err error) { - res = []SourceId{} +func (st Store) GetSources() (res []Source, err error) { + res = []Source{} err = st.db.View(func(tx *bolt.Tx) error { c := tx.Bucket([]byte(sourcesRevBn)).Cursor() for k, v := c.First(); k != nil; k, v = c.Next() { @@ -546,7 +546,7 @@ func (st Store) GetSources() (res []SourceId, err error) { if err := json.Unmarshal(v, &s); err != nil { return err } - var src SourceId + var src Source src.CopyFromSourceDb(s) res = append(res, src) } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index 8b7c385..3bc912b 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -50,18 +50,18 @@ var ( testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt" testBoltPathFinal = "/run/s5hub_testing_db_final.bolt" - streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"} - sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData} - updateData = DataUpdate{Data: SourceData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000} - clientsData = []ClientData{ - ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, - ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, - ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} + streamIdData = Stream{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"} + sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIdData} + updateData = Update{Data: UpdateData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000} + clientsData = []Client{ + Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, + Client{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, + Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400}, + Client{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}, + Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}} ) -func generateTestData(n int) ([]DataUpdateFull, int) { +func generateTestData(n int) ([]UpdateFull, int) { hostnames := []string{"streamer1", "streamer2"} contents := []string{"av", "audio"} formats := []string{"webm", "flash", "hls"} @@ -74,27 +74,27 @@ func generateTestData(n int) ([]DataUpdateFull, int) { if n < 0 { n = numSrcs } - var data []DataUpdateFull + var data []UpdateFull for i := 0; i < n; i += numSrcs { for _, hostname := range hostnames { for _, content := range contents { for _, format := range formats { for _, quality := range qualities { - d := DataUpdateFull{} + d := UpdateFull{} d.Version = ProtocolVersion - d.SourceId.Hostname = hostname - d.SourceId.StreamId.ContentId = content - d.SourceId.StreamId.Format = format - d.SourceId.StreamId.Quality = quality - d.SourceId.Tags = tags + d.Source.Hostname = hostname + d.Source.Stream.ContentId = content + d.Source.Stream.Format = format + d.Source.Stream.Quality = quality + d.Source.Tags = tags - d.DataUpdate.StartTime = starttime - d.DataUpdate.Duration = duration - d.DataUpdate.Data.ClientCount = uint(len(clientsData)) - d.DataUpdate.Data.BytesSent = 6400 * uint(len(clientsData)) + d.Update.StartTime = starttime + d.Update.Duration = duration + d.Update.Data.ClientCount = uint(len(clientsData)) + d.Update.Data.BytesSent = 6400 * uint(len(clientsData)) - d.DataUpdate.Data.Clients = clientsData + d.Update.Data.Clients = clientsData data = append(data, d) } } @@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("failed to append update: %v", err) @@ -323,21 +323,21 @@ func TestAppendAndFetch(t *testing.T) { expected := in expected.SourceHubUuid = store.GetHubUuid() - expected.SourceHubDataUpdateId = 1 + expected.SourceHubUpdateId = 1 expected.ForwardHubUuid = "" - expected.ForwardHubDataUpdateId = 0 + expected.ForwardHubUpdateId = 0 if !reflect.DeepEqual(expected, out) { t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) } // append many - var ins []DataUpdateFull + var ins []UpdateFull upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) - ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) + ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) upd.Data.Clients = nil - ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) + ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}) if err = store.AppendMany(ins); err != nil { t.Fatalf("failed to append update: %v", err) } @@ -351,9 +351,9 @@ func TestAppendAndFetch(t *testing.T) { out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder expected = ins[i] expected.SourceHubUuid = store.GetHubUuid() - expected.SourceHubDataUpdateId = i + 2 + expected.SourceHubUpdateId = i + 2 expected.ForwardHubUuid = "" - expected.ForwardHubDataUpdateId = 0 + expected.ForwardHubUpdateId = 0 if !reflect.DeepEqual(expected, out) { t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected) @@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) { upd := updateData upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } @@ -414,17 +414,17 @@ func TestGetUpdatesAfter(t *testing.T) { upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - expected := []DataUpdateFull{} + expected := []UpdateFull{} for i := 0; i < 3; i = i + 1 { - in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } e := in e.SourceHubUuid = store.hubUuid - e.SourceHubDataUpdateId = i + 1 + e.SourceHubUpdateId = i + 1 e.ForwardHubUuid = "" - e.ForwardHubDataUpdateId = 0 + e.ForwardHubUpdateId = 0 expected = append(expected, e) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) } @@ -497,7 +497,7 @@ func TestGetUpdatesAfter(t *testing.T) { } } -func TestForwardedDataUpdates(t *testing.T) { +func TestForwardedUpdates(t *testing.T) { // prepare a new store os.Remove(testBoltPath) store, err := NewStore(testBoltPath, false) @@ -540,17 +540,17 @@ func TestForwardedDataUpdates(t *testing.T) { upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC) upd.Data.Clients = clientsData - expected := []DataUpdateFull{} + expected := []UpdateFull{} for i := 0; i < 3; i = i + 1 { - in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} + in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd} in.SourceHubUuid = forwardedHub - in.SourceHubDataUpdateId = 3 - i // out of order + in.SourceHubUpdateId = 3 - i // out of order if err = store.Append(in); err != nil { t.Fatalf("unexpected error: %v", err) } myLastId = myLastId + 1 in.ForwardHubUuid = store.GetHubUuid() - in.ForwardHubDataUpdateId = myLastId + in.ForwardHubUpdateId = myLastId expected = append(expected, in) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) } @@ -597,7 +597,7 @@ func TestForwardedDataUpdates(t *testing.T) { } } -func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { +func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -635,7 +635,7 @@ func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, fi } } -func TestForwardedDataUpdates2(t *testing.T) { +func TestForwardedUpdates2(t *testing.T) { // prepare 4 new stores os.Remove(testBoltPath) src1Store, err := NewStore(testBoltPath, false) @@ -676,7 +676,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0) // forward 5 updates from src1 to fwd if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil { @@ -687,7 +687,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0) // forward 3 updates from src2 to fwd if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil { @@ -698,7 +698,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0) // forward 7 updates from fwd to final if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil { @@ -709,7 +709,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7) // forward remaining updates from src1 and src2 to fwd if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil { @@ -726,7 +726,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7) // forward remainging from fwd to final if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil { @@ -737,7 +737,7 @@ func TestForwardedDataUpdates2(t *testing.T) { } // check last updates so far - checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17) + checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17) } func TestGetSources(t *testing.T) { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index 6220c89..5d36e0d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -34,61 +34,61 @@ package sfive import "time" -type StreamId struct { +type Stream struct { ContentId string `json:"content-id"` Format string `json:"format"` Quality string `json:"quality"` } -type SourceId struct { +type Source struct { Hostname string `json:"hostname"` - StreamId StreamId `json:"streamer-id"` + Stream Stream `json:"streamer-id"` Tags []string `json:"tags,omitempty"` } -type ClientData struct { +type Client struct { Ip string `json:"ip"` UserAgent string `json:"user-agent,omitempty"` BytesSent uint `json:"bytes-sent"` } -type SourceData struct { - ClientCount uint `json:"client-count"` - BytesReceived uint `json:"bytes-received,omitempty"` - BytesSent uint `json:"bytes-sent"` - Clients []ClientData `json:"clients,omitempty"` +type UpdateData struct { + ClientCount uint `json:"client-count"` + BytesReceived uint `json:"bytes-received,omitempty"` + BytesSent uint `json:"bytes-sent"` + Clients []Client `json:"clients,omitempty"` } -type DataUpdate struct { +type Update struct { StartTime time.Time `json:"start-time"` Duration int64 `json:"duration-ms"` - Data SourceData `json:"data"` + Data UpdateData `json:"data"` } type Header struct { - Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API - SourceHubUuid string `json:"SourceHubUuid,omitempty"` - SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"` - ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` - ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"` + Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API + SourceHubUuid string `json:"SourceHubUuid,omitempty"` + SourceHubUpdateId int `json:"SourceHubDataUpdateId,omitempty"` + ForwardHubUuid string `json:"ForwardHubUuid,omitempty"` + ForwardHubUpdateId int `json:"ForwardHubDataUpdateId,omitempty"` } -type DataUpdateFull struct { +type UpdateFull struct { Header - SourceId - DataUpdate + Source + Update } -func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) { - duf.Hostname = src.Hostname - duf.StreamId = src.StreamId - duf.Tags = src.Tags +func (uf *UpdateFull) CopyFromSource(src *Source) { + uf.Hostname = src.Hostname + uf.Stream = src.Stream + uf.Tags = src.Tags } -func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) { - duf.StartTime = du.StartTime - duf.Duration = du.Duration - duf.Data = du.Data +func (uf *UpdateFull) CopyFromUpdate(u *Update) { + uf.StartTime = u.StartTime + uf.Duration = u.Duration + uf.Data = u.Data } // web @@ -107,11 +107,11 @@ type WebHubsResponse struct { } type WebSourcesResponse struct { - Sources []SourceId `json:"sources"` + Sources []Source `json:"sources"` } type WebUpdatesGetResponse struct { - Updates []DataUpdateFull `json:"updates"` + Updates []UpdateFull `json:"updates"` } type WebUpdatesPostResponse struct { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go index 41fb29a..5234bb3 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go @@ -52,7 +52,7 @@ const ( latestUpdatesBn = "LatestUpdates" hubUuidsFwdBn = "HubUUIDsFwd" hubUuidsRevBn = "HubUUIDsRev" - dataUpdatesBn = "DataUpdates" + updatesBn = "Updates" sourcesFwdBn = "SourcesFwd" sourcesRevBn = "SourcesRev" clientDataBn = "ClientData" @@ -65,40 +65,40 @@ const ( ) // stored in sourcesRevBn -type streamIdDb struct { +type streamDb struct { ContentId string `json:"c"` Format string `json:"f"` Quality string `json:"q"` } type sourceDb struct { - Hostname string `json:"h"` - StreamId streamIdDb `json:"s"` - Tags []string `json:"t"` + Hostname string `json:"h"` + Stream streamDb `json:"s"` + Tags []string `json:"t"` } -func NewSourceDb(value DataUpdateFull) sourceDb { +func NewSourceDb(value UpdateFull) sourceDb { return sourceDb{ - Hostname: value.SourceId.Hostname, - StreamId: streamIdDb{ - ContentId: value.SourceId.StreamId.ContentId, - Format: value.SourceId.StreamId.Format, - Quality: value.SourceId.StreamId.Quality, + Hostname: value.Source.Hostname, + Stream: streamDb{ + ContentId: value.Source.Stream.ContentId, + Format: value.Source.Stream.Format, + Quality: value.Source.Stream.Quality, }, - Tags: value.SourceId.Tags, + Tags: value.Source.Tags, } } func (s sourceDb) Slug() string { - return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ",")) + return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ",")) } -func (s *SourceId) CopyFromSourceDb(v sourceDb) { - s.Hostname = v.Hostname - s.StreamId.ContentId = v.StreamId.ContentId - s.StreamId.Format = v.StreamId.Format - s.StreamId.Quality = v.StreamId.Quality - s.Tags = v.Tags +func (s *Source) CopyFromSourceDb(sdb sourceDb) { + s.Hostname = sdb.Hostname + s.Stream.ContentId = sdb.Stream.ContentId + s.Stream.Format = sdb.Stream.Format + s.Stream.Quality = sdb.Stream.Quality + s.Tags = sdb.Tags } // stored in clientDataBn @@ -109,46 +109,46 @@ type clientDataDb struct { } // stored in dataUpdatesBn -type dataUpdateDb struct { - SourceHubId int `json:"h,omitempty"` - SourceHubDataUpdateId int `json:"hi,omitempty"` - SourceId int `json:"si"` - StartTime int64 `json:"st"` // unix timestamp in milliseconds - Duration int64 `json:"du"` // duration in milliseconds - ClientCount uint `json:"cc,omitempty"` - BytesReceived uint `json:"br,omitempty"` - BytesSent uint `json:"bs,omitempty"` +type updateDb struct { + SourceHubId int `json:"h,omitempty"` + SourceHubUpdateId int `json:"hi,omitempty"` + SourceId int `json:"si"` + StartTime int64 `json:"st"` // unix timestamp in milliseconds + Duration int64 `json:"du"` // duration in milliseconds + ClientCount uint `json:"cc,omitempty"` + BytesReceived uint `json:"br,omitempty"` + BytesSent uint `json:"bs,omitempty"` } -func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb { - return dataUpdateDb{ +func NewUpdateDb(uf UpdateFull) updateDb { + return updateDb{ -1, - v.SourceHubDataUpdateId, + uf.SourceHubUpdateId, -1, - int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000), - v.Duration, - v.Data.ClientCount, - v.Data.BytesReceived, - v.Data.BytesSent, + int64(uf.StartTime.Unix()*1000) + int64(uf.StartTime.Nanosecond()/1000000), + uf.Duration, + uf.Data.ClientCount, + uf.Data.BytesReceived, + uf.Data.BytesSent, } } -func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) { +func (uf *UpdateFull) CopyFromUpdateDb(udb updateDb, srcHubUuid, hubUuid string, id int) { if srcHubUuid == "" { - s.SourceHubUuid = hubUuid - s.SourceHubDataUpdateId = id + uf.SourceHubUuid = hubUuid + uf.SourceHubUpdateId = id } else { - s.SourceHubUuid = srcHubUuid - s.SourceHubDataUpdateId = v.SourceHubDataUpdateId - s.ForwardHubUuid = hubUuid - s.ForwardHubDataUpdateId = id + uf.SourceHubUuid = srcHubUuid + uf.SourceHubUpdateId = udb.SourceHubUpdateId + uf.ForwardHubUuid = hubUuid + uf.ForwardHubUpdateId = id } - s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000) - s.Duration = v.Duration - s.Data.ClientCount = v.ClientCount - s.Data.BytesReceived = v.BytesReceived - s.Data.BytesSent = v.BytesSent + uf.StartTime = time.Unix((udb.StartTime / 1000), (udb.StartTime%1000)*1000000) + uf.Duration = udb.Duration + uf.Data.ClientCount = udb.ClientCount + uf.Data.BytesReceived = udb.BytesReceived + uf.Data.BytesSent = udb.BytesSent } func itob(v int) []byte { -- cgit v1.2.3