From 26fc9a24efb968da3399439db43b360b674004a9 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Mon, 20 Oct 2014 06:05:56 +0200 Subject: dag: accesslog importer can now import multiple streamerids in one pass major refactoring done --- src/daq/accesslog/s5-accesslog.py | 286 ++++++++++++++++++++++++-------------- 1 file changed, 184 insertions(+), 102 deletions(-) (limited to 'src') diff --git a/src/daq/accesslog/s5-accesslog.py b/src/daq/accesslog/s5-accesslog.py index e559041..44df905 100755 --- a/src/daq/accesslog/s5-accesslog.py +++ b/src/daq/accesslog/s5-accesslog.py @@ -32,17 +32,16 @@ # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, reactor, unix +from twisted.internet import protocol, unix, reactor, task import socket from time import sleep -import sys import re import simplejson as json import datetime import dateutil.parser -_MAX_PACKET_SIZE = 8192 # in bytes +_MAX_PACKET_SIZE = 65536 # in bytes __version__ = "$Rev$" @@ -77,28 +76,27 @@ class SFiveProto(protocol.ConnectedDatagramProtocol): self._importer = importer def startProtocol(self): - self._importer._socketReady() + self._importer.socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() + self._importer.socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() + self._importer.socketError() class ClientList: - def __init__(self, file_re): + def __init__(self): self._clients = { } - self._file_re = file_re def clear(self): self._clients = { } @@ -116,21 +114,14 @@ class ClientList: def values(self): return self._clients.itervalues() - def update(self, linedata): - if linedata['status'] != 200 and linedata['status'] != 206: - return - if linedata['req']['method'] != 'GET': - return - - try: - if re.match(self._file_re, linedata['req']['url']): - if linedata['client'] in self._clients.keys(): - self._clients[linedata['client']]['bytes-sent'] += linedata['size'] - else: - self._clients[linedata['client']] = { 'ip': linedata['client'], - 'bytes-sent': linedata['size'] } - except re.error as e: - print 'SFive: regex error: %s' % (e) + def update(self, logdata): + key = '%(client)s/%(ua)s' % logdata + if key in self._clients.keys(): + self._clients[key]['bytes-sent'] += logdata['bytes-sent'] + else: + self._clients[key] = { 'ip': logdata['client'], + 'user-agent': logdata['ua'], + 'bytes-sent': logdata['bytes-sent'] } class AccessLog(): @@ -141,16 +132,60 @@ class AccessLog(): self._socket = properties['socket'] self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] + self._content_id = None + self._format = None + self._quality = None self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] - self._nameformat = properties['nameformat'] + self._initStreamerIds(properties['streamer-ids']) self._proto = None self._conn = None + self._connected = False + self._url_re = re.compile(properties['nameformat']) + + + def _initStreamerIds(self, streamer): + print 'SFive: will look for the following streamer ids:' + self._streamer = {} + cs = {} + fs = {} + qs = {} + for s in streamer: + parts = s.split('/') + if len(parts) != 3: + raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) + scs = parts[0].split(',') + sfs = parts[1].split(',') + sqs = parts[2].split(',') + for c in scs: + cs[c] = 1 + if c not in self._streamer: + self._streamer[c] = {} + for f in sfs: + fs[f] = 1 + if f not in self._streamer[c]: + self._streamer[c][f] = {} + for q in sqs: + qs[q] = 1 + if q not in self._streamer[c][f]: + self._streamer[c][f][q] = ClientList() + print ' %s / %s / %s' % (c, f, q) + + if len(cs.keys()) == 1: + self._content_id = cs.keys()[0] + print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id + + if len(fs.keys()) == 1: + self._format = fs.keys()[0] + print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format + + if len(qs.keys()) == 1: + self._quality = qs.keys()[0] + print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality + + def run(self): if self._initLog(): @@ -159,57 +194,15 @@ class AccessLog(): def _initLog(self): try: - if self._logfile: - print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input') - self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin - else: - print 'SFive: live mode enabled' - - regex = self._nameformat % { 'hostname': self._hostname, - 'content-id': self._content_id, - 'format': self._format, - 'quality': self._quality } - self._file_re = re.compile(regex) - print "SFive: will be looking for files like '%s'" % regex + print 'SFive: will batch import form %s' % (self._logfile) + self._fd = open(self._logfile, 'r') + except IOError as e: print 'SFive: error opening logfile: %s' % (e.strerror) return False - except re.error as e: - print 'SFive: regex error: %s' % (e) - return False return True - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def _socketError(self): - if self._conn: - self._conn.stopListening() - reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._sendInit() - if hasattr(self, '_fd'): - cnt = self._sendLogData(self._fd, None) - print 'SFive: imported %d datasets' % (cnt) - self._fd.close() - else: - print 'SFive: live mode is not yet implemented!' - # TODO: wait self._duration seconds and call _sendData with - # all lines received lines since last update - - reactor.stop() - - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _prepareLineRegex(self): parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', @@ -239,58 +232,144 @@ class AccessLog(): def _parseLine(self, regex, line): linedata = regex.match(line).groupdict() - for part in ("client", "ref", "ua"): - if linedata[part] == "-": + for part in ('client', 'ref', 'ua'): + if linedata[part] == '-': linedata[part] = None - linedata["status"] = int(linedata["status"]) - linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"]) + linedata['status'] = int(linedata['status']) + linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size']) linedata['req'] = self._parseRequest(linedata['req']) linedata['ts'] = self._parseDatetime(linedata['ts']) return linedata - def _sendLogData(self, data, lastts): - cnt = 0 - nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) - clients = ClientList(self._file_re) + def _sendLogData(self): + linecnt = 0 + updatecnt = 0 + lastts = None + nextts = None try: regex = self._prepareLineRegex() - for line in data: + for line in self._fd: + linecnt += 1 linedata = self._parseLine(regex, line) if not lastts: lastts = linedata['ts'] + # TODO: make this aligned to time! nextts = lastts + datetime.timedelta(seconds=self._duration) - clients.clear() - clients.update(linedata) + self._clearClients() + self._updateClients(linedata) else: while linedata['ts'] >= nextts: - self._sendDataset(lastts, self._duration, clients) - cnt += 1 + updatecnt += self._sendUpdates(lastts) lastts = nextts nextts = lastts + datetime.timedelta(seconds=self._duration) - clients.clear() - - clients.update(linedata) + self._clearClients() + self._updateClients(linedata) # send remaining data if nextts: - self._sendDataset(lastts, self._duration, clients) - cnt += 1 + updatecnt += self._sendUpdates(lastts) except re.error as e: print 'SFive: regex error: %s' % (e) + return updatecnt, linecnt + + def _updateClients(self, linedata): + if linedata['status'] != 200 and linedata['status'] != 206: + return + if linedata['req']['method'] != 'GET': + return + + try: + m = re.match(self._url_re, linedata['req']['url']) + if not m: + return + streamerid = m.groupdict() + except re.error as e: + print 'SFive: regex error: %s' % (e) + + try: + self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata) + except KeyError as e: + pass + + def _sendUpdates(self, ts): + cnt = 0 + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + self._sendDataset(ts, self._duration, c, f, q) + cnt +=1 return cnt - def _sendDataset(self, timestamp, duration, clients): + def _clearClients(self): + for c in self._streamer.keys(): + for f in self._streamer[c].keys(): + for q in self._streamer[c][f].keys(): + self._streamer[c][f][q].clear() + + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._connected = False + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def socketReady(self): + print 'SFive: connection to sfive hub established' + self._connected = True + self._sendInit() + updatecnt, linecnt = self._sendLogData() + print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt) + self._fd.close() + if reactor.running: + reactor.stop() + + def socketError(self): + print 'SFive: connection to sfive hub lost' + if self._conn and self._connected: + self._conn.stopListening() + self._connected = False + if reactor.running: + reactor.stop() + + def _sendInit(self): + initdata = { "version": 1, "hostname": self._hostname, + "streamer-id" : { }, + "tags": self._tags } + if self._content_id: + initdata["streamer-id"]["content-id"] = self._content_id + if self._format: + initdata["streamer-id"]["format"] = self._format + if self._quality: + initdata["streamer-id"]["quality"] = self._quality + if len(initdata["streamer-id"].keys()) == 0: + del initdata["streamer-id"] + + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendDataset(self, timestamp, duration, content_id, format, quality): + clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, + "streamer-id": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } + if not self._content_id: + data["streamer-id"]["content-id"] = content_id + if not self._format: + data["streamer-id"]["format"] = format + if not self._quality: + data["streamer-id"]["quality"] = quality + if len(data["streamer-id"].keys()) == 0: + del data["streamer-id"] + self._proto.sendDatagram('%s\n' % (json.dumps(data))); @@ -302,24 +381,27 @@ if __name__ == '__main__': help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') - parser.add_argument('--format', '-f', dest='format', required=True, - help='the format (i.e. webm)') - parser.add_argument('--quality', '-q', dest='quality', required=True, - help='the quality (i.e. high)') parser.add_argument('--duration', '-d', dest='duration', required=False, - help='time (in seconds) between updates; defaults to 5') + help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--logfile', '-l', dest='logfile', required=False, + parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', + help='a streamer description like [,]/[,]/[,[,