diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForward.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForward.go | 72 |
1 files changed, 1 insertions, 71 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index cf385b9..8014ff2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -51,7 +51,7 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( } else { tid, errl := strconv.ParseInt(string(body), 10, 32) if errl != nil { - s5l.Printf("fwd: invalid lastupdate response: %v\n", err) + s5l.Printf("fwd: invalid lastupdate response: %v\n", errl) err = errl return } @@ -61,72 +61,6 @@ func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) ( return } -type esUpdate struct { - Data StatisticsData `json:"update"` -} - -func (self StatsSinkServer) handleForwardingToElasticSearch(baseurl string, client *http.Client) { - url := baseurl + "/dataupdate/_bulk" -tryResync: - for { - // TODO get last known update from elasticsearch - lastId := -1 - - s5l.Printf("fwd-es: lastupdate: %d", lastId) - - nextBatch: - for { - updates, err := self.getUpdatesAfterInvoke(lastId) - if err != nil { - s5l.Printf("fwd-es: failed reading updates: %v\n", err) - time.Sleep(500 * time.Millisecond) - continue nextBatch - } - - s5l.Printf("fwd-es: got %d updates", len(updates)) - - if len(updates) == 0 { - time.Sleep(1 * time.Second) - continue nextBatch - } - - postData := make([]byte, 4096) - - for _, update := range updates { - data, err := json.Marshal(esUpdate{Data: update}) - if err != nil { - s5l.Panicf("fwd-es: encode failed: %v\n", err) - } - postData = append(postData, ([]byte)("\n")...) - postData = append(postData, data...) - } - - s5tl.Printf("fwd-es: marshalled: %v", (string)(postData)) - - s5l.Printf("fwd-es: marshal OK") - resp, err := client.Post(url, "text/plain", bytes.NewBuffer(postData)) - - if err != nil { - s5l.Printf("fwd-es: post failed: %v\n", err) - time.Sleep(1 * time.Second) - continue tryResync - } - - resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - s5l.Printf("fwd-es: post failed: %s\n", resp.Status) - time.Sleep(1 * time.Second) - continue tryResync - } - - s5l.Printf("fwd-es: all posts OK") - lastId = findMaxId(updates) - s5l.Printf("fwd-es: new lastid: %d", lastId) - time.Sleep(1 * time.Second) - } - } -} - func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { url := baseurl + "/updates" tryResync: @@ -188,7 +122,3 @@ tryResync: func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { self.handleForwarding(forwardBaseUrl, http.DefaultClient) } - -func (self StatsSinkServer) RunForwardingToElasticSearch(forwardBaseUrl string) { - self.handleForwardingToElasticSearch(forwardBaseUrl, http.DefaultClient) -} |