From a4523af62b3c6c9edcb9c6eaf6b9e08ca311378a Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 20 Oct 2014 04:06:49 +0200 Subject: definition of multiple streamer-ids on commandline works now --- src/daq/nginx-lua/s5-nginx-lua-fetch.py | 97 ++++++++++++++++++++++++++++----- 1 file changed, 82 insertions(+), 15 deletions(-) (limited to 'src/daq') diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py index f5721d3..916bb3f 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py @@ -162,13 +162,14 @@ class NGXLuaFetcher(): self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] + self._content_id = None + self._format = None + self._quality = None self._duration = properties['duration'] self._tags = properties['tags'] self._url = properties['url'] self._nameformat = properties['nameformat'] + self._initStreamerIds(properties['streamer-ids']) self._proto = None self._conn = None @@ -177,13 +178,54 @@ class NGXLuaFetcher(): self._looper = None self._start_time = None + def _initStreamerIds(self, streamer): + print 'SFive: will look for the following streamer ids:' + self._streamer = {} + cs = {} + fs = {} + qs = {} + for s in streamer: + parts = s.split('/') + if len(parts) != 3: + raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) + scs = parts[0].split(',') + sfs = parts[1].split(',') + sqs = parts[2].split(',') + for c in scs: + cs[c] = 1 + if c not in self._streamer: + self._streamer[c] = {} + for f in sfs: + fs[f] = 1 + if f not in self._streamer[c]: + self._streamer[c][f] = {} + for q in sqs: + qs[q] = 1 + if q not in self._streamer[c][f]: + self._streamer[c][f][q] = ClientList() + print ' %s / %s / %s' % (c, f, q) + + if len(cs.keys()) == 1: + self._content_id = cs.keys()[0] + print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + + if len(fs.keys()) == 1: + self._format = fs.keys()[0] + print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format + + if len(qs.keys()) == 1: + self._quality = qs.keys()[0] + print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality + + + def run(self): if self._initFetch(): # try to be aligned with current time # this will eventually get out of sync but for now this is good enough offset = self._duration - (time.time() % self._duration) print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset) - self._looper = task.LoopingCall(self._sendUpdate) + self._looper = task.LoopingCall(self._sendUpdates) reactor.callLater(offset, self._startLooper) reactor.callWhenRunning(self._initSocket) reactor.run() @@ -193,9 +235,12 @@ class NGXLuaFetcher(): self._looper.start(self._duration, False) print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T')) - def _sendUpdate(self): + def _sendUpdates(self): if self._connected: - self._sendDataset(self._start_time, self._duration, ClientList()) + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + self._sendDataset(self._start_time, self._duration, c, f, q) else: print 'SFive: not connected - just clearing stats' @@ -281,19 +326,39 @@ class NGXLuaFetcher(): def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "streamer-id" : { }, "tags": self._tags } + if self._content_id: + initdata["streamer-id"]["content-id"] = self._content_id + if self._format: + initdata["streamer-id"]["format"] = self._format + if self._quality: + initdata["streamer-id"]["quality"] = self._quality + if len(initdata["streamer-id"].keys()) == 0: + del initdata["streamer-id"] + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - def _sendDataset(self, timestamp, duration, clients): + def _sendDataset(self, timestamp, duration, content_id, format, quality): + clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, + "streamer-id": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } + if not self._content_id: + data["streamer-id"]["content-id"] = content_id + if not self._format: + data["streamer-id"]["format"] = format + if not self._quality: + data["streamer-id"]["quality"] = quality + if len(data["streamer-id"].keys()) == 0: + del data["streamer-id"] + self._proto.sendDatagram('%s\n' % (json.dumps(data))); @@ -309,24 +374,26 @@ if __name__ == '__main__': help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--content-id', '-c', dest='content-id', required=False, - help='the content-id (i.e. av) - only used if nameformat doesn\'t contain a named match') - parser.add_argument('--format', '-f', dest='format', required=False, - help='the format (i.e. webm) - only used if nameformat doesn\'t contain a named match') - parser.add_argument('--quality', '-q', dest='quality', required=False, - help='the quality (i.e. high) - only used if nameformat doesn\'t contain a named match') + parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', + help='a streamer description like [,]/[,]/[,[,