From b238bd8d4bb4102f82218125c537dce1394b09c0 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 12 Oct 2014 17:22:15 +0200 Subject: renamed flumotion rrd importer, fixed dataset cnt --- src/daq/flumotion-rrd/flumotion-rrd.py | 193 --------------------------- src/daq/flumotion-rrd/sfive-flumotion-rrd.py | 193 +++++++++++++++++++++++++++ 2 files changed, 193 insertions(+), 193 deletions(-) delete mode 100755 src/daq/flumotion-rrd/flumotion-rrd.py create mode 100755 src/daq/flumotion-rrd/sfive-flumotion-rrd.py diff --git a/src/daq/flumotion-rrd/flumotion-rrd.py b/src/daq/flumotion-rrd/flumotion-rrd.py deleted file mode 100755 index 59ba6f6..0000000 --- a/src/daq/flumotion-rrd/flumotion-rrd.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/python -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -from twisted.internet import protocol, reactor -from socket import error as socket_error -import simplejson as json -import datetime -import rrdtool - -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, importer): - self._importer = importer - - def startProtocol(self): - self._importer._socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket_error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() - - -class FlumotionRRD(): - """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: flumotion rrd file importer loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] - self._tags = properties['tags'] - self._bytes_rrdfile = properties['bytes-rrdfile'] - self._clients_rrdfile = properties['clients-rrdfile'] - - self._proto = None - self._conn = None - - def run(self): - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) - - def _socketError(self): - if self._conn: - self._conn.stopListening() - reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._checkRRD() - self._sendInit() - cnt = self._sendRRD() - print 'SFive: sent %d datasets' % (cnt) - - reactor.stop() - - def _checkRRD(self): - info_bytes = rrdtool.info(self._bytes_rrdfile) - step_bytes = info_bytes['step'] - lastupdate_bytes = info_bytes['last_update'] - info_clients = rrdtool.info(self._clients_rrdfile) - step_clients = info_clients['step'] - lastupdate_clients = info_clients['last_update'] - - self._duration = step_bytes - if step_bytes != step_clients: - print 'SFive: ERROR step size of the RRD Files don\'t match' - reactor.stop() - - self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients - if lastupdate_bytes != lastupdate_clients: - print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' - reactor.stop() - - print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( - datetime.datetime.utcfromtimestamp(self._end).isoformat('T'), self._duration) - - def _sendInit(self): - initdata = { "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _sendRRD(self): - data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - - ts = data_bytes[0][0] - max = len(data_bytes[2]) - if max < 1: - return 0 - - cnt = 0 - for idx in range(0, max - 1): - try: - self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, - data_clients[2][idx][0], data_bytes[2][idx][0]) - ts += self._duration - cnt += 1 - except ValueError as err: - print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) - - return max - - def _sendDataset(self, timestamp, duration, client_count, bytes_sent): - data = { "start-time": timestamp.isoformat('T'), - "duration-ms": duration * 1000, - "data": { - "client-count": client_count, - "bytes-sent": bytes_sent - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') - parser.add_argument('--format', '-f', dest='format', required=True, - help='the format (i.e. webm)') - parser.add_argument('--quality', '-q', dest='quality', required=True, - help='the quality (i.e. high)') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, - help='path to the RRD File containing the bytes sent') - parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, - help='path to the RRD File containing the number of clients') - args = vars(parser.parse_args()) - if not args['tags']: - args['tags'] = [] - importer = FlumotionRRD(args) - importer.run() diff --git a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py new file mode 100755 index 0000000..f749929 --- /dev/null +++ b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py @@ -0,0 +1,193 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +from twisted.internet import protocol, reactor +from socket import error as socket_error +import simplejson as json +import datetime +import rrdtool + +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket_error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + +class FlumotionRRD(): + """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: flumotion rrd file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._tags = properties['tags'] + self._bytes_rrdfile = properties['bytes-rrdfile'] + self._clients_rrdfile = properties['clients-rrdfile'] + + self._proto = None + self._conn = None + + def run(self): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + + def _socketError(self): + if self._conn: + self._conn.stopListening() + reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._checkRRD() + self._sendInit() + cnt = self._sendRRD() + print 'SFive: sent %d datasets' % (cnt) + + reactor.stop() + + def _checkRRD(self): + info_bytes = rrdtool.info(self._bytes_rrdfile) + step_bytes = info_bytes['step'] + lastupdate_bytes = info_bytes['last_update'] + info_clients = rrdtool.info(self._clients_rrdfile) + step_clients = info_clients['step'] + lastupdate_clients = info_clients['last_update'] + + self._duration = step_bytes + if step_bytes != step_clients: + print 'SFive: ERROR step size of the RRD Files don\'t match' + reactor.stop() + + self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients + if lastupdate_bytes != lastupdate_clients: + print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' + reactor.stop() + + print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( + datetime.datetime.utcfromtimestamp(self._end).isoformat('T'), self._duration) + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendRRD(self): + data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) + data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) + + ts = data_bytes[0][0] + max = len(data_bytes[2]) + if max < 1: + return 0 + + cnt = 0 + for idx in range(0, max - 1): + try: + self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, + data_clients[2][idx][0], data_bytes[2][idx][0]) + ts += self._duration + cnt += 1 + except ValueError as err: + print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) + + return cnt + + def _sendDataset(self, timestamp, duration, client_count, bytes_sent): + data = { "start-time": timestamp.isoformat('T'), + "duration-ms": duration * 1000, + "data": { + "client-count": client_count, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--content-id', '-c', dest='content-id', required=True, + help='the content-id (i.e. av)') + parser.add_argument('--format', '-f', dest='format', required=True, + help='the format (i.e. webm)') + parser.add_argument('--quality', '-q', dest='quality', required=True, + help='the quality (i.e. high)') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, + help='path to the RRD File containing the bytes sent') + parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, + help='path to the RRD File containing the number of clients') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + importer = FlumotionRRD(args) + importer.run() -- cgit v1.2.3