summaryrefslogtreecommitdiff
path: root/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq/flumotion-rrd/sfive-flumotion-rrd.py')
-rwxr-xr-xsrc/daq/flumotion-rrd/sfive-flumotion-rrd.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
new file mode 100755
index 0000000..f749929
--- /dev/null
+++ b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py
@@ -0,0 +1,193 @@
+#!/usr/bin/python
+#
+# sfive
+#
+# sfive - spreadspace streaming statistics suite is a generic
+# statistic collection tool for streaming server infrastuctures.
+# The system collects and stores meta data like number of views
+# and throughput from a number of streaming servers and stores
+# it in a global data store.
+# The data acquisition is designed to be generic and extensible in
+# order to support different streaming software.
+# sfive also contains tools and applications to filter and visualize
+# live and recorded data.
+#
+#
+# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+# Markus Grueneis <gimpf@gimpf.org>
+#
+# This file is part of sfive.
+#
+# sfive is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3
+# as published by the Free Software Foundation.
+#
+# sfive is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with sfive. If not, see <http://www.gnu.org/licenses/>.
+#
+
+from twisted.internet import protocol, reactor
+from socket import error as socket_error
+import simplejson as json
+import datetime
+import rrdtool
+
+_MAX_PACKET_SIZE = 8192 # in bytes
+
+__version__ = "$Rev$"
+
+class SFiveProto(protocol.ConnectedDatagramProtocol):
+
+ def __init__(self, importer):
+ self._importer = importer
+
+ def startProtocol(self):
+ self._importer._socketReady()
+
+ def datagramReceived(self, data):
+ print 'SFive: received datagram: "%s" (will get ignored)' % (data)
+
+ def connectionFailed(self, failure):
+ print 'SFive: connection failed: %s' % (failure.getErrorMessage())
+ self._importer._socketError()
+
+ def sendDatagram(self, data):
+ try:
+ return self.transport.write(data)
+ except socket_error as err:
+ print 'SFive: sending datagram failed: %s' % (err)
+ self._importer._socketError()
+
+
+class FlumotionRRD():
+ """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite"""
+
+ def __init__(self, properties):
+ print 'SFive: flumotion rrd file importer loaded'
+
+ self._socket = properties['socket']
+ self._hostname = properties['hostname']
+ self._content_id = properties['content-id']
+ self._format = properties['format']
+ self._quality = properties['quality']
+ self._tags = properties['tags']
+ self._bytes_rrdfile = properties['bytes-rrdfile']
+ self._clients_rrdfile = properties['clients-rrdfile']
+
+ self._proto = None
+ self._conn = None
+
+ def run(self):
+ reactor.callWhenRunning(self._initSocket)
+ reactor.run()
+
+ def _initSocket(self):
+ print 'SFive: trying to connect to %s...' % (self._socket)
+ self._proto = SFiveProto(self)
+ self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE)
+
+ def _socketError(self):
+ if self._conn:
+ self._conn.stopListening()
+ reactor.stop()
+
+ def _socketReady(self):
+ print 'SFive: connection to sfive hub established'
+ self._checkRRD()
+ self._sendInit()
+ cnt = self._sendRRD()
+ print 'SFive: sent %d datasets' % (cnt)
+
+ reactor.stop()
+
+ def _checkRRD(self):
+ info_bytes = rrdtool.info(self._bytes_rrdfile)
+ step_bytes = info_bytes['step']
+ lastupdate_bytes = info_bytes['last_update']
+ info_clients = rrdtool.info(self._clients_rrdfile)
+ step_clients = info_clients['step']
+ lastupdate_clients = info_clients['last_update']
+
+ self._duration = step_bytes
+ if step_bytes != step_clients:
+ print 'SFive: ERROR step size of the RRD Files don\'t match'
+ reactor.stop()
+
+ self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients
+ if lastupdate_bytes != lastupdate_clients:
+ print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp'
+ reactor.stop()
+
+ print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % (
+ datetime.datetime.utcfromtimestamp(self._end).isoformat('T'), self._duration)
+
+ def _sendInit(self):
+ initdata = { "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
+ def _sendRRD(self):
+ data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end)
+ data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end)
+
+ ts = data_bytes[0][0]
+ max = len(data_bytes[2])
+ if max < 1:
+ return 0
+
+ cnt = 0
+ for idx in range(0, max - 1):
+ try:
+ self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration,
+ data_clients[2][idx][0], data_bytes[2][idx][0])
+ ts += self._duration
+ cnt += 1
+ except ValueError as err:
+ print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err)
+
+ return cnt
+
+ def _sendDataset(self, timestamp, duration, client_count, bytes_sent):
+ data = { "start-time": timestamp.isoformat('T'),
+ "duration-ms": duration * 1000,
+ "data": {
+ "client-count": client_count,
+ "bytes-sent": bytes_sent
+ }
+ }
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+
+
+
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer')
+ parser.add_argument('--socket', '-s', dest='socket', required=True,
+ help='the path to the data socket of the local SFive hub')
+ parser.add_argument('--hostname', '-n', dest='hostname', required=True,
+ help='the hostname of the machine')
+ parser.add_argument('--content-id', '-c', dest='content-id', required=True,
+ help='the content-id (i.e. av)')
+ parser.add_argument('--format', '-f', dest='format', required=True,
+ help='the format (i.e. webm)')
+ parser.add_argument('--quality', '-q', dest='quality', required=True,
+ help='the quality (i.e. high)')
+ parser.add_argument('--tag', '-t', dest='tags', action='append',
+ help='tag to be added to the statistic data, can be invoked several times')
+ parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True,
+ help='path to the RRD File containing the bytes sent')
+ parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True,
+ help='path to the RRD File containing the number of clients')
+ args = vars(parser.parse_args())
+ if not args['tags']:
+ args['tags'] = []
+ importer = FlumotionRRD(args)
+ importer.run()