summaryrefslogtreecommitdiff
path: root/src/daq/accesslog/s5-accesslog
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-21 06:54:28 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-21 06:54:28 +0200
commitf992b2073a675dbe8ee230f2b9b71deef5159afc (patch)
tree03b538019189f9f5500fd815c4847e78990cc917 /src/daq/accesslog/s5-accesslog
parenthub: added clean target for Makefile (diff)
daq: removed .py extension for scripts
Diffstat (limited to 'src/daq/accesslog/s5-accesslog')
-rwxr-xr-xsrc/daq/accesslog/s5-accesslog414
1 files changed, 414 insertions, 0 deletions
diff --git a/src/daq/accesslog/s5-accesslog b/src/daq/accesslog/s5-accesslog
new file mode 100755
index 0000000..e3d846f
--- /dev/null
+++ b/src/daq/accesslog/s5-accesslog
@@ -0,0 +1,414 @@
+#!/usr/bin/python
+#
+# sfive
+#
+# sfive - spreadspace streaming statistics suite is a generic
+# statistic collection tool for streaming server infrastuctures.
+# The system collects and stores meta data like number of views
+# and throughput from a number of streaming servers and stores
+# it in a global data store.
+# The data acquisition is designed to be generic and extensible in
+# order to support different streaming software.
+# sfive also contains tools and applications to filter and visualize
+# live and recorded data.
+#
+#
+# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+# Markus Grueneis <gimpf@gimpf.org>
+#
+# This file is part of sfive.
+#
+# sfive is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3
+# as published by the Free Software Foundation.
+#
+# sfive is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with sfive. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
+from twisted.internet import protocol, unix, reactor, task
+import socket
+from time import sleep
+
+import re
+import simplejson as json
+import datetime
+import dateutil.parser
+
+_MAX_PACKET_SIZE = 65536 # in bytes
+
+__version__ = "$Rev$"
+
+class SFivePort(unix.ConnectedDatagramPort):
+
+ def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
+ unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
+
+ def write(self, data):
+ try:
+ return self.socket.send(data)
+ except socket.error, se:
+ no = se.args[0]
+ if no == EINTR:
+ return self.write(data)
+ elif no == EMSGSIZE:
+ raise error.MessageLengthError, "message too long"
+ elif no == ECONNREFUSED:
+ self.protocol.connectionRefused()
+ elif no == EAGAIN:
+ # the send buffer seems to be full - let's wait a little while...
+ # this is not really a good solution but better than the aproach
+ # of twisted which just drops the datagram...
+ sleep(0.01)
+ return self.write(data)
+ else:
+ raise
+
+class SFiveProto(protocol.ConnectedDatagramProtocol):
+
+ def __init__(self, importer):
+ self._importer = importer
+
+ def startProtocol(self):
+ self._importer.socketReady()
+
+ def datagramReceived(self, data):
+ print 'SFive: received datagram: "%s" (will get ignored)' % (data)
+
+ def connectionFailed(self, failure):
+ print 'SFive: connection failed: %s' % (failure.getErrorMessage())
+ self._importer.socketError()
+
+ def sendDatagram(self, data):
+ try:
+ return self.transport.write(data)
+ except socket.error as err:
+ print 'SFive: sending datagram failed: %s' % (err)
+ self._importer.socketError()
+
+
+class ClientList:
+
+ def __init__(self):
+ self._clients = { }
+
+ def clear(self):
+ self._clients = { }
+
+ def getCnt(self):
+ return len(self._clients)
+
+ def getBytesSent(self):
+ sum = 0
+ for val in self._clients.itervalues():
+ sum += val['bytes-sent']
+
+ return sum
+
+ def values(self):
+ return self._clients.itervalues()
+
+ def update(self, logdata):
+ key = '%(client)s/%(ua)s' % logdata
+ if key in self._clients.keys():
+ self._clients[key]['bytes-sent'] += logdata['bytes-sent']
+ else:
+ self._clients[key] = { 'ip': logdata['client'],
+ 'user-agent': logdata['ua'],
+ 'bytes-sent': logdata['bytes-sent'] }
+
+
+class AccessLog():
+ """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite"""
+
+ def __init__(self, properties):
+ print 'SFive: accesslog file importer loaded'
+
+ self._socket = properties['socket']
+ self._hostname = properties['hostname']
+ self._content_id = None
+ self._format = None
+ self._quality = None
+ self._duration = properties['duration']
+ self._tags = properties['tags']
+ self._logfile = properties['logfile']
+ self._initStreamerIds(properties['streamer-ids'])
+
+ self._proto = None
+ self._conn = None
+ self._connected = False
+ self._url_re = re.compile(properties['nameformat'])
+
+
+ def _initStreamerIds(self, streamer):
+ print 'SFive: will look for the following streamer ids:'
+ self._streamer = {}
+ cs = {}
+ fs = {}
+ qs = {}
+ for s in streamer:
+ parts = s.split('/')
+ if len(parts) != 3:
+ raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s)
+ scs = parts[0].split(',')
+ sfs = parts[1].split(',')
+ sqs = parts[2].split(',')
+ for c in scs:
+ cs[c] = 1
+ if c not in self._streamer:
+ self._streamer[c] = {}
+ for f in sfs:
+ fs[f] = 1
+ if f not in self._streamer[c]:
+ self._streamer[c][f] = {}
+ for q in sqs:
+ qs[q] = 1
+ if q not in self._streamer[c][f]:
+ self._streamer[c][f][q] = ClientList()
+ print ' %s / %s / %s' % (c, f, q)
+
+ if len(cs.keys()) == 1:
+ self._content_id = cs.keys()[0]
+ print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+
+ if len(fs.keys()) == 1:
+ self._format = fs.keys()[0]
+ print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format
+
+ if len(qs.keys()) == 1:
+ self._quality = qs.keys()[0]
+ print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality
+
+
+
+ def run(self):
+ if self._initLog():
+ reactor.callWhenRunning(self._initSocket)
+ reactor.run()
+
+ def _initLog(self):
+ try:
+ print 'SFive: will batch import form %s' % (self._logfile)
+ self._fd = open(self._logfile, 'r')
+
+ except IOError as e:
+ print 'SFive: error opening logfile: %s' % (e.strerror)
+ return False
+
+ return True
+
+
+ def _prepareLineRegex(self):
+ parts = [ r'(?P<client>\S+)', r'\S+', r'\S+', r'\[(?P<ts>.+)\]', r'"(?P<req>.+)"',
+ r'(?P<status>[0-9]+)', r'(?P<size>\S+)', r'"(?P<ref>.*)"', r'"(?P<ua>.*)"']
+ return re.compile(r'\s+'.join(parts)+r'\s*\Z')
+
+ def _parseRequest(self, reqstr):
+ req = { 'method': None, 'url': None, 'proto': None }
+ try:
+ parts = reqstr.split()
+ req['method'] = parts[0] if parts[0] != '-' else None
+ req['url'] = parts[1]
+ req['proto'] = parts[2]
+ except IndexError:
+ pass
+
+ return req
+
+ def _parseDatetime(self, datetimestr):
+ try:
+ ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True)
+ ts = (ts - ts.utcoffset()).replace(tzinfo=None)
+ return ts
+ except ValueError as e:
+ return None
+
+ def _parseLine(self, regex, line):
+ linedata = regex.match(line).groupdict()
+
+ for part in ('client', 'ref', 'ua'):
+ if linedata[part] == '-':
+ linedata[part] = None
+
+ linedata['status'] = int(linedata['status'])
+ linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size'])
+ linedata['req'] = self._parseRequest(linedata['req'])
+ linedata['ts'] = self._parseDatetime(linedata['ts'])
+ return linedata
+
+ def _getTsFromLogDataAligned(self, ts):
+ try:
+ tsi = int(ts.strftime('%s'))
+ tsi = tsi - (tsi % self._duration)
+ return datetime.datetime.fromtimestamp(tsi)
+ except ValueError:
+ return ts
+
+ def _sendLogData(self):
+ linecnt = 0
+ updatecnt = 0
+ lastts = None
+ nextts = None
+ try:
+ regex = self._prepareLineRegex()
+ for line in self._fd:
+ linecnt += 1
+ linedata = self._parseLine(regex, line)
+ if not lastts:
+ lastts = self._getTsFromLogDataAligned(linedata['ts'])
+ nextts = lastts + datetime.timedelta(seconds=self._duration)
+ self._clearClients()
+ self._updateClients(linedata)
+ else:
+ while linedata['ts'] >= nextts:
+ updatecnt += self._sendUpdates(lastts)
+ lastts = nextts
+ nextts = lastts + datetime.timedelta(seconds=self._duration)
+ self._clearClients()
+ self._updateClients(linedata)
+
+ # send remaining data
+ if nextts:
+ updatecnt += self._sendUpdates(lastts)
+
+ except re.error as e:
+ print 'SFive: regex error: %s' % (e)
+
+ return updatecnt, linecnt
+
+ def _updateClients(self, linedata):
+ if linedata['status'] != 200 and linedata['status'] != 206:
+ return
+ if linedata['req']['method'] != 'GET':
+ return
+
+ try:
+ m = re.match(self._url_re, linedata['req']['url'])
+ if not m:
+ return
+ streamerid = m.groupdict()
+ except re.error as e:
+ print 'SFive: regex error: %s' % (e)
+
+ try:
+ self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata)
+ except KeyError as e:
+ pass
+
+ def _sendUpdates(self, ts):
+ cnt = 0
+ for c in self._streamer.keys():
+ for f in self._streamer[c].keys():
+ for q in self._streamer[c][f].keys():
+ self._sendDataset(ts, self._duration, c, f, q)
+ cnt +=1
+ return cnt
+
+ def _clearClients(self):
+ for c in self._streamer.keys():
+ for f in self._streamer[c].keys():
+ for q in self._streamer[c][f].keys():
+ self._streamer[c][f][q].clear()
+
+
+ def _initSocket(self):
+ print 'SFive: trying to connect to %s...' % (self._socket)
+ self._connected = False
+ self._proto = SFiveProto(self)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn.startListening()
+
+ def socketReady(self):
+ print 'SFive: connection to sfive hub established'
+ self._connected = True
+ self._sendInit()
+ updatecnt, linecnt = self._sendLogData()
+ print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt)
+ self._fd.close()
+ if reactor.running:
+ reactor.stop()
+
+ def socketError(self):
+ print 'SFive: connection to sfive hub lost'
+ if self._conn and self._connected:
+ self._conn.stopListening()
+ self._connected = False
+ if reactor.running:
+ reactor.stop()
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id" : { },
+ "tags": self._tags }
+ if self._content_id:
+ initdata["streamer-id"]["content-id"] = self._content_id
+ if self._format:
+ initdata["streamer-id"]["format"] = self._format
+ if self._quality:
+ initdata["streamer-id"]["quality"] = self._quality
+ if len(initdata["streamer-id"].keys()) == 0:
+ del initdata["streamer-id"]
+
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
+ def _sendDataset(self, timestamp, duration, content_id, format, quality):
+ clients = self._streamer[content_id][format][quality]
+ data = { "start-time": timestamp.isoformat('T') + 'Z',
+ "duration-ms": duration * 1000,
+ "streamer-id": { },
+ "data": {
+ "clients": list(clients.values()),
+ "client-count": clients.getCnt(),
+ "bytes-sent": clients.getBytesSent()
+ }
+ }
+ if not self._content_id:
+ data["streamer-id"]["content-id"] = content_id
+ if not self._format:
+ data["streamer-id"]["format"] = format
+ if not self._quality:
+ data["streamer-id"]["quality"] = quality
+ if len(data["streamer-id"].keys()) == 0:
+ del data["streamer-id"]
+
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer')
+ parser.add_argument('--socket', '-s', dest='socket', required=True,
+ help='the path to the data socket of the local SFive hub')
+ parser.add_argument('--hostname', '-n', dest='hostname', required=True,
+ help='the hostname of the machine')
+ parser.add_argument('--duration', '-d', dest='duration', required=False,
+ help='time (in seconds) between updates; defaults to 15')
+ parser.add_argument('--tag', '-t', dest='tags', action='append',
+ help='tag to be added to the statistic data, can be invoked several times')
+ parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
+ help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--logfile', '-l', dest='logfile', required=True,
+ help='path to the logfile or \'-\' for standard input')
+ parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
+ help='a regular expression (must contain named matches for content, format, quality)')
+ args = vars(parser.parse_args())
+ if not args['tags']:
+ args['tags'] = []
+ if not args['duration']:
+ args['duration'] = 15
+ else:
+ args['duration'] = int(args['duration'])
+ if not args['nameformat']:
+ args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
+ if not args['streamer-ids']:
+ print 'SFive: you have to specify at least one streamer-id!'
+ exit(-1)
+ importer = AccessLog(args)
+ importer.run()