summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
blob: 1f5d16c473f7394577f6a83546b28d02888c8fc2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package sfive

import (
	"time"
	"fmt"

	"github.com/equinox0815/graphite-golang"
)

func (self StatsSinkServer) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) {
	latestId, err = self.getLastUpdateIdInvoke()
	if err != nil {
		s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err)
		return
	}

	return
}

func (self StatsSinkServer) handleForwardingToGraphite(forwardHost string, basePath string) {
  client := &graphite.Graphite{Address: forwardHost}

tryResync:
	for {
		err := client.Connect()
		if err != nil {
			s5l.Printf("fwd-graphite: connect returned err: %v", err)
			time.Sleep(5 * time.Second)
			continue tryResync
		}

		lastId, _, err := self.getLastUpdateGraphite(client)
		if err != nil {
			s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
			client.Disconnect()
			time.Sleep(5 * time.Second)
			continue tryResync
		}
		s5l.Printf("fwd-graphite: lastupdate: %d", lastId)

	nextBatch:
		for {
			updates, err := self.getUpdatesAfterInvoke(lastId)
			if err != nil {
				s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
				time.Sleep(500 * time.Millisecond)
				continue nextBatch
			}

			s5l.Printf("fwd-graphite: got %d updates", len(updates))

			if len(updates) == 0 {
				time.Sleep(1 * time.Second)
				continue nextBatch
			}

			metrics := make([]graphite.Metric, len(updates) * 3)

			for i, update := range updates {
				path := basePath + "." + update.StreamId.ContentId
				path = path + "." + update.StreamId.Format
				path = path + "." + update.StreamId.Quality

				metrics[i*3] = graphite.NewMetric(path + ".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix())
				metrics[i*3 + 1] = graphite.NewMetric(path + ".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix())
				metrics[i*3 + 2] = graphite.NewMetric(path + ".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix())
			}

			err = client.SendMetrics(metrics)
			if err != nil {
				s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err)
				time.Sleep(1 * time.Second)
				continue tryResync
			}

			s5l.Printf("fwd-graphite: all metrics sent")
			lastId = findMaxId(updates)
			s5l.Printf("fwd-graphite: new lastid: %d", lastId)
			//time.Sleep(1 * time.Second)
		}
	}
}

func (self StatsSinkServer) RunForwardingToGraphite(forwardHost string, basePath string) {
	self.handleForwardingToGraphite(forwardHost, basePath)
}