summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-05-07 22:24:20 +0200
committerChristian Pointner <equinox@spreadspace.org>2017-05-07 22:24:20 +0200
commit13333a768cd3d7818d8705474eaf7924cd754387 (patch)
tree5d8dd15fe76cf023fc0f07d421acc9f4d0586fb8
parentcleaned up graphite forwarder (diff)
some cleanup @ forwarder
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForward.go3
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go28
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go3
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go22
4 files changed, 19 insertions, 37 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForward.go b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
index 0914308..19dbf16 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForward.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForward.go
@@ -122,7 +122,6 @@ tryResync:
time.Sleep(5 * time.Second)
continue tryResync
}
-
s5l.Printf("fwd: lastupdate: %d", lastId)
nextBatch:
@@ -133,8 +132,6 @@ tryResync:
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
-
- s5l.Printf("fwd: got %d updates", len(updates))
if len(updates) == 0 {
time.Sleep(1 * time.Second)
continue nextBatch
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
index 4297bbf..8cf3af5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardEs.go
@@ -47,12 +47,10 @@ const forwardEsLastUpdateJson = `{
"aggregations": { "last-id" : { "max" : { "field": "SourceHubDataUpdateId" } } }
}`
-func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (latestId int, storeId string, err error) {
+func fwdEsGetLastUpdate(baseurl string, client *http.Client, hubUuid string) (lastId int, err error) {
url := baseurl + "/dataupdate/_search?search_type=count"
- storeId = srv.GetHubUuid()
-
- queryJson := fmt.Sprintf(forwardEsLastUpdateJson, storeId)
+ queryJson := fmt.Sprintf(forwardEsLastUpdateJson, hubUuid)
s5dl.Printf("fwd-es: query: %s", queryJson)
var resp *http.Response
@@ -69,7 +67,7 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l
}
var body []byte
- body, err = ioutil.ReadAll(resp.Body)
+ body, err = ioutil.ReadAll(resp.Body) // TODO: read from Body using json.Decoder
if err != nil {
s5l.Printf("fwd-es: failed to read lastupdate response: %v\n", err)
return
@@ -78,7 +76,7 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l
s5dl.Printf("fwd-es: lastupdate response: %s\n", body)
if len(body) == 0 {
- latestId = -1
+ lastId = -1
} else {
var value interface{}
err = json.Unmarshal(body, &value)
@@ -90,11 +88,11 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l
idstrcntr := value.(map[string]interface{})["aggregations"].(map[string]interface{})["last-id"].(map[string]interface{})["value"]
if idstrcntr == nil {
s5l.Printf("fwd-es: we are new here\n")
- latestId = -1
+ lastId = -1
return
}
tid := idstrcntr.(float64)
- latestId = int(tid)
+ lastId = int(tid)
}
return
@@ -102,9 +100,10 @@ func (srv Server) forwardEsGetLastUpdate(baseurl string, client *http.Client) (l
func (srv Server) forwardEsRun(baseurl string, client *http.Client) {
url := baseurl + "/_bulk"
+ hubUuid := srv.GetHubUuid()
tryResync:
for {
- lastId, _, err := srv.forwardEsGetLastUpdate(baseurl, client)
+ lastId, err := fwdEsGetLastUpdate(baseurl, client, hubUuid)
if err != nil {
s5l.Printf("fwd-es: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -120,9 +119,6 @@ tryResync:
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
-
- s5l.Printf("fwd-es: got %d updates", len(updates))
-
if len(updates) == 0 {
time.Sleep(1 * time.Second)
continue nextBatch
@@ -131,7 +127,7 @@ tryResync:
postData := bytes.Buffer{}
for _, update := range updates {
- data, err := json.Marshal(update)
+ data, err := json.Marshal(update) // TODO: move to seperate function, use StatelessEncoder and write to io.PipeWriter
if err != nil {
s5l.Panicf("fwd-es: encode failed: %v\n", err)
}
@@ -141,9 +137,8 @@ tryResync:
postData.WriteRune('\n')
}
- s5l.Printf("fwd-es: marshal OK")
+ // TODO: move to seperate function and use io.PipeReader as body
resp, err := client.Post(url, "application/json", bytes.NewReader(postData.Bytes()))
-
if err != nil {
s5l.Printf("fwd-es: post failed: %v\n", err)
time.Sleep(1 * time.Second)
@@ -156,9 +151,8 @@ tryResync:
time.Sleep(1 * time.Second)
continue tryResync
}
- resp.Body.Close()
+ resp.Body.Close() // TODO: check result from elasticsearch
- s5l.Printf("fwd-es: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-es: new lastid: %d", lastId)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
index fc825d9..b4ab8b2 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -66,8 +66,6 @@ tryResync:
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
-
- s5l.Printf("fwd-graphite: got %d updates", len(updates))
if len(updates) == 0 {
time.Sleep(1 * time.Second)
continue nextBatch
@@ -92,7 +90,6 @@ tryResync:
continue tryResync
}
- s5l.Printf("fwd-graphite: all metrics sent")
lastId = findMaxId(updates)
s5l.Printf("fwd-graphite: new lastid: %d", lastId)
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
index 3d44500..9e7817a 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -49,22 +49,18 @@ type forwardPiwikBulkRequest struct {
TokenAuth string `json:"token_auth"`
}
-func (srv Server) forwardPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) {
+func fwdPiwikGetLastUpdate(piwikURL, siteURL string, siteID uint, token string, client *http.Client, hubUuid string) (lastId int, err error) {
// TODO: ask piwik what the last update was...
-
- latestId, err = 0, nil //srv.getLastUpdateIdInvoke()
- if err != nil {
- s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err)
- return
- }
-
+ lastId = 0
return
}
func (srv Server) forwardPiwikRun(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
+ // hubUuid := srv.GetHubUuid()
tryResync:
for {
- lastId, _, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client)
+ // lastId, err := srv.forwardPiwikGetLastUpdate(piwikURL, siteURL, siteID, token, client, hubUuid)
+ lastId, err := srv.GetLastUpdateId()
if err != nil {
s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
time.Sleep(5 * time.Second)
@@ -80,14 +76,12 @@ tryResync:
time.Sleep(500 * time.Millisecond)
continue nextBatch
}
-
- s5l.Printf("fwd-piwik: got %d updates", len(updates))
-
if len(updates) == 0 {
time.Sleep(1 * time.Second)
continue nextBatch
}
+ // TODO: move this to seperate function and write to io.PipeWriter
req := forwardPiwikBulkRequest{TokenAuth: token}
for _, update := range updates {
if len(update.Data.Clients) == 0 {
@@ -116,6 +110,7 @@ tryResync:
s5l.Panicf("fwd-piwik: encode failed: %v\n", err)
}
+ // TODO: move this to seperate function and read from io.PipeReader
resp, err := client.Post(piwikURL, "application/json", &postData)
if err != nil {
s5l.Printf("fwd-piwik: post failed: %v\n", err)
@@ -128,9 +123,8 @@ tryResync:
time.Sleep(1 * time.Second)
continue tryResync
}
- resp.Body.Close()
+ resp.Body.Close() // TODO: check result from Piwik
- s5l.Printf("fwd-piwik: all posts OK")
lastId = findMaxId(updates)
s5l.Printf("fwd-piwik: new lastid: %d", lastId)
}