diff options
Diffstat (limited to 'src/daq/flumotion-rrd/s5-flumotion-rrd.py')
-rwxr-xr-x | src/daq/flumotion-rrd/s5-flumotion-rrd.py | 233 |
1 files changed, 0 insertions, 233 deletions
diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd.py b/src/daq/flumotion-rrd/s5-flumotion-rrd.py deleted file mode 100755 index bef9488..0000000 --- a/src/daq/flumotion-rrd/s5-flumotion-rrd.py +++ /dev/null @@ -1,233 +0,0 @@ -#!/usr/bin/python -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> -# Markus Grueneis <gimpf@gimpf.org> -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see <http://www.gnu.org/licenses/>. -# - -from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, reactor, unix -import socket -from time import sleep - -import simplejson as json -import datetime -import rrdtool - -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFivePort(unix.ConnectedDatagramPort): - - def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): - unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) - - def write(self, data): - try: - return self.socket.send(data) - except socket.error, se: - no = se.args[0] - if no == EINTR: - return self.write(data) - elif no == EMSGSIZE: - raise error.MessageLengthError, "message too long" - elif no == ECONNREFUSED: - self.protocol.connectionRefused() - elif no == EAGAIN: - # the send buffer seems to be full - let's wait a little while... - # this is not really a good solution but better than the aproach - # of twisted which just drops the datagram... - sleep(0.01) - return self.write(data) - else: - raise - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, importer): - self._importer = importer - - def startProtocol(self): - self._importer._socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket.error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() - - -class FlumotionRRD(): - """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: flumotion rrd file importer loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] - self._tags = properties['tags'] - self._bytes_rrdfile = properties['bytes-rrdfile'] - self._clients_rrdfile = properties['clients-rrdfile'] - - self._proto = None - self._conn = None - - def run(self): - if self._initRRD(): - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _initRRD(self): - try: - info_bytes = rrdtool.info(self._bytes_rrdfile) - step_bytes = info_bytes['step'] - lastupdate_bytes = info_bytes['last_update'] - info_clients = rrdtool.info(self._clients_rrdfile) - step_clients = info_clients['step'] - lastupdate_clients = info_clients['last_update'] - - self._duration = step_bytes - if step_bytes != step_clients: - print 'SFive: ERROR step size of the RRD Files don\'t match' - return False - - self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients - if lastupdate_bytes != lastupdate_clients: - print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' - return False - - print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( - datetime.datetime.utcfromtimestamp(self._end).isoformat('T') + 'Z', self._duration) - - except rrdtool.error as e: - print 'SFive: rrdtool-error: %s' % (e) - return False - - return True - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def _socketError(self): - if self._conn: - self._conn.stopListening() - reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._sendInit() - cnt = self._sendRRDData() - print 'SFive: sent %d datasets' % (cnt) - - reactor.stop() - - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _sendRRDData(self): - try: - data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - except rrdtool.error as e: - print 'SFive: rrdtool-error: %s' % (e) - return 0 - - ts = data_bytes[0][0] - max = len(data_bytes[2]) - if max < 1: - return 0 - - cnt = 0 - for idx in range(0, max - 1): - try: - self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, - data_clients[2][idx][0], data_bytes[2][idx][0]) - ts += self._duration - cnt += 1 - except ValueError as err: - print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) - - return cnt - - def _sendDataset(self, timestamp, duration, client_count, bytes_sent): - client_count = int(round(client_count)) if client_count else 0 - bytes_sent = int(round(bytes_sent)) if bytes_sent else 0 - data = { "start-time": timestamp.isoformat('T') + 'Z', - "duration-ms": duration * 1000, - "data": { - "client-count": client_count, - "bytes-sent": bytes_sent - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') - parser.add_argument('--format', '-f', dest='format', required=True, - help='the format (i.e. webm)') - parser.add_argument('--quality', '-q', dest='quality', required=True, - help='the quality (i.e. high)') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, - help='path to the RRD File containing the bytes sent') - parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, - help='path to the RRD File containing the number of clients') - args = vars(parser.parse_args()) - if not args['tags']: - args['tags'] = [] - importer = FlumotionRRD(args) - importer.run() |