#!/usr/bin/env python3 import sys import dbus import re import datetime # import pprint class AutoSuspender(object): def __init__(self): self.__bus = dbus.SystemBus() self.__timer_re = re.compile('^syncoid-pull-.*\.timer$') # self.__pp = pprint.PrettyPrinter(indent=2) def _get_interface(self, dest, object, interface): try: obj = self.__bus.get_object(dest, object) return dbus.Interface(obj, interface) except dbus.exceptions.DBusException as error: print("ERR: %s" % error) sys.exit(1) def _get_logind_manager_interface(self): return self._get_interface("org.freedesktop.login1", "/org/freedesktop/login1", "org.freedesktop.login1.Manager") def _get_systemd_manager_interface(self): return self._get_interface("org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager") def _get_systemd_timer_properties(self, path): return self._get_interface("org.freedesktop.systemd1", path, "org.freedesktop.DBus.Properties").GetAll("org.freedesktop.systemd1.Timer") def _get_users_logged_in(self): manager = self._get_logind_manager_interface() users = [] for user in manager.ListUsers(): uid = int(user[0]) name = str(user[1]) users.append((name, uid)) return users def _get_syncoid_timers(self): manager = self._get_systemd_manager_interface() timers = [] for unit in manager.ListUnits(): name = str(unit[0]) path = str(unit[6]) if not self.__timer_re.match(name): continue timers.append((name, path)) return timers def _get_timer_next_elapse(self, path): props = self._get_systemd_timer_properties(path) try: return datetime.datetime.fromtimestamp(int(props['NextElapseUSecRealtime']) / 1000000) except ValueError: return None def check(self): result = True users = self._get_users_logged_in() if(len(users) > 0): print(" [NO] %d users logged in: %s" % (len(users), ", ".join([user[0] for user in users]))) result = False else: print(" [ok] no users logged in.") timers = self._get_syncoid_timers() for timer in timers: next = self._get_timer_next_elapse(timer[1]) if next == None: print(" [NO] the next elapse time of '%s' is invalid, assuming the unit is currently active" % (timer[0])) result = False continue until = next - datetime.datetime.now() if(until < datetime.timedelta(minutes=10)): print(" [NO] timer '%s' elapses in less than 10 minutes -> %s (%s)" % (timer[0], next, until)) result = False else: print(" [ok] timer '%s' elapses in %s (%s)" % (timer[0], until, next)) return result def suspend(self): manager = self._get_logind_manager_interface() if not manager.CanSuspend(): print("ERR: system can not be suspended") return try: manager.Suspend(False) except dbus.exceptions.DBusException as error: print("ERR: %s" % error) if __name__ == "__main__": s = AutoSuspender() print("checking if the system can be suspended:") if s.check(): print("trying to suspend system") s.suspend() else: print("not suspending system because at least one check failed.")