summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForward.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForward.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go63
1 files changed, 63 insertions, 0 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 49de873..bc7a634 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -61,6 +61,65 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
+func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
+ url := baseurl + "/dataupdate"
+ // tryResync:
+ for {
+ // TODO get last known update from elasticsearch
+ lastId := -1
+
+ s5l.Printf("fwd-es: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-es: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-es: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ for _, update := range updates {
+ data, err := json.Marshal(update)
+ if err != nil {
+ s5l.Panicf("fwd-es: encode failed: %v\n", err)
+ }
+
+ s5l.Printf("fwd-es: marshal OK")
+ resp, err := client.Post(url, "application/json", bytes.NewBuffer(data))
+
+ if err != nil {
+ s5l.Printf("fwd-es: post failed: %v\n", err)
+ continue nextBatch
+ // TODO retry etc.
+ }
+
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
+ continue nextBatch
+ }
+
+ // TODO should never be nil
+ if update.SourceHubDataUpdateId != nil {
+ lastId = *update.SourceHubDataUpdateId
+ }
+ }
+
+ s5l.Printf("fwd-es: all posts OK")
+ s5l.Printf("fwd-es: new lastid: %d", lastId)
+ time.Sleep(1 * time.Second)
+ }
+ }
+}
+
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
@@ -124,3 +183,7 @@ tryResync:
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}
+
+func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
+ self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
+}