summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2015-10-17 06:03:26 +0200
committerChristian Pointner <equinox@spreadspace.org>2015-10-17 06:03:26 +0200
commit27e5a6a0b85f540d9657783a76bebf185f2a70f7 (patch)
tree4640420cf65e009395898249fbc8d8d7d495a7f4
parentdaq: increased max message size to create bigger send buffer (diff)
hub: add support for graphite forwarding
-rw-r--r--src/hub/Makefile1
-rw-r--r--src/hub/src/spreadspace.org/sfive-hub/s5hub.go12
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srv.go27
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go86
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5store.go11
5 files changed, 137 insertions, 0 deletions
diff --git a/src/hub/Makefile b/src/hub/Makefile
index 03ab4e5..2a25c93 100644
--- a/src/hub/Makefile
+++ b/src/hub/Makefile
@@ -41,6 +41,7 @@ getlibs:
$(GOCMD) get "github.com/mattn/go-sqlite3"
$(GOCMD) get "github.com/zenazn/goji"
$(GOCMD) get "code.google.com/p/go-uuid/uuid"
+ $(GOCMD) get "github.com/equinox0815/graphite-golang"
# $(GOCMD) get "github.com/go-sql-driver/mysql"
# $(GOCMD) get "github.com/ziutek/mymysql/godrv"
diff --git a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
index a062029..85274cf 100644
--- a/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
+++ b/src/hub/src/spreadspace.org/sfive-hub/s5hub.go
@@ -21,6 +21,8 @@ func main() {
startWeb := flag.Bool("start-web-server", true, "start a webserver")
forward := flag.String("forward-url", "", "forward to another sfive-server with http server at base-url")
forwardES := flag.String("forward-es-url", "", "forward to an ElasticSearch *index* via http")
+ forwardGraphite := flag.String("forward-graphite", "", "forward to Graphite at this host")
+ GraphiteBasePath := flag.String("graphite-base-path", "sfive", "use this as base for all paths on graphite")
vizAppDir := flag.String("viz-dir", "/usr/share/sfive/viz", "base-path to the viz application")
help := flag.Bool("help", false, "show usage")
@@ -90,6 +92,16 @@ func main() {
}()
}
+ if *forwardGraphite != "" {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ s5hl.Println("start graphite forward")
+ server.RunForwardingToGraphite(*forwardGraphite, *GraphiteBasePath)
+ s5hl.Println("graphite forward finished")
+ }()
+ }
+
alldone := make(chan bool)
go func() {
defer func() { alldone <- true }()
diff --git a/src/hub/src/spreadspace.org/sfive/s5srv.go b/src/hub/src/spreadspace.org/sfive/s5srv.go
index fd76f1b..dace0a5 100644
--- a/src/hub/src/spreadspace.org/sfive/s5srv.go
+++ b/src/hub/src/spreadspace.org/sfive/s5srv.go
@@ -41,6 +41,15 @@ type getHubIdToken struct {
response chan getHubIdResult
}
+type getLastUpdateIdResult struct {
+ id int
+ err error
+}
+
+type getLastUpdateIdToken struct {
+ response chan getLastUpdateIdResult
+}
+
type StatsSinkServer struct {
store sqliteStore
quit chan bool
@@ -51,6 +60,7 @@ type StatsSinkServer struct {
getUpdatesAfterChan chan getUpdatesAfterToken
getUpdatesChan chan getUpdatesToken
getHubIdChan chan getHubIdToken
+ getLastUpdateIdChan chan getLastUpdateIdToken
}
func (self StatsSinkServer) appendActor() {
@@ -92,6 +102,13 @@ func (self StatsSinkServer) appendActor() {
case token := <-self.getHubIdChan:
storeId, err := self.store.GetStoreId()
token.response <- getHubIdResult{storeId, err}
+ case token := <-self.getLastUpdateIdChan:
+ lastUpdateId, err := self.store.GetLastUpdateId()
+ if(lastUpdateId != nil) {
+ token.response <- getLastUpdateIdResult{*lastUpdateId, err}
+ } else {
+ token.response <- getLastUpdateIdResult{0, err}
+ }
}
}
}
@@ -132,6 +149,14 @@ func (self StatsSinkServer) getHubIdInvoke() (string, error) {
return res.id, res.err
}
+func (self StatsSinkServer) getLastUpdateIdInvoke() (int, error) {
+ token := getLastUpdateIdToken{response: make(chan getLastUpdateIdResult, 1)}
+ defer close(token.response)
+ self.getLastUpdateIdChan <- token
+ res := <-token.response
+ return res.id, res.err
+}
+
func (self StatsSinkServer) Close() {
self.quit <- true
<-self.done
@@ -143,6 +168,7 @@ func (self StatsSinkServer) Close() {
close(self.getUpdatesAfterChan)
close(self.getUpdatesChan)
close(self.getHubIdChan)
+ close(self.getLastUpdateIdChan)
self.store.Close()
}
@@ -162,6 +188,7 @@ func NewServer(mysql bool, dbPath string) (server *StatsSinkServer, err error) {
server.getUpdatesAfterChan = make(chan getUpdatesAfterToken, 1)
server.getUpdatesChan = make(chan getUpdatesToken, 3)
server.getHubIdChan = make(chan getHubIdToken, 1)
+ server.getLastUpdateIdChan = make(chan getLastUpdateIdToken, 1)
go server.appendActor()
return
}
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
new file mode 100644
index 0000000..1f5d16c
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -0,0 +1,86 @@
+package sfive
+
+import (
+ "time"
+ "fmt"
+
+ "github.com/equinox0815/graphite-golang"
+)
+
+func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) {
+ latestId, err = self.getLastUpdateIdInvoke()
+ if err != nil {
+ s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err)
+ return
+ }
+
+ return
+}
+
+func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) {
+ client := &graphite.Graphite{Address: forwardHost}
+
+tryResync:
+ for {
+ err := client.Connect()
+ if err != nil {
+ s5l.Printf("fwd-graphite: connect returned err: %v", err)
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+
+ lastId, _, err := self.getLastUpdateGraphite(client)
+ if err != nil {
+ s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
+ client.Disconnect()
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+ s5l.Printf("fwd-graphite: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-graphite: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ metrics := make([]graphite.Metric, len(updates) * 3)
+
+ for i, update := range updates {
+ path := basePath + "." + update.StreamId.ContentId
+ path = path + "." + update.StreamId.Format
+ path = path + "." + update.StreamId.Quality
+
+ metrics[i*3] = graphite.NewMetric(path + ".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix())
+ metrics[i*3 + 1] = graphite.NewMetric(path + ".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix())
+ metrics[i*3 + 2] = graphite.NewMetric(path + ".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix())
+ }
+
+ err = client.SendMetrics(metrics)
+ if err != nil {
+ s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+
+ s5l.Printf("fwd-graphite: all metrics sent")
+ lastId = findMaxId(updates)
+ s5l.Printf("fwd-graphite: new lastid: %d", lastId)
+ //time.Sleep(1 * time.Second)
+ }
+ }
+}
+
+func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) {
+ self.handleForwardingToGraphite(forwardHost, basePath)
+}
diff --git a/src/hub/src/spreadspace.org/sfive/s5store.go b/src/hub/src/spreadspace.org/sfive/s5store.go
index 3fbcf39..d30e365 100644
--- a/src/hub/src/spreadspace.org/sfive/s5store.go
+++ b/src/hub/src/spreadspace.org/sfive/s5store.go
@@ -510,6 +510,17 @@ func (s sqliteStore) GetLastUpdateForUuid(uuid string) (updateId *int, err error
return
}
+func (s sqliteStore) GetLastUpdateId() (updateId *int, err error) {
+ result := lastUpdateQueryResult{}
+ err = s.db.SelectOne(&result, "select max(Id) as MaxDataUpdateId from "+dataUpdatesTn,)
+ if err == nil {
+ updateId = result.MaxDataUpdateId
+ } else {
+ s5l.Printf("db: failed to find max DataUpdateId: %v", err)
+ }
+ return
+}
+
type statsResult struct {
UpdateCount *int
HubCount *int