From 6bcacc3cb604483d1277573eedb521543488b13e Mon Sep 17 00:00:00 2001 From: Christian Pointner Date: Sun, 19 Oct 2014 20:52:55 +0200 Subject: daq: flumotion-plug align datasets to time --- src/daq/flumotion-plug/s5.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'src/daq') diff --git a/src/daq/flumotion-plug/s5.py b/src/daq/flumotion-plug/s5.py index e703026..643d0f6 100644 --- a/src/daq/flumotion-plug/s5.py +++ b/src/daq/flumotion-plug/s5.py @@ -44,6 +44,7 @@ from flumotion.common.poller import Poller from twisted.internet import protocol, reactor from socket import error as socket_error import datetime +import time from flumotion.common.i18n import N_ T_ = i18n.gettexter() @@ -155,10 +156,18 @@ class ComponentSFivePlug(base.ComponentPlug): self.info('SFive: connection to sfive hub established') self._sendInit() if self._sfivepoller: - self._old_bytes_received = self._component.getBytesReceived() - self._old_bytes_sent = self._component.getBytesSent() - self._start_time = datetime.datetime.utcnow().replace(microsecond=0) - self._sfivepoller.start() + # try to be aligned with current time + # this will eventually get out of sync but for now this is good enough + offset = self._duration - (time.time() % self._duration) + self.info('SFive: %sZ -> will wait %0.2f seconds before starting poller (alignment)' % (datetime.datetime.utcnow().isoformat('T'), offset)) + reactor.callLater(offset, self._startPoller) + + def _startPoller(self): + self._old_bytes_received = self._component.getBytesReceived() + self._old_bytes_sent = self._component.getBytesSent() + self._start_time = datetime.datetime.utcnow().replace(microsecond=0) + self._sfivepoller.start() + self.info('SFive: poller started at %sZ' % (self._start_time.isoformat('T'))) def _sendInit(self): initdata = { "version": 1, "hostname": self._hostname, -- cgit v1.2.3