From d7db5c4cf7c538481065cbcf741de7fc16d0d470 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Sun, 16 Nov 2014 19:31:56 +0100 Subject: hub: add basic elastic-search forwarding --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 11 ++++ src/hub/src/spreadspace.org/sfive/s5srvForward.go | 63 +++++++++++++++++++++++ src/hub/test-fwd-es | 5 ++ 3 files changed, 79 insertions(+) create mode 100755 src/hub/test-fwd-es (limited to 'src') diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 4541b2e..a062029 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -20,6 +20,7 @@ func main() { startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram") startWeb := flag.Bool("start-web-server", true, "start a webserver") forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url") + forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http") vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application") help := flag.Bool("help", false, "show usage") @@ -79,6 +80,16 @@ func main() { }() } + if *forwardES != "" { + wg.Add(1) + go func() { + defer wg.Done() + s5hl.Println("start elastic-search forward") + server.RunForwardingToElasticSearch(*forwardES) + s5hl.Println("elastic-search forward finished") + }() + } + alldone := make(chan bool) go func() { defer func() { alldone <- true }() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 49de873..bc7a634 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -61,6 +61,65 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } +func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { + url := baseurl + "/dataupdate" + // tryResync: + for { + // TODO get last known update from elasticsearch + lastId := -1 + + s5l.Printf("fwd-es: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-es: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-es: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + for _, update := range updates { + data, err := json.Marshal(update) + if err != nil { + s5l.Panicf("fwd-es: encode failed: %v\n", err) + } + + s5l.Printf("fwd-es: marshal OK") + resp, err := client.Post(url, "application/json", bytes.NewBuffer(data)) + + if err != nil { + s5l.Printf("fwd-es: post failed: %v\n", err) + continue nextBatch + // TODO retry etc. + } + + resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + s5l.Printf("fwd-es: post failed: %s\n", resp.Status) + continue nextBatch + } + + // TODO should never be nil + if update.SourceHubDataUpdateId != nil { + lastId = *update.SourceHubDataUpdateId + } + } + + s5l.Printf("fwd-es: all posts OK") + s5l.Printf("fwd-es: new lastid: %d", lastId) + time.Sleep(1 * time.Second) + } + } +} + func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: @@ -124,3 +183,7 @@ tryResync: func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) } + +func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { + self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) +} diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es new file mode 100755 index 0000000..e54b0c2 --- /dev/null +++ b/src/hub/test-fwd-es @@ -0,0 +1,5 @@ +#!/bin/sh + +rm -f /run/sfive/pipe /run/sfive/pipegram +./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-es-url="http://localhost:9200/test" + -- cgit v1.2.3