summaryrefslogtreecommitdiff
path: root/src/daq/accesslog/sfive-accesslog.py
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-14 02:47:38 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-14 02:47:38 +0200
commit84e77bf2ac0dfe34d152f2fa27fd31633bd3eadd (patch)
tree89d2059e4c2c4f7054575d52c8c92a4021a467bd /src/daq/accesslog/sfive-accesslog.py
parentMerge branch 'master' of gitspread:sfive (diff)
renamed daq plugins
Diffstat (limited to 'src/daq/accesslog/sfive-accesslog.py')
-rwxr-xr-xsrc/daq/accesslog/sfive-accesslog.py325
1 files changed, 0 insertions, 325 deletions
diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py
deleted file mode 100755
index 7e74090..0000000
--- a/src/daq/accesslog/sfive-accesslog.py
+++ /dev/null
@@ -1,325 +0,0 @@
-#!/usr/bin/python
-#
-# sfive
-#
-# sfive - spreadspace streaming statistics suite is a generic
-# statistic collection tool for streaming server infrastuctures.
-# The system collects and stores meta data like number of views
-# and throughput from a number of streaming servers and stores
-# it in a global data store.
-# The data acquisition is designed to be generic and extensible in
-# order to support different streaming software.
-# sfive also contains tools and applications to filter and visualize
-# live and recorded data.
-#
-#
-# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
-# Markus Grueneis <gimpf@gimpf.org>
-#
-# This file is part of sfive.
-#
-# sfive is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3
-# as published by the Free Software Foundation.
-#
-# sfive is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with sfive. If not, see <http://www.gnu.org/licenses/>.
-#
-
-from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
-from twisted.internet import protocol, reactor, unix
-import socket
-from time import sleep
-
-import sys
-import re
-import simplejson as json
-import datetime
-import dateutil.parser
-
-_MAX_PACKET_SIZE = 8192 # in bytes
-
-__version__ = "$Rev$"
-
-class SFivePort(unix.ConnectedDatagramPort):
-
- def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
- unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
-
- def write(self, data):
- try:
- return self.socket.send(data)
- except socket.error, se:
- no = se.args[0]
- if no == EINTR:
- return self.write(data)
- elif no == EMSGSIZE:
- raise error.MessageLengthError, "message too long"
- elif no == ECONNREFUSED:
- self.protocol.connectionRefused()
- elif no == EAGAIN:
- # the send buffer seems to be full - let's wait a little while...
- # this is not really a good solution but better than the aproach
- # of twisted which just drops the datagram...
- sleep(0.01)
- return self.write(data)
- else:
- raise
-
-class SFiveProto(protocol.ConnectedDatagramProtocol):
-
- def __init__(self, importer):
- self._importer = importer
-
- def startProtocol(self):
- self._importer._socketReady()
-
- def datagramReceived(self, data):
- print 'SFive: received datagram: "%s" (will get ignored)' % (data)
-
- def connectionFailed(self, failure):
- print 'SFive: connection failed: %s' % (failure.getErrorMessage())
- self._importer._socketError()
-
- def sendDatagram(self, data):
- try:
- return self.transport.write(data)
- except socket.error as err:
- print 'SFive: sending datagram failed: %s' % (err)
- self._importer._socketError()
-
-
-class ClientList:
-
- def __init__(self, file_re):
- self._clients = { }
- self._file_re = file_re
-
- def clear(self):
- self._clients = { }
-
- def getCnt(self):
- return len(self._clients)
-
- def getBytesSent(self):
- sum = 0
- for val in self._clients.itervalues():
- sum += val['bytes-sent']
-
- return sum
-
- def values(self):
- return self._clients.itervalues()
-
- def update(self, linedata):
- if linedata['status'] != 200 and linedata['status'] != 206:
- return
- if linedata['req']['method'] != 'GET':
- return
-
- try:
- if re.match(self._file_re, linedata['req']['url']):
- if linedata['client'] in self._clients.keys():
- self._clients[linedata['client']]['bytes-sent'] += linedata['size']
- else:
- self._clients[linedata['client']] = { 'ip': linedata['client'],
- 'bytes-sent': linedata['size'] }
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
-
-
-class AccessLog():
- """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite"""
-
- def __init__(self, properties):
- print 'SFive: accesslog file importer loaded'
-
- self._socket = properties['socket']
- self._hostname = properties['hostname']
- self._content_id = properties['content-id']
- self._format = properties['format']
- self._quality = properties['quality']
- self._duration = properties['duration']
- self._tags = properties['tags']
- self._logfile = properties['logfile']
- self._nameformat = properties['nameformat']
-
- self._proto = None
- self._conn = None
-
- def run(self):
- if self._initLog():
- reactor.callWhenRunning(self._initSocket)
- reactor.run()
-
- def _initLog(self):
- try:
- if self._logfile:
- print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input')
- self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin
- else:
- print 'SFive: live mode enabled'
-
- regex = self._nameformat % { 'hostname': self._hostname,
- 'content-id': self._content_id,
- 'format': self._format,
- 'quality': self._quality }
- self._file_re = re.compile(regex)
- print "SFive: will be looking for files like '%s'" % regex
- except IOError as e:
- print 'SFive: error opening logfile: %s' % (e.strerror)
- return False
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
- return False
-
- return True
-
- def _initSocket(self):
- print 'SFive: trying to connect to %s...' % (self._socket)
- self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
- self._conn.startListening()
-
- def _socketError(self):
- if self._conn:
- self._conn.stopListening()
- reactor.stop()
-
- def _socketReady(self):
- print 'SFive: connection to sfive hub established'
- self._sendInit()
- if hasattr(self, '_fd'):
- cnt = self._sendLogData(self._fd, None)
- print 'SFive: imported %d datasets' % (cnt)
- self._fd.close()
- else:
- print 'SFive: live mode is not yet implemented!'
- # TODO: wait self._duration seconds and call _sendData with
- # all lines received lines since last update
-
- reactor.stop()
-
- def _sendInit(self):
- initdata = { "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
-
- def _prepareLineRegex(self):
- parts = [ r'(?P<client>\S+)', r'\S+', r'\S+', r'\[(?P<ts>.+)\]', r'"(?P<req>.+)"',
- r'(?P<status>[0-9]+)', r'(?P<size>\S+)', r'"(?P<ref>.*)"', r'"(?P<ua>.*)"']
- return re.compile(r'\s+'.join(parts)+r'\s*\Z')
-
- def _parseRequest(self, reqstr):
- req = { 'method': None, 'url': None, 'proto': None }
- try:
- parts = reqstr.split()
- req['method'] = parts[0] if parts[0] != '-' else None
- req['url'] = parts[1]
- req['proto'] = parts[2]
- except IndexError:
- pass
-
- return req
-
- def _parseDatetime(self, datetimestr):
- try:
- ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True)
- ts = (ts - ts.utcoffset()).replace(tzinfo=None)
- return ts
- except ValueError as e:
- return None
-
- def _parseLine(self, regex, line):
- linedata = regex.match(line).groupdict()
-
- for part in ("client", "ref", "ua"):
- if linedata[part] == "-":
- linedata[part] = None
-
- linedata["status"] = int(linedata["status"])
- linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"])
- linedata['req'] = self._parseRequest(linedata['req'])
- linedata['ts'] = self._parseDatetime(linedata['ts'])
- return linedata
-
- def _sendLogData(self, data, lastts):
- cnt = 0
- nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration)
- clients = ClientList(self._file_re)
- try:
- regex = self._prepareLineRegex()
- for line in data:
- linedata = self._parseLine(regex, line)
- if not lastts:
- lastts = linedata['ts']
- nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients.clear()
- clients.update(linedata)
- else:
- while linedata['ts'] >= nextts:
- self._sendDataset(lastts, self._duration, clients)
- cnt += 1
- lastts = nextts
- nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients.clear()
-
- clients.update(linedata)
-
- # send remaining data
- if nextts:
- self._sendDataset(lastts, self._duration, clients)
- cnt += 1
-
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
-
- return cnt
-
- def _sendDataset(self, timestamp, duration, clients):
- data = { "start-time": timestamp.isoformat('T'),
- "duration-ms": duration * 1000,
- "data": {
- "clients": list(clients.values()),
- "client-count": clients.getCnt(),
- "bytes-sent": clients.getBytesSent()
- }
- }
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
-
-
-if __name__ == '__main__':
- import argparse
-
- parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer')
- parser.add_argument('--socket', '-s', dest='socket', required=True,
- help='the path to the data socket of the local SFive hub')
- parser.add_argument('--hostname', '-n', dest='hostname', required=True,
- help='the hostname of the machine')
- parser.add_argument('--content-id', '-c', dest='content-id', required=True,
- help='the content-id (i.e. av)')
- parser.add_argument('--format', '-f', dest='format', required=True,
- help='the format (i.e. webm)')
- parser.add_argument('--quality', '-q', dest='quality', required=True,
- help='the quality (i.e. high)')
- parser.add_argument('--duration', '-d', dest='duration', required=False,
- help='time (in seconds) between updates; defaults to 5')
- parser.add_argument('--tag', '-t', dest='tags', action='append',
- help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--logfile', '-l', dest='logfile', required=False,
- help='path to the logfile or \'-\' for standard input')
- parser.add_argument('--nameformat', '-F', dest='nameformat', required=True,
- help='the format for filenames which are part of this stream, this may include python string expressions')
- args = vars(parser.parse_args())
- if not args['tags']:
- args['tags'] = []
- if not args['duration']:
- args['duration'] = 5
- importer = AccessLog(args)
- importer.run()