summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
authorChristian Pointner <equinox@spreadspace.org>2014-10-20 06:05:56 +0200
committerChristian Pointner <equinox@spreadspace.org>2014-10-20 06:05:56 +0200
commit26fc9a24efb968da3399439db43b360b674004a9 (patch)
treed5e62b9f28374ac9bc197628c587f1af66f2a441 /src/daq
parentdaq: nginx-lua fetcher improved init (diff)
dag: accesslog importer can now import multiple streamerids in one pass
major refactoring done
Diffstat (limited to 'src/daq')
-rwxr-xr-xsrc/daq/accesslog/s5-accesslog.py286
1 files changed, 184 insertions, 102 deletions
diff --git a/src/daq/accesslog/s5-accesslog.py b/src/daq/accesslog/s5-accesslog.py
index e559041..44df905 100755
--- a/src/daq/accesslog/s5-accesslog.py
+++ b/src/daq/accesslog/s5-accesslog.py
@@ -32,17 +32,16 @@
#
from errno import EINTR, EMSGSIZE, EAGAIN, EWOULDBLOCK, ECONNREFUSED, ENOBUFS
-from twisted.internet import protocol, reactor, unix
+from twisted.internet import protocol, unix, reactor, task
import socket
from time import sleep
-import sys
import re
import simplejson as json
import datetime
import dateutil.parser
-_MAX_PACKET_SIZE = 8192 # in bytes
+_MAX_PACKET_SIZE = 65536 # in bytes
__version__ = "$Rev$"
@@ -77,28 +76,27 @@ class SFiveProto(protocol.ConnectedDatagramProtocol):
self._importer = importer
def startProtocol(self):
- self._importer._socketReady()
+ self._importer.socketReady()
def datagramReceived(self, data):
print 'SFive: received datagram: "%s" (will get ignored)' % (data)
def connectionFailed(self, failure):
print 'SFive: connection failed: %s' % (failure.getErrorMessage())
- self._importer._socketError()
+ self._importer.socketError()
def sendDatagram(self, data):
try:
return self.transport.write(data)
except socket.error as err:
print 'SFive: sending datagram failed: %s' % (err)
- self._importer._socketError()
+ self._importer.socketError()
class ClientList:
- def __init__(self, file_re):
+ def __init__(self):
self._clients = { }
- self._file_re = file_re
def clear(self):
self._clients = { }
@@ -116,21 +114,14 @@ class ClientList:
def values(self):
return self._clients.itervalues()
- def update(self, linedata):
- if linedata['status'] != 200 and linedata['status'] != 206:
- return
- if linedata['req']['method'] != 'GET':
- return
-
- try:
- if re.match(self._file_re, linedata['req']['url']):
- if linedata['client'] in self._clients.keys():
- self._clients[linedata['client']]['bytes-sent'] += linedata['size']
- else:
- self._clients[linedata['client']] = { 'ip': linedata['client'],
- 'bytes-sent': linedata['size'] }
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
+ def update(self, logdata):
+ key = '%(client)s/%(ua)s' % logdata
+ if key in self._clients.keys():
+ self._clients[key]['bytes-sent'] += logdata['bytes-sent']
+ else:
+ self._clients[key] = { 'ip': logdata['client'],
+ 'user-agent': logdata['ua'],
+ 'bytes-sent': logdata['bytes-sent'] }
class AccessLog():
@@ -141,16 +132,60 @@ class AccessLog():
self._socket = properties['socket']
self._hostname = properties['hostname']
- self._content_id = properties['content-id']
- self._format = properties['format']
- self._quality = properties['quality']
+ self._content_id = None
+ self._format = None
+ self._quality = None
self._duration = properties['duration']
self._tags = properties['tags']
self._logfile = properties['logfile']
- self._nameformat = properties['nameformat']
+ self._initStreamerIds(properties['streamer-ids'])
self._proto = None
self._conn = None
+ self._connected = False
+ self._url_re = re.compile(properties['nameformat'])
+
+
+ def _initStreamerIds(self, streamer):
+ print 'SFive: will look for the following streamer ids:'
+ self._streamer = {}
+ cs = {}
+ fs = {}
+ qs = {}
+ for s in streamer:
+ parts = s.split('/')
+ if len(parts) != 3:
+ raise ValueError('invalid streamer descriptipn "%s": must consist of 3 parts seperated by a /' % s)
+ scs = parts[0].split(',')
+ sfs = parts[1].split(',')
+ sqs = parts[2].split(',')
+ for c in scs:
+ cs[c] = 1
+ if c not in self._streamer:
+ self._streamer[c] = {}
+ for f in sfs:
+ fs[f] = 1
+ if f not in self._streamer[c]:
+ self._streamer[c][f] = {}
+ for q in sqs:
+ qs[q] = 1
+ if q not in self._streamer[c][f]:
+ self._streamer[c][f][q] = ClientList()
+ print ' %s / %s / %s' % (c, f, q)
+
+ if len(cs.keys()) == 1:
+ self._content_id = cs.keys()[0]
+ print 'SFive: only one content-id detected "%s" - will include it in init messages only' % self._content_id
+
+ if len(fs.keys()) == 1:
+ self._format = fs.keys()[0]
+ print 'SFive: only one format detected "%s" - will include it in init messages only' % self._format
+
+ if len(qs.keys()) == 1:
+ self._quality = qs.keys()[0]
+ print 'SFive: only one quality detected "%s" - will include it in init messages only' % self._quality
+
+
def run(self):
if self._initLog():
@@ -159,57 +194,15 @@ class AccessLog():
def _initLog(self):
try:
- if self._logfile:
- print 'SFive: will batch import form %s' % (self._logfile if self._logfile != '-' else 'standard input')
- self._fd = open(self._logfile, 'r') if self._logfile != '-' else sys.stdin
- else:
- print 'SFive: live mode enabled'
-
- regex = self._nameformat % { 'hostname': self._hostname,
- 'content-id': self._content_id,
- 'format': self._format,
- 'quality': self._quality }
- self._file_re = re.compile(regex)
- print "SFive: will be looking for files like '%s'" % regex
+ print 'SFive: will batch import form %s' % (self._logfile)
+ self._fd = open(self._logfile, 'r')
+
except IOError as e:
print 'SFive: error opening logfile: %s' % (e.strerror)
return False
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
- return False
return True
- def _initSocket(self):
- print 'SFive: trying to connect to %s...' % (self._socket)
- self._proto = SFiveProto(self)
- self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
- self._conn.startListening()
-
- def _socketError(self):
- if self._conn:
- self._conn.stopListening()
- reactor.stop()
-
- def _socketReady(self):
- print 'SFive: connection to sfive hub established'
- self._sendInit()
- if hasattr(self, '_fd'):
- cnt = self._sendLogData(self._fd, None)
- print 'SFive: imported %d datasets' % (cnt)
- self._fd.close()
- else:
- print 'SFive: live mode is not yet implemented!'
- # TODO: wait self._duration seconds and call _sendData with
- # all lines received lines since last update
-
- reactor.stop()
-
- def _sendInit(self):
- initdata = { "version": 1, "hostname": self._hostname,
- "streamer-id": { "content-id": self._content_id, "format": self._format, "quality": self._quality },
- "tags": self._tags }
- self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
def _prepareLineRegex(self):
parts = [ r'(?P<client>\S+)', r'\S+', r'\S+', r'\[(?P<ts>.+)\]', r'"(?P<req>.+)"',
@@ -239,58 +232,144 @@ class AccessLog():
def _parseLine(self, regex, line):
linedata = regex.match(line).groupdict()
- for part in ("client", "ref", "ua"):
- if linedata[part] == "-":
+ for part in ('client', 'ref', 'ua'):
+ if linedata[part] == '-':
linedata[part] = None
- linedata["status"] = int(linedata["status"])
- linedata["size"] = 0 if linedata["size"] == "-" else int(linedata["size"])
+ linedata['status'] = int(linedata['status'])
+ linedata['bytes-sent'] = 0 if linedata['size'] == '-' else int(linedata['size'])
linedata['req'] = self._parseRequest(linedata['req'])
linedata['ts'] = self._parseDatetime(linedata['ts'])
return linedata
- def _sendLogData(self, data, lastts):
- cnt = 0
- nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration)
- clients = ClientList(self._file_re)
+ def _sendLogData(self):
+ linecnt = 0
+ updatecnt = 0
+ lastts = None
+ nextts = None
try:
regex = self._prepareLineRegex()
- for line in data:
+ for line in self._fd:
+ linecnt += 1
linedata = self._parseLine(regex, line)
if not lastts:
lastts = linedata['ts']
+ # TODO: make this aligned to time!
nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients.clear()
- clients.update(linedata)
+ self._clearClients()
+ self._updateClients(linedata)
else:
while linedata['ts'] >= nextts:
- self._sendDataset(lastts, self._duration, clients)
- cnt += 1
+ updatecnt += self._sendUpdates(lastts)
lastts = nextts
nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients.clear()
-
- clients.update(linedata)
+ self._clearClients()
+ self._updateClients(linedata)
# send remaining data
if nextts:
- self._sendDataset(lastts, self._duration, clients)
- cnt += 1
+ updatecnt += self._sendUpdates(lastts)
except re.error as e:
print 'SFive: regex error: %s' % (e)
+ return updatecnt, linecnt
+
+ def _updateClients(self, linedata):
+ if linedata['status'] != 200 and linedata['status'] != 206:
+ return
+ if linedata['req']['method'] != 'GET':
+ return
+
+ try:
+ m = re.match(self._url_re, linedata['req']['url'])
+ if not m:
+ return
+ streamerid = m.groupdict()
+ except re.error as e:
+ print 'SFive: regex error: %s' % (e)
+
+ try:
+ self._streamer[streamerid['content']][streamerid['format']][streamerid['quality']].update(linedata)
+ except KeyError as e:
+ pass
+
+ def _sendUpdates(self, ts):
+ cnt = 0
+ for c in self._streamer.keys():
+ for f in self._streamer[c].keys():
+ for q in self._streamer[c][f].keys():
+ self._sendDataset(ts, self._duration, c, f, q)
+ cnt +=1
return cnt
- def _sendDataset(self, timestamp, duration, clients):
+ def _clearClients(self):
+ for c in self._streamer.keys():
+ for f in self._streamer[c].keys():
+ for q in self._streamer[c][f].keys():
+ self._streamer[c][f][q].clear()
+
+
+ def _initSocket(self):
+ print 'SFive: trying to connect to %s...' % (self._socket)
+ self._connected = False
+ self._proto = SFiveProto(self)
+ self._conn = SFivePort(self._socket, self._proto, _MAX_PACKET_SIZE, 0o666, None, reactor)
+ self._conn.startListening()
+
+ def socketReady(self):
+ print 'SFive: connection to sfive hub established'
+ self._connected = True
+ self._sendInit()
+ updatecnt, linecnt = self._sendLogData()
+ print 'SFive: imported %d datasets from %d lines' % (updatecnt, linecnt)
+ self._fd.close()
+ if reactor.running:
+ reactor.stop()
+
+ def socketError(self):
+ print 'SFive: connection to sfive hub lost'
+ if self._conn and self._connected:
+ self._conn.stopListening()
+ self._connected = False
+ if reactor.running:
+ reactor.stop()
+
+ def _sendInit(self):
+ initdata = { "version": 1, "hostname": self._hostname,
+ "streamer-id" : { },
+ "tags": self._tags }
+ if self._content_id:
+ initdata["streamer-id"]["content-id"] = self._content_id
+ if self._format:
+ initdata["streamer-id"]["format"] = self._format
+ if self._quality:
+ initdata["streamer-id"]["quality"] = self._quality
+ if len(initdata["streamer-id"].keys()) == 0:
+ del initdata["streamer-id"]
+
+ self._proto.sendDatagram('%s\n' % (json.dumps(initdata)));
+
+ def _sendDataset(self, timestamp, duration, content_id, format, quality):
+ clients = self._streamer[content_id][format][quality]
data = { "start-time": timestamp.isoformat('T') + 'Z',
"duration-ms": duration * 1000,
+ "streamer-id": { },
"data": {
"clients": list(clients.values()),
"client-count": clients.getCnt(),
"bytes-sent": clients.getBytesSent()
}
}
+ if not self._content_id:
+ data["streamer-id"]["content-id"] = content_id
+ if not self._format:
+ data["streamer-id"]["format"] = format
+ if not self._quality:
+ data["streamer-id"]["quality"] = quality
+ if len(data["streamer-id"].keys()) == 0:
+ del data["streamer-id"]
+
self._proto.sendDatagram('%s\n' % (json.dumps(data)));
@@ -302,24 +381,27 @@ if __name__ == '__main__':
help='the path to the data socket of the local SFive hub')
parser.add_argument('--hostname', '-n', dest='hostname', required=True,
help='the hostname of the machine')
- parser.add_argument('--content-id', '-c', dest='content-id', required=True,
- help='the content-id (i.e. av)')
- parser.add_argument('--format', '-f', dest='format', required=True,
- help='the format (i.e. webm)')
- parser.add_argument('--quality', '-q', dest='quality', required=True,
- help='the quality (i.e. high)')
parser.add_argument('--duration', '-d', dest='duration', required=False,
- help='time (in seconds) between updates; defaults to 5')
+ help='time (in seconds) between updates; defaults to 15')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
- parser.add_argument('--logfile', '-l', dest='logfile', required=False,
+ parser.add_argument('--streamer-id', '-S', dest='streamer-ids', action='append',
+ help='a streamer description like <content-id1>[,<content-id2>]/<format1>[,<format2>]/<quality>[,<quality>[,<quality]], can be invoked several times')
+ parser.add_argument('--logfile', '-l', dest='logfile', required=True,
help='path to the logfile or \'-\' for standard input')
- parser.add_argument('--nameformat', '-F', dest='nameformat', required=True,
- help='the format for filenames which are part of this stream, this may include python string expressions')
+ parser.add_argument('--nameformat', '-F', dest='nameformat', required=False,
+ help='a regular expression (must contain named matches for content, format, quality)')
args = vars(parser.parse_args())
if not args['tags']:
args['tags'] = []
if not args['duration']:
- args['duration'] = 5
+ args['duration'] = 15
+ else:
+ args['duration'] = int(args['duration'])
+ if not args['nameformat']:
+ args['nameformat'] = '/[^/]+/(?P<format>hls|dash)/(?P<content>.+)-(?P<quality>[^-]+)/.*'
+ if not args['streamer-ids']:
+ print 'SFive: you have to specify at least one streamer-id!'
+ exit(-1)
importer = AccessLog(args)
importer.run()