From d036c0a11c08d225296b0fe0118c34b6a0104048 Mon Sep 17 00:00:00 2001 From: Markus Grüneis Date: Wed, 22 Oct 2014 21:25:20 +0200 Subject: hub: add stub for forwarding --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 2 +- src/hub/src/spreadspace.org/sfive/s5fwd.go | 55 ++++++++++++++++++++++++++ src/hub/src/spreadspace.org/sfive/s5store.go | 14 +++++++ 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5fwd.go (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 3368d37..40cfcc6 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -54,7 +54,7 @@ func main() { go func() { defer wg.Done() s5hl.Println("start forward") - // TODO + server.RunForwarding(*forward) s5hl.Println("forward finished") }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5fwd.go b/src/hub/src/spreadspace.org/sfive/s5fwd.go new file mode 100644 index 0000000..6ae0f99 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5fwd.go @@ -0,0 +1,55 @@ +package sfive + +import ( + "bytes" + "encoding/json" + "net/http" + "time" +) + +func findMaxId(values []dataUpdateDb) int { + maxId := -1 + for i := range values { + if values[i].Id > maxId { + maxId = values[i].Id + } + } + return maxId +} + +func (self StatsSinkServer) handleForwarding(url string, client *http.Client) { + latestId := 0 // TODO retrieve latest known from server +next: + // TODO forwarding + for { + time.Sleep(500) + + updates, err := self.store.GetUpdatesAfter(latestId) + if err != nil { + s5l.Printf("fwd: failed reading updates: %v\n", err) + continue next + } + + for i := range updates { + data, err := json.Marshal(updates[i]) + if err == nil { + s5l.Printf("fwd: encode failed: %v\n", err) + // TODO retry etc. + continue + } + + _, err = client.Post(url, "application/json", bytes.NewBuffer(data)) + if err == nil { + latestId = findMaxId(updates) + } else { + s5l.Printf("fwd: post failed: %v\n", err) + // TODO retry etc. + } + } + } +} + +func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { + forwardUrl := forwardBaseUrl + "/updates" + self.handleForwarding(forwardUrl, http.DefaultClient) +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 82ac2d1..198069c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -366,6 +366,20 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { return } +func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) { + updates, err := s.db.Select( + dataUpdateDb{}, + "select * from "+dataUpdatesTn+" where Id > ?", + id) + if err == nil { + res = make([]dataUpdateDb, len(updates)) + for i := range updates { + res[i] = *updates[i].(*dataUpdateDb) + } + } + return +} + func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []dataUpdateDb, err error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) updates, err := s.db.Select( -- cgit v1.2.3