From 03cccc2f8f1a03aadf639da168798ba08c1610dd Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 7 May 2017 21:55:39 +0200 Subject: use reader/writer interface for fowarder --- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 75 ++++++++++++++--------- 1 file changed, 45 insertions(+), 30 deletions(-) diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index e88a79d..a479c2e 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -33,25 +33,24 @@ package sfive import ( - "bytes" "encoding/json" + "errors" + "io" "net/http" "time" ) func findMaxId(values []DataUpdateFull) int { maxId := -1 - for i := range values { - id := values[i].SourceHubDataUpdateId - if id > maxId { + for _, value := range values { + if id := value.SourceHubDataUpdateId; id > maxId { maxId = id } } return maxId } -func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (lastId int, hubUuid string, err error) { - hubUuid = srv.GetHubUuid() +func fwdGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) { lastId = -1 var resp *http.Response @@ -78,11 +77,30 @@ func (srv Server) forwardGetLastUpdate(baseurl string, client *http.Client) (las return } +func fwdPostUpdates(client *http.Client, url string, pr *io.PipeReader) (int, error) { + resp, err := client.Post(url, "application/json", pr) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, errors.New("") + } + dec := json.NewDecoder(resp.Body) + result := WebUpdatesPostResponse{} + if err = dec.Decode(&result); err != nil { + return 0, err + } + return result.NumUpdates, nil +} + func (srv Server) forwardRun(baseurl string, client *http.Client) { url := baseurl + "/updates/_bulk" + hubUuid := srv.GetHubUuid() tryResync: for { - lastId, _, err := srv.forwardGetLastUpdate(baseurl, client) + lastId, err := fwdGetLastUpdate(baseurl, client, hubUuid) if err != nil { s5l.Printf("fwd: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -95,7 +113,7 @@ tryResync: for { updates, err := srv.GetUpdatesAfter(lastId, 5000) if err != nil { - s5l.Printf("fwd: failed reading updates: %v\n", err) + s5l.Printf("fwd: failed reading updates: %v", err) time.Sleep(500 * time.Millisecond) continue nextBatch } @@ -107,34 +125,31 @@ tryResync: continue nextBatch } - // TODO: directly write to client body - buf := &bytes.Buffer{} - enc, err := NewStatefulEncoder(buf) - if err != nil { - s5l.Printf("fwd: failed encoding init message: %v\n", err) - time.Sleep(500 * time.Millisecond) - continue nextBatch - } - for _, upd := range updates { - if err := enc.Encode(upd); err != nil { - s5l.Printf("fwd: failed encoding updates: %v\n", err) - time.Sleep(500 * time.Millisecond) - continue nextBatch + pr, pw := io.Pipe() + go func(pw *io.PipeWriter) { + defer pw.Close() + + enc, err := NewStatefulEncoder(pw) + if err != nil { + s5l.Printf("fwd: failed encoding/sending init message: %v", err) + return } - } + for _, upd := range updates { + if err := enc.Encode(upd); err != nil { + s5l.Printf("fwd: failed encoding/sending updates: %v", err) + return + } + } + }(pw) - resp, err := client.Post(url, "application/json", buf) - if err != nil { - s5l.Printf("fwd: post failed: %v\n", err) + if num, err := fwdPostUpdates(client, url, pr); err != nil { + s5l.Printf("fwd: failed sending updates: %v", err) continue tryResync - } - resp.Body.Close() - if resp.StatusCode != http.StatusOK { - s5l.Printf("fwd: post failed: %s\n", resp.Status) + } else if num != len(updates) { + s5l.Printf("fwd: server acknowledged wrong number of updates: expected %d, got: %d", len(updates), num) continue tryResync } - s5l.Printf("fwd: post OK") lastId = findMaxId(updates) s5l.Printf("fwd: new lastid: %d", lastId) } -- cgit v1.2.3