summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-10 21:54:28 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-10 21:54:28 +0200
commitfda0f759703ecb394d504fee834b61b8cfb97bf6 (patch)
tree203310454f642ddd42901860c04d0465fd22c6dc /src
parentsome cleanup @ forwarder (diff)
major variable name refactoring
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go20
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go20
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go56
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go96
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go58
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesStore.go96
11 files changed, 187 insertions, 187 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 9db899c..8d84fbc 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -47,7 +47,7 @@ const (
//
type Decoder interface {
- Decode() (DataUpdateFull, error)
+ Decode() (UpdateFull, error)
Slug() string
}
@@ -64,7 +64,7 @@ func (sd *StatelessDecoder) Slug() string {
return ""
}
-func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) {
+func (sd *StatelessDecoder) Decode() (res UpdateFull, err error) {
if err = sd.dec.Decode(&res); err != nil {
return
}
@@ -78,7 +78,7 @@ func (sd *StatelessDecoder) Decode() (res DataUpdateFull, err error) {
type StatefulDecoder struct {
dec *json.Decoder
Header
- SourceId
+ Source
}
func NewStatefulDecoder(r io.Reader) (Decoder, error) {
@@ -94,13 +94,13 @@ func NewStatefulDecoder(r io.Reader) (Decoder, error) {
}
func (sd *StatefulDecoder) Slug() string {
- s := sd.SourceId
- return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality)
+ s := sd.Source
+ return fmt.Sprintf("%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality)
}
-func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) {
+func (sd *StatefulDecoder) Decode() (res UpdateFull, err error) {
res.Version = sd.Version
- res.CopyFromSourceId(&sd.SourceId)
+ res.CopyFromSource(&sd.Source)
if err = sd.dec.Decode(&res); err != nil {
return
}
@@ -115,7 +115,7 @@ func (sd *StatefulDecoder) Decode() (res DataUpdateFull, err error) {
//
type Encoder interface {
- Encode(data DataUpdateFull) error
+ Encode(data UpdateFull) error
Slug() string
}
@@ -132,7 +132,7 @@ func (se *StatelessEncoder) Slug() string {
return ""
}
-func (se *StatelessEncoder) Encode(data DataUpdateFull) error {
+func (se *StatelessEncoder) Encode(data UpdateFull) error {
data.Version = ProtocolVersion
data.StartTime = data.StartTime.UTC()
return se.enc.Encode(data)
@@ -155,7 +155,7 @@ func (se *StatefulEncoder) Slug() string {
return ""
}
-func (se *StatefulEncoder) Encode(data DataUpdateFull) error {
+func (se *StatefulEncoder) Encode(data UpdateFull) error {
data.Version = 0 // the init message took care of that
data.StartTime = data.StartTime.UTC()
return se.enc.Encode(data)
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index b44c628..722d675 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -43,14 +43,14 @@ import (
var (
initDataEncoded = `"hostname": "localhost", "streamer-id": {"quality": "low", "content-id": "av", "format": "webm"}, "tags": ["elevate", "2014"]`
- initDataStruct = SourceId{Hostname: "localhost", StreamId: StreamId{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
+ initDataStruct = Source{Hostname: "localhost", Stream: Stream{Quality: "low", ContentId: "av", Format: "webm"}, Tags: []string{"elevate", "2014"}}
updateDataEncoded = `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": 5000`
- updateDataStruct = DataUpdate{Data: SourceData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
+ updateDataStruct = Update{Data: UpdateData{BytesSent: 1, ClientCount: 3, BytesReceived: 1}, StartTime: time.Date(2014, time.August, 24, 14, 35, 33, 847282000, time.UTC), Duration: 5000}
)
-func GetExpected() (expected DataUpdateFull) {
+func GetExpected() (expected UpdateFull) {
expected.Version = ProtocolVersion
- expected.CopyFromSourceId(&initDataStruct)
+ expected.CopyFromSource(&initDataStruct)
expected.CopyFromUpdate(&updateDataStruct)
return
}
@@ -146,8 +146,8 @@ func TestDecodeStateful(t *testing.T) {
}
slug := dec.Slug()
- expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.StreamId.ContentId,
- initDataStruct.StreamId.Format, initDataStruct.StreamId.Quality}, "/")
+ expectedSlug := strings.Join([]string{initDataStruct.Hostname, initDataStruct.Stream.ContentId,
+ initDataStruct.Stream.Format, initDataStruct.Stream.Quality}, "/")
if slug != expectedSlug {
t.Fatalf("slug failed:\n actual: %v\n expected: %v\n", slug, expectedSlug)
}
@@ -155,8 +155,8 @@ func TestDecodeStateful(t *testing.T) {
}
func TestEncodeStateless(t *testing.T) {
- var td DataUpdateFull
- td.CopyFromSourceId(&initDataStruct)
+ var td UpdateFull
+ td.CopyFromSource(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
encoded := &bytes.Buffer{}
@@ -184,8 +184,8 @@ func TestEncodeStateless(t *testing.T) {
}
func TestEncodeStateful(t *testing.T) {
- var td DataUpdateFull
- td.CopyFromSourceId(&initDataStruct)
+ var td UpdateFull
+ td.CopyFromSource(&initDataStruct)
td.CopyFromUpdate(&updateDataStruct)
encoded := &bytes.Buffer{}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 75460d1..91ab598 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -33,17 +33,17 @@
package sfive
type appendToken struct {
- data DataUpdateFull
+ data UpdateFull
response chan error
}
type appendManyToken struct {
- data []DataUpdateFull
+ data []UpdateFull
response chan error
}
type getUpdatesResult struct {
- values []DataUpdateFull
+ values []UpdateFull
err error
}
@@ -103,21 +103,21 @@ func (srv Server) appendActor() {
}
}
-func (srv Server) Append(data DataUpdateFull) error {
+func (srv Server) Append(data UpdateFull) error {
token := appendToken{data: data, response: make(chan error, 1)}
defer close(token.response)
srv.appendChan <- token
return <-token.response
}
-func (srv Server) AppendMany(data []DataUpdateFull) error {
+func (srv Server) AppendMany(data []UpdateFull) error {
token := appendManyToken{data: data, response: make(chan error, 1)}
defer close(token.response)
srv.appendManyChan <- token
return <-token.response
}
-func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) {
+func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) {
token := getUpdatesAfterToken{id: id, limit: limit, response: make(chan getUpdatesResult, 1)}
defer close(token.response)
srv.getUpdatesAfterChan <- token
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 19dbf16..11709a1 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -40,10 +40,10 @@ import (
"time"
)
-func findMaxId(values []DataUpdateFull) int {
+func findMaxId(values []UpdateFull) int {
maxId := -1
for _, value := range values {
- if id := value.SourceHubDataUpdateId; id > maxId {
+ if id := value.SourceHubUpdateId; id > maxId {
maxId = id
}
}
@@ -77,7 +77,7 @@ func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (last
return
}
-func fwdWriteUpdates(updates []DataUpdateFull, pw *io.PipeWriter) {
+func fwdWriteUpdates(updates []UpdateFull, pw *io.PipeWriter) {
defer pw.Close()
enc, err := NewStatefulEncoder(pw)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index b4ab8b2..8665f4c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -74,9 +74,9 @@ tryResync:
metrics := make([]graphite.Metric, len(updates)*3)
for i, update := range updates {
path := basePath + "." + update.Hostname
- path = path + "." + update.StreamId.ContentId
- path = path + "." + update.StreamId.Format
- path = path + "." + update.StreamId.Quality
+ path = path + "." + update.Stream.ContentId
+ path = path + "." + update.Stream.Format
+ path = path + "." + update.Stream.Quality
metrics[i*3] = graphite.NewMetric(path+".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix())
metrics[i*3+1] = graphite.NewMetric(path+".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix())
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 9e7817a..6caa26f 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -97,7 +97,7 @@ tryResync:
q := make(url.Values)
q.Add("rec", "1")
q.Add("idsite", strconv.FormatUint(uint64(siteID), 10))
- q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.StreamId.ContentId, update.StreamId.Format, update.StreamId.Quality, update.Hostname))
+ q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Stream.ContentId, update.Stream.Format, update.Stream.Quality, update.Hostname))
q.Add("cip", ip)
q.Add("cdt", strconv.FormatInt(update.StartTime.Unix(), 10))
q.Add("ua", client.UserAgent)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 962187f..29143c9 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -243,7 +243,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- values := []DataUpdateFull{}
+ values := []UpdateFull{}
for {
value, err := decoder.Decode()
if err != nil {
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 8dda6af..567cadc 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -49,7 +49,7 @@ const (
)
var (
- storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, dataUpdatesBn,
+ storeBuckets = []string{latestUpdatesBn, hubUuidsFwdBn, hubUuidsRevBn, updatesBn,
sourcesFwdBn, sourcesRevBn, clientDataBn, userAgentsFwdBn, userAgentsRevBn}
)
@@ -240,8 +240,8 @@ func (st Store) insertNewSource(tx *bolt.Tx, src sourceDb) (srcId int, err error
return srcId, err
}
-func (st Store) insertDataUpdate(tx *bolt.Tx, du dataUpdateDb) (duId int, err error) {
- b := tx.Bucket([]byte(dataUpdatesBn))
+func (st Store) insertUpdate(tx *bolt.Tx, du updateDb) (duId int, err error) {
+ b := tx.Bucket([]byte(updatesBn))
b.FillPercent = 1.0 // we only do appends
next, _ := b.NextSequence()
@@ -278,7 +278,7 @@ func (st Store) insertNewUserAgent(tx *bolt.Tx, ua string) (uaId int, err error)
return uaId, err
}
-func (st Store) insertClientData(tx *bolt.Tx, duId int, cd []ClientData) error {
+func (st Store) insertClient(tx *bolt.Tx, duId int, cd []Client) error {
if len(cd) == 0 {
return nil
}
@@ -315,8 +315,8 @@ func (st Store) setLastUpdateForUuid(tx *bolt.Tx, uuid string, duId int) error {
// Split up the multidimensional dataupdate and append all the key-value pairs
-func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err error) {
- du := NewDataUpdateDb(update)
+func (st Store) appendItem(tx *bolt.Tx, update UpdateFull) (duId int, err error) {
+ du := NewUpdateDb(update)
src := NewSourceDb(update)
if du.SourceHubId, err = st.insertNewHub(tx, update.SourceHubUuid); err != nil {
@@ -325,25 +325,25 @@ func (st Store) appendItem(tx *bolt.Tx, update DataUpdateFull) (duId int, err er
if du.SourceId, err = st.insertNewSource(tx, src); err != nil {
return
}
- if duId, err = st.insertDataUpdate(tx, du); err != nil {
+ if duId, err = st.insertUpdate(tx, du); err != nil {
return
}
- if err = st.insertClientData(tx, duId, update.Data.Clients); err != nil {
+ if err = st.insertClient(tx, duId, update.Data.Clients); err != nil {
return
}
if update.SourceHubUuid != "" {
- err = st.setLastUpdateForUuid(tx, update.SourceHubUuid, du.SourceHubDataUpdateId)
+ err = st.setLastUpdateForUuid(tx, update.SourceHubUuid, du.SourceHubUpdateId)
}
if update.ForwardHubUuid != "" {
- err = st.setLastUpdateForUuid(tx, update.ForwardHubUuid, update.ForwardHubDataUpdateId)
+ err = st.setLastUpdateForUuid(tx, update.ForwardHubUuid, update.ForwardHubUpdateId)
}
return
}
// Public Append Interface
-func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
+func (st Store) AppendMany(updates []UpdateFull) (err error) {
if st.readOnly {
return ErrReadOnly
}
@@ -357,8 +357,8 @@ func (st Store) AppendMany(updates []DataUpdateFull) (err error) {
})
}
-func (st Store) Append(update DataUpdateFull) (err error) {
- return st.AppendMany([]DataUpdateFull{update})
+func (st Store) Append(update UpdateFull) (err error) {
+ return st.AppendMany([]UpdateFull{update})
}
//
@@ -390,7 +390,7 @@ func (st Store) getSource(tx *bolt.Tx, id int) (res sourceDb, err error) {
return
}
-func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
+func (st Store) getClients(tx *bolt.Tx, id int) (res []Client, err error) {
bc := tx.Bucket([]byte(clientDataBn))
bu := tx.Bucket([]byte(userAgentsRevBn))
@@ -403,7 +403,7 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
return
}
for _, c := range data {
- cd := ClientData{Ip: c.Ip, BytesSent: c.BytesSent}
+ cd := Client{Ip: c.Ip, BytesSent: c.BytesSent}
ua := bu.Get(itob(c.UserAgentId))
if ua != nil {
cd.UserAgent = string(ua)
@@ -415,8 +415,8 @@ func (st Store) getClients(tx *bolt.Tx, id int) (res []ClientData, err error) {
// fetch all the key-value pairs and merge them into the multidimensional dataupdate
-func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdateFull, err error) {
- res.CopyFromDataUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
+func (st Store) fetchItem(tx *bolt.Tx, duId int, du updateDb) (res UpdateFull, err error) {
+ res.CopyFromUpdateDb(du, st.getHub(tx, du.SourceHubId), st.hubUuid, duId)
var src sourceDb
if src, err = st.getSource(tx, du.SourceId); err != nil {
return
@@ -430,8 +430,8 @@ func (st Store) fetchItem(tx *bolt.Tx, duId int, du dataUpdateDb) (res DataUpdat
// Public Fetch Interface
-func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) {
- res = []DataUpdateFull{}
+func (st Store) GetUpdatesAfter(id, limit int) (res []UpdateFull, err error) {
+ res = []UpdateFull{}
if id < 0 { // TODO: interpret negative values as last x values
id = 0
}
@@ -442,7 +442,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error)
limit = StoreGetUpdatesLimit
}
err = st.db.View(func(tx *bolt.Tx) error {
- c := tx.Bucket([]byte(dataUpdatesBn)).Cursor()
+ c := tx.Bucket([]byte(updatesBn)).Cursor()
k, v := c.Seek(itob(id))
if k == nil {
return nil
@@ -451,7 +451,7 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error)
k, v = c.Next()
}
for ; k != nil; k, v = c.Next() {
- var d dataUpdateDb
+ var d updateDb
if err := json.Unmarshal(v, &d); err != nil {
return err
}
@@ -470,16 +470,16 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error)
return
}
-func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
+func (st Store) GetUpdate(id int) (res UpdateFull, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
- b := tx.Bucket([]byte(dataUpdatesBn))
+ b := tx.Bucket([]byte(updatesBn))
jsonData := b.Get(itob(id))
if jsonData == nil {
return ErrNotFound
}
- var d dataUpdateDb
+ var d updateDb
if err := json.Unmarshal(jsonData, &d); err != nil {
return err
}
@@ -503,7 +503,7 @@ func (st Store) GetHubUuid() string {
func (st Store) GetLastUpdateId() (updateId int, err error) {
err = st.db.View(func(tx *bolt.Tx) error {
- updateId = int(tx.Bucket([]byte(dataUpdatesBn)).Sequence())
+ updateId = int(tx.Bucket([]byte(updatesBn)).Sequence())
return nil
})
return
@@ -537,8 +537,8 @@ func (st Store) GetHubs() (res []string, err error) {
return
}
-func (st Store) GetSources() (res []SourceId, err error) {
- res = []SourceId{}
+func (st Store) GetSources() (res []Source, err error) {
+ res = []Source{}
err = st.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket([]byte(sourcesRevBn)).Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
@@ -546,7 +546,7 @@ func (st Store) GetSources() (res []SourceId, err error) {
if err := json.Unmarshal(v, &s); err != nil {
return err
}
- var src SourceId
+ var src Source
src.CopyFromSourceDb(s)
res = append(res, src)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index 8b7c385..3bc912b 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -50,18 +50,18 @@ var (
testBoltPathFwd = "/run/s5hub_testing_db_fwd.bolt"
testBoltPathFinal = "/run/s5hub_testing_db_final.bolt"
- streamIdData = StreamId{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"}
- sourceData = SourceId{Hostname: "streamer", Tags: []string{"tag1", "master"}, StreamId: streamIdData}
- updateData = DataUpdate{Data: SourceData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000}
- clientsData = []ClientData{
- ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- ClientData{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- ClientData{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
- ClientData{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
- ClientData{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
+ streamIdData = Stream{ContentId: "talkingheads", Format: "7bitascii", Quality: "high"}
+ sourceData = Source{Hostname: "streamer", Tags: []string{"tag1", "master"}, Stream: streamIdData}
+ updateData = Update{Data: UpdateData{ClientCount: 3, BytesReceived: 42, BytesSent: 136}, Duration: 5000}
+ clientsData = []Client{
+ Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
+ Client{"10.12.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
+ Client{"127.0.0.1", "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:53.0) Gecko/20100101 Firefox/53.0", 6400},
+ Client{"192.168.0.1", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400},
+ Client{"172.16.0.2", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/57.0.2987.98 Chrome/57.0.2987.98 Safari/537.36", 6400}}
)
-func generateTestData(n int) ([]DataUpdateFull, int) {
+func generateTestData(n int) ([]UpdateFull, int) {
hostnames := []string{"streamer1", "streamer2"}
contents := []string{"av", "audio"}
formats := []string{"webm", "flash", "hls"}
@@ -74,27 +74,27 @@ func generateTestData(n int) ([]DataUpdateFull, int) {
if n < 0 {
n = numSrcs
}
- var data []DataUpdateFull
+ var data []UpdateFull
for i := 0; i < n; i += numSrcs {
for _, hostname := range hostnames {
for _, content := range contents {
for _, format := range formats {
for _, quality := range qualities {
- d := DataUpdateFull{}
+ d := UpdateFull{}
d.Version = ProtocolVersion
- d.SourceId.Hostname = hostname
- d.SourceId.StreamId.ContentId = content
- d.SourceId.StreamId.Format = format
- d.SourceId.StreamId.Quality = quality
- d.SourceId.Tags = tags
+ d.Source.Hostname = hostname
+ d.Source.Stream.ContentId = content
+ d.Source.Stream.Format = format
+ d.Source.Stream.Quality = quality
+ d.Source.Tags = tags
- d.DataUpdate.StartTime = starttime
- d.DataUpdate.Duration = duration
- d.DataUpdate.Data.ClientCount = uint(len(clientsData))
- d.DataUpdate.Data.BytesSent = 6400 * uint(len(clientsData))
+ d.Update.StartTime = starttime
+ d.Update.Duration = duration
+ d.Update.Data.ClientCount = uint(len(clientsData))
+ d.Update.Data.BytesSent = 6400 * uint(len(clientsData))
- d.DataUpdate.Data.Clients = clientsData
+ d.Update.Data.Clients = clientsData
data = append(data, d)
}
}
@@ -309,7 +309,7 @@ func TestAppendAndFetch(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("failed to append update: %v", err)
@@ -323,21 +323,21 @@ func TestAppendAndFetch(t *testing.T) {
expected := in
expected.SourceHubUuid = store.GetHubUuid()
- expected.SourceHubDataUpdateId = 1
+ expected.SourceHubUpdateId = 1
expected.ForwardHubUuid = ""
- expected.ForwardHubDataUpdateId = 0
+ expected.ForwardHubUpdateId = 0
if !reflect.DeepEqual(expected, out) {
t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected)
}
// append many
- var ins []DataUpdateFull
+ var ins []UpdateFull
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
- ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
+ ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
upd.Data.Clients = nil
- ins = append(ins, DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
+ ins = append(ins, UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd})
if err = store.AppendMany(ins); err != nil {
t.Fatalf("failed to append update: %v", err)
}
@@ -351,9 +351,9 @@ func TestAppendAndFetch(t *testing.T) {
out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder
expected = ins[i]
expected.SourceHubUuid = store.GetHubUuid()
- expected.SourceHubDataUpdateId = i + 2
+ expected.SourceHubUpdateId = i + 2
expected.ForwardHubUuid = ""
- expected.ForwardHubDataUpdateId = 0
+ expected.ForwardHubUpdateId = 0
if !reflect.DeepEqual(expected, out) {
t.Fatalf("failed to fetch update\nactual: %v\nexpected: %v\n", out, expected)
@@ -377,7 +377,7 @@ func TestReadOnly(t *testing.T) {
upd := updateData
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
@@ -414,17 +414,17 @@ func TestGetUpdatesAfter(t *testing.T) {
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- expected := []DataUpdateFull{}
+ expected := []UpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
e := in
e.SourceHubUuid = store.hubUuid
- e.SourceHubDataUpdateId = i + 1
+ e.SourceHubUpdateId = i + 1
e.ForwardHubUuid = ""
- e.ForwardHubDataUpdateId = 0
+ e.ForwardHubUpdateId = 0
expected = append(expected, e)
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
}
@@ -497,7 +497,7 @@ func TestGetUpdatesAfter(t *testing.T) {
}
}
-func TestForwardedDataUpdates(t *testing.T) {
+func TestForwardedUpdates(t *testing.T) {
// prepare a new store
os.Remove(testBoltPath)
store, err := NewStore(testBoltPath, false)
@@ -540,17 +540,17 @@ func TestForwardedDataUpdates(t *testing.T) {
upd.StartTime = time.Date(2014, time.August, 24, 14, 35, 33, 847000000, time.UTC)
upd.Data.Clients = clientsData
- expected := []DataUpdateFull{}
+ expected := []UpdateFull{}
for i := 0; i < 3; i = i + 1 {
- in := DataUpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
+ in := UpdateFull{Header{0, "", -1, "", -1}, sourceData, upd}
in.SourceHubUuid = forwardedHub
- in.SourceHubDataUpdateId = 3 - i // out of order
+ in.SourceHubUpdateId = 3 - i // out of order
if err = store.Append(in); err != nil {
t.Fatalf("unexpected error: %v", err)
}
myLastId = myLastId + 1
in.ForwardHubUuid = store.GetHubUuid()
- in.ForwardHubDataUpdateId = myLastId
+ in.ForwardHubUpdateId = myLastId
expected = append(expected, in)
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
}
@@ -597,7 +597,7 @@ func TestForwardedDataUpdates(t *testing.T) {
}
}
-func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) {
+func checkForwardedUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) {
lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
@@ -635,7 +635,7 @@ func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, fi
}
}
-func TestForwardedDataUpdates2(t *testing.T) {
+func TestForwardedUpdates2(t *testing.T) {
// prepare 4 new stores
os.Remove(testBoltPath)
src1Store, err := NewStore(testBoltPath, false)
@@ -676,7 +676,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 0, 0, 0, 0, 0)
// forward 5 updates from src1 to fwd
if data, err = src1Store.GetUpdatesAfter(0, 5); err != nil {
@@ -687,7 +687,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 0, 0, 0, 0)
// forward 3 updates from src2 to fwd
if data, err = src2Store.GetUpdatesAfter(0, 3); err != nil {
@@ -698,7 +698,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 0, 0, 0)
// forward 7 updates from fwd to final
if data, err = fwdStore.GetUpdatesAfter(0, 7); err != nil {
@@ -709,7 +709,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 5, 3, 5, 2, 7)
// forward remaining updates from src1 and src2 to fwd
if data, err = src1Store.GetUpdatesAfter(5, -1); err != nil {
@@ -726,7 +726,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 5, 2, 7)
// forward remainging from fwd to final
if data, err = fwdStore.GetUpdatesAfter(7, -1); err != nil {
@@ -737,7 +737,7 @@ func TestForwardedDataUpdates2(t *testing.T) {
}
// check last updates so far
- checkForwardedDataUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17)
+ checkForwardedUpdates2(t, src1Store, src2Store, fwdStore, finalStore, 10, 7, 10, 7, 17)
}
func TestGetSources(t *testing.T) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index 6220c89..5d36e0d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -34,61 +34,61 @@ package sfive
import "time"
-type StreamId struct {
+type Stream struct {
ContentId string `json:"content-id"`
Format string `json:"format"`
Quality string `json:"quality"`
}
-type SourceId struct {
+type Source struct {
Hostname string `json:"hostname"`
- StreamId StreamId `json:"streamer-id"`
+ Stream Stream `json:"streamer-id"`
Tags []string `json:"tags,omitempty"`
}
-type ClientData struct {
+type Client struct {
Ip string `json:"ip"`
UserAgent string `json:"user-agent,omitempty"`
BytesSent uint `json:"bytes-sent"`
}
-type SourceData struct {
- ClientCount uint `json:"client-count"`
- BytesReceived uint `json:"bytes-received,omitempty"`
- BytesSent uint `json:"bytes-sent"`
- Clients []ClientData `json:"clients,omitempty"`
+type UpdateData struct {
+ ClientCount uint `json:"client-count"`
+ BytesReceived uint `json:"bytes-received,omitempty"`
+ BytesSent uint `json:"bytes-sent"`
+ Clients []Client `json:"clients,omitempty"`
}
-type DataUpdate struct {
+type Update struct {
StartTime time.Time `json:"start-time"`
Duration int64 `json:"duration-ms"`
- Data SourceData `json:"data"`
+ Data UpdateData `json:"data"`
}
type Header struct {
- Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
- SourceHubUuid string `json:"SourceHubUuid,omitempty"`
- SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
- ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
- ForwardHubDataUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
+ Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
+ SourceHubUuid string `json:"SourceHubUuid,omitempty"`
+ SourceHubUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
+ ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`
+ ForwardHubUpdateId int `json:"ForwardHubDataUpdateId,omitempty"`
}
-type DataUpdateFull struct {
+type UpdateFull struct {
Header
- SourceId
- DataUpdate
+ Source
+ Update
}
-func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) {
- duf.Hostname = src.Hostname
- duf.StreamId = src.StreamId
- duf.Tags = src.Tags
+func (uf *UpdateFull) CopyFromSource(src *Source) {
+ uf.Hostname = src.Hostname
+ uf.Stream = src.Stream
+ uf.Tags = src.Tags
}
-func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) {
- duf.StartTime = du.StartTime
- duf.Duration = du.Duration
- duf.Data = du.Data
+func (uf *UpdateFull) CopyFromUpdate(u *Update) {
+ uf.StartTime = u.StartTime
+ uf.Duration = u.Duration
+ uf.Data = u.Data
}
// web
@@ -107,11 +107,11 @@ type WebHubsResponse struct {
}
type WebSourcesResponse struct {
- Sources []SourceId `json:"sources"`
+ Sources []Source `json:"sources"`
}
type WebUpdatesGetResponse struct {
- Updates []DataUpdateFull `json:"updates"`
+ Updates []UpdateFull `json:"updates"`
}
type WebUpdatesPostResponse struct {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesStore.go b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
index 41fb29a..5234bb3 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesStore.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesStore.go
@@ -52,7 +52,7 @@ const (
latestUpdatesBn = "LatestUpdates"
hubUuidsFwdBn = "HubUUIDsFwd"
hubUuidsRevBn = "HubUUIDsRev"
- dataUpdatesBn = "DataUpdates"
+ updatesBn = "Updates"
sourcesFwdBn = "SourcesFwd"
sourcesRevBn = "SourcesRev"
clientDataBn = "ClientData"
@@ -65,40 +65,40 @@ const (
)
// stored in sourcesRevBn
-type streamIdDb struct {
+type streamDb struct {
ContentId string `json:"c"`
Format string `json:"f"`
Quality string `json:"q"`
}
type sourceDb struct {
- Hostname string `json:"h"`
- StreamId streamIdDb `json:"s"`
- Tags []string `json:"t"`
+ Hostname string `json:"h"`
+ Stream streamDb `json:"s"`
+ Tags []string `json:"t"`
}
-func NewSourceDb(value DataUpdateFull) sourceDb {
+func NewSourceDb(value UpdateFull) sourceDb {
return sourceDb{
- Hostname: value.SourceId.Hostname,
- StreamId: streamIdDb{
- ContentId: value.SourceId.StreamId.ContentId,
- Format: value.SourceId.StreamId.Format,
- Quality: value.SourceId.StreamId.Quality,
+ Hostname: value.Source.Hostname,
+ Stream: streamDb{
+ ContentId: value.Source.Stream.ContentId,
+ Format: value.Source.Stream.Format,
+ Quality: value.Source.Stream.Quality,
},
- Tags: value.SourceId.Tags,
+ Tags: value.Source.Tags,
}
}
func (s sourceDb) Slug() string {
- return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.StreamId.ContentId, s.StreamId.Format, s.StreamId.Quality, strings.Join(s.Tags, ","))
+ return fmt.Sprintf("%s/%s/%s/%s/%s", s.Hostname, s.Stream.ContentId, s.Stream.Format, s.Stream.Quality, strings.Join(s.Tags, ","))
}
-func (s *SourceId) CopyFromSourceDb(v sourceDb) {
- s.Hostname = v.Hostname
- s.StreamId.ContentId = v.StreamId.ContentId
- s.StreamId.Format = v.StreamId.Format
- s.StreamId.Quality = v.StreamId.Quality
- s.Tags = v.Tags
+func (s *Source) CopyFromSourceDb(sdb sourceDb) {
+ s.Hostname = sdb.Hostname
+ s.Stream.ContentId = sdb.Stream.ContentId
+ s.Stream.Format = sdb.Stream.Format
+ s.Stream.Quality = sdb.Stream.Quality
+ s.Tags = sdb.Tags
}
// stored in clientDataBn
@@ -109,46 +109,46 @@ type clientDataDb struct {
}
// stored in dataUpdatesBn
-type dataUpdateDb struct {
- SourceHubId int `json:"h,omitempty"`
- SourceHubDataUpdateId int `json:"hi,omitempty"`
- SourceId int `json:"si"`
- StartTime int64 `json:"st"` // unix timestamp in milliseconds
- Duration int64 `json:"du"` // duration in milliseconds
- ClientCount uint `json:"cc,omitempty"`
- BytesReceived uint `json:"br,omitempty"`
- BytesSent uint `json:"bs,omitempty"`
+type updateDb struct {
+ SourceHubId int `json:"h,omitempty"`
+ SourceHubUpdateId int `json:"hi,omitempty"`
+ SourceId int `json:"si"`
+ StartTime int64 `json:"st"` // unix timestamp in milliseconds
+ Duration int64 `json:"du"` // duration in milliseconds
+ ClientCount uint `json:"cc,omitempty"`
+ BytesReceived uint `json:"br,omitempty"`
+ BytesSent uint `json:"bs,omitempty"`
}
-func NewDataUpdateDb(v DataUpdateFull) dataUpdateDb {
- return dataUpdateDb{
+func NewUpdateDb(uf UpdateFull) updateDb {
+ return updateDb{
-1,
- v.SourceHubDataUpdateId,
+ uf.SourceHubUpdateId,
-1,
- int64(v.StartTime.Unix()*1000) + int64(v.StartTime.Nanosecond()/1000000),
- v.Duration,
- v.Data.ClientCount,
- v.Data.BytesReceived,
- v.Data.BytesSent,
+ int64(uf.StartTime.Unix()*1000) + int64(uf.StartTime.Nanosecond()/1000000),
+ uf.Duration,
+ uf.Data.ClientCount,
+ uf.Data.BytesReceived,
+ uf.Data.BytesSent,
}
}
-func (s *DataUpdateFull) CopyFromDataUpdateDb(v dataUpdateDb, srcHubUuid, hubUuid string, id int) {
+func (uf *UpdateFull) CopyFromUpdateDb(udb updateDb, srcHubUuid, hubUuid string, id int) {
if srcHubUuid == "" {
- s.SourceHubUuid = hubUuid
- s.SourceHubDataUpdateId = id
+ uf.SourceHubUuid = hubUuid
+ uf.SourceHubUpdateId = id
} else {
- s.SourceHubUuid = srcHubUuid
- s.SourceHubDataUpdateId = v.SourceHubDataUpdateId
- s.ForwardHubUuid = hubUuid
- s.ForwardHubDataUpdateId = id
+ uf.SourceHubUuid = srcHubUuid
+ uf.SourceHubUpdateId = udb.SourceHubUpdateId
+ uf.ForwardHubUuid = hubUuid
+ uf.ForwardHubUpdateId = id
}
- s.StartTime = time.Unix((v.StartTime / 1000), (v.StartTime%1000)*1000000)
- s.Duration = v.Duration
- s.Data.ClientCount = v.ClientCount
- s.Data.BytesReceived = v.BytesReceived
- s.Data.BytesSent = v.BytesSent
+ uf.StartTime = time.Unix((udb.StartTime / 1000), (udb.StartTime%1000)*1000000)
+ uf.Duration = udb.Duration
+ uf.Data.ClientCount = udb.ClientCount
+ uf.Data.BytesReceived = udb.BytesReceived
+ uf.Data.BytesSent = udb.BytesSent
}
func itob(v int) []byte {