From 5e63bae8462b0adad408633c9daff2479e98e0a4 Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 3 Aug 2014 21:15:49 +0200 Subject: added inital flumotion plug --- LICENSE | 2 +- README | 2 +- doc/protocol.md | 10 ++-- src/daq/flumotion-plug/sfive.py | 116 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 5 deletions(-) create mode 100644 src/daq/flumotion-plug/sfive.py diff --git a/LICENSE b/LICENSE index 92c6d74..3821868 100644 --- a/LICENSE +++ b/LICENSE @@ -8,7 +8,7 @@ # it in a global data store. # The data acquisition is designed to be generic and extensible in # order to support different streaming software. -# sfive also contains tools and applications to filter and visualize +# sfive also contains tools and applications to filter and visualize # live and recorded data. # # diff --git a/README b/README index e9ee652..30c3698 100644 --- a/README +++ b/README @@ -8,7 +8,7 @@ and throughput from a number of streaming servers and stores it in a global data store. The data acquisition is designed to be generic and extensible in order to support different streaming software. -sfive also contains tools and applications to filter and visualize +sfive also contains tools and applications to filter and visualize live and recorded data. diff --git a/doc/protocol.md b/doc/protocol.md index 282c863..73641b8 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -1,6 +1,8 @@ -# Messages +Messages +======== -## init +init +---- { "hostname": "myhostname", @@ -8,7 +10,9 @@ "tags": [ "elevate", "2014", "discourse" ] } -## data-update + +data-update +----------- { "start-time": "2014-08-03Z12:34:56.123", diff --git a/src/daq/flumotion-plug/sfive.py b/src/daq/flumotion-plug/sfive.py new file mode 100644 index 0000000..9875393 --- /dev/null +++ b/src/daq/flumotion-plug/sfive.py @@ -0,0 +1,116 @@ +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner +# Markus Grüneis +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see . +# + +import os + +from flumotion.component.plugs import base +from flumotion.common import messages, i18n, log +from flumotion.common.poller import Poller + +from flumotion.common.i18n import N_ +T_ = i18n.gettexter() + +_DEFAULT_POLL_INTERVAL = 5 # in seconds + +__version__ = "$Rev$" + +class ComponentSFivePlug(base.ComponentPlug): + """Class to send statistics to the spreadspace streaming statistic suite""" + + ### ComponentPlug methods + + def start(self, component): + self.debug('SFive: plug loaded') + + self._sfivepoller = None + self._component = component + + properties = self.args['properties'] + self._socket = properties['socket'] + self._hostename = properties['hostname'] + self._content_id = properties.get('content-id') + self._format = properties.get('format') + self._quality = properties.get('quality') + + self._duration = properties.get('duration', _DEFAULT_POLL_INTERVAL) + self._sfivepoller = Poller(self._updateSFive, self._duration, start=False) + + self._initSocket() + + def stop(self, component): + if self._sfivepoller: + self._sfivepoller.stop() + ## TODO: close unix socket + + def _initSocket(self) + self.debug('SFive: connecting to %s...', self._socket) + ## TODO: init unix socket and call _socketReady when socket is ready + ## TODO: how handle connection drop??? + + def _socketError(self) + self.warning('SFive: connection lost... trying reconnect') + ## TODO: try reconnect... + + def _socketReady(self) + self.info('SFive: connection to sfive hub established') + if self._sfivepoller: + self._sfivepoller(start) + + def _sendInit(self) + # TODO: create json and send it out + # { + # "hostname": "myhostname", + # "streamer-id": { "content-id": "av-orig", "format": "flash", "quality": "medium" }, + # "tags": [ "elevate", "2014", "discourse" ] + # } + + + def _updateSFive(self): + """send data update""" + + client_count = self._component.getClients() + bytes_received = self._component.getBytesReceived() + bytes_sent = self._component.getBytesSent() + # TODO: create json and send it out + # { + # "start-time": "2014-08-03Z12:34:56.123", + # "duration-ms": 5000, + # "data": { + # "clients": [ + # { "ip": "127.0.0.1:2345", "bytes-transferred": 12094, "user-agent": "Mozilla Version 28", .... }, + # ..... + # ], + # "client-count": 12, + # "bytes-received": 12345, + # "bytes-sent": 921734098, + # .... + # } -- cgit v1.2.3