#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS from twisted.internet import protocol, unix, reactor, task from twisted.internet.protocol import Protocol from twisted.internet.defer import Deferred from twisted.web.client import Agent from twisted.web.http_headers import Headers import socket from time import sleep import re import simplejson as json import datetime import time import dateutil.parser _MAX_PACKET_SIZE = 65536 # in bytes __version__ = "$Rev$" class SFiveNGXluaProto(Protocol): def __init__(self, finished, fetcher): self.data = '' self.finished = finished self._fetcher = fetcher print 'SFive: started receiving log data' def dataReceived(self, bytes): self.data += bytes lines = self.data.splitlines(True) if len(lines) > 0: if '\n' not in lines[-1] and '\r' not in lines[-1]: self.data = lines[-1] del lines[-1] else: self.data = '' if len(lines) > 0: for line in lines: self._fetcher.updateData(json.loads(line)) def connectionLost(self, reason): print 'SFive: finished receiving log data:', reason.getErrorMessage() self._fetcher.close() class SFivePort(unix.ConnectedDatagramPort): def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) def write(self, data): try: return self.socket.send(data) except socket.error, se: no = se.args[0] if no == EINTR: return self.write(data) elif no == EMSGSIZE: raise error.MessageLengthError, "message too long" elif no == ECONNREFUSED: self.protocol.connectionRefused() elif no == EAGAIN: # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... sleep(0.01) return self.write(data) else: raise class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, fetcher): self._fetcher = fetcher def startProtocol(self): self._fetcher.socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._fetcher.socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._fetcher.socketError() class ClientList: def __init__(self): self._clients = { } def clear(self): self._clients = { } def getCnt(self): return len(self._clients) def getBytesSent(self): sum = 0 for val in self._clients.itervalues(): sum += val['bytes-sent'] return sum def values(self): return self._clients.itervalues() def update(self, logdata): key = '%(client)s/%(ua)s' % logdata if key in self._clients.keys(): self._clients[key]['bytes-sent'] += logdata['bytes-sent'] else: self._clients[key] = { 'ip': logdata['client'], 'user-agent': logdata['ua'], 'bytes-sent': logdata['bytes-sent'] } class NGXLuaFetcher(): """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: nginx-lua fetcher loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = None self._format = None self._quality = None self._duration = properties['duration'] self._tags = properties['tags'] self._url = properties['url'] self._initStreamerIds(properties['streamer-ids']) self._proto = None self._conn = None self._connected = False self._url_re = re.compile(properties['nameformat']) self._looper = None self._start_time = None def _initStreamerIds(self, streamer): print 'SFive: will look for the following streamer ids:' self._streamer = {} cs = {} fs = {} qs = {} for s in streamer: parts = s.split('/') if len(parts) != 3: raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) scs = parts[0].split(',') sfs = parts[1].split(',') sqs = parts[2].split(',') for c in scs: cs[c] = 1 if c not in self._streamer: self._streamer[c] = {} for f in sfs: fs[f] = 1 if f not in self._streamer[c]: self._streamer[c][f] = {} for q in sqs: qs[q] = 1 if q not in self._streamer[c][f]: self._streamer[c][f][q] = ClientList() print ' %s / %s / %s' % (c, f, q) if len(cs.keys()) == 1: self._content_id = cs.keys()[0] print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id if len(fs.keys()) == 1: self._format = fs.keys()[0] print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format if len(qs.keys()) == 1: self._quality = qs.keys()[0] print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality def run(self): self._initFetch() # try to be aligned with current time # this will eventually get out of sync but for now this is good enough offset = self._duration - (time.time() % self._duration) print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset) self._looper = task.LoopingCall(self._sendUpdates) reactor.callLater(offset, self._startLooper) reactor.callWhenRunning(self._initSocket) reactor.run() def _startLooper(self): self._start_time = datetime.datetime.utcnow().replace(microsecond=0) self._looper.start(self._duration, False) print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T')) def _sendUpdates(self): for c in self._streamer.keys(): for f in self._streamer[c].keys(): for q in self._streamer[c][f].keys(): if self._connected: self._sendDataset(self._start_time, self._duration, c, f, q) self._streamer[c][f][q].clear() self._start_time = datetime.datetime.utcnow().replace(microsecond=0) def _initFetch(self): print "SFive: trying to fetch from '%s'" % self._url agent = Agent(reactor) d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) d.addCallback(self._httpResponse) d.addBoth(self.close) def _httpResponse(self, response): print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase) finished = Deferred() response.deliverBody(SFiveNGXluaProto(finished, self)) return finished def updateData(self, data): try: m = re.match(self._url_re, data['url']) if not m: return streamerid = m.groupdict() except re.error as e: print 'SFive: regex error: %s' % (e) try: if data['status'] == 200 or data['status'] == 206: ts = dateutil.parser.parse(data['time']) ts = (ts - ts.utcoffset()).replace(tzinfo=None) if self._start_time and ts >= self._start_time: self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(data) except KeyError as e: print 'SFive: %s - ignoring dataset' % (e) except ValueError as e: print 'SFive: %s - ignoring dataset' % (e) def close(self): if self._conn and self._connected: self._conn.stopListening() self._connected = False if reactor.running: reactor.stop() def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._connected = False self._proto = SFiveProto(self) self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) self._conn.startListening() def socketReady(self): print 'SFive: connection to sfive hub established' self._connected = True self._sendInit() # TODO: start callback every self._duration seconds def socketError(self): print 'SFive: connection to sfive hub lost - trying reconnect' if self._conn and self._connected: self._conn.stopListening() self._connected = False reactor.callLater(3, self._initSocket) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, "streamer-id" : { }, "tags": self._tags } if self._content_id: initdata["streamer-id"]["content-id"] = self._content_id if self._format: initdata["streamer-id"]["format"] = self._format if self._quality: initdata["streamer-id"]["quality"] = self._quality if len(initdata["streamer-id"].keys()) == 0: del initdata["streamer-id"] self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _sendDataset(self, timestamp, duration, content_id, format, quality): clients = self._streamer[content_id][format][quality] data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, "streamer-id": { }, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } if not self._content_id: data["streamer-id"]["content-id"] = content_id if not self._format: data["streamer-id"]["format"] = format if not self._quality: data["streamer-id"]["quality"] = quality if len(data["streamer-id"].keys()) == 0: del data["streamer-id"] self._proto.sendDatagram('%s\n' % (json.dumps(data))); if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher') parser.add_argument('--socket', '-s', dest='socket', required=True, help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') parser.add_argument('--duration', '-d', dest='duration', required=False, help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', help='a streamer description like [,]/[,]/[,[,