summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-07 21:55:39 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-07 21:55:39 +0200
commit03cccc2f8f1a03aadf639da168798ba08c1610dd (patch)
tree0627a242e1923295111ad805f53f07ffa25d82fa
parentadded stateful encoder (diff)
use reader/writer interface for fowarder
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go75
1 files changed, 45 insertions, 30 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index e88a79d..a479c2e 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -33,25 +33,24 @@
package sfive
import (
- "bytes"
"encoding/json"
+ "errors"
+ "io"
"net/http"
"time"
)
func findMaxId(values []DataUpdateFull) int {
maxId := -1
- for i := range values {
- id := values[i].SourceHubDataUpdateId
- if id > maxId {
+ for _, value := range values {
+ if id := value.SourceHubDataUpdateId; id > maxId {
maxId = id
}
}
return maxId
}
-func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lastId int, hubUuid string, err error) {
- hubUuid = srv.GetHubUuid()
+func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) {
lastId = -1
var resp *http.Response
@@ -78,11 +77,30 @@ func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (las
return
}
+func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, error) {
+ resp, err := client.Post(url, "application/json", pr)
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return 0, errors.New("")
+ }
+ dec := json.NewDecoder(resp.Body)
+ result := WebUpdatesPostResponse{}
+ if err = dec.Decode(&result); err != nil {
+ return 0, err
+ }
+ return result.NumUpdates, nil
+}
+
func (srv Server) forwardRun(baseurl string, client *http.Client) {
url := baseurl + "/updates/_bulk"
+ hubUuid := srv.GetHubUuid()
tryResync:
for {
- lastId, _, err := srv.forwardGetLastUpdate(baseurl, client)
+ lastId, err := fwdGetLastUpdate(baseurl, client, hubUuid)
if err != nil {
s5l.Printf("fwd: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -95,7 +113,7 @@ tryResync:
for {
updates, err := srv.GetUpdatesAfter(lastId, 5000)
if err != nil {
- s5l.Printf("fwd: failed reading updates: %v\n", err)
+ s5l.Printf("fwd: failed reading updates: %v", err)
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
@@ -107,34 +125,31 @@ tryResync:
continue nextBatch
}
- // TODO: directly write to client body
- buf := &bytes.Buffer{}
- enc, err := NewStatefulEncoder(buf)
- if err != nil {
- s5l.Printf("fwd: failed encoding init message: %v\n", err)
- time.Sleep(500 * time.Millisecond)
- continue nextBatch
- }
- for _, upd := range updates {
- if err := enc.Encode(upd); err != nil {
- s5l.Printf("fwd: failed encoding updates: %v\n", err)
- time.Sleep(500 * time.Millisecond)
- continue nextBatch
+ pr, pw := io.Pipe()
+ go func(pw *io.PipeWriter) {
+ defer pw.Close()
+
+ enc, err := NewStatefulEncoder(pw)
+ if err != nil {
+ s5l.Printf("fwd: failed encoding/sending init message: %v", err)
+ return
}
- }
+ for _, upd := range updates {
+ if err := enc.Encode(upd); err != nil {
+ s5l.Printf("fwd: failed encoding/sending updates: %v", err)
+ return
+ }
+ }
+ }(pw)
- resp, err := client.Post(url, "application/json", buf)
- if err != nil {
- s5l.Printf("fwd: post failed: %v\n", err)
+ if num, err := fwdPostUpdates(client, url, pr); err != nil {
+ s5l.Printf("fwd: failed sending updates: %v", err)
continue tryResync
- }
- resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- s5l.Printf("fwd: post failed: %s\n", resp.Status)
+ } else if num != len(updates) {
+ s5l.Printf("fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num)
continue tryResync
}
- s5l.Printf("fwd: post OK")
lastId = findMaxId(updates)
s5l.Printf("fwd: new lastid: %d", lastId)
}