diff options
Diffstat (limited to 'src/daq/accesslog/s5-accesslog')
-rwxr-xr-x | src/daq/accesslog/s5-accesslog | 414 |
1 files changed, 414 insertions, 0 deletions
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog new file mode 100755 index 0000000..e3d846f --- /dev/null +++ b/src/daq/accesslog/s5-accesslog @@ -0,0 +1,414 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> +# Markus Grueneis <gimpf@gimpf.org> +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see <http://www.gnu.org/licenses/>. +# + +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, unix, reactor, task +import socket +from time import sleep + +import re +import simplejson as json +import datetime +import dateutil.parser + +_MAX_PACKET_SIZE = 65536 # in bytes + +__version__ = "$Rev$" + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer.socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer.socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer.socketError() + + +class ClientList: + + def __init__(self): + self._clients = { } + + def clear(self): + self._clients = { } + + def getCnt(self): + return len(self._clients) + + def getBytesSent(self): + sum = 0 + for val in self._clients.itervalues(): + sum += val['bytes-sent'] + + return sum + + def values(self): + return self._clients.itervalues() + + def update(self, logdata): + key = '%(client)s/%(ua)s' % logdata + if key in self._clients.keys(): + self._clients[key]['bytes-sent'] += logdata['bytes-sent'] + else: + self._clients[key] = { 'ip': logdata['client'], + 'user-agent': logdata['ua'], + 'bytes-sent': logdata['bytes-sent'] } + + +class AccessLog(): + """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: accesslog file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = None + self._format = None + self._quality = None + self._duration = properties['duration'] + self._tags = properties['tags'] + self._logfile = properties['logfile'] + self._initStreamerIds(properties['streamer-ids']) + + self._proto = None + self._conn = None + self._connected = False + self._url_re = re.compile(properties['nameformat']) + + + def _initStreamerIds(self, streamer): + print 'SFive: will look for the following streamer ids:' + self._streamer = {} + cs = {} + fs = {} + qs = {} + for s in streamer: + parts = s.split('/') + if len(parts) != 3: + raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) + scs = parts[0].split(',') + sfs = parts[1].split(',') + sqs = parts[2].split(',') + for c in scs: + cs[c] = 1 + if c not in self._streamer: + self._streamer[c] = {} + for f in sfs: + fs[f] = 1 + if f not in self._streamer[c]: + self._streamer[c][f] = {} + for q in sqs: + qs[q] = 1 + if q not in self._streamer[c][f]: + self._streamer[c][f][q] = ClientList() + print ' %s / %s / %s' % (c, f, q) + + if len(cs.keys()) == 1: + self._content_id = cs.keys()[0] + print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + + if len(fs.keys()) == 1: + self._format = fs.keys()[0] + print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format + + if len(qs.keys()) == 1: + self._quality = qs.keys()[0] + print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality + + + + def run(self): + if self._initLog(): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initLog(self): + try: + print 'SFive: will batch import form %s' % (self._logfile) + self._fd = open(self._logfile, 'r') + + except IOError as e: + print 'SFive: error opening logfile: %s' % (e.strerror) + return False + + return True + + + def _prepareLineRegex(self): + parts = [ r'(?P<client>\S+)', r'\S+', r'\S+', r'\[(?P<ts>.+)\]', r'"(?P<req>.+)"', + r'(?P<status>[0-9]+)', r'(?P<size>\S+)', r'"(?P<ref>.*)"', r'"(?P<ua>.*)"'] + return re.compile(r'\s+'.join(parts)+r'\s*\Z') + + def _parseRequest(self, reqstr): + req = { 'method': None, 'url': None, 'proto': None } + try: + parts = reqstr.split() + req['method'] = parts[0] if parts[0] != '-' else None + req['url'] = parts[1] + req['proto'] = parts[2] + except IndexError: + pass + + return req + + def _parseDatetime(self, datetimestr): + try: + ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) + ts = (ts - ts.utcoffset()).replace(tzinfo=None) + return ts + except ValueError as e: + return None + + def _parseLine(self, regex, line): + linedata = regex.match(line).groupdict() + + for part in ('client', 'ref', 'ua'): + if linedata[part] == '-': + linedata[part] = None + + linedata['status'] = int(linedata['status']) + linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size']) + linedata['req'] = self._parseRequest(linedata['req']) + linedata['ts'] = self._parseDatetime(linedata['ts']) + return linedata + + def _getTsFromLogDataAligned(self, ts): + try: + tsi = int(ts.strftime('%s')) + tsi = tsi - (tsi % self._duration) + return datetime.datetime.fromtimestamp(tsi) + except ValueError: + return ts + + def _sendLogData(self): + linecnt = 0 + updatecnt = 0 + lastts = None + nextts = None + try: + regex = self._prepareLineRegex() + for line in self._fd: + linecnt += 1 + linedata = self._parseLine(regex, line) + if not lastts: + lastts = self._getTsFromLogDataAligned(linedata['ts']) + nextts = lastts + datetime.timedelta(seconds=self._duration) + self._clearClients() + self._updateClients(linedata) + else: + while linedata['ts'] >= nextts: + updatecnt += self._sendUpdates(lastts) + lastts = nextts + nextts = lastts + datetime.timedelta(seconds=self._duration) + self._clearClients() + self._updateClients(linedata) + + # send remaining data + if nextts: + updatecnt += self._sendUpdates(lastts) + + except re.error as e: + print 'SFive: regex error: %s' % (e) + + return updatecnt, linecnt + + def _updateClients(self, linedata): + if linedata['status'] != 200 and linedata['status'] != 206: + return + if linedata['req']['method'] != 'GET': + return + + try: + m = re.match(self._url_re, linedata['req']['url']) + if not m: + return + streamerid = m.groupdict() + except re.error as e: + print 'SFive: regex error: %s' % (e) + + try: + self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata) + except KeyError as e: + pass + + def _sendUpdates(self, ts): + cnt = 0 + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + self._sendDataset(ts, self._duration, c, f, q) + cnt +=1 + return cnt + + def _clearClients(self): + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + self._streamer[c][f][q].clear() + + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._connected = False + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def socketReady(self): + print 'SFive: connection to sfive hub established' + self._connected = True + self._sendInit() + updatecnt, linecnt = self._sendLogData() + print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt) + self._fd.close() + if reactor.running: + reactor.stop() + + def socketError(self): + print 'SFive: connection to sfive hub lost' + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + if reactor.running: + reactor.stop() + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id" : { }, + "tags": self._tags } + if self._content_id: + initdata["streamer-id"]["content-id"] = self._content_id + if self._format: + initdata["streamer-id"]["format"] = self._format + if self._quality: + initdata["streamer-id"]["quality"] = self._quality + if len(initdata["streamer-id"].keys()) == 0: + del initdata["streamer-id"] + + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendDataset(self, timestamp, duration, content_id, format, quality): + clients = self._streamer[content_id][format][quality] + data = { "start-time": timestamp.isoformat('T') + 'Z', + "duration-ms": duration * 1000, + "streamer-id": { }, + "data": { + "clients": list(clients.values()), + "client-count": clients.getCnt(), + "bytes-sent": clients.getBytesSent() + } + } + if not self._content_id: + data["streamer-id"]["content-id"] = content_id + if not self._format: + data["streamer-id"]["format"] = format + if not self._quality: + data["streamer-id"]["quality"] = quality + if len(data["streamer-id"].keys()) == 0: + del data["streamer-id"] + + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--duration', '-d', dest='duration', required=False, + help='time (in seconds) between updates; defaults to 15') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', + help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times') + parser.add_argument('--logfile', '-l', dest='logfile', required=True, + help='path to the logfile or \'-\' for standard input') + parser.add_argument('--nameformat', '-F', dest='nameformat', required=False, + help='a regular expression (must contain named matches for content, format, quality)') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + if not args['duration']: + args['duration'] = 15 + else: + args['duration'] = int(args['duration']) + if not args['nameformat']: + args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*' + if not args['streamer-ids']: + print 'SFive: you have to specify at least one streamer-id!' + exit(-1) + importer = AccessLog(args) + importer.run() |