diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForward.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 63 |
1 files changed, 63 insertions, 0 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 49de873..bc7a634 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -61,6 +61,65 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } +func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { + url := baseurl + "/dataupdate" + // tryResync: + for { + // TODO get last known update from elasticsearch + lastId := -1 + + s5l.Printf("fwd-es: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-es: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-es: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + for _, update := range updates { + data, err := json.Marshal(update) + if err != nil { + s5l.Panicf("fwd-es: encode failed: %v\n", err) + } + + s5l.Printf("fwd-es: marshal OK") + resp, err := client.Post(url, "application/json", bytes.NewBuffer(data)) + + if err != nil { + s5l.Printf("fwd-es: post failed: %v\n", err) + continue nextBatch + // TODO retry etc. + } + + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + s5l.Printf("fwd-es: post failed: %s\n", resp.Status) + continue nextBatch + } + + // TODO should never be nil + if update.SourceHubDataUpdateId != nil { + lastId = *update.SourceHubDataUpdateId + } + } + + s5l.Printf("fwd-es: all posts OK") + s5l.Printf("fwd-es: new lastid: %d", lastId) + time.Sleep(1 * time.Second) + } + } +} + func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: @@ -124,3 +183,7 @@ tryResync: func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) } + +func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { + self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) +} |