diff options
author | Christian Pointner <equinox@spreadspace.org> | 2017-05-10 22:16:06 +0200 |
---|---|---|
committer | Christian Pointner <equinox@spreadspace.org> | 2017-05-10 22:16:06 +0200 |
commit | b7bbe18d29d58b6c5bd9f4c0872ed1a0b76da5ef (patch) | |
tree | f7161e9143d4e6ddc91e4b2aaf83652dbc9747b9 | |
parent | some more variable renaming (diff) |
and some more variable renaming
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srv.go | 22 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 20 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 14 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go | 4 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvPipe.go | 12 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvWeb.go | 37 |
6 files changed, 53 insertions, 56 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index 91ab598..f99bffb 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -33,18 +33,18 @@ package sfive type appendToken struct { - data UpdateFull + update UpdateFull response chan error } type appendManyToken struct { - data []UpdateFull + updates []UpdateFull response chan error } type getUpdatesResult struct { - values []UpdateFull - err error + updates []UpdateFull + err error } type getUpdatesAfterToken struct { @@ -88,9 +88,9 @@ func (srv Server) appendActor() { case <-srv.quit: return case token := <-srv.appendChan: - token.response <- srv.store.Append(token.data) + token.response <- srv.store.Append(token.update) case token := <-srv.appendManyChan: - token.response <- srv.store.AppendMany(token.data) + token.response <- srv.store.AppendMany(token.updates) case token := <-srv.getUpdatesAfterChan: values, err := srv.store.GetUpdatesAfter(token.id, token.limit) token.response <- getUpdatesResult{values, err} @@ -103,15 +103,15 @@ func (srv Server) appendActor() { } } -func (srv Server) Append(data UpdateFull) error { - token := appendToken{data: data, response: make(chan error, 1)} +func (srv Server) Append(update UpdateFull) error { + token := appendToken{update: update, response: make(chan error, 1)} defer close(token.response) srv.appendChan <- token return <-token.response } -func (srv Server) AppendMany(data []UpdateFull) error { - token := appendManyToken{data: data, response: make(chan error, 1)} +func (srv Server) AppendMany(updates []UpdateFull) error { + token := appendManyToken{updates: updates, response: make(chan error, 1)} defer close(token.response) srv.appendManyChan <- token return <-token.response @@ -122,7 +122,7 @@ func (srv Server) GetUpdatesAfter(id, limit int) ([]UpdateFull, error) { defer close(token.response) srv.getUpdatesAfterChan <- token res := <-token.response - return res.values, res.err + return res.updates, res.err } func (srv Server) GetHubUuid() string { diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 11709a1..bd16c78 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -40,9 +40,9 @@ import ( "time" ) -func findMaxId(values []UpdateFull) int { +func findMaxId(updates []UpdateFull) int { maxId := -1 - for _, value := range values { + for _, value := range updates { if id := value.SourceHubUpdateId; id > maxId { maxId = id } @@ -50,11 +50,11 @@ func findMaxId(values []UpdateFull) int { return maxId } -func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) { +func fwdGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) { lastId = -1 var resp *http.Response - resp, err = client.Get(baseurl + "/lastupdate/" + hubUuid) + resp, err = client.Get(baseUrl + "/lastupdate/" + hubUuid) if err != nil { s5l.Printf("fwd: failed to query for lastupdate: %v\n", err) return @@ -66,9 +66,8 @@ func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (last return } - dec := json.NewDecoder(resp.Body) result := WebLastUpdateIdResponse{} - if err = dec.Decode(&result); err != nil { + if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { s5l.Printf("fwd: server failed to fulfill query for lastupdate: %v\n", err) return } @@ -103,20 +102,19 @@ func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, er if resp.StatusCode != http.StatusOK { return 0, errors.New("") } - dec := json.NewDecoder(resp.Body) result := WebUpdatesPostResponse{} - if err = dec.Decode(&result); err != nil { + if err = json.NewDecoder(resp.Body).Decode(&result); err != nil { return 0, err } return result.NumUpdates, nil } -func (srv Server) forwardRun(baseurl string, client *http.Client) { - url := baseurl + "/updates/_bulk" +func (srv Server) forwardRun(baseUrl string, client *http.Client) { + url := baseUrl + "/updates/_bulk" hubUuid := srv.GetHubUuid() tryResync: for { - lastId, err := fwdGetLastUpdate(baseurl, client, hubUuid) + lastId, err := fwdGetLastUpdateId(baseUrl, client, hubUuid) if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 8cf3af5..005e03d 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -42,15 +42,15 @@ import ( "time" ) -const forwardEsLastUpdateJson = `{ +const forwardEsLastUpdateIdJson = `{ "query": {"match": { "SourceHubUuid": "%s" } }, "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } }` -func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) { - url := baseurl + "/dataupdate/_search?search_type=count" +func fwdEsGetLastUpdateId(baseUrl string, client *http.Client, hubUuid string) (lastId int, err error) { + url := baseUrl + "/dataupdate/_search?search_type=count" - queryJson := fmt.Sprintf(forwardEsLastUpdateJson, hubUuid) + queryJson := fmt.Sprintf(forwardEsLastUpdateIdJson, hubUuid) s5dl.Printf("fwd-es: query: %s", queryJson) var resp *http.Response @@ -98,12 +98,12 @@ func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (la return } -func (srv Server) forwardEsRun(baseurl string, client *http.Client) { - url := baseurl + "/_bulk" +func (srv Server) forwardEsRun(baseUrl string, client *http.Client) { + url := baseUrl + "/_bulk" hubUuid := srv.GetHubUuid() tryResync: for { - lastId, err := fwdEsGetLastUpdate(baseurl, client, hubUuid) + lastId, err := fwdEsGetLastUpdateId(baseUrl, client, hubUuid) if err != nil { s5l.Printf("fwd-es: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 6caa26f..ff6d985 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -49,7 +49,7 @@ type forwardPiwikBulkRequest struct { TokenAuth string `json:"token_auth"` } -func fwdPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) { +func fwdPiwikGetLastUpdateId(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) { // TODO: ask piwik what the last update was... lastId = 0 return @@ -59,7 +59,7 @@ func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token s // hubUuid := srv.GetHubUuid() tryResync: for { - // lastId, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client, hubUuid) + // lastId, err := srv.forwardPiwikGetLastUpdateId(piwikURL, siteURL, siteID, token, client, hubUuid) lastId, err := srv.GetLastUpdateId() if err != nil { s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go index f94b2e7..bfabace 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvPipe.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvPipe.go @@ -49,20 +49,20 @@ const ( func (srv Server) pipeHandle(conn net.Conn) { defer conn.Close() - decoder, err := NewStatefulDecoder(conn) + dec, err := NewStatefulDecoder(conn) if err != nil { s5l.Printf("pipe: failed to read init message: %v\n", err) return } - slug := decoder.Slug() + slug := dec.Slug() s5l.Printf("pipe: new connection: %s\n", slug) defer func() { s5l.Printf("pipe(%s): connection closed\n", slug) }() for { - value, err := decoder.Decode() + update, err := dec.Decode() if err != nil { if err == io.EOF { break @@ -78,7 +78,7 @@ func (srv Server) pipeHandle(conn net.Conn) { } } - if err = srv.Append(value); err != nil { + if err = srv.Append(update); err != nil { s5l.Printf("pipe(%s): failed to store data: %v\n", slug, err) // TODO: send NACK? break @@ -120,13 +120,13 @@ func (srv Server) pipegramHandle(pconn net.PacketConn) { } data := buffer[0:n] - value, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() + update, err := NewStatelessDecoder(bytes.NewReader(data)).Decode() if err != nil { s5l.Printf("pipegram: failed to decode data message: %v\n", err) continue } - if err = srv.Append(value); err != nil { + if err = srv.Append(update); err != nil { s5l.Printf("pipegram: failed to store data: %v\n", err) } } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go index 29143c9..b9752ba 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvWeb.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvWeb.go @@ -157,7 +157,7 @@ func webUpdates(srv *Server, w http.ResponseWriter, r *http.Request) { // get one update func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) { - upd, err := srv.store.GetUpdate(id) + update, err := srv.store.GetUpdate(id) if err != nil { status := http.StatusInternalServerError if err == ErrNotFound { @@ -167,7 +167,7 @@ func webUpdateGet(srv *Server, id int, w http.ResponseWriter, r *http.Request) { return } - sendWebResponse(w, http.StatusOK, upd) + sendWebResponse(w, http.StatusOK, update) } // get list of updates (as json array) @@ -194,7 +194,7 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{err.Error()}) } - upds, err := srv.store.GetUpdatesAfter(after, limit) + updates, err := srv.store.GetUpdatesAfter(after, limit) if err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return @@ -203,13 +203,13 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") // this is actually multiple json documents... w.WriteHeader(http.StatusOK) - encoder, err := NewStatefulEncoder(w) + enc, err := NewStatefulEncoder(w) if err != nil { s5l.Printf("Error while sending init: %v", err) return } - for _, upd := range upds { - if err := encoder.Encode(upd); err != nil { + for _, update := range updates { + if err := enc.Encode(update); err != nil { s5l.Printf("Error while sending data: %v", err) return } @@ -219,14 +219,14 @@ func webUpdatesGetBulk(srv *Server, w http.ResponseWriter, r *http.Request) { // post one update func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { - decoder := NewStatelessDecoder(r.Body) - value, err := decoder.Decode() + dec := NewStatelessDecoder(r.Body) + update, err := dec.Decode() if err != nil { sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"error while decoding update: " + err.Error()}) return } - if err = srv.Append(value); err != nil { + if err = srv.Append(update); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } @@ -237,15 +237,15 @@ func webUpdatePost(srv *Server, w http.ResponseWriter, r *http.Request) { // post multiple updates in bulk mode func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { - decoder, err := NewStatefulDecoder(r.Body) + dec, err := NewStatefulDecoder(r.Body) if err != nil && err != io.EOF { sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode init message: " + err.Error()}) return } - values := []UpdateFull{} + updates := []UpdateFull{} for { - value, err := decoder.Decode() + update, err := dec.Decode() if err != nil { if err != io.EOF { sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"failed to read/decode data message: " + err.Error()}) @@ -253,20 +253,20 @@ func webUpdatesPostBulk(srv *Server, w http.ResponseWriter, r *http.Request) { } break } - values = append(values, value) + updates = append(updates, update) } - numValues := len(values) - if numValues < 1 { + numUpdates := len(updates) + if numUpdates < 1 { sendWebResponse(w, http.StatusBadRequest, WebErrorResponse{"got no data messages"}) return } - if err = srv.AppendMany(values); err != nil { + if err = srv.AppendMany(updates); err != nil { sendWebResponse(w, http.StatusInternalServerError, WebErrorResponse{err.Error()}) return } - sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numValues}) + sendWebResponse(w, http.StatusOK, WebUpdatesPostResponse{numUpdates}) } // @@ -348,8 +348,7 @@ func sendInvalidMethod(w http.ResponseWriter, method string) { func sendWebResponse(w http.ResponseWriter, status int, respdata interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) - encoder := json.NewEncoder(w) - if err := encoder.Encode(respdata); err != nil { + if err := json.NewEncoder(w).Encode(respdata); err != nil { s5l.Printf("Error while sending data: %v", err) } } |