#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS from twisted.internet import protocol, reactor, unix from twisted.internet.protocol import Protocol from twisted.internet.defer import Deferred from twisted.web.client import Agent from twisted.web.http_headers import Headers import socket from time import sleep import re import simplejson as json import datetime _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" class SFiveNGXluaProto(Protocol): def __init__(self, finished, importer): self.data = '' self.finished = finished self._importer = importer print 'SFive: started receiving log data' def dataReceived(self, bytes): self.data += bytes lines = self.data.splitlines(True) if len(lines) > 0: if '\n' not in lines[-1] and '\r' not in lines[-1]: self.data = lines[-1] del lines[-1] else: self.data = '' if len(lines) > 0: for line in lines: print "SFive: '%s'" % line.strip() self._importer.updateData(json.loads(line)) def connectionLost(self, reason): print 'SFive: finished receiving log data:', reason.getErrorMessage() self._importer._socketError() class SFivePort(unix.ConnectedDatagramPort): def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) def write(self, data): try: return self.socket.send(data) except socket.error, se: no = se.args[0] if no == EINTR: return self.write(data) elif no == EMSGSIZE: raise error.MessageLengthError, "message too long" elif no == ECONNREFUSED: self.protocol.connectionRefused() elif no == EAGAIN: # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... sleep(0.01) return self.write(data) else: raise class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): self._importer = importer def startProtocol(self): self._importer._socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._importer._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer._socketError() class ClientList: def __init__(self, re): self._re = re self._clients = { } def clear(self): self._clients = { } def getCnt(self): return len(self._clients) def getBytesSent(self): sum = 0 for val in self._clients.itervalues(): sum += val['bytes-sent'] return sum def values(self): return self._clients.itervalues() def update(self, logdata): ## TODO: parse logdata['uri'] !!! if logdata['client'] in self._clients.keys(): self._clients[logdata['client']]['bytes-sent'] += logdata['bytes-sent'] else: self._clients[logdata['client']] = { 'ip': logdata['client'], 'bytes-sent': logdata['bytes-sent'] } class NGXLuaFetcher(): """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: nginx-lua fetcher loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties['content-id'] self._format = properties['format'] self._quality = properties['quality'] self._duration = properties['duration'] self._tags = properties['tags'] self._url = properties['url'] self._nameformat = properties['nameformat'] self._proto = None self._conn = None self._clients = None self._reactor = reactor def run(self): if self._initFetch(): self._reactor.callWhenRunning(self._initSocket) self._reactor.run() def _initFetch(self): try: self._clients = ClientList(re.compile(self._nameformat)) # TODO: check for named matches print "SFive: trying to fetch from '%s'" % self._url agent = Agent(self._reactor) d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None) d.addCallback(self._httpResponse) d.addBoth(self._socketError) except re.error as e: print 'SFive: regex error: %s' % (e) return False return True def _httpResponse(self, response): print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase) finished = Deferred() response.deliverBody(SFiveNGXluaProto(finished, self)) return finished def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, self._reactor) self._conn.startListening() def _socketError(self): if self._conn: self._conn.stopListening() self._reactor.stop() def _socketReady(self): print 'SFive: connection to sfive hub established' self._sendInit() # TODO: start callback every self._duration seconds def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def updateData(self, data): self._clients.update(data) def _sendDataset(self, timestamp, duration, clients): data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, "data": { "clients": list(clients.values()), "client-count": clients.getCnt(), "bytes-sent": clients.getBytesSent() } } self._proto.sendDatagram('%s\n' % (json.dumps(data))); if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher') parser.add_argument('--socket', '-s', dest='socket', required=True, help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') parser.add_argument('--duration', '-d', dest='duration', required=False, help='time (in seconds) between updates; defaults to 15') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--content-id', '-c', dest='content-id', required=False, help='the content-id (i.e. av) - only used if nameformat doesn\'t contain a named match') parser.add_argument('--format', '-f', dest='format', required=False, help='the format (i.e. webm) - only used if nameformat doesn\'t contain a named match') parser.add_argument('--quality', '-q', dest='quality', required=False, help='the quality (i.e. high) - only used if nameformat doesn\'t contain a named match') parser.add_argument('--url', '-u', dest='url', required=False, help='the url of the nginx sfive exporert, default http://localhost/sfive') parser.add_argument('--nameformat', '-F', dest='nameformat', required=True, help='a regular expression (containing named matches for content, format, quality)') args = vars(parser.parse_args()) if not args['tags']: args['tags'] = [] if not args['duration']: args['duration'] = 15 if not args['url']: args['url'] = 'http://localhost/sfive' fetcher = NGXLuaFetcher(args) fetcher.run()