From 419ec2ae02f6426bcce7822607827c2a44722fa3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 22 Oct 2014 23:27:20 +0200 Subject: daq: flumotion-rrd now uses stateless protocol --- src/daq/flumotion-rrd/s5-flumotion-rrd | 38 ++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 9 deletions(-) (limited to 'src/daq') diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd index bef9488..c2e82eb 100755 --- a/src/daq/flumotion-rrd/s5-flumotion-rrd +++ b/src/daq/flumotion-rrd/s5-flumotion-rrd @@ -156,18 +156,13 @@ class FlumotionRRD(): def _socketReady(self): print 'SFive: connection to sfive hub established' - self._sendInit() +# we are using datagram sockets for now -> must use stateless protocol +# self._sendInit() cnt = self._sendRRDData() print 'SFive: sent %d datasets' % (cnt) reactor.stop() - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - def _sendRRDData(self): try: data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) @@ -184,8 +179,8 @@ class FlumotionRRD(): cnt = 0 for idx in range(0, max - 1): try: - self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, - data_clients[2][idx][0], data_bytes[2][idx][0]) + self._sendDatasetFull(datetime.datetime.utcfromtimestamp(ts), self._duration, + data_clients[2][idx][0], data_bytes[2][idx][0]) ts += self._duration cnt += 1 except ValueError as err: @@ -193,6 +188,31 @@ class FlumotionRRD(): return cnt + def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent): + client_count = int(round(client_count)) if client_count else 0 + bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 + data = { "version": 1, "hostname": self._hostname, + "streamer-id": { + "content-id": self._content_id, + "format": self._format, + "quality": self._quality + }, + "tags": self._tags, + "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, + "data": { + "client-count": client_count, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + def _sendDataset(self, timestamp, duration, client_count, bytes_sent): client_count = int(round(client_count)) if client_count else 0 bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 -- cgit v1.2.3