From b2479736c8ed08df83ae9436ec90024db59cad79 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Wed, 22 Oct 2014 23:55:24 +0200 Subject: daq: flumotion-plug now uses stateless protocol --- src/daq/flumotion-plug/s5.py | 47 +++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 12 deletions(-) (limited to 'src/daq/flumotion-plug/s5.py') diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py index 643d0f6..f986d23 100644 --- a/src/daq/flumotion-plug/s5.py +++ b/src/daq/flumotion-plug/s5.py @@ -154,7 +154,8 @@ class ComponentSFivePlug(base.ComponentPlug): def _socketReady(self): self.info('SFive: connection to sfive hub established') - self._sendInit() +# we are using datagram sockets for now -> must use stateless protocol +# self._sendInit() if self._sfivepoller: # try to be aligned with current time # this will eventually get out of sync but for now this is good enough @@ -169,12 +170,6 @@ class ComponentSFivePlug(base.ComponentPlug): self._sfivepoller.start() self.info('SFive: poller started at %sZ' % (self._start_time.isoformat('T'))) - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) - def _updateSFive(self): client_count = self._component.getClients() bytes_received = self._component.getBytesReceived() @@ -185,13 +180,41 @@ class ComponentSFivePlug(base.ComponentPlug): bytes_sent_diff = bytes_sent - self._old_bytes_sent self._old_bytes_sent = bytes_sent - data = { "start-time": self._start_time.isoformat('T') + 'Z', - "duration-ms": self._duration * 1000, + self._sendDatasetFull(self._start_time, self._duration, client_count, bytes_sent_diff, bytes_received_diff) + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + + + def _sendDatasetFull(self, timestamp, duration, client_count, bytes_sent, bytes_received): + data = { "version": 1, "hostname": self._hostname, + "streamer-id": { + "content-id": self._content_id, + "format": self._format, + "quality": self._quality + }, + "tags": self._tags, + "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, "data": { "client-count": client_count, - "bytes-received": bytes_received_diff, - "bytes-sent": bytes_sent_diff + "bytes-received": bytes_received, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))) + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) + + def _sendDataset(self, timestamp, duration, client_count, bytes_sent, bytes_received): + data = { "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, + "data": { + "client-count": client_count, + "bytes-received": bytes_received, + "bytes-sent": bytes_sent } } self._proto.sendDatagram('%s\n' % (json.dumps(data))) - self._start_time = datetime.datetime.utcnow().replace(microsecond=0) -- cgit v1.2.3