summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-22 23:27:20 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-22 23:56:03 +0200
commit419ec2ae02f6426bcce7822607827c2a44722fa3 (patch)
tree9165e7fa9024f8cde5488c17b90a220da6d904b5
parentdaq: nginx-lua-fetch now uses stateless protocol (diff)
daq: flumotion-rrd now uses stateless protocol
-rwxr-xr-xsrc/daq/flumotion-rrd/s5-flumotion-rrd38
1 files changed, 29 insertions, 9 deletions
diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd
index bef9488..c2e82eb 100755
--- a/src/daq/flumotion-rrd/s5-flumotion-rrd
+++ b/src/daq/flumotion-rrd/s5-flumotion-rrd
@@ -156,18 +156,13 @@ class FlumotionRRD():
def _socketReady(self):
print 'SFive: connection to sfive hub established'
- self._sendInit()
+# we are using datagram sockets for now -> must use stateless protocol
+# self._sendInit()
cnt = self._sendRRDData()
print 'SFive: sent %d datasets' % (cnt)
reactor.stop()
- def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
-
def _sendRRDData(self):
try:
data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end)
@@ -184,8 +179,8 @@ class FlumotionRRD():
cnt = 0
for idx in range(0, max - 1):
try:
- self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration,
- data_clients[2][idx][0], data_bytes[2][idx][0])
+ self._sendDatasetFull(datetime.datetime.utcfromtimestamp(ts), self._duration,
+ data_clients[2][idx][0], data_bytes[2][idx][0])
ts += self._duration
cnt += 1
except ValueError as err:
@@ -193,6 +188,31 @@ class FlumotionRRD():
return cnt
+ def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent):
+ client_count = int(round(client_count)) if client_count else 0
+ bytes_sent = int(round(bytes_sent)) if bytes_sent else 0
+ data = { "version": 1, "hostname": self._hostname,
+ "streamer-id": {
+ "content-id": self._content_id,
+ "format": self._format,
+ "quality": self._quality
+ },
+ "tags": self._tags,
+ "start-time": timestamp.isoformat('T') + 'Z',
+ "duration-ms": duration * 1000,
+ "data": {
+ "client-count": client_count,
+ "bytes-sent": bytes_sent
+ }
+ }
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
def _sendDataset(self, timestamp, duration, client_count, bytes_sent):
client_count = int(round(client_count)) if client_count else 0
bytes_sent = int(round(bytes_sent)) if bytes_sent else 0