From 9434e0ebaf5ea4f15f2a539b9545aea5d5fb9400 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 29 Nov 2014 18:01:30 +0100 Subject: hub: fwd-es tries bulk-api --- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 47 +++++++++++++---------- 1 file changed, 26 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index bc7a634..cf385b9 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -61,9 +61,13 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } +type esUpdate struct { + Data StatisticsData `json:"update"` +} + func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { - url := baseurl + "/dataupdate" - // tryResync: + url := baseurl + "/dataupdate/_bulk" +tryResync: for { // TODO get last known update from elasticsearch lastId := -1 @@ -86,34 +90,37 @@ func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, clie continue nextBatch } + postData := make([]byte, 4096) + for _, update := range updates { - data, err := json.Marshal(update) + data, err := json.Marshal(esUpdate{Data: update}) if err != nil { s5l.Panicf("fwd-es: encode failed: %v\n", err) } + postData = append(postData, ([]byte)("\n")...) + postData = append(postData, data...) + } - s5l.Printf("fwd-es: marshal OK") - resp, err := client.Post(url, "application/json", bytes.NewBuffer(data)) + s5tl.Printf("fwd-es: marshalled: %v", (string)(postData)) - if err != nil { - s5l.Printf("fwd-es: post failed: %v\n", err) - continue nextBatch - // TODO retry etc. - } + s5l.Printf("fwd-es: marshal OK") + resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData)) - resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - s5l.Printf("fwd-es: post failed: %s\n", resp.Status) - continue nextBatch - } + if err != nil { + s5l.Printf("fwd-es: post failed: %v\n", err) + time.Sleep(1 * time.Second) + continue tryResync + } - // TODO should never be nil - if update.SourceHubDataUpdateId != nil { - lastId = *update.SourceHubDataUpdateId - } + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + s5l.Printf("fwd-es: post failed: %s\n", resp.Status) + time.Sleep(1 * time.Second) + continue tryResync } s5l.Printf("fwd-es: all posts OK") + lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) time.Sleep(1 * time.Second) } @@ -169,10 +176,8 @@ tryResync: s5l.Printf("fwd: post failed: %s\n", resp.Status) continue tryResync } - // } s5l.Printf("fwd: post OK") - lastId = findMaxId(updates) s5l.Printf("fwd: new lastid: %d", lastId) time.Sleep(1 * time.Second) -- cgit v1.2.3