diff options
Diffstat (limited to 'src/hub')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 56 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 100 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5typesApi.go | 36 |
3 files changed, 91 insertions, 101 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 144601e..10e4b9d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -35,9 +35,7 @@ package sfive import ( "bytes" "encoding/json" - "io/ioutil" "net/http" - "strconv" "time" ) @@ -52,11 +50,12 @@ func findMaxId(values []DataUpdateFull) int { return maxId } -func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { - storeId = srv.GetHubUuid() +func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lastId int, hubUuid string, err error) { + hubUuid = srv.GetHubUuid() + lastId = -1 var resp *http.Response - resp, err = client.Get(baseurl + "/lastupdate/" + storeId) + resp, err = client.Get(baseurl + "/lastupdate/" + hubUuid) if err != nil { s5l.Printf("fwd: failed to query for lastupdate: %v\n", err) return @@ -68,34 +67,22 @@ func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lat return } - var body []byte - body, err = ioutil.ReadAll(resp.Body) - if err != nil { - s5l.Printf("fwd: failed to read lastupdate response: %v\n", err) + dec := json.NewDecoder(resp.Body) + result := WebLastUpdateIdResponse{} + if err = dec.Decode(&result); err != nil { + s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err) return } - if len(body) == 0 { - latestId = -1 - } else { - var tid int64 - tid, err = strconv.ParseInt(string(body), 10, 32) - if err != nil { - s5l.Printf("fwd: invalid lastupdate response: %v\n", err) - return - } - latestId = int(tid) - } - + lastId = result.LastUpdateId return } func (srv Server) forwardRun(baseurl string, client *http.Client) { - url := baseurl + "/updates" + url := baseurl + "/updates/_bulk" tryResync: for { lastId, _, err := srv.forwardGetLastUpdate(baseurl, client) - if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -120,19 +107,26 @@ tryResync: continue nextBatch } - data, err := json.Marshal(DataUpdateFullContainer{updates}) - - if err != nil { - s5l.Panicf("fwd: encode failed: %v\n", err) + // TODO: use StatefulEncoder + buf := &bytes.Buffer{} + if err := json.NewEncoder(buf).Encode(Header{Version: 1}); err != nil { + s5l.Printf("fwd: failed encoding init message: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + enc := NewStatelessEncoder(buf) + for _, upd := range updates { + if err := enc.Encode(upd); err != nil { + s5l.Printf("fwd: failed encoding updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } } - s5l.Printf("fwd: marshal OK") - - resp, err := client.Post(url, "application/json", bytes.NewBuffer(data)) + resp, err := client.Post(url, "application/json", buf) if err != nil { s5l.Printf("fwd: post failed: %v\n", err) continue tryResync - // TODO retry etc. } resp.Body.Close() if resp.StatusCode != http.StatusOK { diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index d098626..ee9f131 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -44,31 +44,22 @@ import ( "time" ) -type webErrorResponse struct { - Error string `json:"error,omitempty"` -} - func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) { // TODO: show index on '^/$' - sendWebResponse(w, http.StatusNotFound, webErrorResponse{"not found"}) + sendWebResponse(w, http.StatusNotFound, WebErrorResponse{"not found"}) } // // /healthz // -type webHealthzResponse struct { - Status string `json:"status"` - HubUuid string `json:"hub-uuid"` -} - func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) return } - resp := webHealthzResponse{} + resp := WebHealthzResponse{} resp.HubUuid = srv.GetHubUuid() resp.Status = "OK" // TODO: do a more sophisticated check sendWebResponse(w, http.StatusOK, resp) @@ -78,10 +69,6 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) { // /hubs // -type webHubsResponse struct { - Hubs []string `json:"hubs"` -} - func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) @@ -89,9 +76,9 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) { } var err error - resp := webHubsResponse{} + resp := WebHubsResponse{} if resp.Hubs, err = srv.store.GetHubs(); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } sendWebResponse(w, http.StatusOK, resp) @@ -101,10 +88,6 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) { // /sources // -type webSourcesResponse struct { - Sources []SourceId `json:"sources"` -} - func webSources(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) @@ -112,9 +95,9 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) { } var err error - resp := webSourcesResponse{} + resp := WebSourcesResponse{} if resp.Sources, err = srv.store.GetSources(); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } sendWebResponse(w, http.StatusOK, resp) @@ -135,12 +118,12 @@ func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) { } if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + strings.TrimPrefix("/updates/", r.URL.Path)}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid update id: " + strings.TrimPrefix("/updates/", r.URL.Path)}) return } id, err := strconv.Atoi(path.Base(r.URL.Path)) if err != nil { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid update id: " + err.Error()}) return } webUpdateGet(srv, id, w, r) @@ -150,7 +133,7 @@ func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) { return } - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid parameter: " + strings.TrimPrefix("/updates/", r.URL.Path)}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid parameter: " + strings.TrimPrefix("/updates/", r.URL.Path)}) default: sendInvalidMethod(w, r.Method) } @@ -180,7 +163,7 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) { if err == ErrNotFound { status = http.StatusNotFound } - sendWebResponse(w, status, webErrorResponse{err.Error()}) + sendWebResponse(w, status, WebErrorResponse{err.Error()}) return } @@ -189,19 +172,15 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) { // get list of updates (as json array) -type webUpdatesGetResponse struct { - Updates []DataUpdateFull `json:"updates"` -} - func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) { after, limit, err := parseAfterAndLimit(r) if err != nil { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()}) } - resp := webUpdatesGetResponse{} + resp := WebUpdatesGetResponse{} if resp.Updates, err = srv.store.GetUpdatesAfter(after, limit); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } sendWebResponse(w, http.StatusOK, resp) @@ -212,17 +191,24 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) { func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { after, limit, err := parseAfterAndLimit(r) if err != nil { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()}) } upds, err := srv.store.GetUpdatesAfter(after, limit) if err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } w.Header().Set("Content-Type", "application/json") // this is actually multiple json documents... w.WriteHeader(http.StatusOK) + + // TODO: use StatefulEncoder + if err := json.NewEncoder(w).Encode(Header{Version: 1}); err != nil { + s5l.Printf("Error while sending data: %v", err) + return + s5l.Printf("fwd: failed encoding init message: %v\n", err) + } encoder := NewStatelessEncoder(w) for _, upd := range upds { if err := encoder.Encode(upd); err != nil { @@ -234,24 +220,20 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { // post one update -type webUpdatesPostResponse struct { - NumUpdates int `json:"num-updates"` -} - func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { decoder := NewStatelessDecoder(r.Body) value, err := decoder.Decode() if err != nil { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"error while decoding update: " + err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"error while decoding update: " + err.Error()}) return } if err = srv.Append(value); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } - sendWebResponse(w, http.StatusOK, webUpdatesPostResponse{1}) + sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{1}) } // post multiple updates in bulk mode @@ -259,7 +241,7 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { decoder, err := NewStatefulDecoder(r.Body) if err != nil && err != io.EOF { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"failed to read/decode init message: " + err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode init message: " + err.Error()}) return } @@ -268,7 +250,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { value, err := decoder.Decode() if err != nil { if err != io.EOF { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"failed to read/decode data message: " + err.Error()}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode data message: " + err.Error()}) return } break @@ -277,27 +259,22 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { } numValues := len(values) if numValues < 1 { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"got no data messages"}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"got no data messages"}) return } if err = srv.AppendMany(values); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } - sendWebResponse(w, http.StatusOK, webUpdatesPostResponse{numValues}) + sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numValues}) } // // /lastupdate/:UUID // -type webLastUpdateIdForUuidResponse struct { - HubUuid string `json:"hub-uuid"` - LastUpdateId int `json:"lastupdate"` -} - func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) @@ -305,15 +282,15 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) } if matched, err := path.Match("/lastupdate/?*", r.URL.Path); err != nil || !matched { - sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid uuid"}) + sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid uuid"}) return } var err error - resp := webLastUpdateIdForUuidResponse{} + resp := WebLastUpdateIdResponse{} resp.HubUuid = path.Base(r.URL.Path) if resp.LastUpdateId, err = srv.store.GetLastUpdateIdForUuid(resp.HubUuid); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } sendWebResponse(w, http.StatusOK, resp) @@ -323,11 +300,6 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) // /lastupdate // -type webLastUpdateIdResponse struct { - HubUuid string `json:"hub-uuid"` - LastUpdateId int `json:"lastupdate"` -} - func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { sendInvalidMethod(w, r.Method) @@ -335,10 +307,10 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) { } var err error - resp := webLastUpdateIdResponse{} + resp := WebLastUpdateIdResponse{} resp.HubUuid = srv.GetHubUuid() if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil { - sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()}) + sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } sendWebResponse(w, http.StatusOK, resp) @@ -372,7 +344,7 @@ func parseAfterAndLimit(r *http.Request) (after, limit int, err error) { } func sendInvalidMethod(w http.ResponseWriter, method string) { - sendWebResponse(w, http.StatusMethodNotAllowed, webErrorResponse{"invalid request method: " + method}) + sendWebResponse(w, http.StatusMethodNotAllowed, WebErrorResponse{"invalid request method: " + method}) } func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go index a7816f1..6220c89 100644 --- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go +++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go @@ -79,10 +79,6 @@ type DataUpdateFull struct { DataUpdate } -type DataUpdateFullContainer struct { - Data []DataUpdateFull `json:"data"` -} - func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) { duf.Hostname = src.Hostname duf.StreamId = src.StreamId @@ -95,6 +91,34 @@ func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) { duf.Data = du.Data } -type GenericDataContainer struct { - Data interface{} `json:"data"` +// web + +type WebErrorResponse struct { + Error string `json:"error,omitempty"` +} + +type WebHealthzResponse struct { + Status string `json:"status"` + HubUuid string `json:"hub-uuid"` +} + +type WebHubsResponse struct { + Hubs []string `json:"hubs"` +} + +type WebSourcesResponse struct { + Sources []SourceId `json:"sources"` +} + +type WebUpdatesGetResponse struct { + Updates []DataUpdateFull `json:"updates"` +} + +type WebUpdatesPostResponse struct { + NumUpdates int `json:"num-updates"` +} + +type WebLastUpdateIdResponse struct { + HubUuid string `json:"hub-uuid"` + LastUpdateId int `json:"lastupdate"` } |