summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-20 02:32:46 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-20 02:32:46 +0200
commit5204b48aca924c2de303090680622b0ed03dca80 (patch)
tree27701fb00abde78ee9a56cb2bf26e8e421dae5eb /src/daq/nginx-lua
parentdaq: nginx-lua now uses on_abort for faster recovery (diff)
daq: ngin-lua fetcher reconnection handling works now
Diffstat (limited to 'src/daq/nginx-lua')
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch.py98
1 files changed, 60 insertions, 38 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
index 87c7bd1..4401e53 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
@@ -43,17 +43,18 @@ from time import sleep
import re
import simplejson as json
import datetime
+import dateutil.parser
-_MAX_PACKET_SIZE = 8192 # in bytes
+_MAX_PACKET_SIZE = 65536 # in bytes
__version__ = "$Rev$"
class SFiveNGXluaProto(Protocol):
- def __init__(self, finished, importer):
+ def __init__(self, finished, fetcher):
self.data = ''
self.finished = finished
- self._importer = importer
+ self._fetcher = fetcher
print 'SFive: started receiving log data'
def dataReceived(self, bytes):
@@ -68,11 +69,11 @@ class SFiveNGXluaProto(Protocol):
if len(lines) > 0:
for line in lines:
- self._importer.updateData(json.loads(line))
+ self._fetcher.updateData(json.loads(line))
def connectionLost(self, reason):
print 'SFive: finished receiving log data:', reason.getErrorMessage()
- self._importer._socketError()
+ self._fetcher.close()
class SFivePort(unix.ConnectedDatagramPort):
@@ -102,25 +103,25 @@ class SFivePort(unix.ConnectedDatagramPort):
class SFiveProto(protocol.ConnectedDatagramProtocol):
- def __init__(self, importer):
- self._importer = importer
+ def __init__(self, fetcher):
+ self._fetcher = fetcher
def startProtocol(self):
- self._importer._socketReady()
+ self._fetcher.socketReady()
def datagramReceived(self, data):
print 'SFive: received datagram: "%s" (will get ignored)' % (data)
def connectionFailed(self, failure):
print 'SFive: connection failed: %s' % (failure.getErrorMessage())
- self._importer._socketError()
+ self._fetcher.socketError()
def sendDatagram(self, data):
try:
return self.transport.write(data)
except socket.error as err:
print 'SFive: sending datagram failed: %s' % (err)
- self._importer._socketError()
+ self._fetcher.socketError()
class ClientList:
@@ -170,23 +171,23 @@ class NGXLuaFetcher():
self._proto = None
self._conn = None
+ self._connected = False
self._url_re = None
- self._reactor = reactor
def run(self):
if self._initFetch():
- self._reactor.callWhenRunning(self._initSocket)
- self._reactor.run()
+ reactor.callWhenRunning(self._initSocket)
+ reactor.run()
def _initFetch(self):
try:
self._url_re = re.compile(self._nameformat)
print "SFive: trying to fetch from '%s'" % self._url
- agent = Agent(self._reactor)
+ agent = Agent(reactor)
d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None)
d.addCallback(self._httpResponse)
- d.addBoth(self._socketError)
+ d.addBoth(self.close)
except re.error as e:
print 'SFive: regex error: %s' % (e)
@@ -200,28 +201,6 @@ class NGXLuaFetcher():
response.deliverBody(SFiveNGXluaProto(finished, self))
return finished
- def _initSocket(self):
- print 'SFive: trying to connect to %s...' % (self._socket)
- self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, self._reactor)
- self._conn.startListening()
-
- def _socketError(self):
- if self._conn:
- self._conn.stopListening()
- self._reactor.stop()
-
- def _socketReady(self):
- print 'SFive: connection to sfive hub established'
- self._sendInit()
- # TODO: start callback every self._duration seconds
-
- def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
-
def updateData(self, data):
try:
m = re.match(self._url_re, data['url'])
@@ -234,9 +213,52 @@ class NGXLuaFetcher():
try:
print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'],
streamerid['content'], streamerid['format'], streamerid['quality'])
- #self._clients.update(data)
+ # self._clients.update(data)
+ if self._conn and self._connected:
+ print " --> %(time)s: from %(client)s/%(port)d (%(ua)s) %(status)d: %(bytes-sent)d bytes" % data
+ self._sendDataset(dateutil.parser.parse(data['time']), 1, ClientList())
+
except KeyError as e:
print 'SFive: %s' % (e)
+ except ValueError as e:
+ print 'SFive: %s' % (e)
+ except AttributeError as e:
+ print 'SFive: %s' % (e)
+
+ def close(self):
+ if self._conn and self._connected:
+ self._conn.stopListening()
+ self._connected = False
+ if reactor.running:
+ reactor.stop()
+
+
+
+ def _initSocket(self):
+ print 'SFive: trying to connect to %s...' % (self._socket)
+ self._connected = False
+ self._proto = SFiveProto(self)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn.startListening()
+
+ def socketReady(self):
+ print 'SFive: connection to sfive hub established'
+ self._connected = True
+ self._sendInit()
+ # TODO: start callback every self._duration seconds
+
+ def socketError(self):
+ print 'SFive: connection to sfive hub lost - trying reconnect'
+ if self._conn and self._connected:
+ self._conn.stopListening()
+ self._connected = False
+ reactor.callLater(3, self._initSocket)
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
def _sendDataset(self, timestamp, duration, clients):
data = { "start-time": timestamp.isoformat('T') + 'Z',