diff options
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 144 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 6 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store_test.go | 20 |
6 files changed, 116 insertions, 80 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 76805c3..75460d1 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -53,12 +53,12 @@ type getUpdatesAfterToken struct { response chan getUpdatesResult } -type getHubIdResult struct { +type getHubUuidResult struct { id string } -type getHubIdToken struct { - response chan getHubIdResult +type getHubUuidToken struct { + response chan getHubUuidResult } type getLastUpdateIdResult struct { @@ -77,7 +77,7 @@ type Server struct { appendChan chan appendToken appendManyChan chan appendManyToken getUpdatesAfterChan chan getUpdatesAfterToken - getHubIdChan chan getHubIdToken + getHubUuidChan chan getHubUuidToken getLastUpdateIdChan chan getLastUpdateIdToken } @@ -94,8 +94,8 @@ func (srv Server) appendActor() { case token := <-srv.getUpdatesAfterChan: values, err := srv.store.GetUpdatesAfter(token.id, token.limit) token.response <- getUpdatesResult{values, err} - case token := <-srv.getHubIdChan: - token.response <- getHubIdResult{srv.store.GetStoreId()} + case token := <-srv.getHubUuidChan: + token.response <- getHubUuidResult{srv.store.GetHubUuid()} case token := <-srv.getLastUpdateIdChan: lastUpdateId, err := srv.store.GetLastUpdateId() token.response <- getLastUpdateIdResult{lastUpdateId, err} @@ -125,10 +125,10 @@ func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) { return res.values, res.err } -func (srv Server) GetHubId() string { - token := getHubIdToken{response: make(chan getHubIdResult, 1)} +func (srv Server) GetHubUuid() string { + token := getHubUuidToken{response: make(chan getHubUuidResult, 1)} defer close(token.response) - srv.getHubIdChan <- token + srv.getHubUuidChan <- token res := <-token.response return res.id } @@ -150,7 +150,7 @@ func (srv Server) Close() { close(srv.appendChan) close(srv.appendManyChan) close(srv.getUpdatesAfterChan) - close(srv.getHubIdChan) + close(srv.getHubUuidChan) close(srv.getLastUpdateIdChan) srv.store.Close() s5l.Printf("server: finished\n") @@ -169,7 +169,7 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) { server.appendChan = make(chan appendToken, 32) server.appendManyChan = make(chan appendManyToken, 32) server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32) - server.getHubIdChan = make(chan getHubIdToken, 32) + server.getHubUuidChan = make(chan getHubUuidToken, 32) server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32) go server.appendActor() s5l.Printf("server: started\n") diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index d6874de..144601e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -53,7 +53,7 @@ func findMaxId(values []DataUpdateFull) int { } func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { - storeId = srv.GetHubId() + storeId = srv.GetHubUuid() var resp *http.Response resp, err = client.Get(baseurl + "/lastupdate/" + storeId) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 8880b8a..4297bbf 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -50,7 +50,7 @@ const forwardEsLastUpdateJson = `{ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { url := baseurl + "/dataupdate/_search?search_type=count" - storeId = srv.GetHubId() + storeId = srv.GetHubUuid() queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId) s5dl.Printf("fwd-es: query: %s", queryJson) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index cbcf2d4..687aa82 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -51,11 +51,13 @@ func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) { sendWebResponse(w, http.StatusNotFound, webErrorResponse{"not found"}) } +// // /healthz +// type webHealthzResponse struct { - Status string `json:"status"` - HubId string `json:"hub-uuid"` + Status string `json:"status"` + HubUuid string `json:"hub-uuid"` } func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { @@ -65,12 +67,14 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { } resp := webHealthzResponse{} - resp.HubId = srv.GetHubId() + resp.HubUuid = srv.GetHubUuid() resp.Status = "OK" // TODO: do a more sophisticated check sendWebResponse(w, http.StatusOK, resp) } +// // /hubs +// type webHubsResponse struct { Hubs []string `json:"hubs"` @@ -91,7 +95,9 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) { sendWebResponse(w, http.StatusOK, resp) } +// // /sources +// type webSourcesResponse struct { Sources []SourceId `json:"sources"` @@ -112,14 +118,39 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) { sendWebResponse(w, http.StatusOK, resp) } -// /updates/:ID +// +// /updates/(:ID|_bulk) +// -func webUpdate(srv *Server, w http.ResponseWriter, r *http.Request) { - if r.Method != "GET" { +func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + webUpdateGet(srv, w, r) + case "POST": + webUpdatesPost(srv, w, r) + default: + sendInvalidMethod(w, r.Method) + } +} + +// +// /updates +// + +func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + webUpdatesGet(srv, w, r) + case "POST": + webUpdatePost(srv, w, r) + default: sendInvalidMethod(w, r.Method) - return } +} +// get one update + +func webUpdateGet(srv *Server, w http.ResponseWriter, r *http.Request) { if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched { sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id"}) return @@ -141,38 +172,24 @@ func webUpdate(srv *Server, w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) encoder := NewStatelessEncoder(w) - encoder.Encode(upd) // TODO: handle error -} - -// /updates - -func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - webUpdatesGet(srv, w, r) - case "POST": - q := r.URL.Query() - if _, exists := q["bulk"]; !exists { - webUpdatesPost(srv, w, r) - } else { - webUpdatesPostBulk(srv, w, r) - } - default: - sendInvalidMethod(w, r.Method) + if err := encoder.Encode(upd); err != nil { + s5l.Println("Error while sending data: %v", err) } } +// get list of updates + func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) { - from := 0 + after := 0 limit := -1 q := r.URL.Query() var err error - fromStr := q.Get("from") - if fromStr != "" { - if from, err = strconv.Atoi(fromStr); err != nil { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid from value: " + err.Error()}) + afterStr := q.Get("after") + if afterStr != "" { + if after, err = strconv.Atoi(afterStr); err != nil { + sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid after value: " + err.Error()}) return } } @@ -185,7 +202,7 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) { } var upds []DataUpdateFull - if upds, err = srv.store.GetUpdatesAfter(from, limit); err != nil { + if upds, err = srv.store.GetUpdatesAfter(after, limit); err != nil { sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) return } @@ -193,11 +210,42 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) encoder := NewStatelessEncoder(w) for _, upd := range upds { // TODO: inside container? - encoder.Encode(upd) // TODO: handle error + if err := encoder.Encode(upd); err != nil { + s5l.Println("Error while sending data: %v", err) + return + } } } +// post one update + +func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { + // TODO: implement this! + + // decoder := NewStatelessDecoder(r.Body) + // data, err := decoder.Decode() + // if err != nil && err != io.EOF { + // s5l.Printf("web: failed to decode: %v\n", err) + // http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) + // return + // } + + // if err = srv.Append(data); err != nil { + // http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) + // } else { + // fmt.Fprintf(w, "1 update successfully stored.") + // } + + sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting signle update data not yet implemented!"}) +} + +// post updates in bulk mode + func webUpdatesPost(srv *Server, w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/updates/_bulk" { + sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update parameter"}) + return + } // TODO: implement this! // decoder, err := NewStatefulDecoder(r.Body) @@ -226,30 +274,12 @@ func webUpdatesPost(srv *Server, w http.ResponseWriter, r *http.Request) { // fmt.Fprintf(w, "%d update(s) successfully stored.", len(data)) // } - sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting signle update data not yet implemented!"}) -} - -func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { - // TODO: implement this! - - // decoder := NewStatelessDecoder(r.Body) - // data, err := decoder.Decode() - // if err != nil && err != io.EOF { - // s5l.Printf("web: failed to decode: %v\n", err) - // http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest) - // return - // } - - // if err = srv.Append(data); err != nil { - // http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError) - // } else { - // fmt.Fprintf(w, "1 update successfully stored.") - // } - sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting bulk update data not yet implemented!"}) } +// // /lastupdate/:UUID +// type webLastUpdateIdForUuidResponse struct { HubUuid string `json:"hub-uuid"` @@ -277,7 +307,9 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) sendWebResponse(w, http.StatusOK, resp) } +// // /lastupdate +// type webLastUpdateIdResponse struct { HubUuid string `json:"hub-uuid"` @@ -292,7 +324,7 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) { var err error resp := webLastUpdateIdResponse{} - resp.HubUuid = srv.GetHubId() + resp.HubUuid = srv.GetHubUuid() if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil { sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) return @@ -312,7 +344,9 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) encoder := json.NewEncoder(w) - encoder.Encode(respdata) + if err := encoder.Encode(respdata); err != nil { + s5l.Println("Error while sending data: %v", err) + } } type webHandler struct { @@ -344,7 +378,7 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) { mux.Handle("/healthz", webHandler{srv, webHealthz}) mux.Handle("/hubs", webHandler{srv, webHubs}) mux.Handle("/sources", webHandler{srv, webSources}) - mux.Handle("/updates/", webHandler{srv, webUpdate}) + mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam}) mux.Handle("/updates", webHandler{srv, webUpdates}) mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIdForUuid}) mux.Handle("/lastupdate", webHandler{srv, webLastUpdateId}) diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 48fee9c..8dda6af 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -436,7 +436,9 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error) id = 0 } if limit < 0 || limit > StoreGetUpdatesLimit { - s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit) + if limit > 0 { + s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit) + } limit = StoreGetUpdatesLimit } err = st.db.View(func(tx *bolt.Tx) error { @@ -495,7 +497,7 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) { // Auxilliary Data // -func (st Store) GetStoreId() string { +func (st Store) GetHubUuid() string { return st.hubUuid } diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go index a6b2178..8b7c385 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store_test.go +++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go @@ -322,7 +322,7 @@ func TestAppendAndFetch(t *testing.T) { out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder expected := in - expected.SourceHubUuid = store.GetStoreId() + expected.SourceHubUuid = store.GetHubUuid() expected.SourceHubDataUpdateId = 1 expected.ForwardHubUuid = "" expected.ForwardHubDataUpdateId = 0 @@ -350,7 +350,7 @@ func TestAppendAndFetch(t *testing.T) { } out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder expected = ins[i] - expected.SourceHubUuid = store.GetStoreId() + expected.SourceHubUuid = store.GetHubUuid() expected.SourceHubDataUpdateId = i + 2 expected.ForwardHubUuid = "" expected.ForwardHubDataUpdateId = 0 @@ -474,8 +474,8 @@ func TestGetUpdatesAfter(t *testing.T) { t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[:2]) } - // all after id 2 - updList, err = store.GetUpdatesAfter(2, -1) + // 100000 after id 2 + updList, err = store.GetUpdatesAfter(2, 100000) if err != nil { t.Fatalf("failed to fetch updates: %v", err) } @@ -549,7 +549,7 @@ func TestForwardedDataUpdates(t *testing.T) { t.Fatalf("unexpected error: %v", err) } myLastId = myLastId + 1 - in.ForwardHubUuid = store.GetStoreId() + in.ForwardHubUuid = store.GetHubUuid() in.ForwardHubDataUpdateId = myLastId expected = append(expected, in) upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond) @@ -598,35 +598,35 @@ func TestForwardedDataUpdates(t *testing.T) { } func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) { - lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetStoreId()) + lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) } if lastId != fwdSrc1Id { t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id) } - lastId, err = fwdStore.GetLastUpdateIdForUuid(src2Store.GetStoreId()) + lastId, err = fwdStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) } if lastId != fwdSrc2Id { t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id) } - lastId, err = finalStore.GetLastUpdateIdForUuid(src1Store.GetStoreId()) + lastId, err = finalStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) } if lastId != finalSrc1Id { t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id) } - lastId, err = finalStore.GetLastUpdateIdForUuid(src2Store.GetStoreId()) + lastId, err = finalStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) } if lastId != finalSrc2Id { t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id) } - lastId, err = finalStore.GetLastUpdateIdForUuid(fwdStore.GetStoreId()) + lastId, err = finalStore.GetLastUpdateIdForUuid(fwdStore.GetHubUuid()) if err != nil { t.Fatalf("unexpected error: %v", err) } |