summaryrefslogtreecommitdiff
path: root/src/daq
diff options
context:
space:
mode:
Diffstat (limited to 'src/daq')
-rwxr-xr-xsrc/daq/accesslog/sfive-accesslog.py52
1 files changed, 40 insertions, 12 deletions
diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py
index da77f98..a3140e0 100755
--- a/src/daq/accesslog/sfive-accesslog.py
+++ b/src/daq/accesslog/sfive-accesslog.py
@@ -105,6 +105,7 @@ class AccessLog():
self._content_id = properties['content-id']
self._format = properties['format']
self._quality = properties['quality']
+ self._duration = properties['duration']
self._tags = properties['tags']
self._logfile = properties['logfile']
self._nameformat = properties['nameformat']
@@ -130,7 +131,7 @@ class AccessLog():
'format': self._format,
'quality': self._quality }
self._file_re = re.compile(regex)
- print "will be looking for files like '%s'" % regex
+ print "SFive: will be looking for files like '%s'" % regex
except IOError as e:
print 'SFive: error opening logfile: %s' % (e.strerror)
return False
@@ -155,11 +156,13 @@ class AccessLog():
print 'SFive: connection to sfive hub established'
self._sendInit()
if hasattr(self, '_fd'):
- cnt = self._sendLogData(self._fd)
- print 'SFive: sent %d datasets' % (cnt)
+ cnt = self._sendLogData(self._fd, None)
+ print 'SFive: imported %d datasets' % (cnt)
self._fd.close()
else:
- print 'SFive: live mode is not implemented yet!'
+ print 'SFive: live mode is not yet implemented!'
+ # TODO: wait self._duration seconds and call _sendData with
+ # all lines received lines since last update
reactor.stop()
@@ -188,7 +191,9 @@ class AccessLog():
def _parseDatetime(self, datetimestr):
try:
- return dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True)
+ ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True)
+ ts = (ts - ts.utcoffset()).replace(tzinfo=None)
+ return ts
except ValueError as e:
return None
@@ -205,19 +210,38 @@ class AccessLog():
linedata['ts'] = self._parseDatetime(linedata['ts'])
return linedata
- def _sendLogData(self, data):
+ def _sendLogData(self, data, lastts):
cnt = 0
+ nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration)
+ bytes_sent = 0
+ client_cnt = 0
try:
regex = self._prepareLineRegex()
for line in data:
linedata = self._parseLine(regex, line)
+ if not lastts:
+ lastts = linedata['ts']
+ nextts = lastts + datetime.timedelta(seconds=self._duration)
+ client_cnt += 1
+ bytes_sent += linedata['size']
+ else:
+ while linedata['ts'] > nextts:
+ self._sendDataset(nextts, self._duration, client_cnt, bytes_sent)
+ cnt += 1
+ lastts = nextts
+ nextts = lastts + datetime.timedelta(seconds=self._duration)
+ client_cnt = 0
+ bytes_sent = 0
+
+ # TODO: update client_cnt and bytes_sent only if data matches
+ # also the data needs to be aggregated for clients
+ client_cnt += 1
+ bytes_sent += linedata['size']
+
+ # send remaining data
+ if nextts:
+ self._sendDataset(nextts, self._duration, client_cnt, bytes_sent)
cnt += 1
- print linedata
- print
-
- if cnt >= 10:
- break
- # TODO: create datasets using parser and call _sendDataset()
except re.error as e:
print 'SFive: regex error: %s' % (e)
@@ -249,6 +273,8 @@ if __name__ == '__main__':
help='the format (i.e. webm)')
parser.add_argument('--quality', '-q', dest='quality', required=True,
help='the quality (i.e. high)')
+ parser.add_argument('--duration', '-d', dest='duration', required=False,
+ help='time (in seconds) between updates; defaults to 5')
parser.add_argument('--tag', '-t', dest='tags', action='append',
help='tag to be added to the statistic data, can be invoked several times')
parser.add_argument('--logfile', '-l', dest='logfile', required=False,
@@ -258,5 +284,7 @@ if __name__ == '__main__':
args = vars(parser.parse_args())
if not args['tags']:
args['tags'] = []
+ if not args['duration']:
+ args['duration'] = 5
importer = AccessLog(args)
importer.run()