summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-07 17:58:54 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-07 17:58:54 +0200
commit5e83447ec336f98c5bfa511d0e895b323c07362e (patch)
tree5feb92e2e2e5a32d65a2e8d451bb9c59837507e7
parentadded dispatcher for bulk vs single updates post (diff)
use /updates/_bulk for bulk imports (like Elasticsearch)
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go144
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go6
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store_test.go20
6 files changed, 116 insertions, 80 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 76805c3..75460d1 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -53,12 +53,12 @@ type getUpdatesAfterToken struct {
response chan getUpdatesResult
}
-type getHubIdResult struct {
+type getHubUuidResult struct {
id string
}
-type getHubIdToken struct {
- response chan getHubIdResult
+type getHubUuidToken struct {
+ response chan getHubUuidResult
}
type getLastUpdateIdResult struct {
@@ -77,7 +77,7 @@ type Server struct {
appendChan chan appendToken
appendManyChan chan appendManyToken
getUpdatesAfterChan chan getUpdatesAfterToken
- getHubIdChan chan getHubIdToken
+ getHubUuidChan chan getHubUuidToken
getLastUpdateIdChan chan getLastUpdateIdToken
}
@@ -94,8 +94,8 @@ func (srv Server) appendActor() {
case token := <-srv.getUpdatesAfterChan:
values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
token.response <- getUpdatesResult{values, err}
- case token := <-srv.getHubIdChan:
- token.response <- getHubIdResult{srv.store.GetStoreId()}
+ case token := <-srv.getHubUuidChan:
+ token.response <- getHubUuidResult{srv.store.GetHubUuid()}
case token := <-srv.getLastUpdateIdChan:
lastUpdateId, err := srv.store.GetLastUpdateId()
token.response <- getLastUpdateIdResult{lastUpdateId, err}
@@ -125,10 +125,10 @@ func (srv Server) GetUpdatesAfter(id, limit int) ([]DataUpdateFull, error) {
return res.values, res.err
}
-func (srv Server) GetHubId() string {
- token := getHubIdToken{response: make(chan getHubIdResult, 1)}
+func (srv Server) GetHubUuid() string {
+ token := getHubUuidToken{response: make(chan getHubUuidResult, 1)}
defer close(token.response)
- srv.getHubIdChan <- token
+ srv.getHubUuidChan <- token
res := <-token.response
return res.id
}
@@ -150,7 +150,7 @@ func (srv Server) Close() {
close(srv.appendChan)
close(srv.appendManyChan)
close(srv.getUpdatesAfterChan)
- close(srv.getHubIdChan)
+ close(srv.getHubUuidChan)
close(srv.getLastUpdateIdChan)
srv.store.Close()
s5l.Printf("server: finished\n")
@@ -169,7 +169,7 @@ func NewServer(dbPath string, readOnly bool) (server *Server, err error) {
server.appendChan = make(chan appendToken, 32)
server.appendManyChan = make(chan appendManyToken, 32)
server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 32)
- server.getHubIdChan = make(chan getHubIdToken, 32)
+ server.getHubUuidChan = make(chan getHubUuidToken, 32)
server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 32)
go server.appendActor()
s5l.Printf("server: started\n")
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index d6874de..144601e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -53,7 +53,7 @@ func findMaxId(values []DataUpdateFull) int {
}
func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
- storeId = srv.GetHubId()
+ storeId = srv.GetHubUuid()
var resp *http.Response
resp, err = client.Get(baseurl + "/lastupdate/" + storeId)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 8880b8a..4297bbf 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -50,7 +50,7 @@ const forwardEsLastUpdateJson = `{
func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
url := baseurl + "/dataupdate/_search?search_type=count"
- storeId = srv.GetHubId()
+ storeId = srv.GetHubUuid()
queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId)
s5dl.Printf("fwd-es: query: %s", queryJson)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index cbcf2d4..687aa82 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -51,11 +51,13 @@ func webNotFound(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusNotFound, webErrorResponse{"not found"})
}
+//
// /healthz
+//
type webHealthzResponse struct {
- Status string `json:"status"`
- HubId string `json:"hub-uuid"`
+ Status string `json:"status"`
+ HubUuid string `json:"hub-uuid"`
}
func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
@@ -65,12 +67,14 @@ func webHealthz(srv *Server, w http.ResponseWriter, r *http.Request) {
}
resp := webHealthzResponse{}
- resp.HubId = srv.GetHubId()
+ resp.HubUuid = srv.GetHubUuid()
resp.Status = "OK" // TODO: do a more sophisticated check
sendWebResponse(w, http.StatusOK, resp)
}
+//
// /hubs
+//
type webHubsResponse struct {
Hubs []string `json:"hubs"`
@@ -91,7 +95,9 @@ func webHubs(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusOK, resp)
}
+//
// /sources
+//
type webSourcesResponse struct {
Sources []SourceId `json:"sources"`
@@ -112,14 +118,39 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusOK, resp)
}
-// /updates/:ID
+//
+// /updates/(:ID|_bulk)
+//
-func webUpdate(srv *Server, w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
+func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "GET":
+ webUpdateGet(srv, w, r)
+ case "POST":
+ webUpdatesPost(srv, w, r)
+ default:
+ sendInvalidMethod(w, r.Method)
+ }
+}
+
+//
+// /updates
+//
+
+func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "GET":
+ webUpdatesGet(srv, w, r)
+ case "POST":
+ webUpdatePost(srv, w, r)
+ default:
sendInvalidMethod(w, r.Method)
- return
}
+}
+// get one update
+
+func webUpdateGet(srv *Server, w http.ResponseWriter, r *http.Request) {
if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched {
sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id"})
return
@@ -141,38 +172,24 @@ func webUpdate(srv *Server, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
encoder := NewStatelessEncoder(w)
- encoder.Encode(upd) // TODO: handle error
-}
-
-// /updates
-
-func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) {
- switch r.Method {
- case "GET":
- webUpdatesGet(srv, w, r)
- case "POST":
- q := r.URL.Query()
- if _, exists := q["bulk"]; !exists {
- webUpdatesPost(srv, w, r)
- } else {
- webUpdatesPostBulk(srv, w, r)
- }
- default:
- sendInvalidMethod(w, r.Method)
+ if err := encoder.Encode(upd); err != nil {
+ s5l.Println("Error while sending data: %v", err)
}
}
+// get list of updates
+
func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
- from := 0
+ after := 0
limit := -1
q := r.URL.Query()
var err error
- fromStr := q.Get("from")
- if fromStr != "" {
- if from, err = strconv.Atoi(fromStr); err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid from value: " + err.Error()})
+ afterStr := q.Get("after")
+ if afterStr != "" {
+ if after, err = strconv.Atoi(afterStr); err != nil {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid after value: " + err.Error()})
return
}
}
@@ -185,7 +202,7 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
}
var upds []DataUpdateFull
- if upds, err = srv.store.GetUpdatesAfter(from, limit); err != nil {
+ if upds, err = srv.store.GetUpdatesAfter(after, limit); err != nil {
sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
return
}
@@ -193,11 +210,42 @@ func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
encoder := NewStatelessEncoder(w)
for _, upd := range upds { // TODO: inside container?
- encoder.Encode(upd) // TODO: handle error
+ if err := encoder.Encode(upd); err != nil {
+ s5l.Println("Error while sending data: %v", err)
+ return
+ }
}
}
+// post one update
+
+func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
+ // TODO: implement this!
+
+ // decoder := NewStatelessDecoder(r.Body)
+ // data, err := decoder.Decode()
+ // if err != nil && err != io.EOF {
+ // s5l.Printf("web: failed to decode: %v\n", err)
+ // http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
+ // return
+ // }
+
+ // if err = srv.Append(data); err != nil {
+ // http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
+ // } else {
+ // fmt.Fprintf(w, "1 update successfully stored.")
+ // }
+
+ sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting signle update data not yet implemented!"})
+}
+
+// post updates in bulk mode
+
func webUpdatesPost(srv *Server, w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/updates/_bulk" {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update parameter"})
+ return
+ }
// TODO: implement this!
// decoder, err := NewStatefulDecoder(r.Body)
@@ -226,30 +274,12 @@ func webUpdatesPost(srv *Server, w http.ResponseWriter, r *http.Request) {
// fmt.Fprintf(w, "%d update(s) successfully stored.", len(data))
// }
- sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting signle update data not yet implemented!"})
-}
-
-func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
- // TODO: implement this!
-
- // decoder := NewStatelessDecoder(r.Body)
- // data, err := decoder.Decode()
- // if err != nil && err != io.EOF {
- // s5l.Printf("web: failed to decode: %v\n", err)
- // http.Error(w, fmt.Sprintf("failed to read/decode update message: %v", err), http.StatusBadRequest)
- // return
- // }
-
- // if err = srv.Append(data); err != nil {
- // http.Error(w, fmt.Sprintf("failed to store data: %s", err), http.StatusInternalServerError)
- // } else {
- // fmt.Fprintf(w, "1 update successfully stored.")
- // }
-
sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting bulk update data not yet implemented!"})
}
+//
// /lastupdate/:UUID
+//
type webLastUpdateIdForUuidResponse struct {
HubUuid string `json:"hub-uuid"`
@@ -277,7 +307,9 @@ func webLastUpdateIdForUuid(srv *Server, w http.ResponseWriter, r *http.Request)
sendWebResponse(w, http.StatusOK, resp)
}
+//
// /lastupdate
+//
type webLastUpdateIdResponse struct {
HubUuid string `json:"hub-uuid"`
@@ -292,7 +324,7 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) {
var err error
resp := webLastUpdateIdResponse{}
- resp.HubUuid = srv.GetHubId()
+ resp.HubUuid = srv.GetHubUuid()
if resp.LastUpdateId, err = srv.store.GetLastUpdateId(); err != nil {
sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
return
@@ -312,7 +344,9 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
encoder := json.NewEncoder(w)
- encoder.Encode(respdata)
+ if err := encoder.Encode(respdata); err != nil {
+ s5l.Println("Error while sending data: %v", err)
+ }
}
type webHandler struct {
@@ -344,7 +378,7 @@ func webRun(listener *net.TCPListener, srv *Server) (err error) {
mux.Handle("/healthz", webHandler{srv, webHealthz})
mux.Handle("/hubs", webHandler{srv, webHubs})
mux.Handle("/sources", webHandler{srv, webSources})
- mux.Handle("/updates/", webHandler{srv, webUpdate})
+ mux.Handle("/updates/", webHandler{srv, webUpdatesWithParam})
mux.Handle("/updates", webHandler{srv, webUpdates})
mux.Handle("/lastupdate/", webHandler{srv, webLastUpdateIdForUuid})
mux.Handle("/lastupdate", webHandler{srv, webLastUpdateId})
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 48fee9c..8dda6af 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -436,7 +436,9 @@ func (st Store) GetUpdatesAfter(id, limit int) (res []DataUpdateFull, err error)
id = 0
}
if limit < 0 || limit > StoreGetUpdatesLimit {
- s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit)
+ if limit > 0 {
+ s5l.Printf("store: truncating get-update limit to %d (from %d)", StoreGetUpdatesLimit, limit)
+ }
limit = StoreGetUpdatesLimit
}
err = st.db.View(func(tx *bolt.Tx) error {
@@ -495,7 +497,7 @@ func (st Store) GetUpdate(id int) (res DataUpdateFull, err error) {
// Auxilliary Data
//
-func (st Store) GetStoreId() string {
+func (st Store) GetHubUuid() string {
return st.hubUuid
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store_test.go b/src/hub/src/spreadspace.org/sfive/s5store_test.go
index a6b2178..8b7c385 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store_test.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store_test.go
@@ -322,7 +322,7 @@ func TestAppendAndFetch(t *testing.T) {
out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder
expected := in
- expected.SourceHubUuid = store.GetStoreId()
+ expected.SourceHubUuid = store.GetHubUuid()
expected.SourceHubDataUpdateId = 1
expected.ForwardHubUuid = ""
expected.ForwardHubDataUpdateId = 0
@@ -350,7 +350,7 @@ func TestAppendAndFetch(t *testing.T) {
}
out.StartTime = out.StartTime.UTC() // this would normally be handled by the protocol encoder
expected = ins[i]
- expected.SourceHubUuid = store.GetStoreId()
+ expected.SourceHubUuid = store.GetHubUuid()
expected.SourceHubDataUpdateId = i + 2
expected.ForwardHubUuid = ""
expected.ForwardHubDataUpdateId = 0
@@ -474,8 +474,8 @@ func TestGetUpdatesAfter(t *testing.T) {
t.Fatalf("failed to fetch updates\nactual: %v\nexpected: %v\n", updList, expected[:2])
}
- // all after id 2
- updList, err = store.GetUpdatesAfter(2, -1)
+ // 100000 after id 2
+ updList, err = store.GetUpdatesAfter(2, 100000)
if err != nil {
t.Fatalf("failed to fetch updates: %v", err)
}
@@ -549,7 +549,7 @@ func TestForwardedDataUpdates(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
myLastId = myLastId + 1
- in.ForwardHubUuid = store.GetStoreId()
+ in.ForwardHubUuid = store.GetHubUuid()
in.ForwardHubDataUpdateId = myLastId
expected = append(expected, in)
upd.StartTime = upd.StartTime.Add(time.Duration(upd.Duration) * time.Millisecond)
@@ -598,35 +598,35 @@ func TestForwardedDataUpdates(t *testing.T) {
}
func checkForwardedDataUpdates2(t *testing.T, src1Store, src2Store, fwdStore, finalStore Store, fwdSrc1Id, fwdSrc2Id, finalSrc1Id, finalSrc2Id, finalFwdId int) {
- lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetStoreId())
+ lastId, err := fwdStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lastId != fwdSrc1Id {
t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc1Id)
}
- lastId, err = fwdStore.GetLastUpdateIdForUuid(src2Store.GetStoreId())
+ lastId, err = fwdStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lastId != fwdSrc2Id {
t.Fatalf("failed to get last update ID: %d, expected %d", lastId, fwdSrc2Id)
}
- lastId, err = finalStore.GetLastUpdateIdForUuid(src1Store.GetStoreId())
+ lastId, err = finalStore.GetLastUpdateIdForUuid(src1Store.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lastId != finalSrc1Id {
t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc1Id)
}
- lastId, err = finalStore.GetLastUpdateIdForUuid(src2Store.GetStoreId())
+ lastId, err = finalStore.GetLastUpdateIdForUuid(src2Store.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if lastId != finalSrc2Id {
t.Fatalf("failed to get last update ID: %d, expected %d", lastId, finalSrc2Id)
}
- lastId, err = finalStore.GetLastUpdateIdForUuid(fwdStore.GetStoreId())
+ lastId, err = finalStore.GetLastUpdateIdForUuid(fwdStore.GetHubUuid())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}