From 5204b48aca924c2de303090680622b0ed03dca80 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 20 Oct 2014 02:32:46 +0200 Subject: daq: ngin-lua fetcher reconnection handling works now --- src/daq/nginx-lua/s5-nginx-lua-fetch.py | 98 ++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 38 deletions(-) (limited to 'src/daq/nginx-lua/s5-nginx-lua-fetch.py') diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py index 87c7bd1..4401e53 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py @@ -43,17 +43,18 @@ from time import sleep import re import simplejson as json import datetime +import dateutil.parser -_MAX_PACKET_SIZE = 8192 # in bytes +_MAX_PACKET_SIZE = 65536 # in bytes __version__ = "$Rev$" class SFiveNGXluaProto(Protocol): - def __init__(self, finished, importer): + def __init__(self, finished, fetcher): self.data = '' self.finished = finished - self._importer = importer + self._fetcher = fetcher print 'SFive: started receiving log data' def dataReceived(self, bytes): @@ -68,11 +69,11 @@ class SFiveNGXluaProto(Protocol): if len(lines) > 0: for line in lines: - self._importer.updateData(json.loads(line)) + self._fetcher.updateData(json.loads(line)) def connectionLost(self, reason): print 'SFive: finished receiving log data:', reason.getErrorMessage() - self._importer._socketError() + self._fetcher.close() class SFivePort(unix.ConnectedDatagramPort): @@ -102,25 +103,25 @@ class SFivePort(unix.ConnectedDatagramPort): class SFiveProto(protocol.ConnectedDatagramProtocol): - def __init__(self, importer): - self._importer = importer + def __init__(self, fetcher): + self._fetcher = fetcher def startProtocol(self): - self._importer._socketReady() + self._fetcher.socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() + self._fetcher.socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() + self._fetcher.socketError() class ClientList: @@ -170,23 +171,23 @@ class NGXLuaFetcher(): self._proto = None self._conn = None + self._connected = False self._url_re = None - self._reactor = reactor def run(self): if self._initFetch(): - self._reactor.callWhenRunning(self._initSocket) - self._reactor.run() + reactor.callWhenRunning(self._initSocket) + reactor.run() def _initFetch(self): try: self._url_re = re.compile(self._nameformat) print "SFive: trying to fetch from '%s'" % self._url - agent = Agent(self._reactor) + agent = Agent(reactor) d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) d.addCallback(self._httpResponse) - d.addBoth(self._socketError) + d.addBoth(self.close) except re.error as e: print 'SFive: regex error: %s' % (e) @@ -200,28 +201,6 @@ class NGXLuaFetcher(): response.deliverBody(SFiveNGXluaProto(finished, self)) return finished - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, self._reactor) - self._conn.startListening() - - def _socketError(self): - if self._conn: - self._conn.stopListening() - self._reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._sendInit() - # TODO: start callback every self._duration seconds - - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - def updateData(self, data): try: m = re.match(self._url_re, data['url']) @@ -234,9 +213,52 @@ class NGXLuaFetcher(): try: print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'], streamerid['content'], streamerid['format'], streamerid['quality']) - #self._clients.update(data) + # self._clients.update(data) + if self._conn and self._connected: + print " --> %(time)s: from %(client)s/%(port)d (%(ua)s) %(status)d: %(bytes-sent)d bytes" % data + self._sendDataset(dateutil.parser.parse(data['time']), 1, ClientList()) + except KeyError as e: print 'SFive: %s' % (e) + except ValueError as e: + print 'SFive: %s' % (e) + except AttributeError as e: + print 'SFive: %s' % (e) + + def close(self): + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + if reactor.running: + reactor.stop() + + + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._connected = False + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def socketReady(self): + print 'SFive: connection to sfive hub established' + self._connected = True + self._sendInit() + # TODO: start callback every self._duration seconds + + def socketError(self): + print 'SFive: connection to sfive hub lost - trying reconnect' + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + reactor.callLater(3, self._initSocket) + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _sendDataset(self, timestamp, duration, clients): data = { "start-time": timestamp.isoformat('T') + 'Z', -- cgit v1.2.3