From c957124e7faf0971cd09d6c46e36a93934f8e5f9 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 11 Mar 2017 01:05:07 +0100 Subject: added forwarding for piwik --- src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 18 +++- .../src/spreadspace.org/sfive/s5srvForwardPiwik.go | 111 +++++++++++++++++++++ src/hub/test-fwd-piwik | 4 + 3 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go create mode 100755 src/hub/test-fwd-piwik diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index 85274cf..8897e42 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -22,7 +22,11 @@ func main() { forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url") forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http") forwardGraphite := flag.String("forward-graphite", "", "forward to Graphite at this host") - GraphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite") + graphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite") + forwardPiwik := flag.String("forward-piwik-url", "", "forward to Piwik at this host") + piwikSiteURL := flag.String("piwik-site-url", "", "use this base url for the site") + piwikSiteID := flag.Uint("piwik-site-id", 1, "use this site-id for piwik") + piwikToken := flag.String("piwik-token", "", "the auth token for piwik") vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application") help := flag.Bool("help", false, "show usage") @@ -97,11 +101,21 @@ func main() { go func() { defer wg.Done() s5hl.Println("start graphite forward") - server.RunForwardingToGraphite(*forwardGraphite, *GraphiteBasePath) + server.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath) s5hl.Println("graphite forward finished") }() } + if *forwardPiwik != "" { + wg.Add(1) + go func() { + defer wg.Done() + s5hl.Println("start piwik forward") + server.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken) + s5hl.Println("piwik forward finished") + }() + } + alldone := make(chan bool) go func() { defer func() { alldone <- true }() diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go new file mode 100644 index 0000000..4235c50 --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go @@ -0,0 +1,111 @@ +package sfive + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "strconv" + "time" +) + +type PiwikBulkRequest struct { + Requests []string `json:"requests"` + TokenAuth string `json:"token_auth"` +} + +func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) { + // TODO: ask piwik what the last update was... + + latestId, err = 0, nil //self.getLastUpdateIdInvoke() + if err != nil { + s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err) + return + } + + return +} + +func (self StatsSinkServer) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) { +tryResync: + for { + lastId, _, err := self.getLastUpdatePiwik(piwikURL, siteURL, siteID, token, client) + if err != nil { + s5l.Printf("fwd-piwik: lastupdate returned err: %v", err) + time.Sleep(5 * time.Second) + continue tryResync + } + s5l.Printf("fwd-piwik: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-piwik: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-piwik: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + req := PiwikBulkRequest{TokenAuth: token} + for _, update := range updates { + if len(update.Data.Clients) == 0 { + continue + } + + for _, client := range update.Data.Clients { + ip, _, err := net.SplitHostPort(client.Ip) + if err != nil { + ip = client.Ip + } + + q := make(url.Values) + q.Add("rec", "1") + q.Add("idsite", strconv.FormatUint(uint64(siteID), 10)) + q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Hostname, update.StreamId.ContentId, update.StreamId.Format, update.StreamId.Quality)) + q.Add("cip", ip) + q.Add("cdt", strconv.FormatInt(update.StartTime.Unix(), 10)) + q.Add("ua", client.UserAgent) + req.Requests = append(req.Requests, "?"+q.Encode()) + } + } + + postData := bytes.Buffer{} + if err := json.NewEncoder(&postData).Encode(req); err != nil { + s5l.Panicf("fwd-piwik: encode failed: %v\n", err) + } + + resp, err := client.Post(piwikURL, "application/json", &postData) + if err != nil { + s5l.Printf("fwd-piwik: post failed: %v\n", err) + time.Sleep(1 * time.Second) + continue tryResync + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := ioutil.ReadAll(resp.Body) + s5l.Printf("fwd-piwik: post failed: %s\n%s\n", resp.Status, body) + time.Sleep(1 * time.Second) + continue tryResync + } + resp.Body.Close() + + s5l.Printf("fwd-piwik: all posts OK") + lastId = findMaxId(updates) + s5l.Printf("fwd-piwik: new lastid: %d", lastId) + //time.Sleep(1 * time.Second) + } + } +} + +func (self StatsSinkServer) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) { + self.handleForwardingToPiwik(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient) +} diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik new file mode 100755 index 0000000..1d45219 --- /dev/null +++ b/src/hub/test-fwd-piwik @@ -0,0 +1,4 @@ +#!/bin/sh + +rm -f /run/sfive/pipe /run/sfive/pipegram +./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at" -- cgit v1.2.3