summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-11-29 19:55:42 +0100
committerMarkus Grüneis <gimpf@gimpf.org>2014-11-29 19:55:42 +0100
commitd3369605b96b077d2fa98bec40befd143565942a (patch)
tree596bd19cd9eaba55a4948029623ffe7eb2dfcf57 /src
parenthub: store duration in milliseconds (diff)
hub: fix fwd-es
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go37
1 files changed, 14 insertions, 23 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 8fb547b..b80e111 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -6,7 +6,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
- "strconv"
"strings"
"time"
)
@@ -66,26 +65,15 @@ func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client)
latestId = -1
return
}
- idstr := idstrcntr.(*string)
-
- tid, errl := strconv.ParseInt(*idstr, 10, 32)
- if errl != nil {
- s5l.Printf("fwd-es: invalid value: %v\n", errl)
- err = errl
- return
- }
+ tid := idstrcntr.(float64)
latestId = int(tid)
}
return
}
-type esUpdate struct {
- Data StatisticsData `json:"update"`
-}
-
func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
- url := baseurl + "/dataupdate/_bulk"
+ url := baseurl + "/_bulk"
tryResync:
for {
lastId, _, err := self.getLastUpdateEs(baseurl, client)
@@ -112,21 +100,23 @@ tryResync:
continue nextBatch
}
- postData := make([]byte, 4096)
+ postData := bytes.Buffer{}
for _, update := range updates {
- data, err := json.Marshal(esUpdate{Data: update})
+ data, err := json.Marshal(update)
if err != nil {
s5l.Panicf("fwd-es: encode failed: %v\n", err)
}
- postData = append(postData, ([]byte)("\n")...)
- postData = append(postData, data...)
+ postData.WriteString(`{"index":{"_type":"dataupdate"}}`)
+ postData.WriteRune('\n')
+ postData.Write(data)
+ postData.WriteRune('\n')
}
- s5tl.Printf("fwd-es: marshalled: %v", (string)(postData))
+ //s5tl.Printf("fwd-es: marshalled:\n%v\n", (string)(postData.Bytes()))
s5l.Printf("fwd-es: marshal OK")
- resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData))
+ resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
if err != nil {
s5l.Printf("fwd-es: post failed: %v\n", err)
@@ -134,17 +124,18 @@ tryResync:
continue tryResync
}
- resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
- s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
+ body, _ := ioutil.ReadAll(resp.Body)
+ s5l.Printf("fwd-es: post failed: %s\n%s\n", resp.Status, body)
time.Sleep(1 * time.Second)
continue tryResync
}
+ resp.Body.Close()
s5l.Printf("fwd-es: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-es: new lastid: %d", lastId)
- time.Sleep(1 * time.Second)
+ //time.Sleep(1 * time.Second)
}
}
}