summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go28
1 files changed, 14 insertions, 14 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index d61c5fc..71313aa 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -51,29 +51,29 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
url := baseUrl + "/dataupdate/_search?search_type=count"
queryJson := fmt.Sprintf(forwardEsLastUpdateIDJson, hubUUID)
- s5dl.Printf("fwd-es: query: %s", queryJson)
+ s5dl.Printf("srv|fwd-es: query: %s", queryJson)
var resp *http.Response
resp, err = client.Post(url, "application/json", strings.NewReader(queryJson))
if err != nil {
- s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err)
+ s5l.Printf("srv|fwd-es: querying for lastupdate failed: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
- s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
+ s5l.Printf("srv|fwd-es: elasticsearch failed to fulfill query for lastupdate: %v", resp.StatusCode)
return
}
var body []byte
body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder
if err != nil {
- s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err)
+ s5l.Printf("srv|fwd-es: reading lastupdate response failed: %v", err)
return
}
- s5dl.Printf("fwd-es: lastupdate response: %s\n", body)
+ s5dl.Printf("srv|fwd-es: lastupdate response: %s", body)
if len(body) == 0 {
lastID = -1
@@ -81,13 +81,13 @@ func fwdEsGetLastUpdateID(baseUrl string, client *http.Client, hubUUID string) (
var value interface{}
err = json.Unmarshal(body, &value)
if err != nil {
- s5l.Printf("fwd-es: invalid lastupdate response: %v\n", err)
+ s5l.Printf("srv|fwd-es: invalid lastupdate response: %v", err)
return
}
idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"]
if idstrcntr == nil {
- s5l.Printf("fwd-es: we are new here\n")
+ s5l.Printf("srv|fwd-es: we are new here")
lastID = -1
return
}
@@ -105,17 +105,17 @@ tryResync:
for {
lastID, err := fwdEsGetLastUpdateID(baseUrl, client, hubUUID)
if err != nil {
- s5l.Printf("fwd-es: lastupdate returned err: %v", err)
+ s5l.Printf("srv|fwd-es: fetching lastupdate failed: %v", err)
time.Sleep(5 * time.Second)
continue tryResync
}
- s5l.Printf("fwd-es: lastupdate: %d", lastID)
+ s5l.Printf("srv|fwd-es: new lastupdate: %d", lastID)
nextBatch:
for {
updates, err := srv.store.GetUpdatesAfter(lastID, 5000)
if err != nil {
- s5l.Printf("fwd-es: failed reading updates: %v\n", err)
+ s5l.Printf("srv|fwd-es: reading updates failed: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -129,7 +129,7 @@ tryResync:
for _, update := range updates {
data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter
if err != nil {
- s5l.Panicf("fwd-es: encode failed: %v\n", err)
+ s5l.Panicf("srv|fwd-es: encoding updates failed: %v", err)
}
postData.WriteString(`{"index":{"_type":"dataupdate"}}`)
postData.WriteRune('\n')
@@ -140,21 +140,21 @@ tryResync:
// TODO: move to seperate function and use io.PipeReader as body
resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
if err != nil {
- s5l.Printf("fwd-es: post failed: %v\n", err)
+ s5l.Printf("srv|fwd-es: posting updates failed: %v", err)
time.Sleep(1 * time.Second)
continue tryResync
}
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(resp.Body)
- s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body)
+ s5l.Printf("srv|fwd-es: posting updates failed: %s\n%s", resp.Status, body)
time.Sleep(1 * time.Second)
continue tryResync
}
resp.Body.Close() // TODO: check result from elasticsearch
lastID = findMaxID(updates)
- s5l.Printf("fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
+ s5l.Printf("srv|fwd-es: successfully forwarded %d updates, new lastid: %d", len(updates), lastID)
}
}
}