From d068936f934e2447904803ac7bda5a127a0ddb0d Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sat, 29 Nov 2014 18:49:09 +0100 Subject: hub: query last-id from elasticsearch in fwd-es --- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 72 +--------- .../src/spreadspace.org/sfive/s5srvForwardEs.go | 154 +++++++++++++++++++++ 2 files changed, 155 insertions(+), 71 deletions(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index cf385b9..8014ff2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -51,7 +51,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( } else { tid, errl := strconv.ParseInt(string(body), 10, 32) if errl != nil { - s5l.Printf("fwd: invalid lastupdate response: %v\n", err) + s5l.Printf("fwd: invalid lastupdate response: %v\n", errl) err = errl return } @@ -61,72 +61,6 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } -type esUpdate struct { - Data StatisticsData `json:"update"` -} - -func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { - url := baseurl + "/dataupdate/_bulk" -tryResync: - for { - // TODO get last known update from elasticsearch - lastId := -1 - - s5l.Printf("fwd-es: lastupdate: %d", lastId) - - nextBatch: - for { - updates, err := self.getUpdatesAfterInvoke(lastId) - if err != nil { - s5l.Printf("fwd-es: failed reading updates: %v\n", err) - time.Sleep(500 * time.Millisecond) - continue nextBatch - } - - s5l.Printf("fwd-es: got %d updates", len(updates)) - - if len(updates) == 0 { - time.Sleep(1 * time.Second) - continue nextBatch - } - - postData := make([]byte, 4096) - - for _, update := range updates { - data, err := json.Marshal(esUpdate{Data: update}) - if err != nil { - s5l.Panicf("fwd-es: encode failed: %v\n", err) - } - postData = append(postData, ([]byte)("\n")...) - postData = append(postData, data...) - } - - s5tl.Printf("fwd-es: marshalled: %v", (string)(postData)) - - s5l.Printf("fwd-es: marshal OK") - resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData)) - - if err != nil { - s5l.Printf("fwd-es: post failed: %v\n", err) - time.Sleep(1 * time.Second) - continue tryResync - } - - resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - s5l.Printf("fwd-es: post failed: %s\n", resp.Status) - time.Sleep(1 * time.Second) - continue tryResync - } - - s5l.Printf("fwd-es: all posts OK") - lastId = findMaxId(updates) - s5l.Printf("fwd-es: new lastid: %d", lastId) - time.Sleep(1 * time.Second) - } - } -} - func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: @@ -188,7 +122,3 @@ tryResync: func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) } - -func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { - self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) -} diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go new file mode 100644 index 0000000..8fb547b --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -0,0 +1,154 @@ +package sfive + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "strings" + "time" +) + +const lastUpdateJson = `{ + "query": {"match": { "SourceHubUuid": "%s" } }, + "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } +}` + +func (self StatsSinkServer) getLastUpdateEs(baseurl string, client *http.Client) (latestId int, storeId string, err error) { + url := baseurl + "/dataupdate/_search?pretty" + + storeId, err = self.getHubIdInvoke() + + if err != nil { + s5l.Printf("fwd-es: failed to get own hubid: %v\n", err) + return + } + + queryJson := fmt.Sprintf(lastUpdateJson, storeId) + s5tl.Printf("fwd-es: query: %s", queryJson) + + resp, err := client.Post(url, "application/json", strings.NewReader(queryJson)) + if err != nil { + s5l.Printf("fwd-es: failed to query for lastupdate: %v\n", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s5l.Printf("fwd-es: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) + return + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err) + return + } + + s5tl.Printf("fwd-es: lastupdate response: %s\n", body) + + if len(body) == 0 { + latestId = -1 + } else { + var value interface{} + errl := json.Unmarshal(body, &value) + if errl != nil { + s5l.Printf("fwd-es: invalid lastupdate response: %v\n", errl) + err = errl + return + } + + idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] + if idstrcntr == nil { + s5l.Printf("fwd-es: we are new here\n") + latestId = -1 + return + } + idstr := idstrcntr.(*string) + + tid, errl := strconv.ParseInt(*idstr, 10, 32) + if errl != nil { + s5l.Printf("fwd-es: invalid value: %v\n", errl) + err = errl + return + } + latestId = int(tid) + } + + return +} + +type esUpdate struct { + Data StatisticsData `json:"update"` +} + +func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { + url := baseurl + "/dataupdate/_bulk" +tryResync: + for { + lastId, _, err := self.getLastUpdateEs(baseurl, client) + if err != nil { + s5l.Printf("fwd-es: lastupdate returned err: %v", err) + time.Sleep(5 * time.Second) + continue tryResync + } + s5l.Printf("fwd-es: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-es: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-es: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + postData := make([]byte, 4096) + + for _, update := range updates { + data, err := json.Marshal(esUpdate{Data: update}) + if err != nil { + s5l.Panicf("fwd-es: encode failed: %v\n", err) + } + postData = append(postData, ([]byte)("\n")...) + postData = append(postData, data...) + } + + s5tl.Printf("fwd-es: marshalled: %v", (string)(postData)) + + s5l.Printf("fwd-es: marshal OK") + resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData)) + + if err != nil { + s5l.Printf("fwd-es: post failed: %v\n", err) + time.Sleep(1 * time.Second) + continue tryResync + } + + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + s5l.Printf("fwd-es: post failed: %s\n", resp.Status) + time.Sleep(1 * time.Second) + continue tryResync + } + + s5l.Printf("fwd-es: all posts OK") + lastId = findMaxId(updates) + s5l.Printf("fwd-es: new lastid: %d", lastId) + time.Sleep(1 * time.Second) + } + } +} + +func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { + self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) +} -- cgit v1.2.3