From f992b2073a675dbe8ee230f2b9b71deef5159afc Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 21 Oct 2014 06:54:28 +0200 Subject: daq: removed .py extension for scripts --- src/daq/nginx-lua/s5-nginx-lua-fetch | 384 ++++++++++++++++++++++++++++++++ src/daq/nginx-lua/s5-nginx-lua-fetch.py | 384 -------------------------------- 2 files changed, 384 insertions(+), 384 deletions(-) create mode 100755 src/daq/nginx-lua/s5-nginx-lua-fetch delete mode 100755 src/daq/nginx-lua/s5-nginx-lua-fetch.py (limited to 'src/daq/nginx-lua') diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch b/src/daq/nginx-lua/s5-nginx-lua-fetch new file mode 100755 index 0000000..86ef940 --- /dev/null +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch @@ -0,0 +1,384 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, unix, reactor, task +from twisted.internet.protocol import Protocol +from twisted.internet.defer import Deferred +from twisted.web.client import Agent +from twisted.web.http_headers import Headers +import socket +from time import sleep + +import re +import simplejson as json +import datetime +import time +import dateutil.parser + +_MAX_PACKET_SIZE = 65536 # in bytes + +__version__ = "$Rev$" + + +class SFiveNGXluaProto(Protocol): + def __init__(self, finished, fetcher): + self.data = '' + self.finished = finished + self._fetcher = fetcher + print 'SFive: started receiving log data' + + def dataReceived(self, bytes): + self.data += bytes + lines = self.data.splitlines(True) + if len(lines) > 0: + if '\n' not in lines[-1] and '\r' not in lines[-1]: + self.data = lines[-1] + del lines[-1] + else: + self.data = '' + + if len(lines) > 0: + for line in lines: + self._fetcher.updateData(json.loads(line)) + + def connectionLost(self, reason): + print 'SFive: finished receiving log data:', reason.getErrorMessage() + self._fetcher.close() + + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, fetcher): + self._fetcher = fetcher + + def startProtocol(self): + self._fetcher.socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._fetcher.socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._fetcher.socketError() + + +class ClientList: + + def __init__(self): + self._clients = { } + + def clear(self): + self._clients = { } + + def getCnt(self): + return len(self._clients) + + def getBytesSent(self): + sum = 0 + for val in self._clients.itervalues(): + sum += val['bytes-sent'] + + return sum + + def values(self): + return self._clients.itervalues() + + def update(self, logdata): + key = '%(client)s/%(ua)s' % logdata + if key in self._clients.keys(): + self._clients[key]['bytes-sent'] += logdata['bytes-sent'] + else: + self._clients[key] = { 'ip': logdata['client'], + 'user-agent': logdata['ua'], + 'bytes-sent': logdata['bytes-sent'] } + + +class NGXLuaFetcher(): + """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: nginx-lua fetcher loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = None + self._format = None + self._quality = None + self._duration = properties['duration'] + self._tags = properties['tags'] + self._url = properties['url'] + self._initStreamerIds(properties['streamer-ids']) + + self._proto = None + self._conn = None + self._connected = False + self._url_re = re.compile(properties['nameformat']) + self._looper = None + self._start_time = None + + def _initStreamerIds(self, streamer): + print 'SFive: will look for the following streamer ids:' + self._streamer = {} + cs = {} + fs = {} + qs = {} + for s in streamer: + parts = s.split('/') + if len(parts) != 3: + raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) + scs = parts[0].split(',') + sfs = parts[1].split(',') + sqs = parts[2].split(',') + for c in scs: + cs[c] = 1 + if c not in self._streamer: + self._streamer[c] = {} + for f in sfs: + fs[f] = 1 + if f not in self._streamer[c]: + self._streamer[c][f] = {} + for q in sqs: + qs[q] = 1 + if q not in self._streamer[c][f]: + self._streamer[c][f][q] = ClientList() + print ' %s / %s / %s' % (c, f, q) + + if len(cs.keys()) == 1: + self._content_id = cs.keys()[0] + print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + + if len(fs.keys()) == 1: + self._format = fs.keys()[0] + print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format + + if len(qs.keys()) == 1: + self._quality = qs.keys()[0] + print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality + + + + def run(self): + self._initFetch() + # try to be aligned with current time + # this will eventually get out of sync but for now this is good enough + offset = self._duration - (time.time() % self._duration) + print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset) + self._looper = task.LoopingCall(self._sendUpdates) + reactor.callLater(offset, self._startLooper) + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _startLooper(self): + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + self._looper.start(self._duration, False) + print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T')) + + def _sendUpdates(self): + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + if self._connected: + self._sendDataset(self._start_time, self._duration, c, f, q) + self._streamer[c][f][q].clear() + + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + + def _initFetch(self): + print "SFive: trying to fetch from '%s'" % self._url + agent = Agent(reactor) + d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) + d.addCallback(self._httpResponse) + d.addBoth(self.close) + + def _httpResponse(self, response): + print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase) + finished = Deferred() + response.deliverBody(SFiveNGXluaProto(finished, self)) + return finished + + def updateData(self, data): + try: + m = re.match(self._url_re, data['url']) + if not m: + return + streamerid = m.groupdict() + except re.error as e: + print 'SFive: regex error: %s' % (e) + + try: + if data['status'] == 200 or data['status'] == 206: + ts = dateutil.parser.parse(data['time']) + ts = (ts - ts.utcoffset()).replace(tzinfo=None) + if self._start_time and ts >= self._start_time: + self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(data) + + except KeyError as e: + print 'SFive: %s - ignoring dataset' % (e) + except ValueError as e: + print 'SFive: %s - ignoring dataset' % (e) + + def close(self): + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + if reactor.running: + reactor.stop() + + + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._connected = False + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def socketReady(self): + print 'SFive: connection to sfive hub established' + self._connected = True + self._sendInit() + # TODO: start callback every self._duration seconds + + def socketError(self): + print 'SFive: connection to sfive hub lost - trying reconnect' + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + reactor.callLater(3, self._initSocket) + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id" : { }, + "tags": self._tags } + if self._content_id: + initdata["streamer-id"]["content-id"] = self._content_id + if self._format: + initdata["streamer-id"]["format"] = self._format + if self._quality: + initdata["streamer-id"]["quality"] = self._quality + if len(initdata["streamer-id"].keys()) == 0: + del initdata["streamer-id"] + + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendDataset(self, timestamp, duration, content_id, format, quality): + clients = self._streamer[content_id][format][quality] + data = { "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, + "streamer-id": { }, + "data": { + "clients": list(clients.values()), + "client-count": clients.getCnt(), + "bytes-sent": clients.getBytesSent() + } + } + if not self._content_id: + data["streamer-id"]["content-id"] = content_id + if not self._format: + data["streamer-id"]["format"] = format + if not self._quality: + data["streamer-id"]["quality"] = quality + if len(data["streamer-id"].keys()) == 0: + del data["streamer-id"] + + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--duration', '-d', dest='duration', required=False, + help='time (in seconds) between updates; defaults to 15') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', + help='a streamer description like [,]/[,]/[,[, -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, unix, reactor, task -from twisted.internet.protocol import Protocol -from twisted.internet.defer import Deferred -from twisted.web.client import Agent -from twisted.web.http_headers import Headers -import socket -from time import sleep - -import re -import simplejson as json -import datetime -import time -import dateutil.parser - -_MAX_PACKET_SIZE = 65536 # in bytes - -__version__ = "$Rev$" - - -class SFiveNGXluaProto(Protocol): - def __init__(self, finished, fetcher): - self.data = '' - self.finished = finished - self._fetcher = fetcher - print 'SFive: started receiving log data' - - def dataReceived(self, bytes): - self.data += bytes - lines = self.data.splitlines(True) - if len(lines) > 0: - if '\n' not in lines[-1] and '\r' not in lines[-1]: - self.data = lines[-1] - del lines[-1] - else: - self.data = '' - - if len(lines) > 0: - for line in lines: - self._fetcher.updateData(json.loads(line)) - - def connectionLost(self, reason): - print 'SFive: finished receiving log data:', reason.getErrorMessage() - self._fetcher.close() - - -class SFivePort(unix.ConnectedDatagramPort): - - def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): - unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) - - def write(self, data): - try: - return self.socket.send(data) - except socket.error, se: - no = se.args[0] - if no == EINTR: - return self.write(data) - elif no == EMSGSIZE: - raise error.MessageLengthError, "message too long" - elif no == ECONNREFUSED: - self.protocol.connectionRefused() - elif no == EAGAIN: - # the send buffer seems to be full - let's wait a little while... - # this is not really a good solution but better than the aproach - # of twisted which just drops the datagram... - sleep(0.01) - return self.write(data) - else: - raise - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, fetcher): - self._fetcher = fetcher - - def startProtocol(self): - self._fetcher.socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._fetcher.socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket.error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._fetcher.socketError() - - -class ClientList: - - def __init__(self): - self._clients = { } - - def clear(self): - self._clients = { } - - def getCnt(self): - return len(self._clients) - - def getBytesSent(self): - sum = 0 - for val in self._clients.itervalues(): - sum += val['bytes-sent'] - - return sum - - def values(self): - return self._clients.itervalues() - - def update(self, logdata): - key = '%(client)s/%(ua)s' % logdata - if key in self._clients.keys(): - self._clients[key]['bytes-sent'] += logdata['bytes-sent'] - else: - self._clients[key] = { 'ip': logdata['client'], - 'user-agent': logdata['ua'], - 'bytes-sent': logdata['bytes-sent'] } - - -class NGXLuaFetcher(): - """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: nginx-lua fetcher loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = None - self._format = None - self._quality = None - self._duration = properties['duration'] - self._tags = properties['tags'] - self._url = properties['url'] - self._initStreamerIds(properties['streamer-ids']) - - self._proto = None - self._conn = None - self._connected = False - self._url_re = re.compile(properties['nameformat']) - self._looper = None - self._start_time = None - - def _initStreamerIds(self, streamer): - print 'SFive: will look for the following streamer ids:' - self._streamer = {} - cs = {} - fs = {} - qs = {} - for s in streamer: - parts = s.split('/') - if len(parts) != 3: - raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) - scs = parts[0].split(',') - sfs = parts[1].split(',') - sqs = parts[2].split(',') - for c in scs: - cs[c] = 1 - if c not in self._streamer: - self._streamer[c] = {} - for f in sfs: - fs[f] = 1 - if f not in self._streamer[c]: - self._streamer[c][f] = {} - for q in sqs: - qs[q] = 1 - if q not in self._streamer[c][f]: - self._streamer[c][f][q] = ClientList() - print ' %s / %s / %s' % (c, f, q) - - if len(cs.keys()) == 1: - self._content_id = cs.keys()[0] - print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id - - if len(fs.keys()) == 1: - self._format = fs.keys()[0] - print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format - - if len(qs.keys()) == 1: - self._quality = qs.keys()[0] - print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality - - - - def run(self): - self._initFetch() - # try to be aligned with current time - # this will eventually get out of sync but for now this is good enough - offset = self._duration - (time.time() % self._duration) - print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset) - self._looper = task.LoopingCall(self._sendUpdates) - reactor.callLater(offset, self._startLooper) - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _startLooper(self): - self._start_time = datetime.datetime.utcnow().replace(microsecond=0) - self._looper.start(self._duration, False) - print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T')) - - def _sendUpdates(self): - for c in self._streamer.keys(): - for f in self._streamer[c].keys(): - for q in self._streamer[c][f].keys(): - if self._connected: - self._sendDataset(self._start_time, self._duration, c, f, q) - self._streamer[c][f][q].clear() - - self._start_time = datetime.datetime.utcnow().replace(microsecond=0) - - def _initFetch(self): - print "SFive: trying to fetch from '%s'" % self._url - agent = Agent(reactor) - d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) - d.addCallback(self._httpResponse) - d.addBoth(self.close) - - def _httpResponse(self, response): - print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase) - finished = Deferred() - response.deliverBody(SFiveNGXluaProto(finished, self)) - return finished - - def updateData(self, data): - try: - m = re.match(self._url_re, data['url']) - if not m: - return - streamerid = m.groupdict() - except re.error as e: - print 'SFive: regex error: %s' % (e) - - try: - if data['status'] == 200 or data['status'] == 206: - ts = dateutil.parser.parse(data['time']) - ts = (ts - ts.utcoffset()).replace(tzinfo=None) - if self._start_time and ts >= self._start_time: - self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(data) - - except KeyError as e: - print 'SFive: %s - ignoring dataset' % (e) - except ValueError as e: - print 'SFive: %s - ignoring dataset' % (e) - - def close(self): - if self._conn and self._connected: - self._conn.stopListening() - self._connected = False - if reactor.running: - reactor.stop() - - - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._connected = False - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def socketReady(self): - print 'SFive: connection to sfive hub established' - self._connected = True - self._sendInit() - # TODO: start callback every self._duration seconds - - def socketError(self): - print 'SFive: connection to sfive hub lost - trying reconnect' - if self._conn and self._connected: - self._conn.stopListening() - self._connected = False - reactor.callLater(3, self._initSocket) - - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, - "tags": self._tags } - if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id - if self._format: - initdata["streamer-id"]["format"] = self._format - if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] - - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _sendDataset(self, timestamp, duration, content_id, format, quality): - clients = self._streamer[content_id][format][quality] - data = { "start-time": timestamp.isoformat('T') + 'Z', - "duration-ms": duration * 1000, - "streamer-id": { }, - "data": { - "clients": list(clients.values()), - "client-count": clients.getCnt(), - "bytes-sent": clients.getBytesSent() - } - } - if not self._content_id: - data["streamer-id"]["content-id"] = content_id - if not self._format: - data["streamer-id"]["format"] = format - if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] - - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--duration', '-d', dest='duration', required=False, - help='time (in seconds) between updates; defaults to 15') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like [,]/[,]/[,[,