From f992b2073a675dbe8ee230f2b9b71deef5159afc Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Tue, 21 Oct 2014 06:54:28 +0200 Subject: daq: removed .py extension for scripts --- src/daq/accesslog/s5-accesslog.py | 414 -------------------------------------- 1 file changed, 414 deletions(-) delete mode 100755 src/daq/accesslog/s5-accesslog.py (limited to 'src/daq/accesslog/s5-accesslog.py') diff --git a/src/daq/accesslog/s5-accesslog.py b/src/daq/accesslog/s5-accesslog.py deleted file mode 100755 index e3d846f..0000000 --- a/src/daq/accesslog/s5-accesslog.py +++ /dev/null @@ -1,414 +0,0 @@ -#!/usr/bin/python -# -# sfive -# -# sfive - spreadspace streaming statistics suite is a generic -# statistic collection tool for streaming server infrastuctures. -# The system collects and stores meta data like number of views -# and throughput from a number of streaming servers and stores -# it in a global data store. -# The data acquisition is designed to be generic and extensible in -# order to support different streaming software. -# sfive also contains tools and applications to filter and visualize -# live and recorded data. -# -# -# Copyright (C) 2014 Christian Pointner -# Markus Grueneis -# -# This file is part of sfive. -# -# sfive is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License version 3 -# as published by the Free Software Foundation. -# -# sfive is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with sfive. If not, see . -# - -from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS -from twisted.internet import protocol, unix, reactor, task -import socket -from time import sleep - -import re -import simplejson as json -import datetime -import dateutil.parser - -_MAX_PACKET_SIZE = 65536 # in bytes - -__version__ = "$Rev$" - -class SFivePort(unix.ConnectedDatagramPort): - - def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None): - unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor) - - def write(self, data): - try: - return self.socket.send(data) - except socket.error, se: - no = se.args[0] - if no == EINTR: - return self.write(data) - elif no == EMSGSIZE: - raise error.MessageLengthError, "message too long" - elif no == ECONNREFUSED: - self.protocol.connectionRefused() - elif no == EAGAIN: - # the send buffer seems to be full - let's wait a little while... - # this is not really a good solution but better than the aproach - # of twisted which just drops the datagram... - sleep(0.01) - return self.write(data) - else: - raise - -class SFiveProto(protocol.ConnectedDatagramProtocol): - - def __init__(self, importer): - self._importer = importer - - def startProtocol(self): - self._importer.socketReady() - - def datagramReceived(self, data): - print 'SFive: received datagram: "%s" (will get ignored)' % (data) - - def connectionFailed(self, failure): - print 'SFive: connection failed: %s' % (failure.getErrorMessage()) - self._importer.socketError() - - def sendDatagram(self, data): - try: - return self.transport.write(data) - except socket.error as err: - print 'SFive: sending datagram failed: %s' % (err) - self._importer.socketError() - - -class ClientList: - - def __init__(self): - self._clients = { } - - def clear(self): - self._clients = { } - - def getCnt(self): - return len(self._clients) - - def getBytesSent(self): - sum = 0 - for val in self._clients.itervalues(): - sum += val['bytes-sent'] - - return sum - - def values(self): - return self._clients.itervalues() - - def update(self, logdata): - key = '%(client)s/%(ua)s' % logdata - if key in self._clients.keys(): - self._clients[key]['bytes-sent'] += logdata['bytes-sent'] - else: - self._clients[key] = { 'ip': logdata['client'], - 'user-agent': logdata['ua'], - 'bytes-sent': logdata['bytes-sent'] } - - -class AccessLog(): - """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" - - def __init__(self, properties): - print 'SFive: accesslog file importer loaded' - - self._socket = properties['socket'] - self._hostname = properties['hostname'] - self._content_id = None - self._format = None - self._quality = None - self._duration = properties['duration'] - self._tags = properties['tags'] - self._logfile = properties['logfile'] - self._initStreamerIds(properties['streamer-ids']) - - self._proto = None - self._conn = None - self._connected = False - self._url_re = re.compile(properties['nameformat']) - - - def _initStreamerIds(self, streamer): - print 'SFive: will look for the following streamer ids:' - self._streamer = {} - cs = {} - fs = {} - qs = {} - for s in streamer: - parts = s.split('/') - if len(parts) != 3: - raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s) - scs = parts[0].split(',') - sfs = parts[1].split(',') - sqs = parts[2].split(',') - for c in scs: - cs[c] = 1 - if c not in self._streamer: - self._streamer[c] = {} - for f in sfs: - fs[f] = 1 - if f not in self._streamer[c]: - self._streamer[c][f] = {} - for q in sqs: - qs[q] = 1 - if q not in self._streamer[c][f]: - self._streamer[c][f][q] = ClientList() - print ' %s / %s / %s' % (c, f, q) - - if len(cs.keys()) == 1: - self._content_id = cs.keys()[0] - print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id - - if len(fs.keys()) == 1: - self._format = fs.keys()[0] - print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format - - if len(qs.keys()) == 1: - self._quality = qs.keys()[0] - print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality - - - - def run(self): - if self._initLog(): - reactor.callWhenRunning(self._initSocket) - reactor.run() - - def _initLog(self): - try: - print 'SFive: will batch import form %s' % (self._logfile) - self._fd = open(self._logfile, 'r') - - except IOError as e: - print 'SFive: error opening logfile: %s' % (e.strerror) - return False - - return True - - - def _prepareLineRegex(self): - parts = [ r'(?P\S+)', r'\S+', r'\S+', r'\[(?P.+)\]', r'"(?P.+)"', - r'(?P[0-9]+)', r'(?P\S+)', r'"(?P.*)"', r'"(?P.*)"'] - return re.compile(r'\s+'.join(parts)+r'\s*\Z') - - def _parseRequest(self, reqstr): - req = { 'method': None, 'url': None, 'proto': None } - try: - parts = reqstr.split() - req['method'] = parts[0] if parts[0] != '-' else None - req['url'] = parts[1] - req['proto'] = parts[2] - except IndexError: - pass - - return req - - def _parseDatetime(self, datetimestr): - try: - ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) - ts = (ts - ts.utcoffset()).replace(tzinfo=None) - return ts - except ValueError as e: - return None - - def _parseLine(self, regex, line): - linedata = regex.match(line).groupdict() - - for part in ('client', 'ref', 'ua'): - if linedata[part] == '-': - linedata[part] = None - - linedata['status'] = int(linedata['status']) - linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size']) - linedata['req'] = self._parseRequest(linedata['req']) - linedata['ts'] = self._parseDatetime(linedata['ts']) - return linedata - - def _getTsFromLogDataAligned(self, ts): - try: - tsi = int(ts.strftime('%s')) - tsi = tsi - (tsi % self._duration) - return datetime.datetime.fromtimestamp(tsi) - except ValueError: - return ts - - def _sendLogData(self): - linecnt = 0 - updatecnt = 0 - lastts = None - nextts = None - try: - regex = self._prepareLineRegex() - for line in self._fd: - linecnt += 1 - linedata = self._parseLine(regex, line) - if not lastts: - lastts = self._getTsFromLogDataAligned(linedata['ts']) - nextts = lastts + datetime.timedelta(seconds=self._duration) - self._clearClients() - self._updateClients(linedata) - else: - while linedata['ts'] >= nextts: - updatecnt += self._sendUpdates(lastts) - lastts = nextts - nextts = lastts + datetime.timedelta(seconds=self._duration) - self._clearClients() - self._updateClients(linedata) - - # send remaining data - if nextts: - updatecnt += self._sendUpdates(lastts) - - except re.error as e: - print 'SFive: regex error: %s' % (e) - - return updatecnt, linecnt - - def _updateClients(self, linedata): - if linedata['status'] != 200 and linedata['status'] != 206: - return - if linedata['req']['method'] != 'GET': - return - - try: - m = re.match(self._url_re, linedata['req']['url']) - if not m: - return - streamerid = m.groupdict() - except re.error as e: - print 'SFive: regex error: %s' % (e) - - try: - self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata) - except KeyError as e: - pass - - def _sendUpdates(self, ts): - cnt = 0 - for c in self._streamer.keys(): - for f in self._streamer[c].keys(): - for q in self._streamer[c][f].keys(): - self._sendDataset(ts, self._duration, c, f, q) - cnt +=1 - return cnt - - def _clearClients(self): - for c in self._streamer.keys(): - for f in self._streamer[c].keys(): - for q in self._streamer[c][f].keys(): - self._streamer[c][f][q].clear() - - - def _initSocket(self): - print 'SFive: trying to connect to %s...' % (self._socket) - self._connected = False - self._proto = SFiveProto(self) - self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor) - self._conn.startListening() - - def socketReady(self): - print 'SFive: connection to sfive hub established' - self._connected = True - self._sendInit() - updatecnt, linecnt = self._sendLogData() - print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt) - self._fd.close() - if reactor.running: - reactor.stop() - - def socketError(self): - print 'SFive: connection to sfive hub lost' - if self._conn and self._connected: - self._conn.stopListening() - self._connected = False - if reactor.running: - reactor.stop() - - def _sendInit(self): - initdata = { "version": 1, "hostname": self._hostname, - "streamer-id" : { }, - "tags": self._tags } - if self._content_id: - initdata["streamer-id"]["content-id"] = self._content_id - if self._format: - initdata["streamer-id"]["format"] = self._format - if self._quality: - initdata["streamer-id"]["quality"] = self._quality - if len(initdata["streamer-id"].keys()) == 0: - del initdata["streamer-id"] - - self._proto.sendDatagram('%s\n' % (json.dumps(initdata))); - - def _sendDataset(self, timestamp, duration, content_id, format, quality): - clients = self._streamer[content_id][format][quality] - data = { "start-time": timestamp.isoformat('T') + 'Z', - "duration-ms": duration * 1000, - "streamer-id": { }, - "data": { - "clients": list(clients.values()), - "client-count": clients.getCnt(), - "bytes-sent": clients.getBytesSent() - } - } - if not self._content_id: - data["streamer-id"]["content-id"] = content_id - if not self._format: - data["streamer-id"]["format"] = format - if not self._quality: - data["streamer-id"]["quality"] = quality - if len(data["streamer-id"].keys()) == 0: - del data["streamer-id"] - - self._proto.sendDatagram('%s\n' % (json.dumps(data))); - - -if __name__ == '__main__': - import argparse - - parser = argparse.ArgumentParser(description='SFive nginx/apache accesslog Importer') - parser.add_argument('--socket', '-s', dest='socket', required=True, - help='the path to the data socket of the local SFive hub') - parser.add_argument('--hostname', '-n', dest='hostname', required=True, - help='the hostname of the machine') - parser.add_argument('--duration', '-d', dest='duration', required=False, - help='time (in seconds) between updates; defaults to 15') - parser.add_argument('--tag', '-t', dest='tags', action='append', - help='tag to be added to the statistic data, can be invoked several times') - parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append', - help='a streamer description like [,]/[,]/[,[,