From ff25fe32eccd388f4997a120c7f76e64645fe09a Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Fri, 16 Oct 2015 23:31:39 +0200 Subject: daq: flumotion plug now also retries after EAGAIN --- src/daq/accesslog/s5-accesslog | 8 +++---- src/daq/flumotion-plug/s5.py | 39 +++++++++++++++++++++++++++------- src/daq/flumotion-rrd/s5-flumotion-rrd | 8 +++---- src/daq/nginx-lua/s5-nginx-lua-fetch | 11 +++++----- 4 files changed, 44 insertions(+), 22 deletions(-) (limited to 'src/daq') diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog index 5026575..b89db22 100755 --- a/src/daq/accesslog/s5-accesslog +++ b/src/daq/accesslog/s5-accesslog @@ -321,7 +321,7 @@ class AccessLog(): print 'SFive: trying to connect to %s...' % (self._socket) self._connected = False self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor) self._conn.startListening() def socketReady(self): @@ -359,7 +359,7 @@ class AccessLog(): "bytes-sent": clients.getBytesSent() } } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, @@ -374,7 +374,7 @@ class AccessLog(): if len(initdata["streamer-id"].keys()) == 0: del initdata["streamer-id"] - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) def _sendDataset(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] @@ -396,7 +396,7 @@ class AccessLog(): if len(data["streamer-id"].keys()) == 0: del data["streamer-id"] - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) if __name__ == '__main__': diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py index f986d23..09119bd 100644 --- a/src/daq/flumotion-plug/s5.py +++ b/src/daq/flumotion-plug/s5.py @@ -41,7 +41,8 @@ from flumotion.component.plugs import base from flumotion.common import messages, i18n, log from flumotion.common.poller import Poller -from twisted.internet import protocol, reactor +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix from socket import error as socket_error import datetime import time @@ -55,6 +56,31 @@ _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket_error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + time.sleep(0.01) + return self.write(data) + else: + raise + class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, plug): @@ -76,12 +102,6 @@ class SFiveProto(protocol.ConnectedDatagramProtocol): def sendDatagram(self, data): try: - ## TODO: twisted will drop messages if the write buffer is full. - ## Some batch importer work around this issue by sleeping - ## and trying again. For live importer the fix is not applicable - ## but also not as common because unlike live sources batch - ## importer produce a lot of data in a very short period of time. - ## Anyway this issue needs to be addressed! return self.transport.write(data) except socket_error as err: self._plug.warning('SFive: sending datagram failed: %s', err) @@ -142,7 +162,8 @@ class ComponentSFivePlug(base.ComponentPlug): def _initSocket(self): self.info('SFive: trying to (re)connect to %s...', self._socket) self._proto = SFiveProto(self) - self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor) + self._conn.startListening() def _socketError(self): if self._sfivepoller: @@ -180,6 +201,8 @@ class ComponentSFivePlug(base.ComponentPlug): bytes_sent_diff = bytes_sent - self._old_bytes_sent self._old_bytes_sent = bytes_sent + self.debug('SFive: updating %s/%s/%s: %d clients, %d/%d bytes sent/received' % + (self._content_id, self._format, self._quality, client_count, bytes_sent_diff, bytes_received_diff)) self._sendDatasetFull(self._start_time, self._duration, client_count, bytes_sent_diff, bytes_received_diff) self._start_time = datetime.datetime.utcnow().replace(microsecond=0) diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd index c2e82eb..949456d 100755 --- a/src/daq/flumotion-rrd/s5-flumotion-rrd +++ b/src/daq/flumotion-rrd/s5-flumotion-rrd @@ -146,7 +146,7 @@ class FlumotionRRD(): def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor) self._conn.startListening() def _socketError(self): @@ -205,13 +205,13 @@ class FlumotionRRD(): "bytes-sent": bytes_sent } } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) def _sendDataset(self, timestamp, duration, client_count, bytes_sent): client_count = int(round(client_count)) if client_count else 0 @@ -223,7 +223,7 @@ class FlumotionRRD(): "bytes-sent": bytes_sent } } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) if __name__ == '__main__': diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch index 7d13ba9..4691a1f 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch @@ -38,7 +38,6 @@ from twisted.internet.defer import Deferred from twisted.web.client import Agent from twisted.web.http_headers import Headers import socket -from time import sleep import re import simplejson as json @@ -97,7 +96,7 @@ class SFivePort(unix.ConnectedDatagramPort): # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... - sleep(0.01) + time.sleep(0.01) return self.write(data) else: raise @@ -293,7 +292,7 @@ class NGXLuaFetcher(): print 'SFive: trying to connect to %s...' % (self._socket) self._connected = False self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor) self._conn.startListening() def socketReady(self): @@ -325,7 +324,7 @@ class NGXLuaFetcher(): "bytes-sent": clients.getBytesSent() } } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, @@ -340,7 +339,7 @@ class NGXLuaFetcher(): if len(initdata["streamer-id"].keys()) == 0: del initdata["streamer-id"] - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) def _sendDataset(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] @@ -362,7 +361,7 @@ class NGXLuaFetcher(): if len(data["streamer-id"].keys()) == 0: del data["streamer-id"] - self._proto.sendDatagram('%s\n' % (json.dumps(data))); + self._proto.sendDatagram('%s\n' % (json.dumps(data))) if __name__ == '__main__': -- cgit v1.2.3