From c6887323933e2b032d54a9708adc94dcf6c36ab3 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 24 Aug 2014 19:20:29 +0200 Subject: added inital flumotion RRD batch importer --- src/daq/flumotion-rrd/flumotion-rrd.py | 136 +++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100755 src/daq/flumotion-rrd/flumotion-rrd.py (limited to 'src/daq') diff --git a/src/daq/flumotion-rrd/flumotion-rrd.py b/src/daq/flumotion-rrd/flumotion-rrd.py new file mode 100755 index 0000000..1d9c560 --- /dev/null +++ b/src/daq/flumotion-rrd/flumotion-rrd.py @@ -0,0 +1,136 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +from twisted.internet import protocol, reactor +from twisted.python import log +from socket import error as socket_error +import simplejson as json +import datetime + +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket_error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + + +class FlumotionRRD(): + """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: flumotion rrd file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._tags = properties['tags'] + self._duration = properties['duration'] + + self._proto = None + self._conn = None + + def run(self): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + + def _socketError(self): + if self._conn: + self._conn.stopListening() + reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._sendInit() + # TODO: start data import + self._sendDataset(17, 1214, 14214) + reactor.stop() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendDataset(self, client_count, bytes_received, bytes_sent): + data = { "start-time": datetime.datetime.utcnow().isoformat('Z'), + "duration-ms": self._duration, + "data": { + "client-count": client_count, + "bytes-received": bytes_received, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + + +if __name__ == '__main__': + importer = FlumotionRRD({ 'socket': '/tmp/sfive.sock', + 'hostname': 'localhost', + 'content-id': 'av', + 'format': 'webm', + 'quality': 'high', + 'tags': [ 'elevate', '2013' ], + 'duration': 5000, + 'bytes-rrdfile': '/tmp/av-webm-high_bytes.rrd', + 'clients-rrdfile': '/tmp/av-webm-high_clients.rrd', + }) + importer.run() -- cgit v1.2.3