summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForward.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForward.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go72
1 files changed, 1 insertions, 71 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index cf385b9..8014ff2 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -51,7 +51,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
} else {
tid, errl := strconv.ParseInt(string(body), 10, 32)
if errl != nil {
- s5l.Printf("fwd: invalid lastupdate response: %v\n", err)
+ s5l.Printf("fwd: invalid lastupdate response: %v\n", errl)
err = errl
return
}
@@ -61,72 +61,6 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (
return
}
-type esUpdate struct {
- Data StatisticsData `json:"update"`
-}
-
-func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) {
- url := baseurl + "/dataupdate/_bulk"
-tryResync:
- for {
- // TODO get last known update from elasticsearch
- lastId := -1
-
- s5l.Printf("fwd-es: lastupdate: %d", lastId)
-
- nextBatch:
- for {
- updates, err := self.getUpdatesAfterInvoke(lastId)
- if err != nil {
- s5l.Printf("fwd-es: failed reading updates: %v\n", err)
- time.Sleep(500 * time.Millisecond)
- continue nextBatch
- }
-
- s5l.Printf("fwd-es: got %d updates", len(updates))
-
- if len(updates) == 0 {
- time.Sleep(1 * time.Second)
- continue nextBatch
- }
-
- postData := make([]byte, 4096)
-
- for _, update := range updates {
- data, err := json.Marshal(esUpdate{Data: update})
- if err != nil {
- s5l.Panicf("fwd-es: encode failed: %v\n", err)
- }
- postData = append(postData, ([]byte)("\n")...)
- postData = append(postData, data...)
- }
-
- s5tl.Printf("fwd-es: marshalled: %v", (string)(postData))
-
- s5l.Printf("fwd-es: marshal OK")
- resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData))
-
- if err != nil {
- s5l.Printf("fwd-es: post failed: %v\n", err)
- time.Sleep(1 * time.Second)
- continue tryResync
- }
-
- resp.Body.Close()
- if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
- s5l.Printf("fwd-es: post failed: %s\n", resp.Status)
- time.Sleep(1 * time.Second)
- continue tryResync
- }
-
- s5l.Printf("fwd-es: all posts OK")
- lastId = findMaxId(updates)
- s5l.Printf("fwd-es: new lastid: %d", lastId)
- time.Sleep(1 * time.Second)
- }
- }
-}
-
func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
url := baseurl + "/updates"
tryResync:
@@ -188,7 +122,3 @@ tryResync:
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}
-
-func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) {
- self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient)
-}