From 786303ed3cbc6c3923dc6f25ca97cca50225425e Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 12 Oct 2014 19:53:21 +0200 Subject: fixed lost messages for batch importer --- src/daq/flumotion-rrd/sfive-flumotion-rrd.py | 39 +++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) (limited to 'src/daq/flumotion-rrd') diff --git a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py index f749929..86422fa 100755 --- a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py +++ b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py @@ -31,8 +31,11 @@ # along with sfive. If not, see . # -from twisted.internet import protocol, reactor -from socket import error as socket_error +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix +import socket +from time import sleep + import simplejson as json import datetime import rrdtool @@ -41,6 +44,31 @@ _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): @@ -59,7 +87,7 @@ class SFiveProto(protocol.ConnectedDatagramProtocol): def sendDatagram(self, data): try: return self.transport.write(data) - except socket_error as err: + except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer._socketError() @@ -89,7 +117,8 @@ class FlumotionRRD(): def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) - self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() def _socketError(self): if self._conn: @@ -164,8 +193,6 @@ class FlumotionRRD(): self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - if __name__ == '__main__': import argparse -- cgit v1.2.3