summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/protocol.md2
-rwxr-xr-xsrc/daq/accesslog/sfive-accesslog.py81
2 files changed, 54 insertions, 29 deletions
diff --git a/doc/protocol.md b/doc/protocol.md
index d33c941..957fe11 100644
--- a/doc/protocol.md
+++ b/doc/protocol.md
@@ -19,7 +19,7 @@ data-update
"duration-ms": 5000,
"data": {
"clients": [
- { "ip": "127.0.0.1:2345", "bytes-transferred": 12094, "user-agent": "Mozilla Version 28", .... },
+ { "ip": "127.0.0.1", "bytes-sent": 12094, "user-agent": "Mozilla Version 28", .... },
.....
],
"client-count": 12,
diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py
index 3d45d61..7e74090 100755
--- a/src/daq/accesslog/sfive-accesslog.py
+++ b/src/daq/accesslog/sfive-accesslog.py
@@ -94,6 +94,45 @@ class SFiveProto(protocol.ConnectedDatagramProtocol):
self._importer._socketError()
+class ClientList:
+
+ def __init__(self, file_re):
+ self._clients = { }
+ self._file_re = file_re
+
+ def clear(self):
+ self._clients = { }
+
+ def getCnt(self):
+ return len(self._clients)
+
+ def getBytesSent(self):
+ sum = 0
+ for val in self._clients.itervalues():
+ sum += val['bytes-sent']
+
+ return sum
+
+ def values(self):
+ return self._clients.itervalues()
+
+ def update(self, linedata):
+ if linedata['status'] != 200 and linedata['status'] != 206:
+ return
+ if linedata['req']['method'] != 'GET':
+ return
+
+ try:
+ if re.match(self._file_re, linedata['req']['url']):
+ if linedata['client'] in self._clients.keys():
+ self._clients[linedata['client']]['bytes-sent'] += linedata['size']
+ else:
+ self._clients[linedata['client']] = { 'ip': linedata['client'],
+ 'bytes-sent': linedata['size'] }
+ except re.error as e:
+ print 'SFive: regex error: %s' % (e)
+
+
class AccessLog():
"""Class to batch import nginx/apache access logs into the spreadspace streaming statistic suite"""
@@ -178,10 +217,10 @@ class AccessLog():
return re.compile(r'\s+'.join(parts)+r'\s*\Z')
def _parseRequest(self, reqstr):
- req = { 'cmd': None, 'url': None, 'proto': None }
+ req = { 'method': None, 'url': None, 'proto': None }
try:
parts = reqstr.split()
- req['cmd'] = parts[0] if parts[0] != '-' else None
+ req['method'] = parts[0] if parts[0] != '-' else None
req['url'] = parts[1]
req['proto'] = parts[2]
except IndexError:
@@ -210,25 +249,10 @@ class AccessLog():
linedata['ts'] = self._parseDatetime(linedata['ts'])
return linedata
- def _updateClients(self, clients, linedata):
- if linedata['status'] != 200 and linedata['status'] != 206:
- return
- if linedata['req']['cmd'] != 'GET':
- return
-
- try:
- if re.match(self._file_re, linedata['req']['url']):
- if linedata['client'] in clients.keys():
- clients[linedata['client']] += linedata['size']
- else:
- clients[linedata['client']] = linedata['size']
- except re.error as e:
- print 'SFive: regex error: %s' % (e)
-
def _sendLogData(self, data, lastts):
cnt = 0
nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration)
- clients = { }
+ clients = ClientList(self._file_re)
try:
regex = self._prepareLineRegex()
for line in data:
@@ -236,21 +260,21 @@ class AccessLog():
if not lastts:
lastts = linedata['ts']
nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients = { }
- self._updateClients(clients, linedata)
+ clients.clear()
+ clients.update(linedata)
else:
- while linedata['ts'] > nextts:
- self._sendDataset(nextts, self._duration, len(clients), sum(clients.itervalues()))
+ while linedata['ts'] >= nextts:
+ self._sendDataset(lastts, self._duration, clients)
cnt += 1
lastts = nextts
nextts = lastts + datetime.timedelta(seconds=self._duration)
- clients = { }
+ clients.clear()
- self._updateClients(clients, linedata)
+ clients.update(linedata)
# send remaining data
if nextts:
- self._sendDataset(nextts, self._duration, len(clients), sum(clients.itervalues()))
+ self._sendDataset(lastts, self._duration, clients)
cnt += 1
except re.error as e:
@@ -258,12 +282,13 @@ class AccessLog():
return cnt
- def _sendDataset(self, timestamp, duration, client_count, bytes_sent):
+ def _sendDataset(self, timestamp, duration, clients):
data = { "start-time": timestamp.isoformat('T'),
"duration-ms": duration * 1000,
"data": {
- "client-count": client_count,
- "bytes-sent": bytes_sent
+ "clients": list(clients.values()),
+ "client-count": clients.getCnt(),
+ "bytes-sent": clients.getBytesSent()
}
}
self._proto.sendDatagram('%s\n' % (json.dumps(data)));