From 5720bc8b21f771e27a7f8871cb89347a08317c6a Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 20 Oct 2014 03:03:10 +0200 Subject: daq: nginx-lua fetch looper works now --- src/daq/nginx-lua/s5-nginx-lua-fetch.py | 43 ++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py index 4401e53..f5721d3 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py @@ -32,7 +32,7 @@ # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, reactor, unix +from twisted.internet import protocol, unix, reactor, task from twisted.internet.protocol import Protocol from twisted.internet.defer import Deferred from twisted.web.client import Agent @@ -43,6 +43,7 @@ from time import sleep import re import simplejson as json import datetime +import time import dateutil.parser _MAX_PACKET_SIZE = 65536 # in bytes @@ -173,12 +174,33 @@ class NGXLuaFetcher(): self._conn = None self._connected = False self._url_re = None + self._looper = None + self._start_time = None def run(self): if self._initFetch(): + # try to be aligned with current time + # this will eventually get out of sync but for now this is good enough + offset = self._duration - (time.time() % self._duration) + print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset) + self._looper = task.LoopingCall(self._sendUpdate) + reactor.callLater(offset, self._startLooper) reactor.callWhenRunning(self._initSocket) reactor.run() + def _startLooper(self): + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + self._looper.start(self._duration, False) + print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T')) + + def _sendUpdate(self): + if self._connected: + self._sendDataset(self._start_time, self._duration, ClientList()) + else: + print 'SFive: not connected - just clearing stats' + + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + def _initFetch(self): try: self._url_re = re.compile(self._nameformat) @@ -211,19 +233,22 @@ class NGXLuaFetcher(): print 'SFive: regex error: %s' % (e) try: - print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'], - streamerid['content'], streamerid['format'], streamerid['quality']) - # self._clients.update(data) - if self._conn and self._connected: + ts = dateutil.parser.parse(data['time']) + ts = (ts - ts.utcoffset()).replace(tzinfo=None) + if self._start_time and ts >= self._start_time: + print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'], + streamerid['content'], streamerid['format'], streamerid['quality']) print " --> %(time)s: from %(client)s/%(port)d (%(ua)s) %(status)d: %(bytes-sent)d bytes" % data - self._sendDataset(dateutil.parser.parse(data['time']), 1, ClientList()) + # self._clients.update(data) except KeyError as e: - print 'SFive: %s' % (e) + print 'SFive: %s - ignoring dataset' % (e) except ValueError as e: - print 'SFive: %s' % (e) + print 'SFive: %s - ignoring dataset' % (e) except AttributeError as e: - print 'SFive: %s' % (e) + print 'SFive: %s - ignoring dataset' % (e) + except TypeError as e: + print 'SFive: %s - ignoring dataset' % (e) def close(self): if self._conn and self._connected: -- cgit v1.2.3