From 27e5a6a0b85f540d9657783a76bebf185f2a70f7 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 17 Oct 2015 06:03:26 +0200 Subject: hub: add support for graphite forwarding --- src/hub/Makefile | 1 + src/hub/src/spreadspace.org/sfive-hub/s5hub.go | 12 +++ src/hub/src/spreadspace.org/sfive/s5srv.go | 27 +++++++ .../spreadspace.org/sfive/s5srvForwardGraphite.go | 86 ++++++++++++++++++++++ src/hub/src/spreadspace.org/sfive/s5store.go | 11 +++ 5 files changed, 137 insertions(+) create mode 100644 src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go diff --git a/src/hub/Makefile b/src/hub/Makefile index 03ab4e5..2a25c93 100644 --- a/src/hub/Makefile +++ b/src/hub/Makefile @@ -41,6 +41,7 @@ getlibs: $(GOCMD) get "github.com/mattn/go-sqlite3" $(GOCMD) get "github.com/zenazn/goji" $(GOCMD) get "code.google.com/p/go-uuid/uuid" + $(GOCMD) get "github.com/equinox0815/graphite-golang" # $(GOCMD) get "github.com/go-sql-driver/mysql" # $(GOCMD) get "github.com/ziutek/mymysql/godrv" diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go index a062029..85274cf 100644 --- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go +++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go @@ -21,6 +21,8 @@ func main() { startWeb := flag.Bool("start-web-server", true, "start a webserver") forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url") forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http") + forwardGraphite := flag.String("forward-graphite", "", "forward to Graphite at this host") + GraphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite") vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application") help := flag.Bool("help", false, "show usage") @@ -90,6 +92,16 @@ func main() { }() } + if *forwardGraphite != "" { + wg.Add(1) + go func() { + defer wg.Done() + s5hl.Println("start graphite forward") + server.RunForwardingToGraphite(*forwardGraphite, *GraphiteBasePath) + s5hl.Println("graphite forward finished") + }() + } + alldone := make(chan bool) go func() { defer func() { alldone <- true }() diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go index fd76f1b..dace0a5 100644 --- a/src/hub/src/spreadspace.org/sfive/s5srv.go +++ b/src/hub/src/spreadspace.org/sfive/s5srv.go @@ -41,6 +41,15 @@ type getHubIdToken struct { response chan getHubIdResult } +type getLastUpdateIdResult struct { + id int + err error +} + +type getLastUpdateIdToken struct { + response chan getLastUpdateIdResult +} + type StatsSinkServer struct { store sqliteStore quit chan bool @@ -51,6 +60,7 @@ type StatsSinkServer struct { getUpdatesAfterChan chan getUpdatesAfterToken getUpdatesChan chan getUpdatesToken getHubIdChan chan getHubIdToken + getLastUpdateIdChan chan getLastUpdateIdToken } func (self StatsSinkServer) appendActor() { @@ -92,6 +102,13 @@ func (self StatsSinkServer) appendActor() { case token := <-self.getHubIdChan: storeId, err := self.store.GetStoreId() token.response <- getHubIdResult{storeId, err} + case token := <-self.getLastUpdateIdChan: + lastUpdateId, err := self.store.GetLastUpdateId() + if(lastUpdateId != nil) { + token.response <- getLastUpdateIdResult{*lastUpdateId, err} + } else { + token.response <- getLastUpdateIdResult{0, err} + } } } } @@ -132,6 +149,14 @@ func (self StatsSinkServer) getHubIdInvoke() (string, error) { return res.id, res.err } +func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) { + token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)} + defer close(token.response) + self.getLastUpdateIdChan <- token + res := <-token.response + return res.id, res.err +} + func (self StatsSinkServer) Close() { self.quit <- true <-self.done @@ -143,6 +168,7 @@ func (self StatsSinkServer) Close() { close(self.getUpdatesAfterChan) close(self.getUpdatesChan) close(self.getHubIdChan) + close(self.getLastUpdateIdChan) self.store.Close() } @@ -162,6 +188,7 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) { server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1) server.getUpdatesChan = make(chan getUpdatesToken, 3) server.getHubIdChan = make(chan getHubIdToken, 1) + server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1) go server.appendActor() return } diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go new file mode 100644 index 0000000..1f5d16c --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -0,0 +1,86 @@ +package sfive + +import ( + "time" + "fmt" + + "github.com/equinox0815/graphite-golang" +) + +func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) { + latestId, err = self.getLastUpdateIdInvoke() + if err != nil { + s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err) + return + } + + return +} + +func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) { + client := &graphite.Graphite{Address: forwardHost} + +tryResync: + for { + err := client.Connect() + if err != nil { + s5l.Printf("fwd-graphite: connect returned err: %v", err) + time.Sleep(5 * time.Second) + continue tryResync + } + + lastId, _, err := self.getLastUpdateGraphite(client) + if err != nil { + s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) + client.Disconnect() + time.Sleep(5 * time.Second) + continue tryResync + } + s5l.Printf("fwd-graphite: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-graphite: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-graphite: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + metrics := make([]graphite.Metric, len(updates) * 3) + + for i, update := range updates { + path := basePath + "." + update.StreamId.ContentId + path = path + "." + update.StreamId.Format + path = path + "." + update.StreamId.Quality + + metrics[i*3] = graphite.NewMetric(path + ".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix()) + metrics[i*3 + 1] = graphite.NewMetric(path + ".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix()) + metrics[i*3 + 2] = graphite.NewMetric(path + ".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix()) + } + + err = client.SendMetrics(metrics) + if err != nil { + s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err) + time.Sleep(1 * time.Second) + continue tryResync + } + + s5l.Printf("fwd-graphite: all metrics sent") + lastId = findMaxId(updates) + s5l.Printf("fwd-graphite: new lastid: %d", lastId) + //time.Sleep(1 * time.Second) + } + } +} + +func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) { + self.handleForwardingToGraphite(forwardHost, basePath) +} diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go index 3fbcf39..d30e365 100644 --- a/src/hub/src/spreadspace.org/sfive/s5store.go +++ b/src/hub/src/spreadspace.org/sfive/s5store.go @@ -510,6 +510,17 @@ func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error return } +func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) { + result := lastUpdateQueryResult{} + err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn,) + if err == nil { + updateId = result.MaxDataUpdateId + } else { + s5l.Printf("db: failed to find max DataUpdateId: %v", err) + } + return +} + type statsResult struct { UpdateCount *int HubCount *int -- cgit v1.2.3