1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#!/usr/bin/env python3
import sys
import dbus
import re
import datetime
import subprocess
# import pprint
class AutoSuspender(object):
def __init__(self):
self.__bus = dbus.SystemBus()
self.__timer_re = re.compile('^syncoid-pull-.*\.timer$')
self.__zpool_scrub_in_progress_re = re.compile('^\s*scan:.*scrub in progress.*$')
# self.__pp = pprint.PrettyPrinter(indent=2)
def _get_interface(self, dest, object, interface):
try:
obj = self.__bus.get_object(dest, object)
return dbus.Interface(obj, interface)
except dbus.exceptions.DBusException as error:
print("ERR: %s" % error)
sys.exit(1)
def _get_logind_manager_interface(self):
return self._get_interface("org.freedesktop.login1", "/org/freedesktop/login1", "org.freedesktop.login1.Manager")
def _get_systemd_manager_interface(self):
return self._get_interface("org.freedesktop.systemd1", "/org/freedesktop/systemd1", "org.freedesktop.systemd1.Manager")
def _get_systemd_timer_properties(self, path):
return self._get_interface("org.freedesktop.systemd1", path, "org.freedesktop.DBus.Properties").GetAll("org.freedesktop.systemd1.Timer")
def _get_users_logged_in(self):
manager = self._get_logind_manager_interface()
users = []
for user in manager.ListUsers():
uid = int(user[0])
name = str(user[1])
users.append((name, uid))
return users
def _get_syncoid_timers(self):
manager = self._get_systemd_manager_interface()
timers = []
for unit in manager.ListUnits():
name = str(unit[0])
path = str(unit[6])
if not self.__timer_re.match(name):
continue
timers.append((name, path))
return timers
def _get_timer_next_elapse(self, path):
props = self._get_systemd_timer_properties(path)
try:
return datetime.datetime.fromtimestamp(int(props['NextElapseUSecRealtime']) / 1000000)
except ValueError:
return None
def _get_zpools(self):
pools = subprocess.run(["zpool", "list", "-H", "-o", "name"], check=True, stdout=subprocess.PIPE).stdout.splitlines()
return [pool.decode('utf-8') for pool in pools]
def _is_zpool_scrub_in_progress(self, pool):
lines = subprocess.run(["zpool", "status", pool], check=True, stdout=subprocess.PIPE).stdout.splitlines()
for line in lines:
if self.__zpool_scrub_in_progress_re.match(line.decode('utf-8')):
return True
return False
def check(self):
result = True
users = self._get_users_logged_in()
if(len(users) > 0):
print(" [NO] %d users logged in: %s" % (len(users), ", ".join([user[0] for user in users])))
result = False
else:
print(" [ok] no users logged in.")
timers = self._get_syncoid_timers()
for timer in timers:
next = self._get_timer_next_elapse(timer[1])
if next == None:
print(" [NO] the next elapse time of '%s' is invalid, assuming the unit is currently active" % (timer[0]))
result = False
continue
until = next - datetime.datetime.now()
if(until < datetime.timedelta(minutes=10)):
print(" [NO] timer '%s' elapses in less than 10 minutes -> %s (%s)" % (timer[0], next, until))
result = False
else:
print(" [ok] timer '%s' elapses in %s (%s)" % (timer[0], until, next))
zpools = self._get_zpools()
for pool in zpools:
if self._is_zpool_scrub_in_progress(pool):
print(" [NO] zpool scrubbing is in progress on pool '%s'" % (pool))
result = False
else:
print(" [ok] zpool '%s' is not scrubbing" % (pool))
return result
def suspend(self):
manager = self._get_logind_manager_interface()
if not manager.CanSuspend():
print("ERR: system can not be suspended")
return
try:
manager.Suspend(False)
except dbus.exceptions.DBusException as error:
print("ERR: %s" % error)
if __name__ == "__main__":
s = AutoSuspender()
print("checking if the system can be suspended:")
if s.check():
print("trying to suspend system")
s.suspend()
else:
print("not suspending system because at least one check failed.")
|