diff options
-rw-r--r-- | doc/protocol.md | 2 | ||||
-rwxr-xr-x | src/daq/accesslog/sfive-accesslog.py | 81 |
2 files changed, 54 insertions, 29 deletions
diff --git a/doc/protocol.md b/doc/protocol.md index d33c941..957fe11 100644 --- a/doc/protocol.md +++ b/doc/protocol.md @@ -19,7 +19,7 @@ data-update "duration-ms": 5000, "data": { "clients": [ - { "ip": "127.0.0.1:2345", "bytes-transferred": 12094, "user-agent": "Mozilla Version 28", .... }, + { "ip": "127.0.0.1", "bytes-sent": 12094, "user-agent": "Mozilla Version 28", .... }, ..... ], "client-count": 12, diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py index 3d45d61..7e74090 100755 --- a/src/daq/accesslog/sfive-accesslog.py +++ b/src/daq/accesslog/sfive-accesslog.py @@ -94,6 +94,45 @@ class SFiveProto(protocol.ConnectedDatagramProtocol): self._importer._socketError() +class ClientList: + + def __init__(self, file_re): + self._clients = { } + self._file_re = file_re + + def clear(self): + self._clients = { } + + def getCnt(self): + return len(self._clients) + + def getBytesSent(self): + sum = 0 + for val in self._clients.itervalues(): + sum += val['bytes-sent'] + + return sum + + def values(self): + return self._clients.itervalues() + + def update(self, linedata): + if linedata['status'] != 200 and linedata['status'] != 206: + return + if linedata['req']['method'] != 'GET': + return + + try: + if re.match(self._file_re, linedata['req']['url']): + if linedata['client'] in self._clients.keys(): + self._clients[linedata['client']]['bytes-sent'] += linedata['size'] + else: + self._clients[linedata['client']] = { 'ip': linedata['client'], + 'bytes-sent': linedata['size'] } + except re.error as e: + print 'SFive: regex error: %s' % (e) + + class AccessLog(): """Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite""" @@ -178,10 +217,10 @@ class AccessLog(): return re.compile(r'\s+'.join(parts)+r'\s*\Z') def _parseRequest(self, reqstr): - req = { 'cmd': None, 'url': None, 'proto': None } + req = { 'method': None, 'url': None, 'proto': None } try: parts = reqstr.split() - req['cmd'] = parts[0] if parts[0] != '-' else None + req['method'] = parts[0] if parts[0] != '-' else None req['url'] = parts[1] req['proto'] = parts[2] except IndexError: @@ -210,25 +249,10 @@ class AccessLog(): linedata['ts'] = self._parseDatetime(linedata['ts']) return linedata - def _updateClients(self, clients, linedata): - if linedata['status'] != 200 and linedata['status'] != 206: - return - if linedata['req']['cmd'] != 'GET': - return - - try: - if re.match(self._file_re, linedata['req']['url']): - if linedata['client'] in clients.keys(): - clients[linedata['client']] += linedata['size'] - else: - clients[linedata['client']] = linedata['size'] - except re.error as e: - print 'SFive: regex error: %s' % (e) - def _sendLogData(self, data, lastts): cnt = 0 nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) - clients = { } + clients = ClientList(self._file_re) try: regex = self._prepareLineRegex() for line in data: @@ -236,21 +260,21 @@ class AccessLog(): if not lastts: lastts = linedata['ts'] nextts = lastts + datetime.timedelta(seconds=self._duration) - clients = { } - self._updateClients(clients, linedata) + clients.clear() + clients.update(linedata) else: - while linedata['ts'] > nextts: - self._sendDataset(nextts, self._duration, len(clients), sum(clients.itervalues())) + while linedata['ts'] >= nextts: + self._sendDataset(lastts, self._duration, clients) cnt += 1 lastts = nextts nextts = lastts + datetime.timedelta(seconds=self._duration) - clients = { } + clients.clear() - self._updateClients(clients, linedata) + clients.update(linedata) # send remaining data if nextts: - self._sendDataset(nextts, self._duration, len(clients), sum(clients.itervalues())) + self._sendDataset(lastts, self._duration, clients) cnt += 1 except re.error as e: @@ -258,12 +282,13 @@ class AccessLog(): return cnt - def _sendDataset(self, timestamp, duration, client_count, bytes_sent): + def _sendDataset(self, timestamp, duration, clients): data = { "start-time": timestamp.isoformat('T'), "duration-ms": duration * 1000, "data": { - "client-count": client_count, - "bytes-sent": bytes_sent + "clients": list(clients.values()), + "client-count": clients.getCnt(), + "bytes-sent": clients.getBytesSent() } } self._proto.sendDatagram('%s\n' % (json.dumps(data))); |