summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarkus Grüneis <gimpf@gimpf.org>2014-10-22 21:25:20 +0200
committerMarkus Grüneis <gimpf@gimpf.org>2014-10-22 21:25:20 +0200
commitd036c0a11c08d225296b0fe0118c34b6a0104048 (patch)
tree341189740bad70a0a56ca13cac2fe498c1176b51 /src
parenthub: use command-line options for paths etc. (diff)
hub: add stub for forwarding
Diffstat (limited to 'src')
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go2
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5fwd.go55
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go14
3 files changed, 70 insertions, 1 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 3368d37..40cfcc6 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -54,7 +54,7 @@ func main() {
go func() {
defer wg.Done()
s5hl.Println("start forward")
- // TODO
+ server.RunForwarding(*forward)
s5hl.Println("forward finished")
}()
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5fwd.go b/src/hub/src/spreadspace.org/sfive/s5fwd.go
new file mode 100644
index 0000000..6ae0f99
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5fwd.go
@@ -0,0 +1,55 @@
+package sfive
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "time"
+)
+
+func findMaxId(values []dataUpdateDb) int {
+ maxId := -1
+ for i := range values {
+ if values[i].Id > maxId {
+ maxId = values[i].Id
+ }
+ }
+ return maxId
+}
+
+func (self StatsSinkServer) handleForwarding(url string, client *http.Client) {
+ latestId := 0 // TODO retrieve latest known from server
+next:
+ // TODO forwarding
+ for {
+ time.Sleep(500)
+
+ updates, err := self.store.GetUpdatesAfter(latestId)
+ if err != nil {
+ s5l.Printf("fwd: failed reading updates: %v\n", err)
+ continue next
+ }
+
+ for i := range updates {
+ data, err := json.Marshal(updates[i])
+ if err == nil {
+ s5l.Printf("fwd: encode failed: %v\n", err)
+ // TODO retry etc.
+ continue
+ }
+
+ _, err = client.Post(url, "application/json", bytes.NewBuffer(data))
+ if err == nil {
+ latestId = findMaxId(updates)
+ } else {
+ s5l.Printf("fwd: post failed: %v\n", err)
+ // TODO retry etc.
+ }
+ }
+ }
+}
+
+func (self StatsSinkServer) RunForwarding(forwardBaseUrl string) {
+ forwardUrl := forwardBaseUrl + "/updates"
+ self.handleForwarding(forwardUrl, http.DefaultClient)
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 82ac2d1..198069c 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -366,6 +366,20 @@ func (s sqliteStore) GetUpdate(id int) (res dataUpdateDb, err error) {
return
}
+func (s sqliteStore) GetUpdatesAfter(id int) (res []dataUpdateDb, err error) {
+ updates, err := s.db.Select(
+ dataUpdateDb{},
+ "select * from "+dataUpdatesTn+" where Id > ?",
+ id)
+ if err == nil {
+ res = make([]dataUpdateDb, len(updates))
+ for i := range updates {
+ res[i] = *updates[i].(*dataUpdateDb)
+ }
+ }
+ return
+}
+
func (s sqliteStore) GetUpdates(filter *StatsFilter) (res []dataUpdateDb, err error) {
sourceSql, parameters := getFilteredDataUpdateSelect(filter)
updates, err := s.db.Select(