diff options
-rwxr-xr-x | src/daq/accesslog/sfive-accesslog.py | 52 |
1 files changed, 40 insertions, 12 deletions
diff --git a/src/daq/accesslog/sfive-accesslog.py b/src/daq/accesslog/sfive-accesslog.py index da77f98..a3140e0 100755 --- a/src/daq/accesslog/sfive-accesslog.py +++ b/src/daq/accesslog/sfive-accesslog.py @@ -105,6 +105,7 @@ class AccessLog(): self._content_id = properties['content-id'] self._format = properties['format'] self._quality = properties['quality'] + self._duration = properties['duration'] self._tags = properties['tags'] self._logfile = properties['logfile'] self._nameformat = properties['nameformat'] @@ -130,7 +131,7 @@ class AccessLog(): 'format': self._format, 'quality': self._quality } self._file_re = re.compile(regex) - print "will be looking for files like '%s'" % regex + print "SFive: will be looking for files like '%s'" % regex except IOError as e: print 'SFive: error opening logfile: %s' % (e.strerror) return False @@ -155,11 +156,13 @@ class AccessLog(): print 'SFive: connection to sfive hub established' self._sendInit() if hasattr(self, '_fd'): - cnt = self._sendLogData(self._fd) - print 'SFive: sent %d datasets' % (cnt) + cnt = self._sendLogData(self._fd, None) + print 'SFive: imported %d datasets' % (cnt) self._fd.close() else: - print 'SFive: live mode is not implemented yet!' + print 'SFive: live mode is not yet implemented!' + # TODO: wait self._duration seconds and call _sendData with + # all lines received lines since last update reactor.stop() @@ -188,7 +191,9 @@ class AccessLog(): def _parseDatetime(self, datetimestr): try: - return dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) + ts = dateutil.parser.parse(datetimestr[:11] + " " + datetimestr[12:], dayfirst=True) + ts = (ts - ts.utcoffset()).replace(tzinfo=None) + return ts except ValueError as e: return None @@ -205,19 +210,38 @@ class AccessLog(): linedata['ts'] = self._parseDatetime(linedata['ts']) return linedata - def _sendLogData(self, data): + def _sendLogData(self, data, lastts): cnt = 0 + nextts = None if not lastts else lastts + datetime.timedelta(seconds=self._duration) + bytes_sent = 0 + client_cnt = 0 try: regex = self._prepareLineRegex() for line in data: linedata = self._parseLine(regex, line) + if not lastts: + lastts = linedata['ts'] + nextts = lastts + datetime.timedelta(seconds=self._duration) + client_cnt += 1 + bytes_sent += linedata['size'] + else: + while linedata['ts'] > nextts: + self._sendDataset(nextts, self._duration, client_cnt, bytes_sent) + cnt += 1 + lastts = nextts + nextts = lastts + datetime.timedelta(seconds=self._duration) + client_cnt = 0 + bytes_sent = 0 + + # TODO: update client_cnt and bytes_sent only if data matches + # also the data needs to be aggregated for clients + client_cnt += 1 + bytes_sent += linedata['size'] + + # send remaining data + if nextts: + self._sendDataset(nextts, self._duration, client_cnt, bytes_sent) cnt += 1 - print linedata - print - - if cnt >= 10: - break - # TODO: create datasets using parser and call _sendDataset() except re.error as e: print 'SFive: regex error: %s' % (e) @@ -249,6 +273,8 @@ if __name__ == '__main__': help='the format (i.e. webm)') parser.add_argument('--quality', '-q', dest='quality', required=True, help='the quality (i.e. high)') + parser.add_argument('--duration', '-d', dest='duration', required=False, + help='time (in seconds) between updates; defaults to 5') parser.add_argument('--tag', '-t', dest='tags', action='append', help='tag to be added to the statistic data, can be invoked several times') parser.add_argument('--logfile', '-l', dest='logfile', required=False, @@ -258,5 +284,7 @@ if __name__ == '__main__': args = vars(parser.parse_args()) if not args['tags']: args['tags'] = [] + if not args['duration']: + args['duration'] = 5 importer = AccessLog(args) importer.run() |