summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2015-10-16 23:31:39 +0200
committerChristian Pointner <equinox@spreadspace.org>2015-10-16 23:32:19 +0200
commitff25fe32eccd388f4997a120c7f76e64645fe09a (patch)
tree41cd3fbb31f3372fa54e1a25f9ea4fded04eb658 /src/daq
parentremoved vet and format targets from default build (diff)
daq: flumotion plug now also retries after EAGAIN
Diffstat (limited to 'src/daq')
-rwxr-xr-xsrc/daq/accesslog/s5-accesslog8
-rw-r--r--src/daq/flumotion-plug/s5.py39
-rwxr-xr-xsrc/daq/flumotion-rrd/s5-flumotion-rrd8
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch11
4 files changed, 44 insertions, 22 deletions
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog
index 5026575..b89db22 100755
--- a/src/daq/accesslog/s5-accesslog
+++ b/src/daq/accesslog/s5-accesslog
@@ -321,7 +321,7 @@ class AccessLog():
print 'SFive: trying to connect to %s...' % (self._socket)
self._connected = False
self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor)
self._conn.startListening()
def socketReady(self):
@@ -359,7 +359,7 @@ class AccessLog():
"bytes-sent": clients.getBytesSent()
}
}
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
initdata = { "version": 1, "hostname": self._hostname,
@@ -374,7 +374,7 @@ class AccessLog():
if len(initdata["streamer-id"].keys()) == 0:
del initdata["streamer-id"]
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
def _sendDataset(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
@@ -396,7 +396,7 @@ class AccessLog():
if len(data["streamer-id"].keys()) == 0:
del data["streamer-id"]
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
if __name__ == '__main__':
diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py
index f986d23..09119bd 100644
--- a/src/daq/flumotion-plug/s5.py
+++ b/src/daq/flumotion-plug/s5.py
@@ -41,7 +41,8 @@ from flumotion.component.plugs import base
from flumotion.common import messages, i18n, log
from flumotion.common.poller import Poller
-from twisted.internet import protocol, reactor
+from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
+from twisted.internet import protocol, reactor, unix
from socket import error as socket_error
import datetime
import time
@@ -55,6 +56,31 @@ _MAX_PACKET_SIZE = 8192 # in bytes
__version__ = "$Rev$"
+class SFivePort(unix.ConnectedDatagramPort):
+
+ def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
+ unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
+
+ def write(self, data):
+ try:
+ return self.socket.send(data)
+ except socket_error, se:
+ no = se.args[0]
+ if no == EINTR:
+ return self.write(data)
+ elif no == EMSGSIZE:
+ raise error.MessageLengthError, "message too long"
+ elif no == ECONNREFUSED:
+ self.protocol.connectionRefused()
+ elif no == EAGAIN:
+ # the send buffer seems to be full - let's wait a little while...
+ # this is not really a good solution but better than the aproach
+ # of twisted which just drops the datagram...
+ time.sleep(0.01)
+ return self.write(data)
+ else:
+ raise
+
class SFiveProto(protocol.ConnectedDatagramProtocol):
def __init__(self, plug):
@@ -76,12 +102,6 @@ class SFiveProto(protocol.ConnectedDatagramProtocol):
def sendDatagram(self, data):
try:
- ## TODO: twisted will drop messages if the write buffer is full.
- ## Some batch importer work around this issue by sleeping
- ## and trying again. For live importer the fix is not applicable
- ## but also not as common because unlike live sources batch
- ## importer produce a lot of data in a very short period of time.
- ## Anyway this issue needs to be addressed!
return self.transport.write(data)
except socket_error as err:
self._plug.warning('SFive: sending datagram failed: %s', err)
@@ -142,7 +162,8 @@ class ComponentSFivePlug(base.ComponentPlug):
def _initSocket(self):
self.info('SFive: trying to (re)connect to %s...', self._socket)
self._proto = SFiveProto(self)
- self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor)
+ self._conn.startListening()
def _socketError(self):
if self._sfivepoller:
@@ -180,6 +201,8 @@ class ComponentSFivePlug(base.ComponentPlug):
bytes_sent_diff = bytes_sent - self._old_bytes_sent
self._old_bytes_sent = bytes_sent
+ self.debug('SFive: updating %s/%s/%s: %d clients, %d/%d bytes sent/received' %
+ (self._content_id, self._format, self._quality, client_count, bytes_sent_diff, bytes_received_diff))
self._sendDatasetFull(self._start_time, self._duration, client_count, bytes_sent_diff, bytes_received_diff)
self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd b/src/daq/flumotion-rrd/s5-flumotion-rrd
index c2e82eb..949456d 100755
--- a/src/daq/flumotion-rrd/s5-flumotion-rrd
+++ b/src/daq/flumotion-rrd/s5-flumotion-rrd
@@ -146,7 +146,7 @@ class FlumotionRRD():
def _initSocket(self):
print 'SFive: trying to connect to %s...' % (self._socket)
self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor)
self._conn.startListening()
def _socketError(self):
@@ -205,13 +205,13 @@ class FlumotionRRD():
"bytes-sent": bytes_sent
}
}
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
initdata = { "version": 1, "hostname": self._hostname,
"streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
"tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
def _sendDataset(self, timestamp, duration, client_count, bytes_sent):
client_count = int(round(client_count)) if client_count else 0
@@ -223,7 +223,7 @@ class FlumotionRRD():
"bytes-sent": bytes_sent
}
}
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
if __name__ == '__main__':
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch
index 7d13ba9..4691a1f 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch
@@ -38,7 +38,6 @@ from twisted.internet.defer import Deferred
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import socket
-from time import sleep
import re
import simplejson as json
@@ -97,7 +96,7 @@ class SFivePort(unix.ConnectedDatagramPort):
# the send buffer seems to be full - let's wait a little while...
# this is not really a good solution but better than the aproach
# of twisted which just drops the datagram...
- sleep(0.01)
+ time.sleep(0.01)
return self.write(data)
else:
raise
@@ -293,7 +292,7 @@ class NGXLuaFetcher():
print 'SFive: trying to connect to %s...' % (self._socket)
self._connected = False
self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor)
self._conn.startListening()
def socketReady(self):
@@ -325,7 +324,7 @@ class NGXLuaFetcher():
"bytes-sent": clients.getBytesSent()
}
}
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
def _sendInit(self):
initdata = { "version": 1, "hostname": self._hostname,
@@ -340,7 +339,7 @@ class NGXLuaFetcher():
if len(initdata["streamer-id"].keys()) == 0:
del initdata["streamer-id"]
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)))
def _sendDataset(self, timestamp, duration, content_id, format, quality):
clients = self._streamer[content_id][format][quality]
@@ -362,7 +361,7 @@ class NGXLuaFetcher():
if len(data["streamer-id"].keys()) == 0:
del data["streamer-id"]
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)))
if __name__ == '__main__':