summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-06-10 18:00:44 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-06-10 18:00:44 +0200
commite4bd9f8dff474ec37bbacccec1374a39a929fc00 (patch)
treebb4afdd4250bc1ab0aa9f967bfef971c92d276f6
parentadd one more TODO (diff)
improve documentation and more sanity checks
-rw-r--r--ChangeLog7
-rw-r--r--doc/protocol.md67
-rw-r--r--src/daq/README4
-rw-r--r--src/hub/README9
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go38
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go8
10 files changed, 120 insertions, 47 deletions
diff --git a/ChangeLog b/ChangeLog
index 8adb217..16885b5 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,10 +2,13 @@
* major rewrite of hub
- replaced sqlite with bolt-db which is much faster
- - drop vizualisation (viz)
- - added forwarding to Piwik
+ - added sanity checking for incoming data updates
+ - added support for GeoIP lookups
+ - added support for IP address anonymization
- Web API has completely changed
- forwarding using multiple hops works now
+ - added forwarding to Piwik
+ - drop vizualisation (viz)
2016.10.17 -- Version 0.1.4
diff --git a/doc/protocol.md b/doc/protocol.md
index 9c4718d..fdd44f8 100644
--- a/doc/protocol.md
+++ b/doc/protocol.md
@@ -1,13 +1,51 @@
+Introduction
+============
+
+There a two types of interfaces: stateful and stateless.
+
+Stateful interfaces use persistent connections and send an init message after the
+connection has been established. The values in this message are treated as defaults
+which will be used if the corresponding value is missing in subsequent data-update
+messages. In any case the values from data updates override values from init messages.
+
+Stateless interfaces will not use persistent connections but are datagram oriented
+therfore all values must be defined in data-update messages.
+
+
+Structure of data and meaning of data fields:
+---------------------------------------------
+
+Sources of data updates are called streamer. Streamer are defined by the hostname of the
+machine it runs on, a content specifier (room1-audio, room2-av, audio-english, ...),
+a format sepcifier (flash, webm, hls, dash, ...) and a quality specifier (high, low, ...).
+
+Any data update has a start time and a duration. Those two values specify the timespan
+during which a source gathered the data. Both these values are processesd and stored
+with millisecond precision.
+
+The actual data of the update consist of 3 aggregated values: client count, bytes sent and
+bytes received.
+Client count is the number of clients that are or have been connected for at least some
+time within the timespan as specified by start time and duration. Bytes sent is the overall
+number of bytes sent by the source to all the clients combined. Bytes received is the number
+of bytes that the source received from it's stream producer to be sent out to the clients.
+In an ideal world those three values have the following relation:
+
+ bytes-sent = bytes-received * client-count
+
+In addition to aggregated data, data updates may contain a list of all connected clients.
+In order to be useful any client entry must contain the IP address of the client as well
+as the bytes sent to it. Client list entries might also contain the port and other
+information such as user agent strings or Geo IP information.
+
+
+
Messages
========
init
----
-All fields except "version" are optional. The values in this message are treated as
-defaults which will be used if the corresponding value is missing in subsequent
-update messages.
-
{
"version": 2,
"SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5",
@@ -19,18 +57,12 @@ update messages.
"tags": [ "elevate", "2014", "discourse" ]
}
+All fields except "version" are optional.
+
data-update
-----------
-All values which have been defined by the init message are optional. In any case the
-values from data updates override values from init. Stateless interfaces will not use
-init messages and therefore all values must be defined here.
-"SourceHubUuid", "SourceHubUpdateId", "ForwardHubUuid", "ForwardHubUpdateId",
-"user-agent", "bytes-received", "tags" and "clients" might be omitted and are treated
-as an empty string, 0 or empty array respectively.
-The start-time will be processesd and stored with millisecond precision.
-
{
"version": 2,
"SourceHubUuid": "f7df89b4-171e-4b2f-a8a4-e58ac99e5dc5",
@@ -54,6 +86,13 @@ The start-time will be processesd and stored with millisecond precision.
}
}
+All values which have been defined by the init message are optional.
+"SourceHubUuid", "SourceHubUpdateId", "ForwardHubUuid", "ForwardHubUpdateId", "tags",
+"data.bytes-received" and "data.clients" might be omitted and are treated as an empty
+string, 0 or empty array respectively. If "clients" is present "port" and "user-agent"
+fields of the entries might be empty or missing. Also in this case "data.client-count"
+and "data.bytes-sent" might be 0 or omitted as those values will be calculated from
+the contents of "data.clients" by the hub while ingesting the data.
In addition to the user-agent string a client entry may have the following geo-info
fields (all of which might be omitted):
@@ -63,5 +102,5 @@ fields (all of which might be omitted):
"region-code" ..... the 2-letter code for the region as defined by the
MaxMind GeoIP2 database
"city" ............ the name of the city
- "latitude" ........ latitude in ° as float value
- "longitude" ....... longitude in ° as float value
+ "latitude" ........ latitude in degrees as float value
+ "longitude" ....... longitude in degrees as float value
diff --git a/src/daq/README b/src/daq/README
index 03814b3..789ddb6 100644
--- a/src/daq/README
+++ b/src/daq/README
@@ -15,3 +15,7 @@ accesslog
: a batch importer for acceslog-based data (nginx, apache)
nginx-lua
: a live importer using nginx Lua module to get access log like information
+s5proxy
+: acts as a HTTP reverse proxy which can be used to add additional request
+ and response header and extract statistics info which is forwarded to the
+ hub.
diff --git a/src/hub/README b/src/hub/README
index 0a17056..38a5ec3 100644
--- a/src/hub/README
+++ b/src/hub/README
@@ -1,14 +1,13 @@
sfive-hub
=========
-`sfive-hub` is a service providing data aggregation, processing, persistence,
-filtering and retrieval.
+`sfive-hub` is a service providing data aggregation, processing, persistence, and retrieval.
-It accepts data via unix domain sockets and REST API, either stores it persistently in
-a SQL database or caches it in-memory and queried for data through a REST API.
+It accepts data updates via unix domain sockets and REST API, stores it persistently in
+a key-value database and can be queried for data through a REST API. It can also forward
+the data to a multitude of external analytics tools.
Unix domain sockets accept data in the same format as the REST API but are stateful
and use the information provided from the init message as specified in [sfive-protocol][].
[sfive-protocol]: git.spreadspace.org/sfive/doc/protocol.md
-
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 21aba20..675e101 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -73,7 +73,7 @@ func (sd *StatelessDecoder) Decode() (uf *UpdateFull, err error) {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, ProtocolVersion)
return
}
- err = uf.checkSource()
+ err = uf.checkSourceAndTime()
return
}
@@ -112,7 +112,7 @@ func (sd *StatefulDecoder) Decode() (uf *UpdateFull, err error) {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, sd.Version)
return
}
- err = uf.checkSource()
+ err = uf.checkSourceAndTime()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 95c7958..807c244 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -96,6 +96,28 @@ func TestDecodeStateless(t *testing.T) {
t.Fatalf("decoding message with empty/missing quality should throw an error")
}
+ // missing/invalid start-time and duration
+ missingStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "duration-ms": 15000`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingStartTime)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with missing start-time should throw an error")
+ }
+ invalidStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "this is not a date and time", "duration-ms": 15000`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidStartTime)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with invalid start-time should throw an error")
+ }
+ missingDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z"`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingDuration)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with missing duration should throw an error")
+ }
+ invalidDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": -1`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidDuration)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with invalid duration should throw an error")
+ }
+
// valid message
dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, updateEncoded)))
decoded, err := dec.Decode()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6d07e8c..dcd8df0 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -38,12 +38,12 @@ import (
"sync"
)
-type appendToken struct {
+type ingestToken struct {
update *UpdateFull
response chan error
}
-type appendManyToken struct {
+type ingestManyToken struct {
updates []*UpdateFull
response chan error
}
@@ -55,8 +55,8 @@ type Server struct {
geoip GeoIPLookup
quit chan bool
done *sync.WaitGroup
- appendChan chan appendToken
- appendManyChan chan appendManyToken
+ ingestChan chan ingestToken
+ ingestManyChan chan ingestManyToken
}
func (srv Server) transform(update *UpdateFull) *UpdateFull {
@@ -107,32 +107,32 @@ func (srv Server) transformMany(updates []*UpdateFull) {
}
}
-func (srv Server) appendWorker(idx int) {
+func (srv Server) ingestWorker(idx int) {
for {
select {
case <-srv.quit:
return
- case token := <-srv.appendChan:
+ case token := <-srv.ingestChan:
srv.transform(token.update)
token.response <- srv.store.Append(token.update)
- case token := <-srv.appendManyChan:
+ case token := <-srv.ingestManyChan:
srv.transformMany(token.updates)
token.response <- srv.store.AppendMany(token.updates)
}
}
}
-func (srv Server) Append(update *UpdateFull) error {
- token := appendToken{update: update, response: make(chan error, 1)}
+func (srv Server) Ingest(update *UpdateFull) error {
+ token := ingestToken{update: update, response: make(chan error, 1)}
defer close(token.response)
- srv.appendChan <- token
+ srv.ingestChan <- token
return <-token.response
}
-func (srv Server) AppendMany(updates []*UpdateFull) error {
- token := appendManyToken{updates: updates, response: make(chan error, 1)}
+func (srv Server) IngestMany(updates []*UpdateFull) error {
+ token := ingestManyToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
- srv.appendManyChan <- token
+ srv.ingestManyChan <- token
return <-token.response
}
@@ -141,8 +141,8 @@ func (srv Server) Close() {
close(srv.quit)
srv.done.Wait()
- close(srv.appendChan)
- close(srv.appendManyChan)
+ close(srv.ingestChan)
+ close(srv.ingestManyChan)
srv.store.Close()
s5l.Printf("server: finished")
}
@@ -170,16 +170,16 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile, geoipDB str
s5l.Printf("using Geo-IP Lookup: %s", srv.geoip)
}
- srv.numWorker = runtime.NumCPU()
+ srv.numWorker = runtime.NumCPU() // TODO: make this configurable
srv.quit = make(chan bool)
srv.done = &sync.WaitGroup{}
- srv.appendChan = make(chan appendToken, srv.numWorker)
- srv.appendManyChan = make(chan appendManyToken, srv.numWorker)
+ srv.ingestChan = make(chan ingestToken, srv.numWorker)
+ srv.ingestManyChan = make(chan ingestManyToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
srv.done.Add(1)
go func(idx int) {
defer srv.done.Done()
- srv.appendWorker(idx)
+ srv.ingestWorker(idx)
}(i)
}
s5l.Printf("server: started")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index bfabace..53a5068 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -78,7 +78,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
// TODO: send NACK?
break
@@ -126,7 +126,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
continue
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
s5l.Printf("pipegram: failed to store data: %v\n", err)
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 9fe5b86..64c73e4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -245,7 +245,7 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
@@ -280,7 +280,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- if err = srv.AppendMany(updates); err != nil {
+ if err = srv.IngestMany(updates); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index fa0d7a2..5afb13e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -94,7 +94,7 @@ type UpdateFull struct {
Update
}
-func (uf *UpdateFull) checkSource() error {
+func (uf *UpdateFull) checkSourceAndTime() error {
if uf.Source.Hostname == "" {
return errors.New("empty hostname is not allowed")
}
@@ -107,6 +107,12 @@ func (uf *UpdateFull) checkSource() error {
if uf.Source.Stream.Quality == "" {
return errors.New("empty quality is not allowed")
}
+ if uf.StartTime.IsZero() {
+ return errors.New("empty start-time is not allowed")
+ }
+ if uf.Duration < 1 {
+ return errors.New("duration must be > 0")
+ }
return nil
}