summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-06-10 18:00:44 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-06-10 18:00:44 +0200
commite4bd9f8dff474ec37bbacccec1374a39a929fc00 (patch)
treebb4afdd4250bc1ab0aa9f967bfef971c92d276f6 /src/hub
parentadd one more TODO (diff)
improve documentation and more sanity checks
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/README9
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5cvt_test.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go38
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go8
7 files changed, 58 insertions, 31 deletions
diff --git a/src/hub/README b/src/hub/README
index 0a17056..38a5ec3 100644
--- a/src/hub/README
+++ b/src/hub/README
@@ -1,14 +1,13 @@
sfive-hub
=========
-`sfive-hub` is a service providing data aggregation, processing, persistence,
-filtering and retrieval.
+`sfive-hub` is a service providing data aggregation, processing, persistence, and retrieval.
-It accepts data via unix domain sockets and REST API, either stores it persistently in
-a SQL database or caches it in-memory and queried for data through a REST API.
+It accepts data updates via unix domain sockets and REST API, stores it persistently in
+a key-value database and can be queried for data through a REST API. It can also forward
+the data to a multitude of external analytics tools.
Unix domain sockets accept data in the same format as the REST API but are stateful
and use the information provided from the init message as specified in [sfive-protocol][].
[sfive-protocol]: git.spreadspace.org/sfive/doc/protocol.md
-
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go
index 21aba20..675e101 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go
@@ -73,7 +73,7 @@ func (sd *StatelessDecoder) Decode() (uf *UpdateFull, err error) {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, ProtocolVersion)
return
}
- err = uf.checkSource()
+ err = uf.checkSourceAndTime()
return
}
@@ -112,7 +112,7 @@ func (sd *StatefulDecoder) Decode() (uf *UpdateFull, err error) {
err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, sd.Version)
return
}
- err = uf.checkSource()
+ err = uf.checkSourceAndTime()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
index 95c7958..807c244 100644
--- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go
@@ -96,6 +96,28 @@ func TestDecodeStateless(t *testing.T) {
t.Fatalf("decoding message with empty/missing quality should throw an error")
}
+ // missing/invalid start-time and duration
+ missingStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "duration-ms": 15000`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingStartTime)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with missing start-time should throw an error")
+ }
+ invalidStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "this is not a date and time", "duration-ms": 15000`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidStartTime)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with invalid start-time should throw an error")
+ }
+ missingDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z"`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingDuration)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with missing duration should throw an error")
+ }
+ invalidDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": -1`
+ dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidDuration)))
+ if _, err := dec.Decode(); err == nil {
+ t.Fatalf("decoding message with invalid duration should throw an error")
+ }
+
// valid message
dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, updateEncoded)))
decoded, err := dec.Decode()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 6d07e8c..dcd8df0 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -38,12 +38,12 @@ import (
"sync"
)
-type appendToken struct {
+type ingestToken struct {
update *UpdateFull
response chan error
}
-type appendManyToken struct {
+type ingestManyToken struct {
updates []*UpdateFull
response chan error
}
@@ -55,8 +55,8 @@ type Server struct {
geoip GeoIPLookup
quit chan bool
done *sync.WaitGroup
- appendChan chan appendToken
- appendManyChan chan appendManyToken
+ ingestChan chan ingestToken
+ ingestManyChan chan ingestManyToken
}
func (srv Server) transform(update *UpdateFull) *UpdateFull {
@@ -107,32 +107,32 @@ func (srv Server) transformMany(updates []*UpdateFull) {
}
}
-func (srv Server) appendWorker(idx int) {
+func (srv Server) ingestWorker(idx int) {
for {
select {
case <-srv.quit:
return
- case token := <-srv.appendChan:
+ case token := <-srv.ingestChan:
srv.transform(token.update)
token.response <- srv.store.Append(token.update)
- case token := <-srv.appendManyChan:
+ case token := <-srv.ingestManyChan:
srv.transformMany(token.updates)
token.response <- srv.store.AppendMany(token.updates)
}
}
}
-func (srv Server) Append(update *UpdateFull) error {
- token := appendToken{update: update, response: make(chan error, 1)}
+func (srv Server) Ingest(update *UpdateFull) error {
+ token := ingestToken{update: update, response: make(chan error, 1)}
defer close(token.response)
- srv.appendChan <- token
+ srv.ingestChan <- token
return <-token.response
}
-func (srv Server) AppendMany(updates []*UpdateFull) error {
- token := appendManyToken{updates: updates, response: make(chan error, 1)}
+func (srv Server) IngestMany(updates []*UpdateFull) error {
+ token := ingestManyToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
- srv.appendManyChan <- token
+ srv.ingestManyChan <- token
return <-token.response
}
@@ -141,8 +141,8 @@ func (srv Server) Close() {
close(srv.quit)
srv.done.Wait()
- close(srv.appendChan)
- close(srv.appendManyChan)
+ close(srv.ingestChan)
+ close(srv.ingestManyChan)
srv.store.Close()
s5l.Printf("server: finished")
}
@@ -170,16 +170,16 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile, geoipDB str
s5l.Printf("using Geo-IP Lookup: %s", srv.geoip)
}
- srv.numWorker = runtime.NumCPU()
+ srv.numWorker = runtime.NumCPU() // TODO: make this configurable
srv.quit = make(chan bool)
srv.done = &sync.WaitGroup{}
- srv.appendChan = make(chan appendToken, srv.numWorker)
- srv.appendManyChan = make(chan appendManyToken, srv.numWorker)
+ srv.ingestChan = make(chan ingestToken, srv.numWorker)
+ srv.ingestManyChan = make(chan ingestManyToken, srv.numWorker)
for i := 0; i < srv.numWorker; i = i + 1 {
srv.done.Add(1)
go func(idx int) {
defer srv.done.Done()
- srv.appendWorker(idx)
+ srv.ingestWorker(idx)
}(i)
}
s5l.Printf("server: started")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index bfabace..53a5068 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -78,7 +78,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
// TODO: send NACK?
break
@@ -126,7 +126,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
continue
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
s5l.Printf("pipegram: failed to store data: %v\n", err)
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 9fe5b86..64c73e4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -245,7 +245,7 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- if err = srv.Append(update); err != nil {
+ if err = srv.Ingest(update); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
@@ -280,7 +280,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- if err = srv.AppendMany(updates); err != nil {
+ if err = srv.IngestMany(updates); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index fa0d7a2..5afb13e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -94,7 +94,7 @@ type UpdateFull struct {
Update
}
-func (uf *UpdateFull) checkSource() error {
+func (uf *UpdateFull) checkSourceAndTime() error {
if uf.Source.Hostname == "" {
return errors.New("empty hostname is not allowed")
}
@@ -107,6 +107,12 @@ func (uf *UpdateFull) checkSource() error {
if uf.Source.Stream.Quality == "" {
return errors.New("empty quality is not allowed")
}
+ if uf.StartTime.IsZero() {
+ return errors.New("empty start-time is not allowed")
+ }
+ if uf.Duration < 1 {
+ return errors.New("duration must be > 0")
+ }
return nil
}