From aa8d42077c3daa14f81a16f38781b2b8225528cb Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Fri, 24 Oct 2014 15:18:31 +0200 Subject: hub: Implement basic forward support. --- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 95 +++++++++++++++++------ 1 file changed, 70 insertions(+), 25 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 6ae0f99..475027a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -3,53 +3,98 @@ package sfive import ( "bytes" "encoding/json" + "io/ioutil" "net/http" + "strconv" "time" ) -func findMaxId(values []dataUpdateDb) int { +func findMaxId(values []StatisticsData) int { maxId := -1 for i := range values { - if values[i].Id > maxId { - maxId = values[i].Id + id := values[i].SourceHubDataUpdateId + if id != nil && *id > maxId { + maxId = *id } } return maxId } -func (self StatsSinkServer) handleForwarding(url string, client *http.Client) { - latestId := 0 // TODO retrieve latest known from server -next: - // TODO forwarding - for { - time.Sleep(500) +func (self StatsSinkServer) getLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string) { + storeId, err := self.store.GetStoreId() + if err != nil { + s5l.Panicf("fwd: failed to get own hubid: %v\n", err) + } + + resp, err := client.Get(baseurl + "/lastupdate/" + storeId) + if err != nil { + s5l.Panicf("fwd: failed to query for lastupdate: %v\n", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + s5l.Panicf("fwd: server failed to fulfill query for lastupdate: %v\n", resp.StatusCode) + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + s5l.Panicf("fwd: failed to read lastupdate response: %v\n", err) + } - updates, err := self.store.GetUpdatesAfter(latestId) + if len(body) == 0 { + latestId = -1 + } else { + tid, err := strconv.ParseInt(string(body), 10, 32) if err != nil { - s5l.Printf("fwd: failed reading updates: %v\n", err) - continue next + s5l.Panicf("fwd: invalid lastupdate response: %v\n", err) } + latestId = int(tid) + } + + return +} + +func (self StatsSinkServer) handleForwarding(baseurl string, client *http.Client) { + url := baseurl + "/updates" +tryResync: + for { + lastId, _ := self.getLastUpdate(baseurl, client) + + nextBatch: + for { + updates, err := self.store.GetUpdatesAfter(lastId) + if err != nil { + s5l.Printf("fwd: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + for i := range updates { + data, err := json.Marshal(updates[i]) + if err != nil { + s5l.Panicf("fwd: encode failed: %v\n", err) + // TODO retry etc. + } - for i := range updates { - data, err := json.Marshal(updates[i]) - if err == nil { - s5l.Printf("fwd: encode failed: %v\n", err) - // TODO retry etc. - continue + _, err = client.Post(url, "application/json", bytes.NewBuffer(data)) + if err != nil { + s5l.Printf("fwd: post failed: %v\n", err) + continue tryResync + // TODO retry etc. + } } - _, err = client.Post(url, "application/json", bytes.NewBuffer(data)) - if err == nil { - latestId = findMaxId(updates) + if len(updates) == 0 { + time.Sleep(1 * time.Second) } else { - s5l.Printf("fwd: post failed: %v\n", err) - // TODO retry etc. + lastId = findMaxId(updates) + s5tl.Printf("fwd: new lastid: %d", lastId) + time.Sleep(100 * time.Millisecond) } } } } func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { - forwardUrl := forwardBaseUrl + "/updates" - self.handleForwarding(forwardUrl, http.DefaultClient) + self.handleForwarding(forwardBaseUrl, http.DefaultClient) } -- cgit v1.2.3