diff options
Diffstat (limited to 'src/daq')
-rwxr-xr-x | src/daq/accesslog/sfive-accesslog.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py new file mode 100755 index 0000000..03504cb --- /dev/null +++ b/src/daq/accesslog/sfive-accesslog.py @@ -0,0 +1,184 @@ +#!/usr/bin/python +# +# sfive +# +# sfive - spreadspace streaming statistics suite is a generic +# statistic collection tool for streaming server infrastuctures. +# The system collects and stores meta data like number of views +# and throughput from a number of streaming servers and stores +# it in a global data store. +# The data acquisition is designed to be generic and extensible in +# order to support different streaming software. +# sfive also contains tools and applications to filter and visualize +# live and recorded data. +# +# +# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org> +# Markus Grueneis <gimpf@gimpf.org> +# +# This file is part of sfive. +# +# sfive is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 +# as published by the Free Software Foundation. +# +# sfive is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with sfive. If not, see <http://www.gnu.org/licenses/>. +# + +from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS +from twisted.internet import protocol, reactor, unix +import socket +from time import sleep + +import simplejson as json +import datetime + +_MAX_PACKET_SIZE = 8192 # in bytes + +__version__ = "$Rev$" + +class SFivePort(unix.ConnectedDatagramPort): + + def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): + unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) + + def write(self, data): + try: + return self.socket.send(data) + except socket.error, se: + no = se.args[0] + if no == EINTR: + return self.write(data) + elif no == EMSGSIZE: + raise error.MessageLengthError, "message too long" + elif no == ECONNREFUSED: + self.protocol.connectionRefused() + elif no == EAGAIN: + # the send buffer seems to be full - let's wait a little while... + # this is not really a good solution but better than the aproach + # of twisted which just drops the datagram... + sleep(0.01) + return self.write(data) + else: + raise + +class SFiveProto(protocol.ConnectedDatagramProtocol): + + def __init__(self, importer): + self._importer = importer + + def startProtocol(self): + self._importer._socketReady() + + def datagramReceived(self, data): + print 'SFive: received datagram: "%s" (will get ignored)' % (data) + + def connectionFailed(self, failure): + print 'SFive: connection failed: %s' % (failure.getErrorMessage()) + self._importer._socketError() + + def sendDatagram(self, data): + try: + return self.transport.write(data) + except socket.error as err: + print 'SFive: sending datagram failed: %s' % (err) + self._importer._socketError() + + +class AccessLog(): + """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" + + def __init__(self, properties): + print 'SFive: accesslog file importer loaded' + + self._socket = properties['socket'] + self._hostname = properties['hostname'] + self._content_id = properties['content-id'] + self._format = properties['format'] + self._quality = properties['quality'] + self._tags = properties['tags'] + self._logfile = properties['logfile'] + + self._initLog() + + self._proto = None + self._conn = None + + def _initLog(self): + # TODO: open file and init parser + return + + def run(self): + reactor.callWhenRunning(self._initSocket) + reactor.run() + + def _initSocket(self): + print 'SFive: trying to connect to %s...' % (self._socket) + self._proto = SFiveProto(self) + self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) + self._conn.startListening() + + def _socketError(self): + if self._conn: + self._conn.stopListening() + reactor.stop() + + def _socketReady(self): + print 'SFive: connection to sfive hub established' + self._sendInit() + cnt = self._sendLogData() + print 'SFive: sent %d datasets' % (cnt) + + reactor.stop() + + def _sendInit(self): + initdata = { "hostname": self._hostname, + "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality }, + "tags": self._tags } + self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); + + def _sendLogData(self): + cnt = 0 + # TODO: create datasets using parser and call _sendDataset() + return cnt + + def _sendDataset(self, timestamp, duration, client_count, bytes_sent): + data = { "start-time": timestamp.isoformat('T'), + "duration-ms": duration * 1000, + "data": { + "client-count": client_count, + "bytes-sent": bytes_sent + } + } + self._proto.sendDatagram('%s\n' % (json.dumps(data))); + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') + parser.add_argument('--socket', '-s', dest='socket', required=True, + help='the path to the data socket of the local SFive hub') + parser.add_argument('--hostname', '-n', dest='hostname', required=True, + help='the hostname of the machine') + parser.add_argument('--content-id', '-c', dest='content-id', required=True, + help='the content-id (i.e. av)') + parser.add_argument('--format', '-f', dest='format', required=True, + help='the format (i.e. webm)') + parser.add_argument('--quality', '-q', dest='quality', required=True, + help='the quality (i.e. high)') + parser.add_argument('--tag', '-t', dest='tags', action='append', + help='tag to be added to the statistic data, can be invoked several times') + parser.add_argument('--logfile', '-l', dest='logfile', required=True, + help='path to the logfile') + args = vars(parser.parse_args()) + if not args['tags']: + args['tags'] = [] + importer = AccessLog(args) + importer.run() |