# # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # import os try: import simplejson as json except ImportError: json = None from flumotion.component.plugs import base from flumotion.common import messages, i18n, log from flumotion.common.poller import Poller from twisted.internet import protocol, reactor from socket import error as socket_error import datetime from flumotion.common.i18n import N_ T_ = i18n.gettexter() _DEFAULT_POLL_INTERVAL = 5 # in seconds _RECONNECT_TIMEOUT = 2 # in seconds _MAX_PACKET_SIZE = 8192 # in bytes __version__ = "$Rev$" class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, plug): self._plug = plug def stopProtocol(self): self._plug.debug('SFive: protocol stopped') def startProtocol(self): self._plug.debug('SFive: protocol started') self._plug._socketReady() def datagramReceived(self, data): self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) def connectionFailed(self, failure): self._plug.warning('SFive: connection failed: %s', failure.getErrorMessage()) self._plug._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket_error as err: self._plug.warning('SFive: sending datagram failed: %s', err) self._plug._socketError() class ComponentSFivePlug(base.ComponentPlug): """Class to send statistics to the spreadspace streaming statistic suite""" ### ComponentPlug methods def start(self, component): self.debug('SFive: plug loaded') self._sfivepoller = None self._component = component if not self._hasImport(): return properties = self.args['properties'] self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties.get('content-id') self._format = properties.get('format') self._quality = properties.get('quality') tagstring = properties.get('tags', '') self._tags = [x.strip() for x in tagstring.split(',')] self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) self._proto = None self._conn = None reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) def _hasImport(self): """Check simplejson availability""" if not json: m = messages.Warning(T_(N_( "Cannot import module '%s'.\n"), 'simplejson'), mid='simplejson-import-error') m.add(T_(N_( "The SFive plug for this component is disabled."))) self._component.addMessage(m) return False return True def stop(self, component): if self._sfivepoller: self._sfivepoller.stop() if self._conn: self._conn.stopListening() def _initSocket(self): self.info('SFive: trying to (re)connect to %s...', self._socket) self._proto = SFiveProto(self) self._conn = reactor.connectUNIXDatagram(self._socket, self._proto, maxPacketSize=_MAX_PACKET_SIZE) def _socketError(self): if self._sfivepoller: self._sfivepoller.stop() if self._conn: self._conn.stopListening() reactor.callLater(_RECONNECT_TIMEOUT, self._initSocket) def _socketReady(self): self.info('SFive: connection to sfive hub established') self._sendInit() if self._sfivepoller: self._old_bytes_received = self._component.getBytesReceived() self._old_bytes_sent = self._component.getBytesSent() self._start_time = datetime.datetime.utcnow().replace(microsecond=0) self._sfivepoller.start() def _sendInit(self): initdata = { "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": self._tags } self._proto.sendDatagram('%s\n' % (json.dumps(initdata))) def _updateSFive(self): client_count = self._component.getClients() bytes_received = self._component.getBytesReceived() bytes_sent = self._component.getBytesSent() bytes_received_diff = bytes_received - self._old_bytes_received self._old_bytes_received = bytes_received bytes_sent_diff = bytes_sent - self._old_bytes_sent self._old_bytes_sent = bytes_sent data = { "start-time": self._start_time.isoformat('T'), "duration-ms": self._duration * 1000, "data": { "client-count": client_count, "bytes-received": bytes_received_diff, "bytes-sent": bytes_sent_diff } } self._proto.sendDatagram('%s\n' % (json.dumps(data))) self._start_time = datetime.datetime.utcnow().replace(microsecond=0)