From 13333a768cd3d7818d8705474eaf7924cd754387 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 7 May 2017 22:24:20 +0200 Subject: some cleanup @ forwarder --- src/hub/src/spreadspace.org/sfive/s5srvForward.go | 3 --- .../src/spreadspace.org/sfive/s5srvForwardEs.go | 28 +++++++++------------- .../spreadspace.org/sfive/s5srvForwardGraphite.go | 3 --- .../src/spreadspace.org/sfive/s5srvForwardPiwik.go | 22 +++++++---------- 4 files changed, 19 insertions(+), 37 deletions(-) (limited to 'src/hub') diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go index 0914308..19dbf16 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go @@ -122,7 +122,6 @@ tryResync: time.Sleep(5 * time.Second) continue tryResync } - s5l.Printf("fwd: lastupdate: %d", lastId) nextBatch: @@ -133,8 +132,6 @@ tryResync: time.Sleep(500 * time.Millisecond) continue nextBatch } - - s5l.Printf("fwd: got %d updates", len(updates)) if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go index 4297bbf..8cf3af5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go @@ -47,12 +47,10 @@ const forwardEsLastUpdateJson = `{ "aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } } }` -func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) { +func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) { url := baseurl + "/dataupdate/_search?search_type=count" - storeId = srv.GetHubUuid() - - queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId) + queryJson := fmt.Sprintf(forwardEsLastUpdateJson, hubUuid) s5dl.Printf("fwd-es: query: %s", queryJson) var resp *http.Response @@ -69,7 +67,7 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l } var body []byte - body, err = ioutil.ReadAll(resp.Body) + body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder if err != nil { s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err) return @@ -78,7 +76,7 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l s5dl.Printf("fwd-es: lastupdate response: %s\n", body) if len(body) == 0 { - latestId = -1 + lastId = -1 } else { var value interface{} err = json.Unmarshal(body, &value) @@ -90,11 +88,11 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"] if idstrcntr == nil { s5l.Printf("fwd-es: we are new here\n") - latestId = -1 + lastId = -1 return } tid := idstrcntr.(float64) - latestId = int(tid) + lastId = int(tid) } return @@ -102,9 +100,10 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l func (srv Server) forwardEsRun(baseurl string, client *http.Client) { url := baseurl + "/_bulk" + hubUuid := srv.GetHubUuid() tryResync: for { - lastId, _, err := srv.forwardEsGetLastUpdate(baseurl, client) + lastId, err := fwdEsGetLastUpdate(baseurl, client, hubUuid) if err != nil { s5l.Printf("fwd-es: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -120,9 +119,6 @@ tryResync: time.Sleep(500 * time.Millisecond) continue nextBatch } - - s5l.Printf("fwd-es: got %d updates", len(updates)) - if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch @@ -131,7 +127,7 @@ tryResync: postData := bytes.Buffer{} for _, update := range updates { - data, err := json.Marshal(update) + data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter if err != nil { s5l.Panicf("fwd-es: encode failed: %v\n", err) } @@ -141,9 +137,8 @@ tryResync: postData.WriteRune('\n') } - s5l.Printf("fwd-es: marshal OK") + // TODO: move to seperate function and use io.PipeReader as body resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes())) - if err != nil { s5l.Printf("fwd-es: post failed: %v\n", err) time.Sleep(1 * time.Second) @@ -156,9 +151,8 @@ tryResync: time.Sleep(1 * time.Second) continue tryResync } - resp.Body.Close() + resp.Body.Close() // TODO: check result from elasticsearch - s5l.Printf("fwd-es: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-es: new lastid: %d", lastId) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go index fc825d9..b4ab8b2 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -66,8 +66,6 @@ tryResync: time.Sleep(500 * time.Millisecond) continue nextBatch } - - s5l.Printf("fwd-graphite: got %d updates", len(updates)) if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch @@ -92,7 +90,6 @@ tryResync: continue tryResync } - s5l.Printf("fwd-graphite: all metrics sent") lastId = findMaxId(updates) s5l.Printf("fwd-graphite: new lastid: %d", lastId) } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go index 3d44500..9e7817a 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -49,22 +49,18 @@ type forwardPiwikBulkRequest struct { TokenAuth string `json:"token_auth"` } -func (srv Server) forwardPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { +func fwdPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) { // TODO: ask piwik what the last update was... - - latestId, err = 0, nil //srv.getLastUpdateIdInvoke() - if err != nil { - s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err) - return - } - + lastId = 0 return } func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { + // hubUuid := srv.GetHubUuid() tryResync: for { - lastId, _, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client) + // lastId, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client, hubUuid) + lastId, err := srv.GetLastUpdateId() if err != nil { s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) time.Sleep(5 * time.Second) @@ -80,14 +76,12 @@ tryResync: time.Sleep(500 * time.Millisecond) continue nextBatch } - - s5l.Printf("fwd-piwik: got %d updates", len(updates)) - if len(updates) == 0 { time.Sleep(1 * time.Second) continue nextBatch } + // TODO: move this to seperate function and write to io.PipeWriter req := forwardPiwikBulkRequest{TokenAuth: token} for _, update := range updates { if len(update.Data.Clients) == 0 { @@ -116,6 +110,7 @@ tryResync: s5l.Panicf("fwd-piwik: encode failed: %v\n", err) } + // TODO: move this to seperate function and read from io.PipeReader resp, err := client.Post(piwikURL, "application/json", &postData) if err != nil { s5l.Printf("fwd-piwik: post failed: %v\n", err) @@ -128,9 +123,8 @@ tryResync: time.Sleep(1 * time.Second) continue tryResync } - resp.Body.Close() + resp.Body.Close() // TODO: check result from Piwik - s5l.Printf("fwd-piwik: all posts OK") lastId = findMaxId(updates) s5l.Printf("fwd-piwik: new lastid: %d", lastId) } -- cgit v1.2.3