summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go')
-rw-r--r--src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go86
1 files changed, 86 insertions, 0 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
new file mode 100644
index 0000000..1f5d16c
--- /dev/null
+++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
@@ -0,0 +1,86 @@
+package sfive
+
+import (
+ "time"
+ "fmt"
+
+ "github.com/equinox0815/graphite-golang"
+)
+
+func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) {
+ latestId, err = self.getLastUpdateIdInvoke()
+ if err != nil {
+ s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err)
+ return
+ }
+
+ return
+}
+
+func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) {
+ client := &graphite.Graphite{Address: forwardHost}
+
+tryResync:
+ for {
+ err := client.Connect()
+ if err != nil {
+ s5l.Printf("fwd-graphite: connect returned err: %v", err)
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+
+ lastId, _, err := self.getLastUpdateGraphite(client)
+ if err != nil {
+ s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
+ client.Disconnect()
+ time.Sleep(5 * time.Second)
+ continue tryResync
+ }
+ s5l.Printf("fwd-graphite: lastupdate: %d", lastId)
+
+ nextBatch:
+ for {
+ updates, err := self.getUpdatesAfterInvoke(lastId)
+ if err != nil {
+ s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
+ time.Sleep(500 * time.Millisecond)
+ continue nextBatch
+ }
+
+ s5l.Printf("fwd-graphite: got %d updates", len(updates))
+
+ if len(updates) == 0 {
+ time.Sleep(1 * time.Second)
+ continue nextBatch
+ }
+
+ metrics := make([]graphite.Metric, len(updates) * 3)
+
+ for i, update := range updates {
+ path := basePath + "." + update.StreamId.ContentId
+ path = path + "." + update.StreamId.Format
+ path = path + "." + update.StreamId.Quality
+
+ metrics[i*3] = graphite.NewMetric(path + ".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix())
+ metrics[i*3 + 1] = graphite.NewMetric(path + ".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix())
+ metrics[i*3 + 2] = graphite.NewMetric(path + ".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix())
+ }
+
+ err = client.SendMetrics(metrics)
+ if err != nil {
+ s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err)
+ time.Sleep(1 * time.Second)
+ continue tryResync
+ }
+
+ s5l.Printf("fwd-graphite: all metrics sent")
+ lastId = findMaxId(updates)
+ s5l.Printf("fwd-graphite: new lastid: %d", lastId)
+ //time.Sleep(1 * time.Second)
+ }
+ }
+}
+
+func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) {
+ self.handleForwardingToGraphite(forwardHost, basePath)
+}