summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-12 19:53:21 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-12 19:53:21 +0200
commit786303ed3cbc6c3923dc6f25ca97cca50225425e (patch)
tree13d0f566bde496b899019aacb2a35d06a37f1850
parentrenamed flumotion rrd importer, fixed dataset cnt (diff)
fixed lost messages for batch importer
-rwxr-xr-xsrc/daq/flumotion-rrd/sfive-flumotion-rrd.py39
1 files changed, 33 insertions, 6 deletions
diff --git a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
index f749929..86422fa 100755
--- a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
+++ b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
@@ -31,8 +31,11 @@
# along with sfive. If not, see <http://www.gnu.org/licenses/>.
#
-from twisted.internet import protocol, reactor
-from socket import error as socket_error
+from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
+from twisted.internet import protocol, reactor, unix
+import socket
+from time import sleep
+
import simplejson as json
import datetime
import rrdtool
@@ -41,6 +44,31 @@ _MAX_PACKET_SIZE = 8192 # in bytes
__version__ = "$Rev$"
+class SFivePort(unix.ConnectedDatagramPort):
+
+ def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
+ unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
+
+ def write(self, data):
+ try:
+ return self.socket.send(data)
+ except socket.error, se:
+ no = se.args[0]
+ if no == EINTR:
+ return self.write(data)
+ elif no == EMSGSIZE:
+ raise error.MessageLengthError, "message too long"
+ elif no == ECONNREFUSED:
+ self.protocol.connectionRefused()
+ elif no == EAGAIN:
+ # the send buffer seems to be full - let's wait a little while...
+ # this is not really a good solution but better than the aproach
+ # of twisted which just drops the datagram...
+ sleep(0.01)
+ return self.write(data)
+ else:
+ raise
+
class SFiveProto(protocol.ConnectedDatagramProtocol):
def __init__(self, importer):
@@ -59,7 +87,7 @@ class SFiveProto(protocol.ConnectedDatagramProtocol):
def sendDatagram(self, data):
try:
return self.transport.write(data)
- except socket_error as err:
+ except socket.error as err:
print 'SFive: sending datagram failed: %s' % (err)
self._importer._socketError()
@@ -89,7 +117,8 @@ class FlumotionRRD():
def _initSocket(self):
print 'SFive: trying to connect to %s...' % (self._socket)
self._proto = SFiveProto(self)
- self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn.startListening()
def _socketError(self):
if self._conn:
@@ -164,8 +193,6 @@ class FlumotionRRD():
self._proto.sendDatagram('%s\n' % (json.dumps(data)));
-
-
if __name__ == '__main__':
import argparse