summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua/s5-nginx-lua-fetch.py
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-20 03:03:10 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-20 03:03:10 +0200
commit5720bc8b21f771e27a7f8871cb89347a08317c6a (patch)
tree6113d01ec316d13824c68b1d5e901770c3df4ee3 /src/daq/nginx-lua/s5-nginx-lua-fetch.py
parentdaq: ngin-lua fetcher reconnection handling works now (diff)
daq: nginx-lua fetch looper works now
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx-lua-fetch.py')
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch.py43
1 files changed, 34 insertions, 9 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
index 4401e53..f5721d3 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
@@ -32,7 +32,7 @@
#
from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
-from twisted.internet import protocol, reactor, unix
+from twisted.internet import protocol, unix, reactor, task
from twisted.internet.protocol import Protocol
from twisted.internet.defer import Deferred
from twisted.web.client import Agent
@@ -43,6 +43,7 @@ from time import sleep
import re
import simplejson as json
import datetime
+import time
import dateutil.parser
_MAX_PACKET_SIZE = 65536 # in bytes
@@ -173,12 +174,33 @@ class NGXLuaFetcher():
self._conn = None
self._connected = False
self._url_re = None
+ self._looper = None
+ self._start_time = None
def run(self):
if self._initFetch():
+ # try to be aligned with current time
+ # this will eventually get out of sync but for now this is good enough
+ offset = self._duration - (time.time() % self._duration)
+ print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset)
+ self._looper = task.LoopingCall(self._sendUpdate)
+ reactor.callLater(offset, self._startLooper)
reactor.callWhenRunning(self._initSocket)
reactor.run()
+ def _startLooper(self):
+ self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
+ self._looper.start(self._duration, False)
+ print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T'))
+
+ def _sendUpdate(self):
+ if self._connected:
+ self._sendDataset(self._start_time, self._duration, ClientList())
+ else:
+ print 'SFive: not connected - just clearing stats'
+
+ self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
+
def _initFetch(self):
try:
self._url_re = re.compile(self._nameformat)
@@ -211,19 +233,22 @@ class NGXLuaFetcher():
print 'SFive: regex error: %s' % (e)
try:
- print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'],
- streamerid['content'], streamerid['format'], streamerid['quality'])
- # self._clients.update(data)
- if self._conn and self._connected:
+ ts = dateutil.parser.parse(data['time'])
+ ts = (ts - ts.utcoffset()).replace(tzinfo=None)
+ if self._start_time and ts >= self._start_time:
+ print "url='%s' -> contend-id: '%s', format: '%s', quality: '%s'" % (data['url'],
+ streamerid['content'], streamerid['format'], streamerid['quality'])
print " --> %(time)s: from %(client)s/%(port)d (%(ua)s) %(status)d: %(bytes-sent)d bytes" % data
- self._sendDataset(dateutil.parser.parse(data['time']), 1, ClientList())
+ # self._clients.update(data)
except KeyError as e:
- print 'SFive: %s' % (e)
+ print 'SFive: %s - ignoring dataset' % (e)
except ValueError as e:
- print 'SFive: %s' % (e)
+ print 'SFive: %s - ignoring dataset' % (e)
except AttributeError as e:
- print 'SFive: %s' % (e)
+ print 'SFive: %s - ignoring dataset' % (e)
+ except TypeError as e:
+ print 'SFive: %s - ignoring dataset' % (e)
def close(self):
if self._conn and self._connected: