#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS from twisted.internet import protocol, reactor, unix import socket from time import sleep import sys import re import simplejson as json import datetime import dateutil.parser _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" class SFivePort(unix.ConnectedDatagramPort): def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) def write(self, data): try: return self.socket.send(data) except socket.error, se: no = se.args[0] if no == EINTR: return self.write(data) elif no == EMSGSIZE: raise error.MessageLengthError, "message too long" elif no == ECONNREFUSED: self.protocol.connectionRefused() elif no == EAGAIN: # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... sleep(0.01) return self.write(data) else: raise class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): self._importer = importer def startProtocol(self): self._importer._socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._importer._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer._socketError() class ClientList: def __init__(self, file_re): self._clients = { } self._file_re = file_re def clear(self): self._clients = { } def getCnt(self): return len(self._clients) def getBytesSent(self): return sum(self._clients.itervalues()) def update(self, linedata): if linedata['status'] != 200 and linedata['status'] != 206: return if linedata['req']['cmd'] != 'GET': return try: if re.match(self._file_re, linedata['req']['url']): if linedata['client'] in self._clients.keys(): self._clients[linedata['client']] += linedata['size'] else: self._clients[linedata['client']] = linedata['size'] except re.error as e: print 'SFive: regex error: %s' % (e) class AccessLog(): """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: accesslog file importer loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties['content-id'] self._format = properties['format'] self._quality = properties['quality'] self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] self._nameformat = properties['nameformat'] self._proto = None self._conn = None def run(self): if self._initLog(): reactor.callWhenRunning(self._initSocket) reactor.run() def _initLog(self): try: if self._logfile: print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input') self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin else: print 'SFive: live mode enabled' regex = self._nameformat % { 'hostname': self._hostname, 'content-id': self._content_id, 'format': self._format, 'quality': self._quality } self._file_re = re.compile(regex) print "SFive: will be looking for files like '%s'" % regex except IOError as e: print 'SFive: error opening logfile: %s' % (e.strerror) return False except re.error as e: print 'SFive: regex error: %s' % (e) return False return True def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) self._conn.startListening() def _socketError(self): if self._conn: self._conn.stopListening() reactor.stop() def _socketReady(self): print 'SFive: connection to sfive hub established' self._sendInit() if hasattr(self, '_fd'): cnt = self._sendLogData(self._fd, None) print 'SFive: imported %d datasets' % (cnt) self._fd.close() else: print 'SFive: live mode is not yet implemented!' # TODO: wait self._duration seconds and call _sendData with # all lines received lines since last update reactor.stop() def _sendInit(self): initdata = { "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _prepareLineRegex(self): parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', r'(?P[0-9]+)', r'(?P\S+)', r'"(?P.*)"', r'"(?P.*)"'] return re.compile(r'\s+'.join(parts)+r'\s*\Z') def _parseRequest(self, reqstr): req = { 'cmd': None, 'url': None, 'proto': None } try: parts = reqstr.split() req['cmd'] = parts[0] if parts[0] != '-' else None req['url'] = parts[1] req['proto'] = parts[2] except IndexError: pass return req def _parseDatetime(self, datetimestr): try: ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) ts = (ts - ts.utcoffset()).replace(tzinfo=None) return ts except ValueError as e: return None def _parseLine(self, regex, line): linedata = regex.match(line).groupdict() for part in ("client", "ref", "ua"): if linedata[part] == "-": linedata[part] = None linedata["status"] = int(linedata["status"]) linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"]) linedata['req'] = self._parseRequest(linedata['req']) linedata['ts'] = self._parseDatetime(linedata['ts']) return linedata def _sendLogData(self, data, lastts): cnt = 0 nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) clients = ClientList(self._file_re) try: regex = self._prepareLineRegex() for line in data: linedata = self._parseLine(regex, line) if not lastts: lastts = linedata['ts'] nextts = lastts + datetime.timedelta(seconds=self._duration) clients.clear() clients.update(linedata) else: while linedata['ts'] > nextts: self._sendDataset(nextts, self._duration, clients) cnt += 1 lastts = nextts nextts = lastts + datetime.timedelta(seconds=self._duration) clients.clear() clients.update(linedata) # send remaining data if nextts: self._sendDataset(nextts, self._duration, clients) cnt += 1 except re.error as e: print 'SFive: regex error: %s' % (e) return cnt def _sendDataset(self, timestamp, duration, clients): data = { "start-time": timestamp.isoformat('T'), "duration-ms": duration * 1000, "data": { "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } self._proto.sendDatagram('%s\n' % (json.dumps(data))); if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') parser.add_argument('--socket', '-s', dest='socket', required=True, help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') parser.add_argument('--content-id', '-c', dest='content-id', required=True, help='the content-id (i.e. av)') parser.add_argument('--format', '-f', dest='format', required=True, help='the format (i.e. webm)') parser.add_argument('--quality', '-q', dest='quality', required=True, help='the quality (i.e. high)') parser.add_argument('--duration', '-d', dest='duration', required=False, help='time (in seconds) between updates; defaults to 5') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--logfile', '-l', dest='logfile', required=False, help='path to the logfile or \'-\' for standard input') parser.add_argument('--nameformat', '-F', dest='nameformat', required=True, help='the format for filenames which are part of this stream, this may include python string expressions') args = vars(parser.parse_args()) if not args['tags']: args['tags'] = [] if not args['duration']: args['duration'] = 5 importer = AccessLog(args) importer.run()