summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-10 22:16:06 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-10 22:16:06 +0200
commitb7bbe18d29d58b6c5bd9f4c0872ed1a0b76da5ef (patch)
treef7161e9143d4e6ddc91e4b2aaf83652dbc9747b9
parentsome more variable renaming (diff)
and some more variable renaming
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go22
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go20
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go14
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go4
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvPipe.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvWeb.go37
6 files changed, 53 insertions, 56 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index 91ab598..f99bffb 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -33,18 +33,18 @@
package sfive
type appendToken struct {
- data UpdateFull
+ update UpdateFull
response chan error
}
type appendManyToken struct {
- data []UpdateFull
+ updates []UpdateFull
response chan error
}
type getUpdatesResult struct {
- values []UpdateFull
- err error
+ updates []UpdateFull
+ err error
}
type getUpdatesAfterToken struct {
@@ -88,9 +88,9 @@ func (srv Server) appendActor() {
case <-srv.quit:
return
case token := <-srv.appendChan:
- token.response <- srv.store.Append(token.data)
+ token.response <- srv.store.Append(token.update)
case token := <-srv.appendManyChan:
- token.response <- srv.store.AppendMany(token.data)
+ token.response <- srv.store.AppendMany(token.updates)
case token := <-srv.getUpdatesAfterChan:
values, err := srv.store.GetUpdatesAfter(token.id, token.limit)
token.response <- getUpdatesResult{values, err}
@@ -103,15 +103,15 @@ func (srv Server) appendActor() {
}
}
-func (srv Server) Append(data UpdateFull) error {
- token := appendToken{data: data, response: make(chan error, 1)}
+func (srv Server) Append(update UpdateFull) error {
+ token := appendToken{update: update, response: make(chan error, 1)}
defer close(token.response)
srv.appendChan <- token
return <-token.response
}
-func (srv Server) AppendMany(data []UpdateFull) error {
- token := appendManyToken{data: data, response: make(chan error, 1)}
+func (srv Server) AppendMany(updates []UpdateFull) error {
+ token := appendManyToken{updates: updates, response: make(chan error, 1)}
defer close(token.response)
srv.appendManyChan <- token
return <-token.response
@@ -122,7 +122,7 @@ func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) {
defer close(token.response)
srv.getUpdatesAfterChan <- token
res := <-token.response
- return res.values, res.err
+ return res.updates, res.err
}
func (srv Server) GetHubUuid() string {
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 11709a1..bd16c78 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -40,9 +40,9 @@ import (
"time"
)
-func findMaxId(values []UpdateFull) int {
+func findMaxId(updates []UpdateFull) int {
maxId := -1
- for _, value := range values {
+ for _, value := range updates {
if id := value.SourceHubUpdateId; id > maxId {
maxId = id
}
@@ -50,11 +50,11 @@ func findMaxId(values []UpdateFull) int {
return maxId
}
-func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) {
+func fwdGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) {
lastId = -1
var resp *http.Response
- resp, err = client.Get(baseurl + "/lastupdate/" + hubUuid)
+ resp, err = client.Get(baseUrl + "/lastupdate/" + hubUuid)
if err != nil {
s5l.Printf("fwd: failed to query for lastupdate: %v\n", err)
return
@@ -66,9 +66,8 @@ func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (last
return
}
- dec := json.NewDecoder(resp.Body)
result := WebLastUpdateIdResponse{}
- if err = dec.Decode(&result); err != nil {
+ if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err)
return
}
@@ -103,20 +102,19 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er
if resp.StatusCode != http.StatusOK {
return 0, errors.New("")
}
- dec := json.NewDecoder(resp.Body)
result := WebUpdatesPostResponse{}
- if err = dec.Decode(&result); err != nil {
+ if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, err
}
return result.NumUpdates, nil
}
-func (srv Server) forwardRun(baseurl string, client *http.Client) {
- url := baseurl + "/updates/_bulk"
+func (srv Server) forwardRun(baseUrl string, client *http.Client) {
+ url := baseUrl + "/updates/_bulk"
hubUuid := srv.GetHubUuid()
tryResync:
for {
- lastId, err := fwdGetLastUpdate(baseurl, client, hubUuid)
+ lastId, err := fwdGetLastUpdateId(baseUrl, client, hubUuid)
if err != nil {
s5l.Printf("fwd: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 8cf3af5..005e03d 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -42,15 +42,15 @@ import (
"time"
)
-const forwardEsLastUpdateJson = `{
+const forwardEsLastUpdateIdJson = `{
"query": {"match": { "SourceHubUuid": "%s" } },
"aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
}`
-func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) {
- url := baseurl + "/dataupdate/_search?search_type=count"
+func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) {
+ url := baseUrl + "/dataupdate/_search?search_type=count"
- queryJson := fmt.Sprintf(forwardEsLastUpdateJson, hubUuid)
+ queryJson := fmt.Sprintf(forwardEsLastUpdateIdJson, hubUuid)
s5dl.Printf("fwd-es: query: %s", queryJson)
var resp *http.Response
@@ -98,12 +98,12 @@ func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (la
return
}
-func (srv Server) forwardEsRun(baseurl string, client *http.Client) {
- url := baseurl + "/_bulk"
+func (srv Server) forwardEsRun(baseUrl string, client *http.Client) {
+ url := baseUrl + "/_bulk"
hubUuid := srv.GetHubUuid()
tryResync:
for {
- lastId, err := fwdEsGetLastUpdate(baseurl, client, hubUuid)
+ lastId, err := fwdEsGetLastUpdateId(baseUrl, client, hubUuid)
if err != nil {
s5l.Printf("fwd-es: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 6caa26f..ff6d985 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -49,7 +49,7 @@ type forwardPiwikBulkRequest struct {
TokenAuth string `json:"token_auth"`
}
-func fwdPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) {
+func fwdPiwikGetLastUpdateId(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) {
// TODO: ask piwik what the last update was...
lastId = 0
return
@@ -59,7 +59,7 @@ func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token s
// hubUuid := srv.GetHubUuid()
tryResync:
for {
- // lastId, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client, hubUuid)
+ // lastId, err := srv.forwardPiwikGetLastUpdateId(piwikURL, siteURL, siteID, token, client, hubUuid)
lastId, err := srv.GetLastUpdateId()
if err != nil {
s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
index f94b2e7..bfabace 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go
@@ -49,20 +49,20 @@ const (
func (srv Server) pipeHandle(conn net.Conn) {
defer conn.Close()
- decoder, err := NewStatefulDecoder(conn)
+ dec, err := NewStatefulDecoder(conn)
if err != nil {
s5l.Printf("pipe: failed to read init message: %v\n", err)
return
}
- slug := decoder.Slug()
+ slug := dec.Slug()
s5l.Printf("pipe: new connection: %s\n", slug)
defer func() {
s5l.Printf("pipe(%s): connection closed\n", slug)
}()
for {
- value, err := decoder.Decode()
+ update, err := dec.Decode()
if err != nil {
if err == io.EOF {
break
@@ -78,7 +78,7 @@ func (srv Server) pipeHandle(conn net.Conn) {
}
}
- if err = srv.Append(value); err != nil {
+ if err = srv.Append(update); err != nil {
s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err)
// TODO: send NACK?
break
@@ -120,13 +120,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) {
}
data := buffer[0:n]
- value, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
+ update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode()
if err != nil {
s5l.Printf("pipegram: failed to decode data message: %v\n", err)
continue
}
- if err = srv.Append(value); err != nil {
+ if err = srv.Append(update); err != nil {
s5l.Printf("pipegram: failed to store data: %v\n", err)
}
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
index 29143c9..b9752ba 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go
@@ -157,7 +157,7 @@ func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) {
// get one update
func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) {
- upd, err := srv.store.GetUpdate(id)
+ update, err := srv.store.GetUpdate(id)
if err != nil {
status := http.StatusInternalServerError
if err == ErrNotFound {
@@ -167,7 +167,7 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) {
return
}
- sendWebResponse(w, http.StatusOK, upd)
+ sendWebResponse(w, http.StatusOK, update)
}
// get list of updates (as json array)
@@ -194,7 +194,7 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()})
}
- upds, err := srv.store.GetUpdatesAfter(after, limit)
+ updates, err := srv.store.GetUpdatesAfter(after, limit)
if err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
@@ -203,13 +203,13 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") // this is actually multiple json documents...
w.WriteHeader(http.StatusOK)
- encoder, err := NewStatefulEncoder(w)
+ enc, err := NewStatefulEncoder(w)
if err != nil {
s5l.Printf("Error while sending init: %v", err)
return
}
- for _, upd := range upds {
- if err := encoder.Encode(upd); err != nil {
+ for _, update := range updates {
+ if err := enc.Encode(update); err != nil {
s5l.Printf("Error while sending data: %v", err)
return
}
@@ -219,14 +219,14 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
// post one update
func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
- decoder := NewStatelessDecoder(r.Body)
- value, err := decoder.Decode()
+ dec := NewStatelessDecoder(r.Body)
+ update, err := dec.Decode()
if err != nil {
sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"error while decoding update: " + err.Error()})
return
}
- if err = srv.Append(value); err != nil {
+ if err = srv.Append(update); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
@@ -237,15 +237,15 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) {
// post multiple updates in bulk mode
func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
- decoder, err := NewStatefulDecoder(r.Body)
+ dec, err := NewStatefulDecoder(r.Body)
if err != nil && err != io.EOF {
sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode init message: " + err.Error()})
return
}
- values := []UpdateFull{}
+ updates := []UpdateFull{}
for {
- value, err := decoder.Decode()
+ update, err := dec.Decode()
if err != nil {
if err != io.EOF {
sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode data message: " + err.Error()})
@@ -253,20 +253,20 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) {
}
break
}
- values = append(values, value)
+ updates = append(updates, update)
}
- numValues := len(values)
- if numValues < 1 {
+ numUpdates := len(updates)
+ if numUpdates < 1 {
sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"got no data messages"})
return
}
- if err = srv.AppendMany(values); err != nil {
+ if err = srv.AppendMany(updates); err != nil {
sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()})
return
}
- sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numValues})
+ sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numUpdates})
}
//
@@ -348,8 +348,7 @@ func sendInvalidMethod(w http.ResponseWriter, method string) {
func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
- encoder := json.NewEncoder(w)
- if err := encoder.Encode(respdata); err != nil {
+ if err := json.NewEncoder(w).Encode(respdata); err != nil {
s5l.Printf("Error while sending data: %v", err)
}
}