diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 2 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5fwd.go | 55 | ||||
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5store.go | 14 |
3 files changed, 70 insertions, 1 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 3368d37..40cfcc6 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -54,7 +54,7 @@ func main() { go func() { defer wg.Done() s5hl.Println("start forward") - // TODO + server.RunForwarding(*forward) s5hl.Println("forward finished") }() } diff --git a/src/hub/src/spreadspace.org/sfive/s5fwd.go b/src/hub/src/spreadspace.org/sfive/s5fwd.go new file mode 100644 index 0000000..6ae0f99 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5fwd.go @@ -0,0 +1,55 @@ +package sfive + +import ( + "bytes" + "encoding/json" + "net/http" + "time" +) + +func findMaxId(values []dataUpdateDb) int { + maxId := -1 + for i := range values { + if values[i].Id > maxId { + maxId = values[i].Id + } + } + return maxId +} + +func (self StatsSinkServer) handleForwarding(url string, client *http.Client) { + latestId := 0 // TODO retrieve latest known from server +next: + // TODO forwarding + for { + time.Sleep(500) + + updates, err := self.store.GetUpdatesAfter(latestId) + if err != nil { + s5l.Printf("fwd: failed reading updates: %v\n", err) + continue next + } + + for i := range updates { + data, err := json.Marshal(updates[i]) + if err == nil { + s5l.Printf("fwd: encode failed: %v\n", err) + // TODO retry etc. + continue + } + + _, err = client.Post(url, "application/json", bytes.NewBuffer(data)) + if err == nil { + latestId = findMaxId(updates) + } else { + s5l.Printf("fwd: post failed: %v\n", err) + // TODO retry etc. + } + } + } +} + +func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) { + forwardUrl := forwardBaseUrl + "/updates" + self.handleForwarding(forwardUrl, http.DefaultClient) +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 82ac2d1..198069c 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -366,6 +366,20 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) { return } +func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) { + updates, err := s.db.Select( + dataUpdateDb{}, + "select * from "+dataUpdatesTn+" where Id > ?", + id) + if err == nil { + res = make([]dataUpdateDb, len(updates)) + for i := range updates { + res[i] = *updates[i].(*dataUpdateDb) + } + } + return +} + func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []dataUpdateDb, err error) { sourceSql, parameters := getFilteredDataUpdateSelect(filter) updates, err := s.db.Select( |