summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2017-03-11 01:05:07 +0100
committerChristian Pointner <equinox@spreadspace.org>2017-03-11 01:05:07 +0100
commitc957124e7faf0971cd09d6c46e36a93934f8e5f9 (patch)
tree2dd6a0f99d3a7a63cabdbcd9ff667daf51cacf83
parentupdated changelog for release (diff)
added forwarding for piwik
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go18
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go111
-rwxr-xr-xsrc/hub/test-fwd-piwik4
3 files changed, 131 insertions, 2 deletions
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index 85274cf..8897e42 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -22,7 +22,11 @@ func main() {
forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url")
forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http")
forwardGraphite := flag.String("forward-graphite", "", "forward to Graphite at this host")
- GraphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite")
+ graphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite")
+ forwardPiwik := flag.String("forward-piwik-url", "", "forward to Piwik at this host")
+ piwikSiteURL := flag.String("piwik-site-url", "", "use this base url for the site")
+ piwikSiteID := flag.Uint("piwik-site-id", 1, "use this site-id for piwik")
+ piwikToken := flag.String("piwik-token", "", "the auth token for piwik")
vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application")
help := flag.Bool("help", false, "show usage")
@@ -97,11 +101,21 @@ func main() {
go func() {
defer wg.Done()
s5hl.Println("start graphite forward")
- server.RunForwardingToGraphite(*forwardGraphite, *GraphiteBasePath)
+ server.RunForwardingToGraphite(*forwardGraphite, *graphiteBasePath)
s5hl.Println("graphite forward finished")
}()
}
+ if *forwardPiwik != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s5hl.Println("start piwik forward")
+ server.RunForwardingToPiwik(*forwardPiwik, *piwikSiteURL, *piwikSiteID, *piwikToken)
+ s5hl.Println("piwik forward finished")
+ }()
+ }
+
alldone := make(chan bool)
go func() {
defer func() { alldone <- true }()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
new file mode 100644
index 0000000..4235c50
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardPiwik.go
@@ -0,0 +1,111 @@
+package sfive
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strconv"
+ "time"
+)
+
+type PiwikBulkRequest struct {
+ Requests []string `json:"requests"`
+ TokenAuth string `json:"token_auth"`
+}
+
+func (self StatsSinkServer) getLastUpdatePiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) (latestId int, storeId string, err error) {
+ // TODO: ask piwik what the last update was...
+
+ latestId, err = 0, nil //self.getLastUpdateIdInvoke()
+ if err != nil {
+ s5l.Printf("fwd-piwik: failed to get own hubid: %v\n", err)
+ return
+ }
+
+ return
+}
+
+func (self StatsSinkServer) handleForwardingToPiwik(piwikURL, siteURL string, siteID uint, token string, client *http.Client) {
+tryResync:
+ for {
+ lastId, _, err := self.getLastUpdatePiwik(piwikURL, siteURL, siteID, token, client)
+ if err != nil {
+ s5l.Printf("fwd-piwik: lastupdate returned err: %v", err)
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+ s5l.Printf("fwd-piwik: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-piwik: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-piwik: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ req := PiwikBulkRequest{TokenAuth: token}
+ for _, update := range updates {
+ if len(update.Data.Clients) == 0 {
+ continue
+ }
+
+ for _, client := range update.Data.Clients {
+ ip, _, err := net.SplitHostPort(client.Ip)
+ if err != nil {
+ ip = client.Ip
+ }
+
+ q := make(url.Values)
+ q.Add("rec", "1")
+ q.Add("idsite", strconv.FormatUint(uint64(siteID), 10))
+ q.Add("url", fmt.Sprintf("%s/%s/%s/%s/%s", siteURL, update.Hostname, update.StreamId.ContentId, update.StreamId.Format, update.StreamId.Quality))
+ q.Add("cip", ip)
+ q.Add("cdt", strconv.FormatInt(update.StartTime.Unix(), 10))
+ q.Add("ua", client.UserAgent)
+ req.Requests = append(req.Requests, "?"+q.Encode())
+ }
+ }
+
+ postData := bytes.Buffer{}
+ if err := json.NewEncoder(&postData).Encode(req); err != nil {
+ s5l.Panicf("fwd-piwik: encode failed: %v\n", err)
+ }
+
+ resp, err := client.Post(piwikURL, "application/json", &postData)
+ if err != nil {
+ s5l.Printf("fwd-piwik: post failed: %v\n", err)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
+ body, _ := ioutil.ReadAll(resp.Body)
+ s5l.Printf("fwd-piwik: post failed: %s\n%s\n", resp.Status, body)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+ resp.Body.Close()
+
+ s5l.Printf("fwd-piwik: all posts OK")
+ lastId = findMaxId(updates)
+ s5l.Printf("fwd-piwik: new lastid: %d", lastId)
+ //time.Sleep(1 * time.Second)
+ }
+ }
+}
+
+func (self StatsSinkServer) RunForwardingToPiwik(piwikURL, siteURL string, siteID uint, piwikToken string) {
+ self.handleForwardingToPiwik(piwikURL, siteURL, siteID, piwikToken, http.DefaultClient)
+}
diff --git a/src/hub/test-fwd-piwik b/src/hub/test-fwd-piwik
new file mode 100755
index 0000000..1d45219
--- /dev/null
+++ b/src/hub/test-fwd-piwik
@@ -0,0 +1,4 @@
+#!/bin/sh
+
+rm -f /run/sfive/pipe /run/sfive/pipegram
+./bin/sfive-hub -start-pipe-server=false -start-pipegram-server=false -start-web-server=false -db db.sqlite -forward-piwik-url="http://localhost/piwik.php" -piwik-token "asdfjlkasjdflk" -piwik-site-id 4 -piwik-site-url "https://stream.elevate.at"