summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-24 15:18:31 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-24 15:18:31 +0200
commitaa8d42077c3daa14f81a16f38781b2b8225528cb (patch)
tree41ee3f7d5cac41d3a2a7596824b9136880ca00ba /src
parenthub: Fix http response for lastupdate/:id (diff)
hub: Implement basic forward support.
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go95
1 files changed, 70 insertions, 25 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 6ae0f99..475027a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -3,53 +3,98 @@ package sfive
import (
"bytes"
"encoding/json"
+ "io/ioutil"
"net/http"
+ "strconv"
"time"
)
-func findMaxId(values []dataUpdateDb) int {
+func findMaxId(values []StatisticsData) int {
maxId := -1
for i := range values {
- if values[i].Id > maxId {
- maxId = values[i].Id
+ id := values[i].SourceHubDataUpdateId
+ if id != nil && *id > maxId {
+ maxId = *id
}
}
return maxId
}
-func (self StatsSinkServer) handleForwarding(url string, client *http.Client) {
- latestId := 0 // TODO retrieve latest known from server
-next:
- // TODO forwarding
- for {
- time.Sleep(500)
+func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string) {
+ storeId, err := self.store.GetStoreId()
+ if err != nil {
+ s5l.Panicf("fwd: failed to get own hubid: %v\n", err)
+ }
+
+ resp, err := client.Get(baseurl + "/lastupdate/" + storeId)
+ if err != nil {
+ s5l.Panicf("fwd: failed to query for lastupdate: %v\n", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ s5l.Panicf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode)
+ }
+
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ s5l.Panicf("fwd: failed to read lastupdate response: %v\n", err)
+ }
- updates, err := self.store.GetUpdatesAfter(latestId)
+ if len(body) == 0 {
+ latestId = -1
+ } else {
+ tid, err := strconv.ParseInt(string(body), 10, 32)
if err != nil {
- s5l.Printf("fwd: failed reading updates: %v\n", err)
- continue next
+ s5l.Panicf("fwd: invalid lastupdate response: %v\n", err)
}
+ latestId = int(tid)
+ }
+
+ return
+}
+
+func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) {
+ url := baseurl + "/updates"
+tryResync:
+ for {
+ lastId, _ := self.getLastUpdate(baseurl, client)
+
+ nextBatch:
+ for {
+ updates, err := self.store.GetUpdatesAfter(lastId)
+ if err != nil {
+ s5l.Printf("fwd: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ for i := range updates {
+ data, err := json.Marshal(updates[i])
+ if err != nil {
+ s5l.Panicf("fwd: encode failed: %v\n", err)
+ // TODO retry etc.
+ }
- for i := range updates {
- data, err := json.Marshal(updates[i])
- if err == nil {
- s5l.Printf("fwd: encode failed: %v\n", err)
- // TODO retry etc.
- continue
+ _, err = client.Post(url, "application/json", bytes.NewBuffer(data))
+ if err != nil {
+ s5l.Printf("fwd: post failed: %v\n", err)
+ continue tryResync
+ // TODO retry etc.
+ }
}
- _, err = client.Post(url, "application/json", bytes.NewBuffer(data))
- if err == nil {
- latestId = findMaxId(updates)
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
} else {
- s5l.Printf("fwd: post failed: %v\n", err)
- // TODO retry etc.
+ lastId = findMaxId(updates)
+ s5tl.Printf("fwd: new lastid: %d", lastId)
+ time.Sleep(100 * time.Millisecond)
}
}
}
}
func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
- forwardUrl := forwardBaseUrl + "/updates"
- self.handleForwarding(forwardUrl, http.DefaultClient)
+ self.handleForwarding(forwardBaseUrl, http.DefaultClient)
}