summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-11-16 19:31:56 +0100
committerMarkus Grüneis <gimpf@gimpf.org>2014-11-16 19:31:56 +0100
commitd7db5c4cf7c538481065cbcf741de7fc16d0d470 (patch)
tree6730d3a74865fd807db999ceda5c6e3d5a71e2b5 /src
parentadded first version for the logo (diff)
hub: add basic elastic-search forwarding
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go11
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go63
-rwxr-xr-xsrc/hub/test-fwd-es5
3 files changed, 79 insertions, 0 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 4541b2e..a062029 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -20,6 +20,7 @@ func main() {
startGramPipe := flag.Bool("start-pipegram-server", true, "start a datagram oriented pipe server; see option pipegram")
startWeb := flag.Bool("start-web-server", true, "start a webserver")
forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url")
+ forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http")
vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application")
help := flag.Bool("help", false, "show usage")
@@ -79,6 +80,16 @@ func main() {
}()
}
+ if *forwardES != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s5hl.Println("start elastic-search forward")
+ server.RunForwardingToElasticSearch(*forwardES)
+ s5hl.Println("elastic-search forward finished")
+ }()
+ }
+
alldone := make(chan bool)
go func() {
defer func() { alldone <- true }()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 49de873..bc7a634 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -61,6 +61,65 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
+func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
+ url := baseurl + "/dataupdate"
+ // tryResync:
+ for {
+ // TODO get last known update from elasticsearch
+ lastId := -1
+
+ s5l.Printf("fwd-es: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-es: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-es: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ for _, update := range updates {
+ data, err := json.Marshal(update)
+ if err != nil {
+ s5l.Panicf("fwd-es: encode failed: %v\n", err)
+ }
+
+ s5l.Printf("fwd-es: marshal OK")
+ resp, err := client.Post(url, "application/json", bytes.NewBuffer(data))
+
+ if err != nil {
+ s5l.Printf("fwd-es: post failed: %v\n", err)
+ continue nextBatch
+ // TODO retry etc.
+ }
+
+ resp.Body.Close()
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
+ continue nextBatch
+ }
+
+ // TODO should never be nil
+ if update.SourceHubDataUpdateId != nil {
+ lastId = *update.SourceHubDataUpdateId
+ }
+ }
+
+ s5l.Printf("fwd-es: all posts OK")
+ s5l.Printf("fwd-es: new lastid: %d", lastId)
+ time.Sleep(1 * time.Second)
+ }
+ }
+}
+
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
@@ -124,3 +183,7 @@ tryResync:
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}
+
+func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
+ self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
+}
diff --git a/src/hub/test-fwd-es b/src/hub/test-fwd-es
new file mode 100755
index 0000000..e54b0c2
--- /dev/null
+++ b/src/hub/test-fwd-es
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+rm -f /run/sfive/pipe /run/sfive/pipegram
+./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-es-url="http://localhost:9200/test"
+