diff options
Diffstat (limited to 'src/daq/nginx-lua')
-rwxr-xr-x | src/daq/nginx-lua/s5-nginx-lua-fetch.py | 242 | ||||
-rw-r--r-- | src/daq/nginx-lua/s5-nginx.lua | 2 |
2 files changed, 216 insertions, 28 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py index ef13d02..78f5f73 100755 --- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py +++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py @@ -31,20 +31,30 @@ # along with sfive. If not, see <http://www.gnu.org/licenses/>. # -from twisted.internet import reactor -from twisted.internet.defer import Deferred +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix from twisted.internet.protocol import Protocol +from twisted.internet.defer import Deferred from twisted.web.client import Agent from twisted.web.http_headers import Headers +import socket +from time import sleep +import re import simplejson as json import datetime +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + + class SFiveNGXluaProto(Protocol): - def __init__(self, finished): + def __init__(self, finished, importer): self.data = '' self.finished = finished - print 'Started receiving log data' + self._importer = importer + print 'SFive: started receiving log data' def dataReceived(self, bytes): self.data += bytes @@ -57,29 +67,207 @@ class SFiveNGXluaProto(Protocol): self.data = '' if len(lines) > 0: - print '\nlog data received:' for line in lines: - print json.loads(line) + print "SFive: '%s'" % line.strip() + self._importer.updateData(json.loads(line)) def connectionLost(self, reason): - print 'Finished receiving log data:', reason.getErrorMessage() - self.finished.callback(None) - -agent = Agent(reactor) -d = agent.request( - 'GET', - 'http://calypso.spreadspace.org/sfive', - Headers({'User-Agent': ['SFive nginx-lua fetcher']}), - None) - -def cbRequest(response): - finished = Deferred() - response.deliverBody(SFiveNGXluaProto(finished)) - return finished -d.addCallback(cbRequest) - -def cbShutdown(ignored): - reactor.stop() -d.addBoth(cbShutdown) - -reactor.run() + print 'SFive: finished receiving log data:', reason.getErrorMessage() + self._importer._socketError() + + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + +class ClientList: + + def __init__(self, re): + self._re = re + self._clients = { } + + def clear(self): + self._clients = { } + + def getCnt(self): + return len(self._clients) + + def getBytesSent(self): + sum = 0 + for val in self._clients.itervalues(): + sum += val['bytes-sent'] + + return sum + + def values(self): + return self._clients.itervalues() + + def update(self, logdata): + ## TODO: parse logdata['uri'] !!! + if logdata['client'] in self._clients.keys(): + self._clients[logdata['client']]['bytes-sent'] += logdata['bytes-sent'] + else: + self._clients[logdata['client']] = { 'ip': logdata['client'], + 'bytes-sent': logdata['bytes-sent'] } + + +class NGXLuaFetcher(): + """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: nginx-lua fetcher loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._duration = properties['duration'] + self._tags = properties['tags'] + self._url = properties['url'] + self._nameformat = properties['nameformat'] + + self._proto = None + self._conn = None + self._clients = None + self._reactor = reactor + + def run(self): + if self._initFetch(): + self._reactor.callWhenRunning(self._initSocket) + self._reactor.run() + + def _initFetch(self): + try: + self._clients = ClientList(re.compile(self._nameformat)) # TODO: check for named matches + + print "SFive: trying to fetch from '%s'" % self._url + agent = Agent(self._reactor) + d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) + d.addCallback(self._httpResponse) + d.addBoth(self._socketError) + + except re.error as e: + print 'SFive: regex error: %s' % (e) + return False + + return True + + def _httpResponse(self, response): + print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase) + finished = Deferred() + response.deliverBody(SFiveNGXluaProto(finished, self)) + return finished + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, self._reactor) + self._conn.startListening() + + def _socketError(self): + if self._conn: + self._conn.stopListening() + self._reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._sendInit() + # TODO: start callback every self._duration seconds + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def updateData(self, data): + self._clients.update(data) + + def _sendDataset(self, timestamp, duration, clients): + data = { "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, + "data": { + "clients": list(clients.values()), + "client-count": clients.getCnt(), + "bytes-sent": clients.getBytesSent() + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--duration', '-d', dest='duration', required=False, + help='time (in seconds) between updates; defaults to 15') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--content-id', '-c', dest='content-id', required=False, + help='the content-id (i.e. av) - only used if nameformat doesn\'t contain a named match') + parser.add_argument('--format', '-f', dest='format', required=False, + help='the format (i.e. webm) - only used if nameformat doesn\'t contain a named match') + parser.add_argument('--quality', '-q', dest='quality', required=False, + help='the quality (i.e. high) - only used if nameformat doesn\'t contain a named match') + parser.add_argument('--url', '-u', dest='url', required=False, + help='the url of the nginx sfive exporert, default http://localhost/sfive') + parser.add_argument('--nameformat', '-F', dest='nameformat', required=True, + help='a regular expression (containing named matches for content, format, quality)') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + if not args['duration']: + args['duration'] = 15 + if not args['url']: + args['url'] = 'http://localhost/sfive' + fetcher = NGXLuaFetcher(args) + fetcher.run() diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua index ac46224..2ae7e69 100644 --- a/src/daq/nginx-lua/s5-nginx.lua +++ b/src/daq/nginx-lua/s5-nginx.lua @@ -92,7 +92,7 @@ function _SFIVE.log() json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",' json = json .. '"uri": "' .. ngx.var.uri .. '",' json = json .. '"status": ' .. status .. ',' - json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent + json = json .. '"bytes-sent": ' .. ngx.var.bytes_sent json = json .. '}' local ok, err, force = sfive_data:add(idx, json, config.log_exptime) |