From 1758d25b9e6f22b6d8a0116e68962d510078f60f Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sat, 23 Aug 2014 22:59:08 +0200 Subject: flumotion-plug: opening unix socket and sending test data works now --- src/daq/flumotion-plug/sfive.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) (limited to 'src/daq/flumotion-plug/sfive.py') diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py index a192f92..6292ad0 100644 --- a/src/daq/flumotion-plug/sfive.py +++ b/src/daq/flumotion-plug/sfive.py @@ -36,6 +36,8 @@ from flumotion.component.plugs import base from flumotion.common import messages, i18n, log from flumotion.common.poller import Poller +from twisted.internet import protocol, reactor + from flumotion.common.i18n import N_ T_ = i18n.gettexter() @@ -43,6 +45,26 @@ _DEFAULT_POLL_INTERVAL = 5 # in seconds __version__ = "$Rev$" +class SFiveProto(protocol.ConnectedDatagramProtocol): + plug = None + + def __init__(self, plug): + self.plug = plug + + def stopProtocol(self): + self.plug._socketError() + + def startProtocol(self): + self.plug._socketReady() + + def datagramReceived(self, data): + self.plug.debug('SFive: received datagram: "%s" (will get ignored)', data) + + def connectionFailed(self, failure): + self.plug.warning('SFive: "%s"', failure.getErrorMessage()) + self.plug._socketError() + + class ComponentSFivePlug(base.ComponentPlug): """Class to send statistics to the spreadspace streaming statistic suite""" @@ -76,12 +98,13 @@ class ComponentSFivePlug(base.ComponentPlug): def _initSocket(self): self.debug('SFive: connecting to %s...', self._socket) - ## TODO: init unix socket and call _socketReady when socket is ready - ## TODO: how handle connection drop??? - self._socketReady() + self._cp = SFiveProto(self) + c = reactor.connectUNIXDatagram(self._socket, self._cp) def _socketError(self): - self.warning('SFive: connection lost... trying reconnect') + self.warning('SFive: connection error ... trying reconnect') + if self._sfivepoller: + self._sfivepoller.stop() ## TODO: try reconnect... def _socketReady(self): @@ -109,6 +132,7 @@ class ComponentSFivePlug(base.ComponentPlug): self._old_bytes_sent = bytes_sent self.debug('SFive: sending data (client-count: %d, bytes_received: %d, bytes_sent: %d)', client_count, bytes_received_diff, bytes_sent_diff) + self._cp.transport.write('client-count: %d, bytes_received: %d, bytes_sent: %d\n' % (client_count, bytes_received_diff, bytes_sent_diff)); # TODO: create json and send it out # { # "start-time": "2014-08-03Z12:34:56.123", -- cgit v1.2.3