summaryrefslogtreecommitdiff
path: root/src/daq/flumotion-plug
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-08-24 00:19:21 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-08-24 00:19:21 +0200
commitdf6977ff53dfd844fd554032cd5b29df92d7fa11 (patch)
treeeb9835a3108d762e9079368c9c3f689f8ef2ae18 /src/daq/flumotion-plug
parentflumotion-plug: opening unix socket and sending test data works now (diff)
reconnect works now
Diffstat (limited to 'src/daq/flumotion-plug')
-rw-r--r--src/daq/flumotion-plug/sfive.py50
1 files changed, 33 insertions, 17 deletions
diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py
index 6292ad0..4253f7c 100644
--- a/src/daq/flumotion-plug/sfive.py
+++ b/src/daq/flumotion-plug/sfive.py
@@ -37,6 +37,7 @@ from flumotion.common import messages, i18n, log
from flumotion.common.poller import Poller
from twisted.internet import protocol, reactor
+from socket import error as socket_error
from flumotion.common.i18n import N_
T_ = i18n.gettexter()
@@ -46,23 +47,29 @@ _DEFAULT_POLL_INTERVAL = 5 # in seconds
__version__ = "$Rev$"
class SFiveProto(protocol.ConnectedDatagramProtocol):
- plug = None
def __init__(self, plug):
- self.plug = plug
+ self._plug = plug
def stopProtocol(self):
- self.plug._socketError()
+ self._plug.debug('SFive: protcol stopped')
def startProtocol(self):
- self.plug._socketReady()
+ self._plug._socketReady()
def datagramReceived(self, data):
- self.plug.debug('SFive: received datagram: "%s" (will get ignored)', data)
+ self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data)
def connectionFailed(self, failure):
- self.plug.warning('SFive: "%s"', failure.getErrorMessage())
- self.plug._socketError()
+ self._plug.warning('SFive: "%s"', failure.getErrorMessage())
+ self._plug._socketError()
+
+ def sendDatagram(self, data):
+ try:
+ return self.transport.write(data)
+ except socket_error:
+ self._plug._socketError()
+
class ComponentSFivePlug(base.ComponentPlug):
@@ -78,7 +85,7 @@ class ComponentSFivePlug(base.ComponentPlug):
properties = self.args['properties']
self._socket = properties['socket']
- self._hostename = properties['hostname']
+ self._hostname = properties['hostname']
self._content_id = properties.get('content-id')
self._format = properties.get('format')
self._quality = properties.get('quality')
@@ -86,29 +93,36 @@ class ComponentSFivePlug(base.ComponentPlug):
self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL)
self._sfivepoller = Poller(self._updateSFive, self._duration, start=False)
- self._old_bytes_received = -1
- self._old_bytes_sent = -1
+ self._proto = None
+ self._conn = None
self._initSocket()
def stop(self, component):
if self._sfivepoller:
self._sfivepoller.stop()
- ## TODO: close unix socket
+ if self._conn:
+ self._conn.stopListening()
def _initSocket(self):
- self.debug('SFive: connecting to %s...', self._socket)
- self._cp = SFiveProto(self)
- c = reactor.connectUNIXDatagram(self._socket, self._cp)
+ self.debug('SFive: trying to connect to %s...', self._socket)
+ self._proto = SFiveProto(self)
+ self._conn = reactor.connectUNIXDatagram(self._socket, self._proto)
def _socketError(self):
self.warning('SFive: connection error ... trying reconnect')
if self._sfivepoller:
self._sfivepoller.stop()
- ## TODO: try reconnect...
+ if self._conn:
+ self._conn.stopListening()
+
+ reactor.callLater(5, self._initSocket)
def _socketReady(self):
self.info('SFive: connection to sfive hub established')
+ self._old_bytes_received = -1
+ self._old_bytes_sent = -1
+ self._sendInit()
if self._sfivepoller:
self._sfivepoller.start()
@@ -120,6 +134,7 @@ class ComponentSFivePlug(base.ComponentPlug):
# "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" },
# "tags": [ "elevate", "2014", "discourse" ]
# }
+ self._proto.sendDatagram('hi! this is %s speaking\n' % (self._hostname));
def _updateSFive(self):
client_count = self._component.getClients()
@@ -131,8 +146,6 @@ class ComponentSFivePlug(base.ComponentPlug):
bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0;
self._old_bytes_sent = bytes_sent
- self.debug('SFive: sending data (client-count: %d, bytes_received: %d, bytes_sent: %d)', client_count, bytes_received_diff, bytes_sent_diff)
- self._cp.transport.write('client-count: %d, bytes_received: %d, bytes_sent: %d\n' % (client_count, bytes_received_diff, bytes_sent_diff));
# TODO: create json and send it out
# {
# "start-time": "2014-08-03Z12:34:56.123",
@@ -147,3 +160,6 @@ class ComponentSFivePlug(base.ComponentPlug):
# "bytes-sent": 921734098,
# ....
# }
+ data_str = 'client-count: %d, bytes_received: %d, bytes_sent: %d\n' % (client_count, bytes_received_diff, bytes_sent_diff)
+ self.debug(data_str)
+ self._proto.sendDatagram(data_str);