summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-07 20:51:11 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-07 20:51:11 +0200
commit9a9c4d4eab5e3fe5baa7a727deff4989f896f097 (patch)
treec709fb2ce4d1639d5716c06b8f7c426721b0b5df
parentupdate test client to use new _bulk api (diff)
forwarding works now with new web api
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go56
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go100
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go36
3 files changed, 91 insertions, 101 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 144601e..10e4b9d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -35,9 +35,7 @@ package sfive
import (
"bytes"
"encoding/json"
- "io/ioutil"
"net/http"
- "strconv"
"time"
)
@@ -52,11 +50,12 @@ func findMaxId(values []DataUpdateFull) int {
return maxId
}
-func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
- storeId = srv.GetHubUuid()
+func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lastId int, hubUuid string, err error) {
+ hubUuid = srv.GetHubUuid()
+ lastId = -1
var resp *http.Response
- resp, err = client.Get(baseurl + "/lastupdate/" + storeId)
+ resp, err = client.Get(baseurl + "/lastupdate/" + hubUuid)
if err != nil {
s5l.Printf("fwd: failed to query for lastupdate: %v\n", err)
return
@@ -68,34 +67,22 @@ func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lat
return
}
- var body []byte
- body, err = ioutil.ReadAll(resp.Body)
- if err != nil {
- s5l.Printf("fwd: failed to read lastupdate response: %v\n", err)
+ dec := json.NewDecoder(resp.Body)
+ result := WebLastUpdateIdResponse{}
+ if err = dec.Decode(&result); err != nil {
+ s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err)
return
}
- if len(body) == 0 {
- latestId = -1
- } else {
- var tid int64
- tid, err = strconv.ParseInt(string(body), 10, 32)
- if err != nil {
- s5l.Printf("fwd: invalid lastupdate response: %v\n", err)
- return
- }
- latestId = int(tid)
- }
-
+ lastId = result.LastUpdateId
return
}
func (srv Server) forwardRun(baseurl string, client *http.Client) {
- url := baseurl + "/updates"
+ url := baseurl + "/updates/_bulk"
tryResync:
for {
lastId, _, err := srv.forwardGetLastUpdate(baseurl, client)
-
if err != nil {
s5l.Printf("fwd: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -120,19 +107,26 @@ tryResync:
continue nextBatch
}
- data, err := json.Marshal(DataUpdateFullContainer{updates})
-
- if err != nil {
- s5l.Panicf("fwd: encode failed: %v\n", err)
+ // TODO: use StatefulEncoder
+ buf := &bytes.Buffer{}
+ if err := json.NewEncoder(buf).Encode(Header{Version: 1}); err != nil {
+ s5l.Printf("fwd: failed encoding init message: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+ enc := NewStatelessEncoder(buf)
+ for _, upd := range updates {
+ if err := enc.Encode(upd); err != nil {
+ s5l.Printf("fwd: failed encoding updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
}
- s5l.Printf("fwd: marshal OK")
-
- resp, err := client.Post(url, "application/json", bytes.NewBuffer(data))
+ resp, err := client.Post(url, "application/json", buf)
if err != nil {
s5l.Printf("fwd: post failed: %v\n", err)
continue tryResync
- // TODO retry etc.
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index d098626..ee9f131 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -44,31 +44,22 @@ import (
"time"
)
-type webErrorResponse struct {
- Error string `json:"error,omitempty"`
-}
-
func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) {
// TODO: show index on '^/$'
- sendWebResponse(w, http.StatusNotFound, webErrorResponse{"not found"})
+ sendWebResponse(w, http.StatusNotFound, WebErrorResponse{"not found"})
}
//
// /healthz
//
-type webHealthzResponse struct {
- Status string `json:"status"`
- HubUuid string `json:"hub-uuid"`
-}
-
func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
sendInvalidMethod(w, r.Method)
return
}
- resp := webHealthzResponse{}
+ resp := WebHealthzResponse{}
resp.HubUuid = srv.GetHubUuid()
resp.Status = "OK" // TODO: do a more sophisticated check
sendWebResponse(w, http.StatusOK, resp)
@@ -78,10 +69,6 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
// /hubs
//
-type webHubsResponse struct {
- Hubs []string `json:"hubs"`
-}
-
func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
sendInvalidMethod(w, r.Method)
@@ -89,9 +76,9 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) {
}
var err error
- resp := webHubsResponse{}
+ resp := WebHubsResponse{}
if resp.Hubs, err = srv.store.GetHubs(); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
sendWebResponse(w, http.StatusOK, resp)
@@ -101,10 +88,6 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) {
// /sources
//
-type webSourcesResponse struct {
- Sources []SourceId `json:"sources"`
-}
-
func webSources(srv *Server, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
sendInvalidMethod(w, r.Method)
@@ -112,9 +95,9 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) {
}
var err error
- resp := webSourcesResponse{}
+ resp := WebSourcesResponse{}
if resp.Sources, err = srv.store.GetSources(); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
sendWebResponse(w, http.StatusOK, resp)
@@ -135,12 +118,12 @@ func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) {
}
if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + strings.TrimPrefix("/updates/", r.URL.Path)})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid update id: " + strings.TrimPrefix("/updates/", r.URL.Path)})
return
}
id, err := strconv.Atoi(path.Base(r.URL.Path))
if err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid update id: " + err.Error()})
return
}
webUpdateGet(srv, id, w, r)
@@ -150,7 +133,7 @@ func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) {
return
}
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid parameter: " + strings.TrimPrefix("/updates/", r.URL.Path)})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid parameter: " + strings.TrimPrefix("/updates/", r.URL.Path)})
default:
sendInvalidMethod(w, r.Method)
}
@@ -180,7 +163,7 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) {
if err == ErrNotFound {
status = http.StatusNotFound
}
- sendWebResponse(w, status, webErrorResponse{err.Error()})
+ sendWebResponse(w, status, WebErrorResponse{err.Error()})
return
}
@@ -189,19 +172,15 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) {
// get list of updates (as json array)
-type webUpdatesGetResponse struct {
- Updates []DataUpdateFull `json:"updates"`
-}
-
func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
after, limit, err := parseAfterAndLimit(r)
if err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()})
}
- resp := webUpdatesGetResponse{}
+ resp := WebUpdatesGetResponse{}
if resp.Updates, err = srv.store.GetUpdatesAfter(after, limit); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
sendWebResponse(w, http.StatusOK, resp)
@@ -212,17 +191,24 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
after, limit, err := parseAfterAndLimit(r)
if err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()})
}
upds, err := srv.store.GetUpdatesAfter(after, limit)
if err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
w.Header().Set("Content-Type", "application/json") // this is actually multiple json documents...
w.WriteHeader(http.StatusOK)
+
+ // TODO: use StatefulEncoder
+ if err := json.NewEncoder(w).Encode(Header{Version: 1}); err != nil {
+ s5l.Printf("Error while sending data: %v", err)
+ return
+ s5l.Printf("fwd: failed encoding init message: %v\n", err)
+ }
encoder := NewStatelessEncoder(w)
for _, upd := range upds {
if err := encoder.Encode(upd); err != nil {
@@ -234,24 +220,20 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
// post one update
-type webUpdatesPostResponse struct {
- NumUpdates int `json:"num-updates"`
-}
-
func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
decoder := NewStatelessDecoder(r.Body)
value, err := decoder.Decode()
if err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"error while decoding update: " + err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"error while decoding update: " + err.Error()})
return
}
if err = srv.Append(value); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
- sendWebResponse(w, http.StatusOK, webUpdatesPostResponse{1})
+ sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{1})
}
// post multiple updates in bulk mode
@@ -259,7 +241,7 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
decoder, err := NewStatefulDecoder(r.Body)
if err != nil && err != io.EOF {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"failed to read/decode init message: " + err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode init message: " + err.Error()})
return
}
@@ -268,7 +250,7 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
value, err := decoder.Decode()
if err != nil {
if err != io.EOF {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"failed to read/decode data message: " + err.Error()})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode data message: " + err.Error()})
return
}
break
@@ -277,27 +259,22 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
}
numValues := len(values)
if numValues < 1 {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"got no data messages"})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"got no data messages"})
return
}
if err = srv.AppendMany(values); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
- sendWebResponse(w, http.StatusOK, webUpdatesPostResponse{numValues})
+ sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numValues})
}
//
// /lastupdate/:UUID
//
-type webLastUpdateIdForUuidResponse struct {
- HubUuid string `json:"hub-uuid"`
- LastUpdateId int `json:"lastupdate"`
-}
-
func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
sendInvalidMethod(w, r.Method)
@@ -305,15 +282,15 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request)
}
if matched, err := path.Match("/lastupdate/?*", r.URL.Path); err != nil || !matched {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid uuid"})
+ sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"invalid uuid"})
return
}
var err error
- resp := webLastUpdateIdForUuidResponse{}
+ resp := WebLastUpdateIdResponse{}
resp.HubUuid = path.Base(r.URL.Path)
if resp.LastUpdateId, err = srv.store.GetLastUpdateIdForUuid(resp.HubUuid); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
sendWebResponse(w, http.StatusOK, resp)
@@ -323,11 +300,6 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request)
// /lastupdate
//
-type webLastUpdateIdResponse struct {
- HubUuid string `json:"hub-uuid"`
- LastUpdateId int `json:"lastupdate"`
-}
-
func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
sendInvalidMethod(w, r.Method)
@@ -335,10 +307,10 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) {
}
var err error
- resp := webLastUpdateIdResponse{}
+ resp := WebLastUpdateIdResponse{}
resp.HubUuid = srv.GetHubUuid()
if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil {
- sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
sendWebResponse(w, http.StatusOK, resp)
@@ -372,7 +344,7 @@ func parseAfterAndLimit(r *http.Request) (after, limit int, err error) {
}
func sendInvalidMethod(w http.ResponseWriter, method string) {
- sendWebResponse(w, http.StatusMethodNotAllowed, webErrorResponse{"invalid request method: " + method})
+ sendWebResponse(w, http.StatusMethodNotAllowed, WebErrorResponse{"invalid request method: " + method})
}
func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index a7816f1..6220c89 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -79,10 +79,6 @@ type DataUpdateFull struct {
DataUpdate
}
-type DataUpdateFullContainer struct {
- Data []DataUpdateFull `json:"data"`
-}
-
func (duf *DataUpdateFull) CopyFromSourceId(src *SourceId) {
duf.Hostname = src.Hostname
duf.StreamId = src.StreamId
@@ -95,6 +91,34 @@ func (duf *DataUpdateFull) CopyFromUpdate(du *DataUpdate) {
duf.Data = du.Data
}
-type GenericDataContainer struct {
- Data interface{} `json:"data"`
+// web
+
+type WebErrorResponse struct {
+ Error string `json:"error,omitempty"`
+}
+
+type WebHealthzResponse struct {
+ Status string `json:"status"`
+ HubUuid string `json:"hub-uuid"`
+}
+
+type WebHubsResponse struct {
+ Hubs []string `json:"hubs"`
+}
+
+type WebSourcesResponse struct {
+ Sources []SourceId `json:"sources"`
+}
+
+type WebUpdatesGetResponse struct {
+ Updates []DataUpdateFull `json:"updates"`
+}
+
+type WebUpdatesPostResponse struct {
+ NumUpdates int `json:"num-updates"`
+}
+
+type WebLastUpdateIdResponse struct {
+ HubUuid string `json:"hub-uuid"`
+ LastUpdateId int `json:"lastupdate"`
}