diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index d61c5fc..71313aa 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -51,29 +51,29 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( url := baseUrl + "/dataupdate/_search?search_type=count" queryJson := fmt.Sprintf(forwardEsLastUpdateIDJson, hubUUID) - s5dl.Printf("fwd-es: query: %s", queryJson) + s5dl.Printf("srv|fwd-es: query: %s", queryJson) var resp *http.Response resp, err = client.Post(url, "application/json", strings.NewReader(queryJson)) if err != nil { - s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err) + s5l.Printf("srv|fwd-es: querying for lastupdate failed: %v", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) + s5l.Printf("srv|fwd-es: elasticsearch failed to fulfill query for lastupdate: %v", resp.StatusCode) return } var body []byte body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder if err != nil { - s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err) + s5l.Printf("srv|fwd-es: reading lastupdate response failed: %v", err) return } - s5dl.Printf("fwd-es: lastupdate response: %s\n", body) + s5dl.Printf("srv|fwd-es: lastupdate response: %s", body) if len(body) == 0 { lastID = -1 @@ -81,13 +81,13 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) ( var value interface{} err = json.Unmarshal(body, &value) if err != nil { - s5l.Printf("fwd-es: invalid lastupdate response: %v\n", err) + s5l.Printf("srv|fwd-es: invalid lastupdate response: %v", err) return } idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] if idstrcntr == nil { - s5l.Printf("fwd-es: we are new here\n") + s5l.Printf("srv|fwd-es: we are new here") lastID = -1 return } @@ -105,17 +105,17 @@ tryResync: for { lastID, err := fwdEsGetLastUpdateID(baseUrl, client, hubUUID) if err != nil { - s5l.Printf("fwd-es: lastupdate returned err: %v", err) + s5l.Printf("srv|fwd-es: fetching lastupdate failed: %v", err) time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd-es: lastupdate: %d", lastID) + s5l.Printf("srv|fwd-es: new lastupdate: %d", lastID) nextBatch: for { updates, err := srv.store.GetUpdatesAfter(lastID, 5000) if err != nil { - s5l.Printf("fwd-es: failed reading updates: %v\n", err) + s5l.Printf("srv|fwd-es: reading updates failed: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -129,7 +129,7 @@ tryResync: for _, update := range updates { data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter if err != nil { - s5l.Panicf("fwd-es: encode failed: %v\n", err) + s5l.Panicf("srv|fwd-es: encoding updates failed: %v", err) } postData.WriteString(`{"index":{"_type":"dataupdate"}}`) postData.WriteRune('\n') @@ -140,21 +140,21 @@ tryResync: // TODO: move to seperate function and use io.PipeReader as body resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) if err != nil { - s5l.Printf("fwd-es: post failed: %v\n", err) + s5l.Printf("srv|fwd-es: posting updates failed: %v", err) time.Sleep(1 * time.Second) continue tryResync } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { body, _ := ioutil.ReadAll(resp.Body) - s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body) + s5l.Printf("srv|fwd-es: posting updates failed: %s\n%s", resp.Status, body) time.Sleep(1 * time.Second) continue tryResync } resp.Body.Close() // TODO: check result from elasticsearch lastID = findMaxID(updates) - s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) + s5l.Printf("srv|fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID) } } } |