#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014-2015 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS from twisted.internet import protocol, unix, reactor, task import socket from time import sleep import re import simplejson as json import datetime import dateutil.parser _MAX_PACKET_SIZE = 262144 # in bytes __version__ = "$Rev$" class SFivePort(unix.ConnectedDatagramPort): def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) def write(self, data): try: return self.socket.send(data) except socket.error, se: no = se.args[0] if no == EINTR: return self.write(data) elif no == EMSGSIZE: raise error.MessageLengthError, "message too long" elif no == ECONNREFUSED: self.protocol.connectionRefused() elif no == EAGAIN: # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... sleep(0.01) return self.write(data) else: raise class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): self._importer = importer def startProtocol(self): self._importer.socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._importer.socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer.socketError() class ClientList: def __init__(self): self._clients = { } def clear(self): self._clients = { } def getCnt(self): return len(self._clients) def getBytesSent(self): sum = 0 for val in self._clients.itervalues(): sum += val['bytes-sent'] return sum def values(self): return self._clients.itervalues() def update(self, logdata): key = '%(client)s/%(ua)s' % logdata if key in self._clients.keys(): self._clients[key]['bytes-sent'] += logdata['bytes-sent'] else: self._clients[key] = { 'ip': logdata['client'], 'user-agent': logdata['ua'], 'bytes-sent': logdata['bytes-sent'] } class AccessLog(): """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: accesslog file importer loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = None self._format = None self._quality = None self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] self._initStreamerIds(properties['streamer-ids']) self._proto = None self._conn = None self._connected = False self._url_re = re.compile(properties['nameformat']) def _initStreamerIds(self, streamer): print 'SFive: will look for the following streamer ids:' self._streamer = {} cs = {} fs = {} qs = {} for s in streamer: parts = s.split('/') if len(parts) != 3: raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) scs = parts[0].split(',') sfs = parts[1].split(',') sqs = parts[2].split(',') for c in scs: cs[c] = 1 if c not in self._streamer: self._streamer[c] = {} for f in sfs: fs[f] = 1 if f not in self._streamer[c]: self._streamer[c][f] = {} for q in sqs: qs[q] = 1 if q not in self._streamer[c][f]: self._streamer[c][f][q] = ClientList() print ' %s / %s / %s' % (c, f, q) if len(cs.keys()) == 1: self._content_id = cs.keys()[0] print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format if len(qs.keys()) == 1: self._quality = qs.keys()[0] print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality def run(self): if self._initLog(): reactor.callWhenRunning(self._initSocket) reactor.run() def _initLog(self): try: print 'SFive: will batch import from %s' % (self._logfile) self._fd = open(self._logfile, 'r') except IOError as e: print 'SFive: error opening logfile: %s' % (e.strerror) return False return True def _prepareLineRegex(self): parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', r'(?P[0-9]+)', r'(?P\S+)', r'"(?P.*)"', r'"(?P.*)"'] return re.compile(r'\s+'.join(parts)+r'\s*\Z') def _parseRequest(self, reqstr): req = { 'method': None, 'url': None, 'proto': None } try: parts = reqstr.split() req['method'] = parts[0] if parts[0] != '-' else None req['url'] = parts[1] req['proto'] = parts[2] except IndexError: pass return req def _parseDatetime(self, datetimestr): try: ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) ts = (ts - ts.utcoffset()).replace(tzinfo=None) return ts except ValueError as e: return None def _parseLine(self, regex, line): linedata = regex.match(line).groupdict() for part in ('client', 'ref', 'ua'): if linedata[part] == '-': linedata[part] = None linedata['status'] = int(linedata['status']) linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size']) linedata['req'] = self._parseRequest(linedata['req']) linedata['ts'] = self._parseDatetime(linedata['ts']) return linedata def _getTsFromLogDataAligned(self, ts): try: tsi = int(ts.strftime('%s')) tsi = tsi - (tsi % self._duration) return datetime.datetime.fromtimestamp(tsi) except ValueError: return ts def _sendLogData(self): linecnt = 0 updatecnt = 0 lastts = None nextts = None try: regex = self._prepareLineRegex() for line in self._fd: linecnt += 1 linedata = self._parseLine(regex, line) if not lastts: lastts = self._getTsFromLogDataAligned(linedata['ts']) nextts = lastts + datetime.timedelta(seconds=self._duration) self._clearClients() self._updateClients(linedata) else: while linedata['ts'] >= nextts: updatecnt += self._sendUpdates(lastts) lastts = nextts nextts = lastts + datetime.timedelta(seconds=self._duration) self._clearClients() self._updateClients(linedata) # send remaining data if nextts: updatecnt += self._sendUpdates(lastts) except re.error as e: print 'SFive: regex error: %s' % (e) return updatecnt, linecnt def _updateClients(self, linedata): if linedata['status'] != 200 and linedata['status'] != 206: return if linedata['req']['method'] != 'GET': return try: m = re.match(self._url_re, linedata['req']['url']) if not m: return streamerid = m.groupdict() except re.error as e: print 'SFive: regex error: %s' % (e) try: self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata) except KeyError as e: pass def _sendUpdates(self, ts): cnt = 0 for c in self._streamer.keys(): for f in self._streamer[c].keys(): for q in self._streamer[c][f].keys(): self._sendDatasetFull(ts, self._duration, c, f, q) cnt +=1 return cnt def _clearClients(self): for c in self._streamer.keys(): for f in self._streamer[c].keys(): for q in self._streamer[c][f].keys(): self._streamer[c][f][q].clear() def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._connected = False self._proto = SFiveProto(self) self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0666, None, reactor) self._conn.startListening() def socketReady(self): print 'SFive: connection to sfive hub established' self._connected = True # we are using datagram sockets for now -> must use stateless protocol # self._sendInit() updatecnt, linecnt = self._sendLogData() print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt) self._fd.close() if reactor.running: reactor.stop() def socketError(self): print 'SFive: connection to sfive hub lost' if self._conn and self._connected: self._conn.stopListening() self._connected = False if reactor.running: reactor.stop() def _sendDatasetFull(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] data = { "version": 1, "hostname": self._hostname, "tags": self._tags, "streamer-id": { "content-id": content_id, "format": format, "quality": quality }, "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } self._proto.sendDatagram('%s\n' % (json.dumps(data))) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, "streamer-id" : { }, "tags": self._tags } if self._content_id: initdata["streamer-id"]["content-id"] = self._content_id if self._format: initdata["streamer-id"]["format"] = self._format if self._quality: initdata["streamer-id"]["quality"] = self._quality if len(initdata["streamer-id"].keys()) == 0: del initdata["streamer-id"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) def _sendDataset(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, "streamer-id": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } if not self._content_id: data["streamer-id"]["content-id"] = content_id if not self._format: data["streamer-id"]["format"] = format if not self._quality: data["streamer-id"]["quality"] = quality if len(data["streamer-id"].keys()) == 0: del data["streamer-id"] self._proto.sendDatagram('%s\n' % (json.dumps(data))) if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') parser.add_argument('--socket', '-s', dest='socket', required=True, help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') parser.add_argument('--duration', '-d', dest='duration', required=False, help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', help='a streamer description like [,]/[,]/[,[,