summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-20 04:06:49 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-20 04:06:49 +0200
commita4523af62b3c6c9edcb9c6eaf6b9e08ca311378a (patch)
tree499a4376cb862f71933ba8e6573feb7c22bf5f65 /src/daq
parentdaq: nginx-lua fetch looper works now (diff)
definition of multiple streamer-ids on commandline works now
Diffstat (limited to 'src/daq')
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch.py97
1 files changed, 82 insertions, 15 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
index f5721d3..916bb3f 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
@@ -162,13 +162,14 @@ class NGXLuaFetcher():
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties['content-id']
- self._format = properties['format']
- self._quality = properties['quality']
+ self._content_id = None
+ self._format = None
+ self._quality = None
self._duration = properties['duration']
self._tags = properties['tags']
self._url = properties['url']
self._nameformat = properties['nameformat']
+ self._initStreamerIds(properties['streamer-ids'])
self._proto = None
self._conn = None
@@ -177,13 +178,54 @@ class NGXLuaFetcher():
self._looper = None
self._start_time = None
+ def _initStreamerIds(self, streamer):
+ print 'SFive: will look for the following streamer ids:'
+ self._streamer = {}
+ cs = {}
+ fs = {}
+ qs = {}
+ for s in streamer:
+ parts = s.split('/')
+ if len(parts) != 3:
+ raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s)
+ scs = parts[0].split(',')
+ sfs = parts[1].split(',')
+ sqs = parts[2].split(',')
+ for c in scs:
+ cs[c] = 1
+ if c not in self._streamer:
+ self._streamer[c] = {}
+ for f in sfs:
+ fs[f] = 1
+ if f not in self._streamer[c]:
+ self._streamer[c][f] = {}
+ for q in sqs:
+ qs[q] = 1
+ if q not in self._streamer[c][f]:
+ self._streamer[c][f][q] = ClientList()
+ print ' %s / %s / %s' % (c, f, q)
+
+ if len(cs.keys()) == 1:
+ self._content_id = cs.keys()[0]
+ print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+
+ if len(fs.keys()) == 1:
+ self._format = fs.keys()[0]
+ print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format
+
+ if len(qs.keys()) == 1:
+ self._quality = qs.keys()[0]
+ print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality
+
+
+
def run(self):
if self._initFetch():
# try to be aligned with current time
# this will eventually get out of sync but for now this is good enough
offset = self._duration - (time.time() % self._duration)
print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset)
- self._looper = task.LoopingCall(self._sendUpdate)
+ self._looper = task.LoopingCall(self._sendUpdates)
reactor.callLater(offset, self._startLooper)
reactor.callWhenRunning(self._initSocket)
reactor.run()
@@ -193,9 +235,12 @@ class NGXLuaFetcher():
self._looper.start(self._duration, False)
print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T'))
- def _sendUpdate(self):
+ def _sendUpdates(self):
if self._connected:
- self._sendDataset(self._start_time, self._duration, ClientList())
+ for c in self._streamer.keys():
+ for f in self._streamer[c].keys():
+ for q in self._streamer[c][f].keys():
+ self._sendDataset(self._start_time, self._duration, c, f, q)
else:
print 'SFive: not connected - just clearing stats'
@@ -281,19 +326,39 @@ class NGXLuaFetcher():
def _sendInit(self):
initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "streamer-id" : { },
"tags": self._tags }
+ if self._content_id:
+ initdata["streamer-id"]["content-id"] = self._content_id
+ if self._format:
+ initdata["streamer-id"]["format"] = self._format
+ if self._quality:
+ initdata["streamer-id"]["quality"] = self._quality
+ if len(initdata["streamer-id"].keys()) == 0:
+ del initdata["streamer-id"]
+
self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
- def _sendDataset(self, timestamp, duration, clients):
+ def _sendDataset(self, timestamp, duration, content_id, format, quality):
+ clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
+ "streamer-id": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
"bytes-sent": clients.getBytesSent()
}
}
+ if not self._content_id:
+ data["streamer-id"]["content-id"] = content_id
+ if not self._format:
+ data["streamer-id"]["format"] = format
+ if not self._quality:
+ data["streamer-id"]["quality"] = quality
+ if len(data["streamer-id"].keys()) == 0:
+ del data["streamer-id"]
+
self._proto.sendDatagram('%s\n' % (json.dumps(data)));
@@ -309,24 +374,26 @@ if __name__ == '__main__':
help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--content-id', '-c', dest='content-id', required=False,
- help='the content-id (i.e. av) - only used if nameformat doesn\'t contain a named match')
- parser.add_argument('--format', '-f', dest='format', required=False,
- help='the format (i.e. webm) - only used if nameformat doesn\'t contain a named match')
- parser.add_argument('--quality', '-q', dest='quality', required=False,
- help='the quality (i.e. high) - only used if nameformat doesn\'t contain a named match')
+ parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
+ help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
parser.add_argument('--url', '-u', dest='url', required=False,
help='the url of the nginx sfive exporert, default http://localhost/sfive')
parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
- help='a regular expression (containing named matches for content, format, quality)')
+ help='a regular expression (must contain named matches for content, format, quality)')
args = vars(parser.parse_args())
if not args['tags']:
args['tags'] = []
if not args['duration']:
args['duration'] = 15
+ else:
+ args['duration'] = int(args['duration'])
if not args['url']:
args['url'] = 'http://localhost/sfive'
if not args['nameformat']:
args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
+ if not args['streamer-ids']:
+ print 'SFive: you have to specify at least one streamer-id!'
+ exit(-1)
+
fetcher = NGXLuaFetcher(args)
fetcher.run()