From e4bd9f8dff474ec37bbacccec1374a39a929fc00 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 10 Jun 2017 18:00:44 +0200 Subject: improve documentation and more sanity checks --- src/hub/README | 9 +++--- src/hub/src/spreadspace.org/sfive/s5cvt.go | 4 +-- src/hub/src/spreadspace.org/sfive/s5cvt_test.go | 22 ++++++++++++++ src/hub/src/spreadspace.org/sfive/s5srv.go | 38 ++++++++++++------------- src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 4 +-- src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 4 +-- src/hub/src/spreadspace.org/sfive/s5typesApi.go | 8 +++++- 7 files changed, 58 insertions(+), 31 deletions(-) (limited to 'src/hub') diff --git a/src/hub/README b/src/hub/README index 0a17056..38a5ec3 100644 --- a/src/hub/README +++ b/src/hub/README @@ -1,14 +1,13 @@ sfive-hub ========= -`sfive-hub` is a service providing data aggregation, processing, persistence, -filtering and retrieval. +`sfive-hub` is a service providing data aggregation, processing, persistence, and retrieval. -It accepts data via unix domain sockets and REST API, either stores it persistently in -a SQL database or caches it in-memory and queried for data through a REST API. +It accepts data updates via unix domain sockets and REST API, stores it persistently in +a key-value database and can be queried for data through a REST API. It can also forward +the data to a multitude of external analytics tools. Unix domain sockets accept data in the same format as the REST API but are stateful and use the information provided from the init message as specified in [sfive-protocol][]. [sfive-protocol]: git.spreadspace.org/sfive/doc/protocol.md - diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt.go b/src/hub/src/spreadspace.org/sfive/s5cvt.go index 21aba20..675e101 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt.go @@ -73,7 +73,7 @@ func (sd *StatelessDecoder) Decode() (uf *UpdateFull, err error) { err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, ProtocolVersion) return } - err = uf.checkSource() + err = uf.checkSourceAndTime() return } @@ -112,7 +112,7 @@ func (sd *StatefulDecoder) Decode() (uf *UpdateFull, err error) { err = fmt.Errorf("unsupported version: %d, expected: %d", uf.Version, sd.Version) return } - err = uf.checkSource() + err = uf.checkSourceAndTime() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go index 95c7958..807c244 100644 --- a/src/hub/src/spreadspace.org/sfive/s5cvt_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5cvt_test.go @@ -96,6 +96,28 @@ func TestDecodeStateless(t *testing.T) { t.Fatalf("decoding message with empty/missing quality should throw an error") } + // missing/invalid start-time and duration + missingStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "duration-ms": 15000` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingStartTime))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with missing start-time should throw an error") + } + invalidStartTime := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "this is not a date and time", "duration-ms": 15000` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidStartTime))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with invalid start-time should throw an error") + } + missingDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z"` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, missingDuration))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with missing duration should throw an error") + } + invalidDuration := `"data": {"bytes-sent": 1, "client-count": 3, "bytes-received": 1}, "start-time": "2014-08-24T14:35:33.847282Z", "duration-ms": -1` + dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, invalidDuration))) + if _, err := dec.Decode(); err == nil { + t.Fatalf("decoding message with invalid duration should throw an error") + } + // valid message dec = NewStatelessDecoder(strings.NewReader(fmt.Sprintf(`{ "version": %d, %s, %s }`, ProtocolVersion, initEncoded, updateEncoded))) decoded, err := dec.Decode() diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 6d07e8c..dcd8df0 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -38,12 +38,12 @@ import ( "sync" ) -type appendToken struct { +type ingestToken struct { update *UpdateFull response chan error } -type appendManyToken struct { +type ingestManyToken struct { updates []*UpdateFull response chan error } @@ -55,8 +55,8 @@ type Server struct { geoip GeoIPLookup quit chan bool done *sync.WaitGroup - appendChan chan appendToken - appendManyChan chan appendManyToken + ingestChan chan ingestToken + ingestManyChan chan ingestManyToken } func (srv Server) transform(update *UpdateFull) *UpdateFull { @@ -107,32 +107,32 @@ func (srv Server) transformMany(updates []*UpdateFull) { } } -func (srv Server) appendWorker(idx int) { +func (srv Server) ingestWorker(idx int) { for { select { case <-srv.quit: return - case token := <-srv.appendChan: + case token := <-srv.ingestChan: srv.transform(token.update) token.response <- srv.store.Append(token.update) - case token := <-srv.appendManyChan: + case token := <-srv.ingestManyChan: srv.transformMany(token.updates) token.response <- srv.store.AppendMany(token.updates) } } } -func (srv Server) Append(update *UpdateFull) error { - token := appendToken{update: update, response: make(chan error, 1)} +func (srv Server) Ingest(update *UpdateFull) error { + token := ingestToken{update: update, response: make(chan error, 1)} defer close(token.response) - srv.appendChan <- token + srv.ingestChan <- token return <-token.response } -func (srv Server) AppendMany(updates []*UpdateFull) error { - token := appendManyToken{updates: updates, response: make(chan error, 1)} +func (srv Server) IngestMany(updates []*UpdateFull) error { + token := ingestManyToken{updates: updates, response: make(chan error, 1)} defer close(token.response) - srv.appendManyChan <- token + srv.ingestManyChan <- token return <-token.response } @@ -141,8 +141,8 @@ func (srv Server) Close() { close(srv.quit) srv.done.Wait() - close(srv.appendChan) - close(srv.appendManyChan) + close(srv.ingestChan) + close(srv.ingestManyChan) srv.store.Close() s5l.Printf("server: finished") } @@ -170,16 +170,16 @@ func NewServer(dbPath string, readOnly, anonymize bool, anonKeyfile, geoipDB str s5l.Printf("using Geo-IP Lookup: %s", srv.geoip) } - srv.numWorker = runtime.NumCPU() + srv.numWorker = runtime.NumCPU() // TODO: make this configurable srv.quit = make(chan bool) srv.done = &sync.WaitGroup{} - srv.appendChan = make(chan appendToken, srv.numWorker) - srv.appendManyChan = make(chan appendManyToken, srv.numWorker) + srv.ingestChan = make(chan ingestToken, srv.numWorker) + srv.ingestManyChan = make(chan ingestManyToken, srv.numWorker) for i := 0; i < srv.numWorker; i = i + 1 { srv.done.Add(1) go func(idx int) { defer srv.done.Done() - srv.appendWorker(idx) + srv.ingestWorker(idx) }(i) } s5l.Printf("server: started") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index bfabace..53a5068 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -78,7 +78,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } } - if err = srv.Append(update); err != nil { + if err = srv.Ingest(update); err != nil { s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err) // TODO: send NACK? break @@ -126,7 +126,7 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { continue } - if err = srv.Append(update); err != nil { + if err = srv.Ingest(update); err != nil { s5l.Printf("pipegram: failed to store data: %v\n", err) } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 9fe5b86..64c73e4 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -245,7 +245,7 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { return } - if err = srv.Append(update); err != nil { + if err = srv.Ingest(update); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } @@ -280,7 +280,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { return } - if err = srv.AppendMany(updates); err != nil { + if err = srv.IngestMany(updates); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index fa0d7a2..5afb13e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -94,7 +94,7 @@ type UpdateFull struct { Update } -func (uf *UpdateFull) checkSource() error { +func (uf *UpdateFull) checkSourceAndTime() error { if uf.Source.Hostname == "" { return errors.New("empty hostname is not allowed") } @@ -107,6 +107,12 @@ func (uf *UpdateFull) checkSource() error { if uf.Source.Stream.Quality == "" { return errors.New("empty quality is not allowed") } + if uf.StartTime.IsZero() { + return errors.New("empty start-time is not allowed") + } + if uf.Duration < 1 { + return errors.New("duration must be > 0") + } return nil } -- cgit v1.2.3