summaryrefslogtreecommitdiff
path: root/src/hub/src/spreadspace.org/sfive/s5srvForwardGraphite.go
blob: d865dacde297757862f4bb671210904fb8975dcc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package sfive

import (
	"fmt"
	"time"

	"github.com/equinox0815/graphite-golang"
)

func (srv Server) getLastUpdateGraphite(conn *graphite.Graphite) (latestId int, storeId string, err error) {
	latestId, err = srv.getLastUpdateIdInvoke()
	if err != nil {
		s5l.Printf("fwd-graphite: failed to get own hubid: %v\n", err)
		return
	}

	return
}

func (srv Server) handleForwardingToGraphite(forwardHost string, basePath string) {
tryResync:
	for {
		client, err := graphite.NewGraphiteFromAddress(forwardHost)
		if err != nil {
			s5l.Printf("fwd-graphite: connect returned err: %v", err)
			time.Sleep(5 * time.Second)
			continue tryResync
		}

		lastId, _, err := srv.getLastUpdateGraphite(client)
		if err != nil {
			s5l.Printf("fwd-graphite: lastupdate returned err: %v", err)
			client.Disconnect()
			time.Sleep(5 * time.Second)
			continue tryResync
		}
		s5l.Printf("fwd-graphite: lastupdate: %d", lastId)

	nextBatch:
		for {
			updates, err := srv.getUpdatesAfterInvoke(lastId, 5000)
			if err != nil {
				s5l.Printf("fwd-graphite: failed reading updates: %v\n", err)
				time.Sleep(500 * time.Millisecond)
				continue nextBatch
			}

			s5l.Printf("fwd-graphite: got %d updates", len(updates))

			if len(updates) == 0 {
				time.Sleep(1 * time.Second)
				continue nextBatch
			}

			metrics := make([]graphite.Metric, len(updates)*3)

			for i, update := range updates {
				path := basePath + "." + update.Hostname
				path = path + "." + update.StreamId.ContentId
				path = path + "." + update.StreamId.Format
				path = path + "." + update.StreamId.Quality

				metrics[i*3] = graphite.NewMetric(path+".client-count", fmt.Sprintf("%d", update.Data.ClientCount), update.StartTime.Unix())
				metrics[i*3+1] = graphite.NewMetric(path+".bytes-received", fmt.Sprintf("%d", update.Data.BytesReceived), update.StartTime.Unix())
				metrics[i*3+2] = graphite.NewMetric(path+".bytes-sent", fmt.Sprintf("%d", update.Data.BytesSent), update.StartTime.Unix())
			}

			err = client.SendMetrics(metrics)
			if err != nil {
				s5l.Printf("fwd-graphite: sending metrics failed: %v\n", err)
				time.Sleep(1 * time.Second)
				continue tryResync
			}

			s5l.Printf("fwd-graphite: all metrics sent")
			lastId = findMaxId(updates)
			s5l.Printf("fwd-graphite: new lastid: %d", lastId)
			//time.Sleep(1 * time.Second)
		}
	}
}

func (srv Server) RunForwardingToGraphite(forwardHost string, basePath string) {
	srv.handleForwardingToGraphite(forwardHost, basePath)
}