summaryrefslogtreecommitdiff
path: root/src/daq/flumotion-plug/sfive.py
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-14 02:47:38 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-14 02:47:38 +0200
commit84e77bf2ac0dfe34d152f2fa27fd31633bd3eadd (patch)
tree89d2059e4c2c4f7054575d52c8c92a4021a467bd /src/daq/flumotion-plug/sfive.py
parentMerge branch 'master' of gitspread:sfive (diff)
renamed daq plugins
Diffstat (limited to 'src/daq/flumotion-plug/sfive.py')
-rw-r--r--src/daq/flumotion-plug/sfive.py180
1 files changed, 0 insertions, 180 deletions
diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py
deleted file mode 100644
index 0f6abd5..0000000
--- a/src/daq/flumotion-plug/sfive.py
+++ /dev/null
@@ -1,180 +0,0 @@
-#
-# sfive
-#
-# sfive - spreadspace streaming statistics suite is a generic
-# statistic collection tool for streaming server infrastuctures.
-# The system collects and stores meta data like number of views
-# and throughput from a number of streaming servers and stores
-# it in a global data store.
-# The data acquisition is designed to be generic and extensible in
-# order to support different streaming software.
-# sfive also contains tools and applications to filter and visualize
-# live and recorded data.
-#
-#
-# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
-# Markus Grueneis <gimpf@gimpf.org>
-#
-# This file is part of sfive.
-#
-# sfive is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3
-# as published by the Free Software Foundation.
-#
-# sfive is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with sfive. If not, see <http://www.gnu.org/licenses/>.
-#
-
-import os
-
-try:
- import simplejson as json
-except ImportError:
- json = None
-
-from flumotion.component.plugs import base
-from flumotion.common import messages, i18n, log
-from flumotion.common.poller import Poller
-
-from twisted.internet import protocol, reactor
-from socket import error as socket_error
-import datetime
-
-from flumotion.common.i18n import N_
-T_ = i18n.gettexter()
-
-_DEFAULT_POLL_INTERVAL = 5 # in seconds
-_RECONNECT_TIMEOUT = 2 # in seconds
-_MAX_PACKET_SIZE = 8192 # in bytes
-
-__version__ = "$Rev$"
-
-class SFiveProto(protocol.ConnectedDatagramProtocol):
-
- def __init__(self, plug):
- self._plug = plug
-
- def stopProtocol(self):
- self._plug.debug('SFive: protocol stopped')
-
- def startProtocol(self):
- self._plug.debug('SFive: protocol started')
- self._plug._socketReady()
-
- def datagramReceived(self, data):
- self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data)
-
- def connectionFailed(self, failure):
- self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage())
- self._plug._socketError()
-
- def sendDatagram(self, data):
- try:
- return self.transport.write(data)
- except socket_error as err:
- self._plug.warning('SFive: sending datagram failed: %s', err)
- self._plug._socketError()
-
-
-
-class ComponentSFivePlug(base.ComponentPlug):
- """Class to send statistics to the spreadspace streaming statistic suite"""
-
- ### ComponentPlug methods
-
- def start(self, component):
- self.debug('SFive: plug loaded')
-
- self._sfivepoller = None
- self._component = component
-
- if not self._hasImport():
- return
-
- properties = self.args['properties']
- self._socket = properties['socket']
- self._hostname = properties['hostname']
- self._content_id = properties.get('content-id')
- self._format = properties.get('format')
- self._quality = properties.get('quality')
- tagstring = properties.get('tags', '')
- self._tags = [x.strip() for x in tagstring.split(',')]
-
- self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL)
- self._sfivepoller = Poller(self._updateSFive, self._duration, start=False)
-
- self._proto = None
- self._conn = None
-
- self._initSocket()
-
- def _hasImport(self):
- """Check simplejson availability"""
- if not json:
- m = messages.Warning(T_(N_(
- "Cannot import module '%s'.\n"), 'simplejson'),
- mid='simplejson-import-error')
- m.add(T_(N_(
- "The SFive plug for this component is disabled.")))
- self._component.addMessage(m)
- return False
-
- return True
-
- def stop(self, component):
- if self._sfivepoller:
- self._sfivepoller.stop()
- if self._conn:
- self._conn.stopListening()
-
- def _initSocket(self):
- self.info('SFive: trying to (re)connect to %s...', self._socket)
- self._proto = SFiveProto(self)
- self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE)
-
- def _socketError(self):
- if self._sfivepoller:
- self._sfivepoller.stop()
- if self._conn:
- self._conn.stopListening()
-
- reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket)
-
- def _socketReady(self):
- self.info('SFive: connection to sfive hub established')
- self._old_bytes_received = -1
- self._old_bytes_sent = -1
- self._sendInit()
- if self._sfivepoller:
- self._sfivepoller.start()
-
- def _sendInit(self):
- initdata = { "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
-
- def _updateSFive(self):
- client_count = self._component.getClients()
- bytes_received = self._component.getBytesReceived()
- bytes_sent = self._component.getBytesSent()
-
- bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0;
- self._old_bytes_received = bytes_received
- bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0;
- self._old_bytes_sent = bytes_sent
-
- data = { "start-time": datetime.datetime.utcnow().isoformat('T'),
- "duration-ms": self._duration * 1000,
- "data": {
- "client-count": client_count,
- "bytes-received": bytes_received_diff,
- "bytes-sent": bytes_sent_diff
- }
- }
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));