From 84e77bf2ac0dfe34d152f2fa27fd31633bd3eadd Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 14 Oct 2014 02:47:38 +0200 Subject: renamed daq plugins --- src/daq/accesslog/s5-accesslog.py | 325 +++++++++++++++++++++++++++ src/daq/accesslog/sfive-accesslog.py | 325 --------------------------- src/daq/flumotion-plug/s5.py | 180 +++++++++++++++ src/daq/flumotion-plug/sfive.py | 180 --------------- src/daq/flumotion-rrd/s5-flumotion-rrd.py | 231 +++++++++++++++++++ src/daq/flumotion-rrd/sfive-flumotion-rrd.py | 231 ------------------- 6 files changed, 736 insertions(+), 736 deletions(-) create mode 100755 src/daq/accesslog/s5-accesslog.py delete mode 100755 src/daq/accesslog/sfive-accesslog.py create mode 100644 src/daq/flumotion-plug/s5.py delete mode 100644 src/daq/flumotion-plug/sfive.py create mode 100755 src/daq/flumotion-rrd/s5-flumotion-rrd.py delete mode 100755 src/daq/flumotion-rrd/sfive-flumotion-rrd.py (limited to 'src/daq') diff --git a/src/daq/accesslog/s5-accesslog.py b/src/daq/accesslog/s5-accesslog.py new file mode 100755 index 0000000..7e74090 --- /dev/null +++ b/src/daq/accesslog/s5-accesslog.py @@ -0,0 +1,325 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix +import socket +from time import sleep + +import sys +import re +import simplejson as json +import datetime +import dateutil.parser + +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + +class ClientList: + + def __init__(self, file_re): + self._clients = { } + self._file_re = file_re + + def clear(self): + self._clients = { } + + def getCnt(self): + return len(self._clients) + + def getBytesSent(self): + sum = 0 + for val in self._clients.itervalues(): + sum += val['bytes-sent'] + + return sum + + def values(self): + return self._clients.itervalues() + + def update(self, linedata): + if linedata['status'] != 200 and linedata['status'] != 206: + return + if linedata['req']['method'] != 'GET': + return + + try: + if re.match(self._file_re, linedata['req']['url']): + if linedata['client'] in self._clients.keys(): + self._clients[linedata['client']]['bytes-sent'] += linedata['size'] + else: + self._clients[linedata['client']] = { 'ip': linedata['client'], + 'bytes-sent': linedata['size'] } + except re.error as e: + print 'SFive: regex error: %s' % (e) + + +class AccessLog(): + """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: accesslog file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._duration = properties['duration'] + self._tags = properties['tags'] + self._logfile = properties['logfile'] + self._nameformat = properties['nameformat'] + + self._proto = None + self._conn = None + + def run(self): + if self._initLog(): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initLog(self): + try: + if self._logfile: + print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input') + self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin + else: + print 'SFive: live mode enabled' + + regex = self._nameformat % { 'hostname': self._hostname, + 'content-id': self._content_id, + 'format': self._format, + 'quality': self._quality } + self._file_re = re.compile(regex) + print "SFive: will be looking for files like '%s'" % regex + except IOError as e: + print 'SFive: error opening logfile: %s' % (e.strerror) + return False + except re.error as e: + print 'SFive: regex error: %s' % (e) + return False + + return True + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def _socketError(self): + if self._conn: + self._conn.stopListening() + reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._sendInit() + if hasattr(self, '_fd'): + cnt = self._sendLogData(self._fd, None) + print 'SFive: imported %d datasets' % (cnt) + self._fd.close() + else: + print 'SFive: live mode is not yet implemented!' + # TODO: wait self._duration seconds and call _sendData with + # all lines received lines since last update + + reactor.stop() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _prepareLineRegex(self): + parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', + r'(?P[0-9]+)', r'(?P\S+)', r'"(?P.*)"', r'"(?P.*)"'] + return re.compile(r'\s+'.join(parts)+r'\s*\Z') + + def _parseRequest(self, reqstr): + req = { 'method': None, 'url': None, 'proto': None } + try: + parts = reqstr.split() + req['method'] = parts[0] if parts[0] != '-' else None + req['url'] = parts[1] + req['proto'] = parts[2] + except IndexError: + pass + + return req + + def _parseDatetime(self, datetimestr): + try: + ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) + ts = (ts - ts.utcoffset()).replace(tzinfo=None) + return ts + except ValueError as e: + return None + + def _parseLine(self, regex, line): + linedata = regex.match(line).groupdict() + + for part in ("client", "ref", "ua"): + if linedata[part] == "-": + linedata[part] = None + + linedata["status"] = int(linedata["status"]) + linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"]) + linedata['req'] = self._parseRequest(linedata['req']) + linedata['ts'] = self._parseDatetime(linedata['ts']) + return linedata + + def _sendLogData(self, data, lastts): + cnt = 0 + nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) + clients = ClientList(self._file_re) + try: + regex = self._prepareLineRegex() + for line in data: + linedata = self._parseLine(regex, line) + if not lastts: + lastts = linedata['ts'] + nextts = lastts + datetime.timedelta(seconds=self._duration) + clients.clear() + clients.update(linedata) + else: + while linedata['ts'] >= nextts: + self._sendDataset(lastts, self._duration, clients) + cnt += 1 + lastts = nextts + nextts = lastts + datetime.timedelta(seconds=self._duration) + clients.clear() + + clients.update(linedata) + + # send remaining data + if nextts: + self._sendDataset(lastts, self._duration, clients) + cnt += 1 + + except re.error as e: + print 'SFive: regex error: %s' % (e) + + return cnt + + def _sendDataset(self, timestamp, duration, clients): + data = { "start-time": timestamp.isoformat('T'), + "duration-ms": duration * 1000, + "data": { + "clients": list(clients.values()), + "client-count": clients.getCnt(), + "bytes-sent": clients.getBytesSent() + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--content-id', '-c', dest='content-id', required=True, + help='the content-id (i.e. av)') + parser.add_argument('--format', '-f', dest='format', required=True, + help='the format (i.e. webm)') + parser.add_argument('--quality', '-q', dest='quality', required=True, + help='the quality (i.e. high)') + parser.add_argument('--duration', '-d', dest='duration', required=False, + help='time (in seconds) between updates; defaults to 5') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--logfile', '-l', dest='logfile', required=False, + help='path to the logfile or \'-\' for standard input') + parser.add_argument('--nameformat', '-F', dest='nameformat', required=True, + help='the format for filenames which are part of this stream, this may include python string expressions') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + if not args['duration']: + args['duration'] = 5 + importer = AccessLog(args) + importer.run() diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py deleted file mode 100755 index 7e74090..0000000 --- a/src/daq/accesslog/sfive-accesslog.py +++ /dev/null @@ -1,325 +0,0 @@ -#!/usr/bin/python -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, reactor, unix -import socket -from time import sleep - -import sys -import re -import simplejson as json -import datetime -import dateutil.parser - -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFivePort(unix.ConnectedDatagramPort): - - def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): - unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) - - def write(self, data): - try: - return self.socket.send(data) - except socket.error, se: - no = se.args[0] - if no == EINTR: - return self.write(data) - elif no == EMSGSIZE: - raise error.MessageLengthError, "message too long" - elif no == ECONNREFUSED: - self.protocol.connectionRefused() - elif no == EAGAIN: - # the send buffer seems to be full - let's wait a little while... - # this is not really a good solution but better than the aproach - # of twisted which just drops the datagram... - sleep(0.01) - return self.write(data) - else: - raise - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, importer): - self._importer = importer - - def startProtocol(self): - self._importer._socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket.error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() - - -class ClientList: - - def __init__(self, file_re): - self._clients = { } - self._file_re = file_re - - def clear(self): - self._clients = { } - - def getCnt(self): - return len(self._clients) - - def getBytesSent(self): - sum = 0 - for val in self._clients.itervalues(): - sum += val['bytes-sent'] - - return sum - - def values(self): - return self._clients.itervalues() - - def update(self, linedata): - if linedata['status'] != 200 and linedata['status'] != 206: - return - if linedata['req']['method'] != 'GET': - return - - try: - if re.match(self._file_re, linedata['req']['url']): - if linedata['client'] in self._clients.keys(): - self._clients[linedata['client']]['bytes-sent'] += linedata['size'] - else: - self._clients[linedata['client']] = { 'ip': linedata['client'], - 'bytes-sent': linedata['size'] } - except re.error as e: - print 'SFive: regex error: %s' % (e) - - -class AccessLog(): - """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: accesslog file importer loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] - self._duration = properties['duration'] - self._tags = properties['tags'] - self._logfile = properties['logfile'] - self._nameformat = properties['nameformat'] - - self._proto = None - self._conn = None - - def run(self): - if self._initLog(): - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _initLog(self): - try: - if self._logfile: - print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input') - self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin - else: - print 'SFive: live mode enabled' - - regex = self._nameformat % { 'hostname': self._hostname, - 'content-id': self._content_id, - 'format': self._format, - 'quality': self._quality } - self._file_re = re.compile(regex) - print "SFive: will be looking for files like '%s'" % regex - except IOError as e: - print 'SFive: error opening logfile: %s' % (e.strerror) - return False - except re.error as e: - print 'SFive: regex error: %s' % (e) - return False - - return True - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def _socketError(self): - if self._conn: - self._conn.stopListening() - reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._sendInit() - if hasattr(self, '_fd'): - cnt = self._sendLogData(self._fd, None) - print 'SFive: imported %d datasets' % (cnt) - self._fd.close() - else: - print 'SFive: live mode is not yet implemented!' - # TODO: wait self._duration seconds and call _sendData with - # all lines received lines since last update - - reactor.stop() - - def _sendInit(self): - initdata = { "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _prepareLineRegex(self): - parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', - r'(?P[0-9]+)', r'(?P\S+)', r'"(?P.*)"', r'"(?P.*)"'] - return re.compile(r'\s+'.join(parts)+r'\s*\Z') - - def _parseRequest(self, reqstr): - req = { 'method': None, 'url': None, 'proto': None } - try: - parts = reqstr.split() - req['method'] = parts[0] if parts[0] != '-' else None - req['url'] = parts[1] - req['proto'] = parts[2] - except IndexError: - pass - - return req - - def _parseDatetime(self, datetimestr): - try: - ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) - ts = (ts - ts.utcoffset()).replace(tzinfo=None) - return ts - except ValueError as e: - return None - - def _parseLine(self, regex, line): - linedata = regex.match(line).groupdict() - - for part in ("client", "ref", "ua"): - if linedata[part] == "-": - linedata[part] = None - - linedata["status"] = int(linedata["status"]) - linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"]) - linedata['req'] = self._parseRequest(linedata['req']) - linedata['ts'] = self._parseDatetime(linedata['ts']) - return linedata - - def _sendLogData(self, data, lastts): - cnt = 0 - nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) - clients = ClientList(self._file_re) - try: - regex = self._prepareLineRegex() - for line in data: - linedata = self._parseLine(regex, line) - if not lastts: - lastts = linedata['ts'] - nextts = lastts + datetime.timedelta(seconds=self._duration) - clients.clear() - clients.update(linedata) - else: - while linedata['ts'] >= nextts: - self._sendDataset(lastts, self._duration, clients) - cnt += 1 - lastts = nextts - nextts = lastts + datetime.timedelta(seconds=self._duration) - clients.clear() - - clients.update(linedata) - - # send remaining data - if nextts: - self._sendDataset(lastts, self._duration, clients) - cnt += 1 - - except re.error as e: - print 'SFive: regex error: %s' % (e) - - return cnt - - def _sendDataset(self, timestamp, duration, clients): - data = { "start-time": timestamp.isoformat('T'), - "duration-ms": duration * 1000, - "data": { - "clients": list(clients.values()), - "client-count": clients.getCnt(), - "bytes-sent": clients.getBytesSent() - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') - parser.add_argument('--format', '-f', dest='format', required=True, - help='the format (i.e. webm)') - parser.add_argument('--quality', '-q', dest='quality', required=True, - help='the quality (i.e. high)') - parser.add_argument('--duration', '-d', dest='duration', required=False, - help='time (in seconds) between updates; defaults to 5') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--logfile', '-l', dest='logfile', required=False, - help='path to the logfile or \'-\' for standard input') - parser.add_argument('--nameformat', '-F', dest='nameformat', required=True, - help='the format for filenames which are part of this stream, this may include python string expressions') - args = vars(parser.parse_args()) - if not args['tags']: - args['tags'] = [] - if not args['duration']: - args['duration'] = 5 - importer = AccessLog(args) - importer.run() diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py new file mode 100644 index 0000000..0f6abd5 --- /dev/null +++ b/src/daq/flumotion-plug/s5.py @@ -0,0 +1,180 @@ +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +import os + +try: + import simplejson as json +except ImportError: + json = None + +from flumotion.component.plugs import base +from flumotion.common import messages, i18n, log +from flumotion.common.poller import Poller + +from twisted.internet import protocol, reactor +from socket import error as socket_error +import datetime + +from flumotion.common.i18n import N_ +T_ = i18n.gettexter() + +_DEFAULT_POLL_INTERVAL = 5 # in seconds +_RECONNECT_TIMEOUT = 2 # in seconds +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, plug): + self._plug = plug + + def stopProtocol(self): + self._plug.debug('SFive: protocol stopped') + + def startProtocol(self): + self._plug.debug('SFive: protocol started') + self._plug._socketReady() + + def datagramReceived(self, data): + self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) + + def connectionFailed(self, failure): + self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage()) + self._plug._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket_error as err: + self._plug.warning('SFive: sending datagram failed: %s', err) + self._plug._socketError() + + + +class ComponentSFivePlug(base.ComponentPlug): + """Class to send statistics to the spreadspace streaming statistic suite""" + + ### ComponentPlug methods + + def start(self, component): + self.debug('SFive: plug loaded') + + self._sfivepoller = None + self._component = component + + if not self._hasImport(): + return + + properties = self.args['properties'] + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties.get('content-id') + self._format = properties.get('format') + self._quality = properties.get('quality') + tagstring = properties.get('tags', '') + self._tags = [x.strip() for x in tagstring.split(',')] + + self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) + self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) + + self._proto = None + self._conn = None + + self._initSocket() + + def _hasImport(self): + """Check simplejson availability""" + if not json: + m = messages.Warning(T_(N_( + "Cannot import module '%s'.\n"), 'simplejson'), + mid='simplejson-import-error') + m.add(T_(N_( + "The SFive plug for this component is disabled."))) + self._component.addMessage(m) + return False + + return True + + def stop(self, component): + if self._sfivepoller: + self._sfivepoller.stop() + if self._conn: + self._conn.stopListening() + + def _initSocket(self): + self.info('SFive: trying to (re)connect to %s...', self._socket) + self._proto = SFiveProto(self) + self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + + def _socketError(self): + if self._sfivepoller: + self._sfivepoller.stop() + if self._conn: + self._conn.stopListening() + + reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) + + def _socketReady(self): + self.info('SFive: connection to sfive hub established') + self._old_bytes_received = -1 + self._old_bytes_sent = -1 + self._sendInit() + if self._sfivepoller: + self._sfivepoller.start() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _updateSFive(self): + client_count = self._component.getClients() + bytes_received = self._component.getBytesReceived() + bytes_sent = self._component.getBytesSent() + + bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0; + self._old_bytes_received = bytes_received + bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; + self._old_bytes_sent = bytes_sent + + data = { "start-time": datetime.datetime.utcnow().isoformat('T'), + "duration-ms": self._duration * 1000, + "data": { + "client-count": client_count, + "bytes-received": bytes_received_diff, + "bytes-sent": bytes_sent_diff + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py deleted file mode 100644 index 0f6abd5..0000000 --- a/src/daq/flumotion-plug/sfive.py +++ /dev/null @@ -1,180 +0,0 @@ -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -import os - -try: - import simplejson as json -except ImportError: - json = None - -from flumotion.component.plugs import base -from flumotion.common import messages, i18n, log -from flumotion.common.poller import Poller - -from twisted.internet import protocol, reactor -from socket import error as socket_error -import datetime - -from flumotion.common.i18n import N_ -T_ = i18n.gettexter() - -_DEFAULT_POLL_INTERVAL = 5 # in seconds -_RECONNECT_TIMEOUT = 2 # in seconds -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, plug): - self._plug = plug - - def stopProtocol(self): - self._plug.debug('SFive: protocol stopped') - - def startProtocol(self): - self._plug.debug('SFive: protocol started') - self._plug._socketReady() - - def datagramReceived(self, data): - self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) - - def connectionFailed(self, failure): - self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage()) - self._plug._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket_error as err: - self._plug.warning('SFive: sending datagram failed: %s', err) - self._plug._socketError() - - - -class ComponentSFivePlug(base.ComponentPlug): - """Class to send statistics to the spreadspace streaming statistic suite""" - - ### ComponentPlug methods - - def start(self, component): - self.debug('SFive: plug loaded') - - self._sfivepoller = None - self._component = component - - if not self._hasImport(): - return - - properties = self.args['properties'] - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties.get('content-id') - self._format = properties.get('format') - self._quality = properties.get('quality') - tagstring = properties.get('tags', '') - self._tags = [x.strip() for x in tagstring.split(',')] - - self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) - self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) - - self._proto = None - self._conn = None - - self._initSocket() - - def _hasImport(self): - """Check simplejson availability""" - if not json: - m = messages.Warning(T_(N_( - "Cannot import module '%s'.\n"), 'simplejson'), - mid='simplejson-import-error') - m.add(T_(N_( - "The SFive plug for this component is disabled."))) - self._component.addMessage(m) - return False - - return True - - def stop(self, component): - if self._sfivepoller: - self._sfivepoller.stop() - if self._conn: - self._conn.stopListening() - - def _initSocket(self): - self.info('SFive: trying to (re)connect to %s...', self._socket) - self._proto = SFiveProto(self) - self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) - - def _socketError(self): - if self._sfivepoller: - self._sfivepoller.stop() - if self._conn: - self._conn.stopListening() - - reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) - - def _socketReady(self): - self.info('SFive: connection to sfive hub established') - self._old_bytes_received = -1 - self._old_bytes_sent = -1 - self._sendInit() - if self._sfivepoller: - self._sfivepoller.start() - - def _sendInit(self): - initdata = { "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _updateSFive(self): - client_count = self._component.getClients() - bytes_received = self._component.getBytesReceived() - bytes_sent = self._component.getBytesSent() - - bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0; - self._old_bytes_received = bytes_received - bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; - self._old_bytes_sent = bytes_sent - - data = { "start-time": datetime.datetime.utcnow().isoformat('T'), - "duration-ms": self._duration * 1000, - "data": { - "client-count": client_count, - "bytes-received": bytes_received_diff, - "bytes-sent": bytes_sent_diff - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); diff --git a/src/daq/flumotion-rrd/s5-flumotion-rrd.py b/src/daq/flumotion-rrd/s5-flumotion-rrd.py new file mode 100755 index 0000000..4b0a2d1 --- /dev/null +++ b/src/daq/flumotion-rrd/s5-flumotion-rrd.py @@ -0,0 +1,231 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix +import socket +from time import sleep + +import simplejson as json +import datetime +import rrdtool + +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + +class FlumotionRRD(): + """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: flumotion rrd file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._tags = properties['tags'] + self._bytes_rrdfile = properties['bytes-rrdfile'] + self._clients_rrdfile = properties['clients-rrdfile'] + + self._proto = None + self._conn = None + + def run(self): + if self._initRRD(): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initRRD(self): + try: + info_bytes = rrdtool.info(self._bytes_rrdfile) + step_bytes = info_bytes['step'] + lastupdate_bytes = info_bytes['last_update'] + info_clients = rrdtool.info(self._clients_rrdfile) + step_clients = info_clients['step'] + lastupdate_clients = info_clients['last_update'] + + self._duration = step_bytes + if step_bytes != step_clients: + print 'SFive: ERROR step size of the RRD Files don\'t match' + return False + + self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients + if lastupdate_bytes != lastupdate_clients: + print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' + return False + + print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( + datetime.datetime.utcfromtimestamp(self._end).isoformat('T'), self._duration) + + except rrdtool.error as e: + print 'SFive: rrdtool-error: %s' % (e) + return False + + return True + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def _socketError(self): + if self._conn: + self._conn.stopListening() + reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._sendInit() + cnt = self._sendRRDData() + print 'SFive: sent %d datasets' % (cnt) + + reactor.stop() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendRRDData(self): + try: + data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) + data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) + except rrdtool.error as e: + print 'SFive: rrdtool-error: %s' % (e) + return 0 + + ts = data_bytes[0][0] + max = len(data_bytes[2]) + if max < 1: + return 0 + + cnt = 0 + for idx in range(0, max - 1): + try: + self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, + data_clients[2][idx][0], data_bytes[2][idx][0]) + ts += self._duration + cnt += 1 + except ValueError as err: + print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) + + return cnt + + def _sendDataset(self, timestamp, duration, client_count, bytes_sent): + data = { "start-time": timestamp.isoformat('T'), + "duration-ms": duration * 1000, + "data": { + "client-count": client_count, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--content-id', '-c', dest='content-id', required=True, + help='the content-id (i.e. av)') + parser.add_argument('--format', '-f', dest='format', required=True, + help='the format (i.e. webm)') + parser.add_argument('--quality', '-q', dest='quality', required=True, + help='the quality (i.e. high)') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, + help='path to the RRD File containing the bytes sent') + parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, + help='path to the RRD File containing the number of clients') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + importer = FlumotionRRD(args) + importer.run() diff --git a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py b/src/daq/flumotion-rrd/sfive-flumotion-rrd.py deleted file mode 100755 index 4b0a2d1..0000000 --- a/src/daq/flumotion-rrd/sfive-flumotion-rrd.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/python -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, reactor, unix -import socket -from time import sleep - -import simplejson as json -import datetime -import rrdtool - -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFivePort(unix.ConnectedDatagramPort): - - def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): - unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) - - def write(self, data): - try: - return self.socket.send(data) - except socket.error, se: - no = se.args[0] - if no == EINTR: - return self.write(data) - elif no == EMSGSIZE: - raise error.MessageLengthError, "message too long" - elif no == ECONNREFUSED: - self.protocol.connectionRefused() - elif no == EAGAIN: - # the send buffer seems to be full - let's wait a little while... - # this is not really a good solution but better than the aproach - # of twisted which just drops the datagram... - sleep(0.01) - return self.write(data) - else: - raise - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, importer): - self._importer = importer - - def startProtocol(self): - self._importer._socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket.error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._importer._socketError() - - -class FlumotionRRD(): - """Class to batch import flumotion rrd statistics into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: flumotion rrd file importer loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties['content-id'] - self._format = properties['format'] - self._quality = properties['quality'] - self._tags = properties['tags'] - self._bytes_rrdfile = properties['bytes-rrdfile'] - self._clients_rrdfile = properties['clients-rrdfile'] - - self._proto = None - self._conn = None - - def run(self): - if self._initRRD(): - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _initRRD(self): - try: - info_bytes = rrdtool.info(self._bytes_rrdfile) - step_bytes = info_bytes['step'] - lastupdate_bytes = info_bytes['last_update'] - info_clients = rrdtool.info(self._clients_rrdfile) - step_clients = info_clients['step'] - lastupdate_clients = info_clients['last_update'] - - self._duration = step_bytes - if step_bytes != step_clients: - print 'SFive: ERROR step size of the RRD Files don\'t match' - return False - - self._end = lastupdate_bytes if lastupdate_bytes < lastupdate_clients else lastupdate_clients - if lastupdate_bytes != lastupdate_clients: - print 'SFive: WARNING the last update timestamp of the RRD Files don\'t match - will use smaller timestamp' - return False - - print 'SFive: will use 7 days of data ending with %s using a step size of %d seconds' % ( - datetime.datetime.utcfromtimestamp(self._end).isoformat('T'), self._duration) - - except rrdtool.error as e: - print 'SFive: rrdtool-error: %s' % (e) - return False - - return True - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def _socketError(self): - if self._conn: - self._conn.stopListening() - reactor.stop() - - def _socketReady(self): - print 'SFive: connection to sfive hub established' - self._sendInit() - cnt = self._sendRRDData() - print 'SFive: sent %d datasets' % (cnt) - - reactor.stop() - - def _sendInit(self): - initdata = { "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _sendRRDData(self): - try: - data_bytes = rrdtool.fetch(self._bytes_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - data_clients = rrdtool.fetch(self._clients_rrdfile, 'AVERAGE', '-s', 'end-7d', '-e', '%d' % self._end) - except rrdtool.error as e: - print 'SFive: rrdtool-error: %s' % (e) - return 0 - - ts = data_bytes[0][0] - max = len(data_bytes[2]) - if max < 1: - return 0 - - cnt = 0 - for idx in range(0, max - 1): - try: - self._sendDataset(datetime.datetime.utcfromtimestamp(ts), self._duration, - data_clients[2][idx][0], data_bytes[2][idx][0]) - ts += self._duration - cnt += 1 - except ValueError as err: - print 'SFive: timestamp "%d" seems wrong: %s' % (ts, err) - - return cnt - - def _sendDataset(self, timestamp, duration, client_count, bytes_sent): - data = { "start-time": timestamp.isoformat('T'), - "duration-ms": duration * 1000, - "data": { - "client-count": client_count, - "bytes-sent": bytes_sent - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive Flumotion RRD Importer') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--content-id', '-c', dest='content-id', required=True, - help='the content-id (i.e. av)') - parser.add_argument('--format', '-f', dest='format', required=True, - help='the format (i.e. webm)') - parser.add_argument('--quality', '-q', dest='quality', required=True, - help='the quality (i.e. high)') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--bytes-rrdfile', '-b', dest='bytes-rrdfile', required=True, - help='path to the RRD File containing the bytes sent') - parser.add_argument('--clients-rrdfile', '-N', dest='clients-rrdfile', required=True, - help='path to the RRD File containing the number of clients') - args = vars(parser.parse_args()) - if not args['tags']: - args['tags'] = [] - importer = FlumotionRRD(args) - importer.run() -- cgit v1.2.3