diff options
Diffstat (limited to 'src/daq/flumotion-plug/sfive.py')
-rw-r--r-- | src/daq/flumotion-plug/sfive.py | 50 |
1 files changed, 33 insertions, 17 deletions
diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py index 6292ad0..4253f7c 100644 --- a/src/daq/flumotion-plug/sfive.py +++ b/src/daq/flumotion-plug/sfive.py @@ -37,6 +37,7 @@ from flumotion.common import messages, i18n, log from flumotion.common.poller import Poller from twisted.internet import protocol, reactor +from socket import error as socket_error from flumotion.common.i18n import N_ T_ = i18n.gettexter() @@ -46,23 +47,29 @@ _DEFAULT_POLL_INTERVAL = 5 # in seconds __version__ = "$Rev$" class SFiveProto(protocol.ConnectedDatagramProtocol): - plug = None def __init__(self, plug): - self.plug = plug + self._plug = plug def stopProtocol(self): - self.plug._socketError() + self._plug.debug('SFive: protcol stopped') def startProtocol(self): - self.plug._socketReady() + self._plug._socketReady() def datagramReceived(self, data): - self.plug.debug('SFive: received datagram: "%s" (will get ignored)', data) + self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) def connectionFailed(self, failure): - self.plug.warning('SFive: "%s"', failure.getErrorMessage()) - self.plug._socketError() + self._plug.warning('SFive: "%s"', failure.getErrorMessage()) + self._plug._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket_error: + self._plug._socketError() + class ComponentSFivePlug(base.ComponentPlug): @@ -78,7 +85,7 @@ class ComponentSFivePlug(base.ComponentPlug): properties = self.args['properties'] self._socket = properties['socket'] - self._hostename = properties['hostname'] + self._hostname = properties['hostname'] self._content_id = properties.get('content-id') self._format = properties.get('format') self._quality = properties.get('quality') @@ -86,29 +93,36 @@ class ComponentSFivePlug(base.ComponentPlug): self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) - self._old_bytes_received = -1 - self._old_bytes_sent = -1 + self._proto = None + self._conn = None self._initSocket() def stop(self, component): if self._sfivepoller: self._sfivepoller.stop() - ## TODO: close unix socket + if self._conn: + self._conn.stopListening() def _initSocket(self): - self.debug('SFive: connecting to %s...', self._socket) - self._cp = SFiveProto(self) - c = reactor.connectUNIXDatagram(self._socket, self._cp) + self.debug('SFive: trying to connect to %s...', self._socket) + self._proto = SFiveProto(self) + self._conn = reactor.connectUNIXDatagram(self._socket, self._proto) def _socketError(self): self.warning('SFive: connection error ... trying reconnect') if self._sfivepoller: self._sfivepoller.stop() - ## TODO: try reconnect... + if self._conn: + self._conn.stopListening() + + reactor.callLater(5, self._initSocket) def _socketReady(self): self.info('SFive: connection to sfive hub established') + self._old_bytes_received = -1 + self._old_bytes_sent = -1 + self._sendInit() if self._sfivepoller: self._sfivepoller.start() @@ -120,6 +134,7 @@ class ComponentSFivePlug(base.ComponentPlug): # "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" }, # "tags": [ "elevate", "2014", "discourse" ] # } + self._proto.sendDatagram('hi! this is %s speaking\n' % (self._hostname)); def _updateSFive(self): client_count = self._component.getClients() @@ -131,8 +146,6 @@ class ComponentSFivePlug(base.ComponentPlug): bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; self._old_bytes_sent = bytes_sent - self.debug('SFive: sending data (client-count: %d, bytes_received: %d, bytes_sent: %d)', client_count, bytes_received_diff, bytes_sent_diff) - self._cp.transport.write('client-count: %d, bytes_received: %d, bytes_sent: %d\n' % (client_count, bytes_received_diff, bytes_sent_diff)); # TODO: create json and send it out # { # "start-time": "2014-08-03Z12:34:56.123", @@ -147,3 +160,6 @@ class ComponentSFivePlug(base.ComponentPlug): # "bytes-sent": 921734098, # .... # } + data_str = 'client-count: %d, bytes_received: %d, bytes_sent: %d\n' % (client_count, bytes_received_diff, bytes_sent_diff) + self.debug(data_str) + self._proto.sendDatagram(data_str); |