# # sfive # # sfive - spreadspace streaming statistics suite is a generic # statistic collection tool for streaming server infrastuctures. # The system collects and stores meta data like number of views # and throughput from a number of streaming servers and stores # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. # sfive also contains tools and applications to filter and visualize # live and recorded data. # # # Copyright (C) 2014 Christian Pointner # Markus Grueneis # # This file is part of sfive. # # sfive is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License version 3 # as published by the Free Software Foundation. # # sfive is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with sfive. If not, see . # import os from flumotion.component.plugs import base from flumotion.common import messages, i18n, log from flumotion.common.poller import Poller from twisted.internet import protocol, reactor from socket import error as socket_error import simplejson as json import datetime from flumotion.common.i18n import N_ T_ = i18n.gettexter() _DEFAULT_POLL_INTERVAL = 5 # in seconds __version__ = "$Rev$" class SFiveProto(protocol.ConnectedDatagramProtocol): def __init__(self, plug): self._plug = plug def stopProtocol(self): self._plug.debug('SFive: protcol stopped') def startProtocol(self): self._plug._socketReady() def datagramReceived(self, data): self._plug.debug('SFive: received datagram: "%s" (will get ignored)', data) def connectionFailed(self, failure): self._plug.warning('SFive: "%s"', failure.getErrorMessage()) self._plug._socketError() def sendDatagram(self, data): try: return self.transport.write(data) except socket_error: self._plug._socketError() class ComponentSFivePlug(base.ComponentPlug): """Class to send statistics to the spreadspace streaming statistic suite""" ### ComponentPlug methods def start(self, component): self.debug('SFive: plug loaded') self._sfivepoller = None self._component = component properties = self.args['properties'] self._socket = properties['socket'] self._hostname = properties['hostname'] self._content_id = properties.get('content-id') self._format = properties.get('format') self._quality = properties.get('quality') self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) self._proto = None self._conn = None self._initSocket() def stop(self, component): if self._sfivepoller: self._sfivepoller.stop() if self._conn: self._conn.stopListening() def _initSocket(self): self.debug('SFive: trying to connect to %s...', self._socket) self._proto = SFiveProto(self) self._conn = reactor.connectUNIXDatagram(self._socket, self._proto) def _socketError(self): self.warning('SFive: connection error ... trying reconnect') if self._sfivepoller: self._sfivepoller.stop() if self._conn: self._conn.stopListening() reactor.callLater(5, self._initSocket) def _socketReady(self): self.info('SFive: connection to sfive hub established') self._old_bytes_received = -1 self._old_bytes_sent = -1 self._sendInit() if self._sfivepoller: self._sfivepoller.start() def _sendInit(self): self.debug('SFive: sending init message') initdata = { "hostname": self._hostname, "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, "tags": [] } self._proto.sendDatagram(json.dumps(initdata)); def _updateSFive(self): client_count = self._component.getClients() bytes_received = self._component.getBytesReceived() bytes_sent = self._component.getBytesSent() bytes_received_diff = bytes_received - self._old_bytes_received if self._old_bytes_received > 0 else 0; self._old_bytes_received = bytes_received bytes_sent_diff = bytes_sent - self._old_bytes_sent if self._old_bytes_sent > 0 else 0; self._old_bytes_sent = bytes_sent # TODO: create json and send it out # { # "start-time": "2014-08-03Z12:34:56.123", # "duration-ms": 5000, # "data": { # "clients": [ # { "ip": "127.0.0.1:2345", "bytes-transferred": 12094, "user-agent": "Mozilla Version 28", .... }, # ..... # ], # "client-count": 12, # "bytes-received": 12345, # "bytes-sent": 921734098, # .... # } data = { "start-time": datetime.datetime.utcnow().isoformat('Z'), "duration-ms": self._duration * 1000, "client-count": client_count, "bytes-received": bytes_received_diff, "bytes-sent": bytes_sent_diff } self._proto.sendDatagram(json.dumps(data));