diff options
Diffstat (limited to 'src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go')
-rw-r--r-- | src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go new file mode 100644 index 0000000..1f5d16c --- /dev/null +++ b/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go @@ -0,0 +1,86 @@ +package sfive + +import ( + "time" + "fmt" + + "github.com/equinox0815/graphite-golang" +) + +func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) { + latestId, err = self.getLastUpdateIdInvoke() + if err != nil { + s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err) + return + } + + return +} + +func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) { + client := &graphite.Graphite{Address: forwardHost} + +tryResync: + for { + err := client.Connect() + if err != nil { + s5l.Printf("fwd-graphite: connect returned err: %v", err) + time.Sleep(5 * time.Second) + continue tryResync + } + + lastId, _, err := self.getLastUpdateGraphite(client) + if err != nil { + s5l.Printf("fwd-graphite: lastupdate returned err: %v", err) + client.Disconnect() + time.Sleep(5 * time.Second) + continue tryResync + } + s5l.Printf("fwd-graphite: lastupdate: %d", lastId) + + nextBatch: + for { + updates, err := self.getUpdatesAfterInvoke(lastId) + if err != nil { + s5l.Printf("fwd-graphite: failed reading updates: %v\n", err) + time.Sleep(500 * time.Millisecond) + continue nextBatch + } + + s5l.Printf("fwd-graphite: got %d updates", len(updates)) + + if len(updates) == 0 { + time.Sleep(1 * time.Second) + continue nextBatch + } + + metrics := make([]graphite.Metric, len(updates) * 3) + + for i, update := range updates { + path := basePath + "." + update.StreamId.ContentId + path = path + "." + update.StreamId.Format + path = path + "." + update.StreamId.Quality + + metrics[i*3] = graphite.NewMetric(path + ".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix()) + metrics[i*3 + 1] = graphite.NewMetric(path + ".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix()) + metrics[i*3 + 2] = graphite.NewMetric(path + ".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix()) + } + + err = client.SendMetrics(metrics) + if err != nil { + s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err) + time.Sleep(1 * time.Second) + continue tryResync + } + + s5l.Printf("fwd-graphite: all metrics sent") + lastId = findMaxId(updates) + s5l.Printf("fwd-graphite: new lastid: %d", lastId) + //time.Sleep(1 * time.Second) + } + } +} + +func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) { + self.handleForwardingToGraphite(forwardHost, basePath) +} |