summaryrefslogtreecommitdiff
path: root/src/hub
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-11-29 18:01:30 +0100
committerMarkus Grüneis <gimpf@gimpf.org>2014-11-29 18:01:30 +0100
commit9434e0ebaf5ea4f15f2a539b9545aea5d5fb9400 (patch)
tree2049d3eb8826174c1da7868979651eef8541b131 /src/hub
parenthub: fixed distclean target at Makefile (diff)
hub: fwd-es tries bulk-api
Diffstat (limited to 'src/hub')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go47
1 files changed, 26 insertions, 21 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index bc7a634..cf385b9 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -61,9 +61,13 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
+type esUpdate struct {
+ Data StatisticsData `json:"update"`
+}
+
func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
- url := baseurl + "/dataupdate"
- // tryResync:
+ url := baseurl + "/dataupdate/_bulk"
+tryResync:
for {
// TODO get last known update from elasticsearch
lastId := -1
@@ -86,34 +90,37 @@ func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, clie
continue nextBatch
}
+ postData := make([]byte, 4096)
+
for _, update := range updates {
- data, err := json.Marshal(update)
+ data, err := json.Marshal(esUpdate{Data: update})
if err != nil {
s5l.Panicf("fwd-es: encode failed: %v\n", err)
}
+ postData = append(postData, ([]byte)("\n")...)
+ postData = append(postData, data...)
+ }
- s5l.Printf("fwd-es: marshal OK")
- resp, err := client.Post(url, "application/json", bytes.NewBuffer(data))
+ s5tl.Printf("fwd-es: marshalled: %v", (string)(postData))
- if err != nil {
- s5l.Printf("fwd-es: post failed: %v\n", err)
- continue nextBatch
- // TODO retry etc.
- }
+ s5l.Printf("fwd-es: marshal OK")
+ resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData))
- resp.Body.Close()
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
- s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
- continue nextBatch
- }
+ if err != nil {
+ s5l.Printf("fwd-es: post failed: %v\n", err)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
- // TODO should never be nil
- if update.SourceHubDataUpdateId != nil {
- lastId = *update.SourceHubDataUpdateId
- }
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
+ time.Sleep(1 * time.Second)
+ continue tryResync
}
s5l.Printf("fwd-es: all posts OK")
+ lastId = findMaxId(updates)
s5l.Printf("fwd-es: new lastid: %d", lastId)
time.Sleep(1 * time.Second)
}
@@ -169,10 +176,8 @@ tryResync:
s5l.Printf("fwd: post failed: %s\n", resp.Status)
continue tryResync
}
- // }
s5l.Printf("fwd: post OK")
-
lastId = findMaxId(updates)
s5l.Printf("fwd: new lastid: %d", lastId)
time.Sleep(1 * time.Second)