summaryrefslogtreecommitdiff
path: root/src/daq/flumotion-plug/s5.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq/flumotion-plug/s5.py')
-rw-r--r--src/daq/flumotion-plug/s5.py180
1 files changed, 180 insertions, 0 deletions
diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py
new file mode 100644
index 0000000..0f6abd5
--- /dev/null
+++ b/src/daq/flumotion-plug/s5.py
@@ -0,0 +1,180 @@
+#
+# sfive
+#
+# sfive - spreadspace streaming statistics suite is a generic
+# statistic collection tool for streaming server infrastuctures.
+# The system collects and stores meta data like number of views
+# and throughput from a number of streaming servers and stores
+# it in a global data store.
+# The data acquisition is designed to be generic and extensible in
+# order to support different streaming software.
+# sfive also contains tools and applications to filter and visualize
+# live and recorded data.
+#
+#
+# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
+# Markus Grueneis <gimpf@gimpf.org>
+#
+# This file is part of sfive.
+#
+# sfive is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3
+# as published by the Free Software Foundation.
+#
+# sfive is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with sfive. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+
+try:
+ import simplejson as json
+except ImportError:
+ json = None
+
+from flumotion.component.plugs import base
+from flumotion.common import messages, i18n, log
+from flumotion.common.poller import Poller
+
+from twisted.internet import protocol, reactor
+from socket import error as socket_error
+import datetime
+
+from flumotion.common.i18n import N_
+T_ = i18n.gettexter()
+
+_DEFAULT_POLL_INTERVAL = 5 # in seconds
+_RECONNECT_TIMEOUT = 2 # in seconds
+_MAX_PACKET_SIZE = 8192 # in bytes
+
+__version__ = "$Rev$"
+
+class SFiveProto(protocol.ConnectedDatagramProtocol):
+
+ def __init__(self, plug):
+ self._plug = plug
+
+ def stopProtocol(self):
+ self._plug.debug('SFive: protocol stopped')
+
+ def startProtocol(self):
+ self._plug.debug('SFive: protocol started')
+ self._plug._socketReady()
+
+ def datagramReceived(self, data):
+ self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data)
+
+ def connectionFailed(self, failure):
+ self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage())
+ self._plug._socketError()
+
+ def sendDatagram(self, data):
+ try:
+ return self.transport.write(data)
+ except socket_error as err:
+ self._plug.warning('SFive: sending datagram failed: %s', err)
+ self._plug._socketError()
+
+
+
+class ComponentSFivePlug(base.ComponentPlug):
+ """Class to send statistics to the spreadspace streaming statistic suite"""
+
+ ### ComponentPlug methods
+
+ def start(self, component):
+ self.debug('SFive: plug loaded')
+
+ self._sfivepoller = None
+ self._component = component
+
+ if not self._hasImport():
+ return
+
+ properties = self.args['properties']
+ self._socket = properties['socket']
+ self._hostname = properties['hostname']
+ self._content_id = properties.get('content-id')
+ self._format = properties.get('format')
+ self._quality = properties.get('quality')
+ tagstring = properties.get('tags', '')
+ self._tags = [x.strip() for x in tagstring.split(',')]
+
+ self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL)
+ self._sfivepoller = Poller(self._updateSFive, self._duration, start=False)
+
+ self._proto = None
+ self._conn = None
+
+ self._initSocket()
+
+ def _hasImport(self):
+ """Check simplejson availability"""
+ if not json:
+ m = messages.Warning(T_(N_(
+ "Cannot import module '%s'.\n"), 'simplejson'),
+ mid='simplejson-import-error')
+ m.add(T_(N_(
+ "The SFive plug for this component is disabled.")))
+ self._component.addMessage(m)
+ return False
+
+ return True
+
+ def stop(self, component):
+ if self._sfivepoller:
+ self._sfivepoller.stop()
+ if self._conn:
+ self._conn.stopListening()
+
+ def _initSocket(self):
+ self.info('SFive: trying to (re)connect to %s...', self._socket)
+ self._proto = SFiveProto(self)
+ self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE)
+
+ def _socketError(self):
+ if self._sfivepoller:
+ self._sfivepoller.stop()
+ if self._conn:
+ self._conn.stopListening()
+
+ reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket)
+
+ def _socketReady(self):
+ self.info('SFive: connection to sfive hub established')
+ self._old_bytes_received = -1
+ self._old_bytes_sent = -1
+ self._sendInit()
+ if self._sfivepoller:
+ self._sfivepoller.start()
+
+ def _sendInit(self):
+ initdata = { "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
+ def _updateSFive(self):
+ client_count = self._component.getClients()
+ bytes_received = self._component.getBytesReceived()
+ bytes_sent = self._component.getBytesSent()
+
+ bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0;
+ self._old_bytes_received = bytes_received
+ bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0;
+ self._old_bytes_sent = bytes_sent
+
+ data = { "start-time": datetime.datetime.utcnow().isoformat('T'),
+ "duration-ms": self._duration * 1000,
+ "data": {
+ "client-count": client_count,
+ "bytes-received": bytes_received_diff,
+ "bytes-sent": bytes_sent_diff
+ }
+ }
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)));