summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-11-29 18:49:09 +0100
committerMarkus Grüneis <gimpf@gimpf.org>2014-11-29 18:49:23 +0100
commitd068936f934e2447904803ac7bda5a127a0ddb0d (patch)
tree0417b0a6dc8698bd7541a82821a1ede6617c9907 /src
parentes5: don't show hits for last update id query (diff)
hub: query last-id from elasticsearch in fwd-es
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go72
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go154
2 files changed, 155 insertions, 71 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index cf385b9..8014ff2 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -51,7 +51,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
} else {
tid, errl := strconv.ParseInt(string(body), 10, 32)
if errl != nil {
- s5l.Printf("fwd: invalid lastupdate response: %v\n", err)
+ s5l.Printf("fwd: invalid lastupdate response: %v\n", errl)
err = errl
return
}
@@ -61,72 +61,6 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
-type esUpdate struct {
- Data StatisticsData `json:"update"`
-}
-
-func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
- url := baseurl + "/dataupdate/_bulk"
-tryResync:
- for {
- // TODO get last known update from elasticsearch
- lastId := -1
-
- s5l.Printf("fwd-es: lastupdate: %d", lastId)
-
- nextBatch:
- for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
- if err != nil {
- s5l.Printf("fwd-es: failed reading updates: %v\n", err)
- time.Sleep(500 * time.Millisecond)
- continue nextBatch
- }
-
- s5l.Printf("fwd-es: got %d updates", len(updates))
-
- if len(updates) == 0 {
- time.Sleep(1 * time.Second)
- continue nextBatch
- }
-
- postData := make([]byte, 4096)
-
- for _, update := range updates {
- data, err := json.Marshal(esUpdate{Data: update})
- if err != nil {
- s5l.Panicf("fwd-es: encode failed: %v\n", err)
- }
- postData = append(postData, ([]byte)("\n")...)
- postData = append(postData, data...)
- }
-
- s5tl.Printf("fwd-es: marshalled: %v", (string)(postData))
-
- s5l.Printf("fwd-es: marshal OK")
- resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData))
-
- if err != nil {
- s5l.Printf("fwd-es: post failed: %v\n", err)
- time.Sleep(1 * time.Second)
- continue tryResync
- }
-
- resp.Body.Close()
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
- s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
- time.Sleep(1 * time.Second)
- continue tryResync
- }
-
- s5l.Printf("fwd-es: all posts OK")
- lastId = findMaxId(updates)
- s5l.Printf("fwd-es: new lastid: %d", lastId)
- time.Sleep(1 * time.Second)
- }
- }
-}
-
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
@@ -188,7 +122,3 @@ tryResync:
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}
-
-func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
- self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
-}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
new file mode 100644
index 0000000..8fb547b
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -0,0 +1,154 @@
+package sfive
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const lastUpdateJson = `{
+ "query": {"match": { "SourceHubUuid": "%s" } },
+ "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
+}`
+
+func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
+ url := baseurl + "/dataupdate/_search?pretty"
+
+ storeId, err = self.getHubIdInvoke()
+
+ if err != nil {
+ s5l.Printf("fwd-es: failed to get own hubid: %v\n", err)
+ return
+ }
+
+ queryJson := fmt.Sprintf(lastUpdateJson, storeId)
+ s5tl.Printf("fwd-es: query: %s", queryJson)
+
+ resp, err := client.Post(url, "application/json", strings.NewReader(queryJson))
+ if err != nil {
+ s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err)
+ return
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
+ return
+ }
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err)
+ return
+ }
+
+ s5tl.Printf("fwd-es: lastupdate response: %s\n", body)
+
+ if len(body) == 0 {
+ latestId = -1
+ } else {
+ var value interface{}
+ errl := json.Unmarshal(body, &value)
+ if errl != nil {
+ s5l.Printf("fwd-es: invalid lastupdate response: %v\n", errl)
+ err = errl
+ return
+ }
+
+ idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"]
+ if idstrcntr == nil {
+ s5l.Printf("fwd-es: we are new here\n")
+ latestId = -1
+ return
+ }
+ idstr := idstrcntr.(*string)
+
+ tid, errl := strconv.ParseInt(*idstr, 10, 32)
+ if errl != nil {
+ s5l.Printf("fwd-es: invalid value: %v\n", errl)
+ err = errl
+ return
+ }
+ latestId = int(tid)
+ }
+
+ return
+}
+
+type esUpdate struct {
+ Data StatisticsData `json:"update"`
+}
+
+func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
+ url := baseurl + "/dataupdate/_bulk"
+tryResync:
+ for {
+ lastId, _, err := self.getLastUpdateEs(baseurl, client)
+ if err != nil {
+ s5l.Printf("fwd-es: lastupdate returned err: %v", err)
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+ s5l.Printf("fwd-es: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-es: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-es: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ postData := make([]byte, 4096)
+
+ for _, update := range updates {
+ data, err := json.Marshal(esUpdate{Data: update})
+ if err != nil {
+ s5l.Panicf("fwd-es: encode failed: %v\n", err)
+ }
+ postData = append(postData, ([]byte)("\n")...)
+ postData = append(postData, data...)
+ }
+
+ s5tl.Printf("fwd-es: marshalled: %v", (string)(postData))
+
+ s5l.Printf("fwd-es: marshal OK")
+ resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData))
+
+ if err != nil {
+ s5l.Printf("fwd-es: post failed: %v\n", err)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+
+ s5l.Printf("fwd-es: all posts OK")
+ lastId = findMaxId(updates)
+ s5l.Printf("fwd-es: new lastid: %d", lastId)
+ time.Sleep(1 * time.Second)
+ }
+ }
+}
+
+func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
+ self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
+}