From d3369605b96b077d2fa98bec40befd143565942a Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 29 Nov 2014 19:55:42 +0100 Subject: hub: fix fwd-es --- .../src/spreadspace.org/sfive/s5srvForwardEs.go | 37 ++++++++-------------- 1 file changed, 14 insertions(+), 23 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 8fb547b..b80e111 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -6,7 +6,6 @@ import ( "fmt" "io/ioutil" "net/http" - "strconv" "strings" "time" ) @@ -66,26 +65,15 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) latestId = -1 return } - idstr := idstrcntr.(*string) - - tid, errl := strconv.ParseInt(*idstr, 10, 32) - if errl != nil { - s5l.Printf("fwd-es: invalid value: %v\n", errl) - err = errl - return - } + tid := idstrcntr.(float64) latestId = int(tid) } return } -type esUpdate struct { - Data StatisticsData `json:"update"` -} - func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { - url := baseurl + "/dataupdate/_bulk" + url := baseurl + "/_bulk" tryResync: for { lastId, _, err := self.getLastUpdateEs(baseurl, client) @@ -112,21 +100,23 @@ tryResync: continue nextBatch } - postData := make([]byte, 4096) + postData := bytes.Buffer{} for _, update := range updates { - data, err := json.Marshal(esUpdate{Data: update}) + data, err := json.Marshal(update) if err != nil { s5l.Panicf("fwd-es: encode failed: %v\n", err) } - postData = append(postData, ([]byte)("\n")...) - postData = append(postData, data...) + postData.WriteString(`{"index":{"_type":"dataupdate"}}`) + postData.WriteRune('\n') + postData.Write(data) + postData.WriteRune('\n') } - s5tl.Printf("fwd-es: marshalled: %v", (string)(postData)) + //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes())) s5l.Printf("fwd-es: marshal OK") - resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData)) + resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) if err != nil { s5l.Printf("fwd-es: post failed: %v\n", err) @@ -134,17 +124,18 @@ tryResync: continue tryResync } - resp.Body.Close() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - s5l.Printf("fwd-es: post failed: %s\n", resp.Status) + body, _ := ioutil.ReadAll(resp.Body) + s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body) time.Sleep(1 * time.Second) continue tryResync } + resp.Body.Close() s5l.Printf("fwd-es: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) - time.Sleep(1 * time.Second) + //time.Sleep(1 * time.Second) } } } -- cgit v1.2.3