#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS from twisted.internet import protocol, reactor, unix import socket from time import sleep import simplejson as json import datetime import rrdtool _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" class SFivePort(unix.ConnectedDatagramPort): def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) def write(self, data): try: return self.socket.send(data) except socket.error, se: no = se.args[0] if no == EINTR: return self.write(data) elif no == EMSGSIZE: raise error.MessageLengthError, "message too long" elif no == ECONNREFUSED: self.protocol.connectionRefused() elif no == EAGAIN: # the send buffer seems to be full - let's wait a little while... # this is not really a good solution but better than the aproach # of twisted which just drops the datagram... sleep(0.01) return self.write(data) else: raise class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): self._importer = importer def startProtocol(self): self._importer._socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._importer._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket.error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer._socketError() class FlumotionRRD(): """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: flumotion rrd file importer loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties['content-id'] self._format = properties['format'] self._quality = properties['quality'] self._tags = properties['tags'] self._bytes_rrdfile = properties['bytes-rrdfile'] self._clients_rrdfile = properties['clients-rrdfile'] self._proto = None self._conn = None def run(self): if self._initRRD(): reactor.callWhenRunning(self._initSocket) reactor.run() def _initRRD(self): try: info_bytes = rrdtool.info(self._bytes_rrdfile) step_bytes = info_bytes['step'] lastupdate_bytes = info_bytes['last_update'] info_clients = rrdtool.info(self._clients_rrdfile) step_clients = info_clients['step'] lastupdate_clients = info_clients['last_update'] self._duration = step_bytes if step_bytes != step_clients: print 'SFive: ERROR step size of the RRD Files don\'t match' return False self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients if lastupdate_bytes != lastupdate_clients: print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' return False print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( datetime.datetime.utcfromtimestamp(self._end).isoformat('T') + 'Z', self._duration) except rrdtool.error as e: print 'SFive: rrdtool-error: %s' % (e) return False return True def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) self._conn.startListening() def _socketError(self): if self._conn: self._conn.stopListening() reactor.stop() def _socketReady(self): print 'SFive: connection to sfive hub established' self._sendInit() cnt = self._sendRRDData() print 'SFive: sent %d datasets' % (cnt) reactor.stop() def _sendInit(self): initdata = { "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _sendRRDData(self): try: data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) except rrdtool.error as e: print 'SFive: rrdtool-error: %s' % (e) return 0 ts = data_bytes[0][0] max = len(data_bytes[2]) if max < 1: return 0 cnt = 0 for idx in range(0, max - 1): try: self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, data_clients[2][idx][0], data_bytes[2][idx][0]) ts += self._duration cnt += 1 except ValueError as err: print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) return cnt def _sendDataset(self, timestamp, duration, client_count, bytes_sent): client_count = int(round(client_count)) if client_count else 0 bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 data = { "start-time": timestamp.isoformat('T') + 'Z', "duration-ms": duration * 1000, "data": { "client-count": client_count, "bytes-sent": bytes_sent } } self._proto.sendDatagram('%s\n' % (json.dumps(data))); if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') parser.add_argument('--socket', '-s', dest='socket', required=True, help='the path to the data socket of the local SFive hub') parser.add_argument('--hostname', '-n', dest='hostname', required=True, help='the hostname of the machine') parser.add_argument('--content-id', '-c', dest='content-id', required=True, help='the content-id (i.e. av)') parser.add_argument('--format', '-f', dest='format', required=True, help='the format (i.e. webm)') parser.add_argument('--quality', '-q', dest='quality', required=True, help='the quality (i.e. high)') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, help='path to the RRD File containing the bytes sent') parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, help='path to the RRD File containing the number of clients') args = vars(parser.parse_args()) if not args['tags']: args['tags'] = [] importer = FlumotionRRD(args) importer.run()