summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-19 03:03:16 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-19 03:04:20 +0200
commit54d96241d5c6c59556b81f03e8569575e855e42d (patch)
treed020dcbdf2fed5c287ed09edff9381585aaefe2c /src/daq
parentadded fetcher for nginx-lua (diff)
daq: nginx-lua fetch almost done!?
Diffstat (limited to 'src/daq')
-rwxr-xr-xsrc/daq/nginx-lua/s5-nginx-lua-fetch.py242
-rw-r--r--src/daq/nginx-lua/s5-nginx.lua2
2 files changed, 216 insertions, 28 deletions
diff --git a/src/daq/nginx-lua/s5-nginx-lua-fetch.py b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
index ef13d02..78f5f73 100755
--- a/src/daq/nginx-lua/s5-nginx-lua-fetch.py
+++ b/src/daq/nginx-lua/s5-nginx-lua-fetch.py
@@ -31,20 +31,30 @@
# along with sfive. If not, see <http://www.gnu.org/licenses/>.
#
-from twisted.internet import reactor
-from twisted.internet.defer import Deferred
+from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
+from twisted.internet import protocol, reactor, unix
from twisted.internet.protocol import Protocol
+from twisted.internet.defer import Deferred
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
+import socket
+from time import sleep
+import re
import simplejson as json
import datetime
+_MAX_PACKET_SIZE = 8192 # in bytes
+
+__version__ = "$Rev$"
+
+
class SFiveNGXluaProto(Protocol):
- def __init__(self, finished):
+ def __init__(self, finished, importer):
self.data = ''
self.finished = finished
- print 'Started receiving log data'
+ self._importer = importer
+ print 'SFive: started receiving log data'
def dataReceived(self, bytes):
self.data += bytes
@@ -57,29 +67,207 @@ class SFiveNGXluaProto(Protocol):
self.data = ''
if len(lines) > 0:
- print '\nlog data received:'
for line in lines:
- print json.loads(line)
+ print "SFive: '%s'" % line.strip()
+ self._importer.updateData(json.loads(line))
def connectionLost(self, reason):
- print 'Finished receiving log data:', reason.getErrorMessage()
- self.finished.callback(None)
-
-agent = Agent(reactor)
-d = agent.request(
- 'GET',
- 'http://calypso.spreadspace.org/sfive',
- Headers({'User-Agent': ['SFive nginx-lua fetcher']}),
- None)
-
-def cbRequest(response):
- finished = Deferred()
- response.deliverBody(SFiveNGXluaProto(finished))
- return finished
-d.addCallback(cbRequest)
-
-def cbShutdown(ignored):
- reactor.stop()
-d.addBoth(cbShutdown)
-
-reactor.run()
+ print 'SFive: finished receiving log data:', reason.getErrorMessage()
+ self._importer._socketError()
+
+
+class SFivePort(unix.ConnectedDatagramPort):
+
+ def __init__(self, addr, proto, maxPacketSize=8192, mode=0666, bindAddress=None, reactor=None):
+ unix.ConnectedDatagramPort.__init__(self, addr, proto, maxPacketSize, mode, bindAddress, reactor)
+
+ def write(self, data):
+ try:
+ return self.socket.send(data)
+ except socket.error, se:
+ no = se.args[0]
+ if no == EINTR:
+ return self.write(data)
+ elif no == EMSGSIZE:
+ raise error.MessageLengthError, "message too long"
+ elif no == ECONNREFUSED:
+ self.protocol.connectionRefused()
+ elif no == EAGAIN:
+ # the send buffer seems to be full - let's wait a little while...
+ # this is not really a good solution but better than the aproach
+ # of twisted which just drops the datagram...
+ sleep(0.01)
+ return self.write(data)
+ else:
+ raise
+
+class SFiveProto(protocol.ConnectedDatagramProtocol):
+
+ def __init__(self, importer):
+ self._importer = importer
+
+ def startProtocol(self):
+ self._importer._socketReady()
+
+ def datagramReceived(self, data):
+ print 'SFive: received datagram: "%s" (will get ignored)' % (data)
+
+ def connectionFailed(self, failure):
+ print 'SFive: connection failed: %s' % (failure.getErrorMessage())
+ self._importer._socketError()
+
+ def sendDatagram(self, data):
+ try:
+ return self.transport.write(data)
+ except socket.error as err:
+ print 'SFive: sending datagram failed: %s' % (err)
+ self._importer._socketError()
+
+
+class ClientList:
+
+ def __init__(self, re):
+ self._re = re
+ self._clients = { }
+
+ def clear(self):
+ self._clients = { }
+
+ def getCnt(self):
+ return len(self._clients)
+
+ def getBytesSent(self):
+ sum = 0
+ for val in self._clients.itervalues():
+ sum += val['bytes-sent']
+
+ return sum
+
+ def values(self):
+ return self._clients.itervalues()
+
+ def update(self, logdata):
+ ## TODO: parse logdata['uri'] !!!
+ if logdata['client'] in self._clients.keys():
+ self._clients[logdata['client']]['bytes-sent'] += logdata['bytes-sent']
+ else:
+ self._clients[logdata['client']] = { 'ip': logdata['client'],
+ 'bytes-sent': logdata['bytes-sent'] }
+
+
+class NGXLuaFetcher():
+ """Class to import live log data produced by s5-nginx.lua into the spreadspace streaming statistic suite"""
+
+ def __init__(self, properties):
+ print 'SFive: nginx-lua fetcher loaded'
+
+ self._socket = properties['socket']
+ self._hostname = properties['hostname']
+ self._content_id = properties['content-id']
+ self._format = properties['format']
+ self._quality = properties['quality']
+ self._duration = properties['duration']
+ self._tags = properties['tags']
+ self._url = properties['url']
+ self._nameformat = properties['nameformat']
+
+ self._proto = None
+ self._conn = None
+ self._clients = None
+ self._reactor = reactor
+
+ def run(self):
+ if self._initFetch():
+ self._reactor.callWhenRunning(self._initSocket)
+ self._reactor.run()
+
+ def _initFetch(self):
+ try:
+ self._clients = ClientList(re.compile(self._nameformat)) # TODO: check for named matches
+
+ print "SFive: trying to fetch from '%s'" % self._url
+ agent = Agent(self._reactor)
+ d = agent.request('GET', self._url, Headers({'User-Agent': ['SFive nginx-lua fetcher']}), None)
+ d.addCallback(self._httpResponse)
+ d.addBoth(self._socketError)
+
+ except re.error as e:
+ print 'SFive: regex error: %s' % (e)
+ return False
+
+ return True
+
+ def _httpResponse(self, response):
+ print 'SFive: got response from %s: %d %s' % (self._url, response.code, response.phrase)
+ finished = Deferred()
+ response.deliverBody(SFiveNGXluaProto(finished, self))
+ return finished
+
+ def _initSocket(self):
+ print 'SFive: trying to connect to %s...' % (self._socket)
+ self._proto = SFiveProto(self)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, self._reactor)
+ self._conn.startListening()
+
+ def _socketError(self):
+ if self._conn:
+ self._conn.stopListening()
+ self._reactor.stop()
+
+ def _socketReady(self):
+ print 'SFive: connection to sfive hub established'
+ self._sendInit()
+ # TODO: start callback every self._duration seconds
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
+ "tags": self._tags }
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
+ def updateData(self, data):
+ self._clients.update(data)
+
+ def _sendDataset(self, timestamp, duration, clients):
+ data = { "start-time": timestamp.isoformat('T') + 'Z',
+ "duration-ms": duration * 1000,
+ "data": {
+ "clients": list(clients.values()),
+ "client-count": clients.getCnt(),
+ "bytes-sent": clients.getBytesSent()
+ }
+ }
+ self._proto.sendDatagram('%s\n' % (json.dumps(data)));
+
+
+if __name__ == '__main__':
+ import argparse
+
+ parser = argparse.ArgumentParser(description='SFive nginx-lua log line fetcher')
+ parser.add_argument('--socket', '-s', dest='socket', required=True,
+ help='the path to the data socket of the local SFive hub')
+ parser.add_argument('--hostname', '-n', dest='hostname', required=True,
+ help='the hostname of the machine')
+ parser.add_argument('--duration', '-d', dest='duration', required=False,
+ help='time (in seconds) between updates; defaults to 15')
+ parser.add_argument('--tag', '-t', dest='tags', action='append',
+ help='tag to be added to the statistic data, can be invoked several times')
+ parser.add_argument('--content-id', '-c', dest='content-id', required=False,
+ help='the content-id (i.e. av) - only used if nameformat doesn\'t contain a named match')
+ parser.add_argument('--format', '-f', dest='format', required=False,
+ help='the format (i.e. webm) - only used if nameformat doesn\'t contain a named match')
+ parser.add_argument('--quality', '-q', dest='quality', required=False,
+ help='the quality (i.e. high) - only used if nameformat doesn\'t contain a named match')
+ parser.add_argument('--url', '-u', dest='url', required=False,
+ help='the url of the nginx sfive exporert, default http://localhost/sfive')
+ parser.add_argument('--nameformat', '-F', dest='nameformat', required=True,
+ help='a regular expression (containing named matches for content, format, quality)')
+ args = vars(parser.parse_args())
+ if not args['tags']:
+ args['tags'] = []
+ if not args['duration']:
+ args['duration'] = 15
+ if not args['url']:
+ args['url'] = 'http://localhost/sfive'
+ fetcher = NGXLuaFetcher(args)
+ fetcher.run()
diff --git a/src/daq/nginx-lua/s5-nginx.lua b/src/daq/nginx-lua/s5-nginx.lua
index ac46224..2ae7e69 100644
--- a/src/daq/nginx-lua/s5-nginx.lua
+++ b/src/daq/nginx-lua/s5-nginx.lua
@@ -92,7 +92,7 @@ function _SFIVE.log()
json = json .. '"ua": "' .. ngx.var.http_user_agent .. '",'
json = json .. '"uri": "' .. ngx.var.uri .. '",'
json = json .. '"status": ' .. status .. ','
- json = json .. '"bytes_sent": ' .. ngx.var.bytes_sent
+ json = json .. '"bytes-sent": ' .. ngx.var.bytes_sent
json = json .. '}'
local ok, err, force = sfive_data:add(idx, json, config.log_exptime)