From 84e77bf2ac0dfe34d152f2fa27fd31633bd3eadd Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 14 Oct 2014 02:47:38 +0200 Subject: renamed daq plugins --- src/daq/flumotion-plug/s5.py | 180 ++++++++++++++++++++++++++++++++++++++++ src/daq/flumotion-plug/sfive.py | 180 ---------------------------------------- 2 files changed, 180 insertions(+), 180 deletions(-) create mode 100644 src/daq/flumotion-plug/s5.py delete mode 100644 src/daq/flumotion-plug/sfive.py (limited to 'src/daq/flumotion-plug') diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py new file mode 100644 index 0000000..0f6abd5 --- /dev/null +++ b/src/daq/flumotion-plug/s5.py @@ -0,0 +1,180 @@ +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grueneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +import os + +try: + import simplejson as json +except ImportError: + json = None + +from flumotion.component.plugs import base +from flumotion.common import messages, i18n, log +from flumotion.common.poller import Poller + +from twisted.internet import protocol, reactor +from socket import error as socket_error +import datetime + +from flumotion.common.i18n import N_ +T_ = i18n.gettexter() + +_DEFAULT_POLL_INTERVAL = 5 # in seconds +_RECONNECT_TIMEOUT = 2 # in seconds +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, plug): + self._plug = plug + + def stopProtocol(self): + self._plug.debug('SFive: protocol stopped') + + def startProtocol(self): + self._plug.debug('SFive: protocol started') + self._plug._socketReady() + + def datagramReceived(self, data): + self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) + + def connectionFailed(self, failure): + self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage()) + self._plug._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket_error as err: + self._plug.warning('SFive: sending datagram failed: %s', err) + self._plug._socketError() + + + +class ComponentSFivePlug(base.ComponentPlug): + """Class to send statistics to the spreadspace streaming statistic suite""" + + ### ComponentPlug methods + + def start(self, component): + self.debug('SFive: plug loaded') + + self._sfivepoller = None + self._component = component + + if not self._hasImport(): + return + + properties = self.args['properties'] + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties.get('content-id') + self._format = properties.get('format') + self._quality = properties.get('quality') + tagstring = properties.get('tags', '') + self._tags = [x.strip() for x in tagstring.split(',')] + + self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) + self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) + + self._proto = None + self._conn = None + + self._initSocket() + + def _hasImport(self): + """Check simplejson availability""" + if not json: + m = messages.Warning(T_(N_( + "Cannot import module '%s'.\n"), 'simplejson'), + mid='simplejson-import-error') + m.add(T_(N_( + "The SFive plug for this component is disabled."))) + self._component.addMessage(m) + return False + + return True + + def stop(self, component): + if self._sfivepoller: + self._sfivepoller.stop() + if self._conn: + self._conn.stopListening() + + def _initSocket(self): + self.info('SFive: trying to (re)connect to %s...', self._socket) + self._proto = SFiveProto(self) + self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) + + def _socketError(self): + if self._sfivepoller: + self._sfivepoller.stop() + if self._conn: + self._conn.stopListening() + + reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) + + def _socketReady(self): + self.info('SFive: connection to sfive hub established') + self._old_bytes_received = -1 + self._old_bytes_sent = -1 + self._sendInit() + if self._sfivepoller: + self._sfivepoller.start() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _updateSFive(self): + client_count = self._component.getClients() + bytes_received = self._component.getBytesReceived() + bytes_sent = self._component.getBytesSent() + + bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0; + self._old_bytes_received = bytes_received + bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; + self._old_bytes_sent = bytes_sent + + data = { "start-time": datetime.datetime.utcnow().isoformat('T'), + "duration-ms": self._duration * 1000, + "data": { + "client-count": client_count, + "bytes-received": bytes_received_diff, + "bytes-sent": bytes_sent_diff + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py deleted file mode 100644 index 0f6abd5..0000000 --- a/src/daq/flumotion-plug/sfive.py +++ /dev/null @@ -1,180 +0,0 @@ -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -import os - -try: - import simplejson as json -except ImportError: - json = None - -from flumotion.component.plugs import base -from flumotion.common import messages, i18n, log -from flumotion.common.poller import Poller - -from twisted.internet import protocol, reactor -from socket import error as socket_error -import datetime - -from flumotion.common.i18n import N_ -T_ = i18n.gettexter() - -_DEFAULT_POLL_INTERVAL = 5 # in seconds -_RECONNECT_TIMEOUT = 2 # in seconds -_MAX_PACKET_SIZE = 8192 # in bytes - -__version__ = "$Rev$" - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, plug): - self._plug = plug - - def stopProtocol(self): - self._plug.debug('SFive: protocol stopped') - - def startProtocol(self): - self._plug.debug('SFive: protocol started') - self._plug._socketReady() - - def datagramReceived(self, data): - self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) - - def connectionFailed(self, failure): - self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage()) - self._plug._socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket_error as err: - self._plug.warning('SFive: sending datagram failed: %s', err) - self._plug._socketError() - - - -class ComponentSFivePlug(base.ComponentPlug): - """Class to send statistics to the spreadspace streaming statistic suite""" - - ### ComponentPlug methods - - def start(self, component): - self.debug('SFive: plug loaded') - - self._sfivepoller = None - self._component = component - - if not self._hasImport(): - return - - properties = self.args['properties'] - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = properties.get('content-id') - self._format = properties.get('format') - self._quality = properties.get('quality') - tagstring = properties.get('tags', '') - self._tags = [x.strip() for x in tagstring.split(',')] - - self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) - self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) - - self._proto = None - self._conn = None - - self._initSocket() - - def _hasImport(self): - """Check simplejson availability""" - if not json: - m = messages.Warning(T_(N_( - "Cannot import module '%s'.\n"), 'simplejson'), - mid='simplejson-import-error') - m.add(T_(N_( - "The SFive plug for this component is disabled."))) - self._component.addMessage(m) - return False - - return True - - def stop(self, component): - if self._sfivepoller: - self._sfivepoller.stop() - if self._conn: - self._conn.stopListening() - - def _initSocket(self): - self.info('SFive: trying to (re)connect to %s...', self._socket) - self._proto = SFiveProto(self) - self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) - - def _socketError(self): - if self._sfivepoller: - self._sfivepoller.stop() - if self._conn: - self._conn.stopListening() - - reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) - - def _socketReady(self): - self.info('SFive: connection to sfive hub established') - self._old_bytes_received = -1 - self._old_bytes_sent = -1 - self._sendInit() - if self._sfivepoller: - self._sfivepoller.start() - - def _sendInit(self): - initdata = { "hostname": self._hostname, - "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, - "tags": self._tags } - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _updateSFive(self): - client_count = self._component.getClients() - bytes_received = self._component.getBytesReceived() - bytes_sent = self._component.getBytesSent() - - bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0; - self._old_bytes_received = bytes_received - bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; - self._old_bytes_sent = bytes_sent - - data = { "start-time": datetime.datetime.utcnow().isoformat('T'), - "duration-ms": self._duration * 1000, - "data": { - "client-count": client_count, - "bytes-received": bytes_received_diff, - "bytes-sent": bytes_sent_diff - } - } - self._proto.sendDatagram('%s\n' % (json.dumps(data))); -- cgit v1.2.3