#!/usr/bin/python # # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # from twisted.internet import protocol, reactor from twisted.python import log from socket import error as socket_error import simplejson as json import datetime _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, importer): self._importer = importer def startProtocol(self): self._importer._socketReady() def datagramReceived(self, data): print 'SFive: received datagram: "%s" (will get ignored)' % (data) def connectionFailed(self, failure): print 'SFive: connection failed: %s' % (failure.getErrorMessage()) self._importer._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket_error as err: print 'SFive: sending datagram failed: %s' % (err) self._importer._socketError() class FlumotionRRD(): """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" def __init__(self, properties): print 'SFive: flumotion rrd file importer loaded' self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties['content-id'] self._format = properties['format'] self._quality = properties['quality'] self._tags = properties['tags'] self._duration = properties['duration'] self._proto = None self._conn = None def run(self): reactor.callWhenRunning(self._initSocket) reactor.run() def _initSocket(self): print 'SFive: trying to connect to %s...' % (self._socket) self._proto = SFiveProto(self) self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) def _socketError(self): if self._conn: self._conn.stopListening() reactor.stop() def _socketReady(self): print 'SFive: connection to sfive hub established' self._sendInit() # TODO: start data import self._sendDataset(17, 1214, 14214) print 'SFive: sent %d datasets' % (1) reactor.stop() def _sendInit(self): initdata = { "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); def _sendDataset(self, client_count, bytes_received, bytes_sent): data = { "start-time": datetime.datetime.utcnow().isoformat('Z'), "duration-ms": self._duration, "data": { "client-count": client_count, "bytes-received": bytes_received, "bytes-sent": bytes_sent } } self._proto.sendDatagram('%s\n' % (json.dumps(data))); if __name__ == '__main__': importer = FlumotionRRD({ 'socket': '/tmp/sfive.sock', 'hostname': 'localhost', 'content-id': 'av', 'format': 'webm', 'quality': 'high', 'tags': [ 'elevate', '2013' ], 'duration': 5000, 'bytes-rrdfile': '/tmp/av-webm-high_bytes.rrd', 'clients-rrdfile': '/tmp/av-webm-high_clients.rrd', }) importer.run()