summaryrefslogtreecommitdiff
path: root/src/daq/nginx-lua/s5-nginx-lua-fetch.py
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-21 06:54:28 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-21 06:54:28 +0200
commitf992b2073a675dbe8ee230f2b9b71deef5159afc (patch)
tree03b538019189f9f5500fd815c4847e78990cc917 /src/daq/nginx-lua/s5-nginx-lua-fetch.py
parenthub: added clean target for Makefile (diff)
daq: removed .py extension for scripts
Diffstat (limited to 'src/daq/nginx-lua/s5-nginx-lua-fetch.py')
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch.py384
1 files changed, 0 insertions, 384 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
deleted file mode 100755
index 86ef940..0000000
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py
+++ /dev/null
@@ -1,384 +0,0 @@
-#!/usr/bin/python
-#
-# sfive
-#
-# sfive - spreadspace streaming statistics suite is a generic
-# statistic collection tool for streaming server infrastuctures.
-# The system collects and stores meta data like number of views
-# and throughput from a number of streaming servers and stores
-# it in a global data store.
-# The data acquisition is designed to be generic and extensible in
-# order to support different streaming software.
-# sfive also contains tools and applications to filter and visualize
-# live and recorded data.
-#
-#
-# Copyright (C) 2014 Christian Pointner <equinox@spreadspace.org>
-# Markus Grueneis <gimpf@gimpf.org>
-#
-# This file is part of sfive.
-#
-# sfive is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License version 3
-# as published by the Free Software Foundation.
-#
-# sfive is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with sfive. If not, see <http://www.gnu.org/licenses/>.
-#
-
-from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
-from twisted.internet import protocol, unix, reactor, task
-from twisted.internet.protocol import Protocol
-from twisted.internet.defer import Deferred
-from twisted.web.client import Agent
-from twisted.web.http_headers import Headers
-import socket
-from time import sleep
-
-import re
-import simplejson as json
-import datetime
-import time
-import dateutil.parser
-
-_MAX_PACKET_SIZE = 65536 # in bytes
-
-__version__ = "$Rev$"
-
-
-class SFiveNGXluaProto(Protocol):
- def __init__(self, finished, fetcher):
- self.data = ''
- self.finished = finished
- self._fetcher = fetcher
- print 'SFive: started receiving log data'
-
- def dataReceived(self, bytes):
- self.data += bytes
- lines = self.data.splitlines(True)
- if len(lines) > 0:
- if '\n' not in lines[-1] and '\r' not in lines[-1]:
- self.data = lines[-1]
- del lines[-1]
- else:
- self.data = ''
-
- if len(lines) > 0:
- for line in lines:
- self._fetcher.updateData(json.loads(line))
-
- def connectionLost(self, reason):
- print 'SFive: finished receiving log data:', reason.getErrorMessage()
- self._fetcher.close()
-
-
-class SFivePort(unix.ConnectedDatagramPort):
-
- def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
- unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
-
- def write(self, data):
- try:
- return self.socket.send(data)
- except socket.error, se:
- no = se.args[0]
- if no == EINTR:
- return self.write(data)
- elif no == EMSGSIZE:
- raise error.MessageLengthError, "message too long"
- elif no == ECONNREFUSED:
- self.protocol.connectionRefused()
- elif no == EAGAIN:
- # the send buffer seems to be full - let's wait a little while...
- # this is not really a good solution but better than the aproach
- # of twisted which just drops the datagram...
- sleep(0.01)
- return self.write(data)
- else:
- raise
-
-class SFiveProto(protocol.ConnectedDatagramProtocol):
-
- def __init__(self, fetcher):
- self._fetcher = fetcher
-
- def startProtocol(self):
- self._fetcher.socketReady()
-
- def datagramReceived(self, data):
- print 'SFive: received datagram: "%s" (will get ignored)' % (data)
-
- def connectionFailed(self, failure):
- print 'SFive: connection failed: %s' % (failure.getErrorMessage())
- self._fetcher.socketError()
-
- def sendDatagram(self, data):
- try:
- return self.transport.write(data)
- except socket.error as err:
- print 'SFive: sending datagram failed: %s' % (err)
- self._fetcher.socketError()
-
-
-class ClientList:
-
- def __init__(self):
- self._clients = { }
-
- def clear(self):
- self._clients = { }
-
- def getCnt(self):
- return len(self._clients)
-
- def getBytesSent(self):
- sum = 0
- for val in self._clients.itervalues():
- sum += val['bytes-sent']
-
- return sum
-
- def values(self):
- return self._clients.itervalues()
-
- def update(self, logdata):
- key = '%(client)s/%(ua)s' % logdata
- if key in self._clients.keys():
- self._clients[key]['bytes-sent'] += logdata['bytes-sent']
- else:
- self._clients[key] = { 'ip': logdata['client'],
- 'user-agent': logdata['ua'],
- 'bytes-sent': logdata['bytes-sent'] }
-
-
-class NGXLuaFetcher():
- """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite"""
-
- def __init__(self, properties):
- print 'SFive: nginx-lua fetcher loaded'
-
- self._socket = properties['socket']
- self._hostname = properties['hostname']
- self._content_id = None
- self._format = None
- self._quality = None
- self._duration = properties['duration']
- self._tags = properties['tags']
- self._url = properties['url']
- self._initStreamerIds(properties['streamer-ids'])
-
- self._proto = None
- self._conn = None
- self._connected = False
- self._url_re = re.compile(properties['nameformat'])
- self._looper = None
- self._start_time = None
-
- def _initStreamerIds(self, streamer):
- print 'SFive: will look for the following streamer ids:'
- self._streamer = {}
- cs = {}
- fs = {}
- qs = {}
- for s in streamer:
- parts = s.split('/')
- if len(parts) != 3:
- raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s)
- scs = parts[0].split(',')
- sfs = parts[1].split(',')
- sqs = parts[2].split(',')
- for c in scs:
- cs[c] = 1
- if c not in self._streamer:
- self._streamer[c] = {}
- for f in sfs:
- fs[f] = 1
- if f not in self._streamer[c]:
- self._streamer[c][f] = {}
- for q in sqs:
- qs[q] = 1
- if q not in self._streamer[c][f]:
- self._streamer[c][f][q] = ClientList()
- print ' %s / %s / %s' % (c, f, q)
-
- if len(cs.keys()) == 1:
- self._content_id = cs.keys()[0]
- print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
-
- if len(fs.keys()) == 1:
- self._format = fs.keys()[0]
- print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format
-
- if len(qs.keys()) == 1:
- self._quality = qs.keys()[0]
- print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality
-
-
-
- def run(self):
- self._initFetch()
- # try to be aligned with current time
- # this will eventually get out of sync but for now this is good enough
- offset = self._duration - (time.time() % self._duration)
- print 'SFive: %sZ -> will wait %0.2f seconds before starting looper (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset)
- self._looper = task.LoopingCall(self._sendUpdates)
- reactor.callLater(offset, self._startLooper)
- reactor.callWhenRunning(self._initSocket)
- reactor.run()
-
- def _startLooper(self):
- self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
- self._looper.start(self._duration, False)
- print 'SFive: looper started at %sZ' % (self._start_time.isoformat('T'))
-
- def _sendUpdates(self):
- for c in self._streamer.keys():
- for f in self._streamer[c].keys():
- for q in self._streamer[c][f].keys():
- if self._connected:
- self._sendDataset(self._start_time, self._duration, c, f, q)
- self._streamer[c][f][q].clear()
-
- self._start_time = datetime.datetime.utcnow().replace(microsecond=0)
-
- def _initFetch(self):
- print "SFive: trying to fetch from '%s'" % self._url
- agent = Agent(reactor)
- d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None)
- d.addCallback(self._httpResponse)
- d.addBoth(self.close)
-
- def _httpResponse(self, response):
- print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase)
- finished = Deferred()
- response.deliverBody(SFiveNGXluaProto(finished, self))
- return finished
-
- def updateData(self, data):
- try:
- m = re.match(self._url_re, data['url'])
- if not m:
- return
- streamerid = m.groupdict()
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
-
- try:
- if data['status'] == 200 or data['status'] == 206:
- ts = dateutil.parser.parse(data['time'])
- ts = (ts - ts.utcoffset()).replace(tzinfo=None)
- if self._start_time and ts >= self._start_time:
- self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(data)
-
- except KeyError as e:
- print 'SFive: %s - ignoring dataset' % (e)
- except ValueError as e:
- print 'SFive: %s - ignoring dataset' % (e)
-
- def close(self):
- if self._conn and self._connected:
- self._conn.stopListening()
- self._connected = False
- if reactor.running:
- reactor.stop()
-
-
-
- def _initSocket(self):
- print 'SFive: trying to connect to %s...' % (self._socket)
- self._connected = False
- self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
- self._conn.startListening()
-
- def socketReady(self):
- print 'SFive: connection to sfive hub established'
- self._connected = True
- self._sendInit()
- # TODO: start callback every self._duration seconds
-
- def socketError(self):
- print 'SFive: connection to sfive hub lost - trying reconnect'
- if self._conn and self._connected:
- self._conn.stopListening()
- self._connected = False
- reactor.callLater(3, self._initSocket)
-
- def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id" : { },
- "tags": self._tags }
- if self._content_id:
- initdata["streamer-id"]["content-id"] = self._content_id
- if self._format:
- initdata["streamer-id"]["format"] = self._format
- if self._quality:
- initdata["streamer-id"]["quality"] = self._quality
- if len(initdata["streamer-id"].keys()) == 0:
- del initdata["streamer-id"]
-
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
-
- def _sendDataset(self, timestamp, duration, content_id, format, quality):
- clients = self._streamer[content_id][format][quality]
- data = { "start-time": timestamp.isoformat('T') + 'Z',
- "duration-ms": duration * 1000,
- "streamer-id": { },
- "data": {
- "clients": list(clients.values()),
- "client-count": clients.getCnt(),
- "bytes-sent": clients.getBytesSent()
- }
- }
- if not self._content_id:
- data["streamer-id"]["content-id"] = content_id
- if not self._format:
- data["streamer-id"]["format"] = format
- if not self._quality:
- data["streamer-id"]["quality"] = quality
- if len(data["streamer-id"].keys()) == 0:
- del data["streamer-id"]
-
- self._proto.sendDatagram('%s\n' % (json.dumps(data)));
-
-
-if __name__ == '__main__':
- import argparse
-
- parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher')
- parser.add_argument('--socket', '-s', dest='socket', required=True,
- help='the path to the data socket of the local SFive hub')
- parser.add_argument('--hostname', '-n', dest='hostname', required=True,
- help='the hostname of the machine')
- parser.add_argument('--duration', '-d', dest='duration', required=False,
- help='time (in seconds) between updates; defaults to 15')
- parser.add_argument('--tag', '-t', dest='tags', action='append',
- help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
- help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
- parser.add_argument('--url', '-u', dest='url', required=False,
- help='the url of the nginx sfive exporert, default http://localhost/sfive')
- parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
- help='a regular expression (must contain named matches for content, format, quality)')
- args = vars(parser.parse_args())
- if not args['tags']:
- args['tags'] = []
- if not args['duration']:
- args['duration'] = 15
- else:
- args['duration'] = int(args['duration'])
- if not args['url']:
- args['url'] = 'http://localhost/sfive'
- if not args['nameformat']:
- args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
- if not args['streamer-ids']:
- print 'SFive: you have to specify at least one streamer-id!'
- exit(-1)
-
- fetcher = NGXLuaFetcher(args)
- fetcher.run()