summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-22 23:55:24 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-22 23:56:03 +0200
commitb2479736c8ed08df83ae9436ec90024db59cad79 (patch)
treec1858c1d931c830b25ba1ae31b65c7ea775c651c /src/daq
parentdaq: flumotion-rrd now uses stateless protocol (diff)
daq: flumotion-plug now uses stateless protocol
Diffstat (limited to 'src/daq')
-rw-r--r--src/daq/flumotion-plug/s5.py47
1 files changed, 35 insertions, 12 deletions
diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py
index 643d0f6..f986d23 100644
--- a/src/daq/flumotion-plug/s5.py
+++ b/src/daq/flumotion-plug/s5.py
@@ -154,7 +154,8 @@ class ComponentSFivePlug(base.ComponentPlug):
def _socketReady(self):
self.info('SFive: connection to sfive hub established')
- self._sendInit()
+# we are using datagram sockets for now -> must use stateless protocol
+# self._sendInit()
if self._sfivepoller:
# try to be aligned with current time
# this will eventually get out of sync but for now this is good enough
@@ -169,12 +170,6 @@ class ComponentSFivePlug(base.ComponentPlug):
self._sfivepoller.start()
self.info('SFive: poller started at %sZ' % (self._start_time.isoformat('T')))
- def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
-
def _updateSFive(self):
client_count = self._component.getClients()
bytes_received = self._component.getBytesReceived()
@@ -185,13 +180,41 @@ class ComponentSFivePlug(base.ComponentPlug):
bytes_sent_diff = bytes_sent - self._old_bytes_sent
self._old_bytes_sent = bytes_sent
- data = { "start-time": self._start_time.isoformat('T') + 'Z',
- "duration-ms": self._duration * 1000,
+ self._sendDatasetFull(self._start_time, self._duration, client_count, bytes_sent_diff, bytes_received_diff)
+ self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
+
+
+ def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received):
+ data = { "version": 1, "hostname": self._hostname,
+ "streamer-id": {
+ "content-id": self._content_id,
+ "format": self._format,
+ "quality": self._quality
+ },
+ "tags": self._tags,
+ "start-time": timestamp.isoformat('T') + 'Z',
+ "duration-ms": duration * 1000,
"data": {
"client-count": client_count,
- "bytes-received": bytes_received_diff,
- "bytes-sent": bytes_sent_diff
+ "bytes-received": bytes_received,
+ "bytes-sent": bytes_sent
+ }
+ }
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
+
+ def _sendDataset(self, timestamp, duration, client_count, bytes_sent, bytes_received):
+ data = { "start-time": timestamp.isoformat('T') + 'Z',
+ "duration-ms": duration * 1000,
+ "data": {
+ "client-count": client_count,
+ "bytes-received": bytes_received,
+ "bytes-sent": bytes_sent
}
}
self._proto.sendDatagram('%s\n' % (json.dumps(data)))
- self._start_time = datetime.datetime.utcnow().replace(microsecond=0)