summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-07 19:03:20 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-07 19:03:20 +0200
commit416dcceb43bdddb2d83b0cb3db7f6f90a70698e9 (patch)
treeee4567187fc810a9d3da3238f5cbba62762c7bcb
parentuse /updates/_bulk for bulk imports (like Elasticsearch) (diff)
cleaned up web api _bulk vs normal
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go130
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5typesApi.go2
2 files changed, 83 insertions, 49 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 687aa82..efe2ff4 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -35,10 +35,12 @@ package sfive
import (
"encoding/json"
// "io"
+ "fmt"
"net"
"net/http"
"path"
"strconv"
+ "strings"
"time"
)
@@ -123,11 +125,32 @@ func webSources(srv *Server, w http.ResponseWriter, r *http.Request) {
//
func webUpdatesWithParam(srv *Server, w http.ResponseWriter, r *http.Request) {
+ bulk := r.URL.Path == "/updates/_bulk"
+
switch r.Method {
case "GET":
- webUpdateGet(srv, w, r)
+ if bulk {
+ webUpdatesGetBulk(srv, w, r)
+ return
+ }
+
+ if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + strings.TrimPrefix("/updates/", r.URL.Path)})
+ return
+ }
+ id, err := strconv.Atoi(path.Base(r.URL.Path))
+ if err != nil {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + err.Error()})
+ return
+ }
+ webUpdateGet(srv, id, w, r)
case "POST":
- webUpdatesPost(srv, w, r)
+ if bulk {
+ webUpdatesPostBulk(srv, w, r)
+ return
+ }
+
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid parameter: " + strings.TrimPrefix("/updates/", r.URL.Path)})
default:
sendInvalidMethod(w, r.Method)
}
@@ -150,18 +173,9 @@ func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) {
// get one update
-func webUpdateGet(srv *Server, w http.ResponseWriter, r *http.Request) {
- if matched, err := path.Match("/updates/?*", r.URL.Path); err != nil || !matched {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id"})
- return
- }
- id, err := strconv.Atoi(path.Base(r.URL.Path))
+func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) {
+ upd, err := srv.store.GetUpdate(id)
if err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update id: " + err.Error()})
- return
- }
- var upd DataUpdateFull
- if upd, err = srv.store.GetUpdate(id); err != nil {
status := http.StatusInternalServerError
if err == ErrNotFound {
status = http.StatusNotFound
@@ -169,49 +183,50 @@ func webUpdateGet(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, status, webErrorResponse{err.Error()})
return
}
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- encoder := NewStatelessEncoder(w)
- if err := encoder.Encode(upd); err != nil {
- s5l.Println("Error while sending data: %v", err)
- }
+
+ sendWebResponse(w, http.StatusOK, upd)
}
-// get list of updates
+// get list of updates (as json array)
-func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
- after := 0
- limit := -1
+type webUpdatesResponse struct {
+ Updates []DataUpdateFull `json:"updates"`
+}
- q := r.URL.Query()
- var err error
+func webUpdatesGet(srv *Server, w http.ResponseWriter, r *http.Request) {
+ after, limit, err := parseAfterAndLimit(r)
+ if err != nil {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()})
+ }
- afterStr := q.Get("after")
- if afterStr != "" {
- if after, err = strconv.Atoi(afterStr); err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid after value: " + err.Error()})
- return
- }
+ resp := webUpdatesResponse{}
+ if resp.Updates, err = srv.store.GetUpdatesAfter(after, limit); err != nil {
+ sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
+ return
}
- limitStr := q.Get("limit")
- if limitStr != "" {
- if limit, err = strconv.Atoi(limitStr); err != nil {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid limit value: " + err.Error()})
- return
- }
+ sendWebResponse(w, http.StatusOK, resp)
+}
+
+// get list of updates (bulk format)
+
+func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
+ after, limit, err := parseAfterAndLimit(r)
+ if err != nil {
+ sendWebResponse(w, http.StatusBadRequest, webErrorResponse{err.Error()})
}
- var upds []DataUpdateFull
- if upds, err = srv.store.GetUpdatesAfter(after, limit); err != nil {
+ upds, err := srv.store.GetUpdatesAfter(after, limit)
+ if err != nil {
sendWebResponse(w, http.StatusInternalServerError, webErrorResponse{err.Error()})
return
}
+
w.Header().Set("Content-Type", "application/json") // this is actually multiple json documents...
w.WriteHeader(http.StatusOK)
encoder := NewStatelessEncoder(w)
- for _, upd := range upds { // TODO: inside container?
+ for _, upd := range upds {
if err := encoder.Encode(upd); err != nil {
- s5l.Println("Error while sending data: %v", err)
+ s5l.Printf("Error while sending data: %v", err)
return
}
}
@@ -239,13 +254,9 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusNotImplemented, webErrorResponse{"posting signle update data not yet implemented!"})
}
-// post updates in bulk mode
+// post multiple updates in bulk mode
-func webUpdatesPost(srv *Server, w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/updates/_bulk" {
- sendWebResponse(w, http.StatusBadRequest, webErrorResponse{"invalid update parameter"})
- return
- }
+func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
// TODO: implement this!
// decoder, err := NewStatefulDecoder(r.Body)
@@ -336,6 +347,29 @@ func webLastUpdateId(srv *Server, w http.ResponseWriter, r *http.Request) {
// common functions
//
+func parseAfterAndLimit(r *http.Request) (after, limit int, err error) {
+ q := r.URL.Query()
+
+ afterStr := q.Get("after")
+ if afterStr != "" {
+ if after, err = strconv.Atoi(afterStr); err != nil {
+ err = fmt.Errorf("invalid after value: %v", err.Error())
+ return
+ }
+ }
+
+ limit = -1
+ limitStr := q.Get("limit")
+ if limitStr != "" {
+ if limit, err = strconv.Atoi(limitStr); err != nil {
+ err = fmt.Errorf("invalid limit value: %v", err.Error())
+ return
+ }
+ }
+
+ return
+}
+
func sendInvalidMethod(w http.ResponseWriter, method string) {
sendWebResponse(w, http.StatusMethodNotAllowed, webErrorResponse{"invalid request method: " + method})
}
@@ -345,7 +379,7 @@ func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
w.WriteHeader(status)
encoder := json.NewEncoder(w)
if err := encoder.Encode(respdata); err != nil {
- s5l.Println("Error while sending data: %v", err)
+ s5l.Printf("Error while sending data: %v", err)
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5typesApi.go b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
index d028ffb..a7816f1 100644
--- a/src/hub/src/spreadspace.org/sfive/s5typesApi.go
+++ b/src/hub/src/spreadspace.org/sfive/s5typesApi.go
@@ -66,7 +66,7 @@ type DataUpdate struct {
}
type Header struct {
- Version uint `json:"version"`
+ Version uint `json:"version,omitempty"` // omitempty is needed for data only messages and for REST API
SourceHubUuid string `json:"SourceHubUuid,omitempty"`
SourceHubDataUpdateId int `json:"SourceHubDataUpdateId,omitempty"`
ForwardHubUuid string `json:"ForwardHubUuid,omitempty"`