1#
2# SPDX-License-Identifier: MIT
3#
4
5import re
6import time
7
8from oeqa.runtime.case import OERuntimeTestCase
9from oeqa.core.decorator.depends import OETestDepends
10from oeqa.core.decorator.data import skipIfDataVar, skipIfNotDataVar
11from oeqa.runtime.decorator.package import OEHasPackage
12from oeqa.core.decorator.data import skipIfNotFeature
13
14class SystemdTest(OERuntimeTestCase):
15
16    def systemctl(self, action='', target='', expected=0, verbose=False):
17        command = 'SYSTEMD_BUS_TIMEOUT=240s systemctl %s %s' % (action, target)
18        status, output = self.target.run(command)
19        message = '\n'.join([command, output])
20        if status != expected and verbose:
21            cmd = 'SYSTEMD_BUS_TIMEOUT=240s systemctl status --full %s' % target
22            message += self.target.run(cmd)[1]
23        self.assertEqual(status, expected, message)
24        return output
25
26    #TODO: use pyjournalctl instead
27    def journalctl(self, args='',l_match_units=None):
28        """
29        Request for the journalctl output to the current target system
30
31        Arguments:
32        -args, an optional argument pass through argument
33        -l_match_units, an optional list of units to filter the output
34        Returns:
35        -string output of the journalctl command
36        Raises:
37        -AssertionError, on remote commands that fail
38        -ValueError, on a journalctl call with filtering by l_match_units that
39        returned no entries
40        """
41
42        query_units=''
43        if l_match_units:
44            query_units = ['_SYSTEMD_UNIT='+unit for unit in l_match_units]
45            query_units = ' '.join(query_units)
46        command = 'journalctl %s %s' %(args, query_units)
47        status, output = self.target.run(command)
48        if status:
49            raise AssertionError("Command '%s' returned non-zero exit "
50                    'code %d:\n%s' % (command, status, output))
51        if len(output) == 1 and "-- No entries --" in output:
52            raise ValueError('List of units to match: %s, returned no entries'
53                    % l_match_units)
54        return output
55
56class SystemdBasicTests(SystemdTest):
57
58    def settle(self):
59        """
60        Block until systemd has finished activating any units being activated,
61        or until two minutes has elapsed.
62
63        Returns a tuple, either (True, '') if all units have finished
64        activating, or (False, message string) if there are still units
65        activating (generally, failing units that restart).
66        """
67        endtime = time.time() + (60 * 2)
68        while True:
69            status, output = self.target.run('SYSTEMD_BUS_TIMEOUT=240s systemctl --state=activating')
70            if "0 loaded units listed" in output:
71                return (True, '')
72            if time.time() >= endtime:
73                return (False, output)
74            time.sleep(10)
75
76    @skipIfNotFeature('systemd',
77                      'Test requires systemd to be in DISTRO_FEATURES')
78    @skipIfNotDataVar('VIRTUAL-RUNTIME_init_manager', 'systemd',
79                      'systemd is not the init manager for this image')
80    @OETestDepends(['ssh.SSHTest.test_ssh'])
81    def test_systemd_basic(self):
82        self.systemctl('--version')
83
84    @OETestDepends(['systemd.SystemdBasicTests.test_systemd_basic'])
85    def test_systemd_list(self):
86        self.systemctl('list-unit-files')
87
88    @OETestDepends(['systemd.SystemdBasicTests.test_systemd_basic'])
89    def test_systemd_failed(self):
90        settled, output = self.settle()
91        msg = "Timed out waiting for systemd to settle:\n%s" % output
92        self.assertTrue(settled, msg=msg)
93
94        output = self.systemctl('list-units', '--failed')
95        match = re.search('0 loaded units listed', output)
96        if not match:
97            output += self.systemctl('status --full --failed')
98        self.assertTrue(match, msg='Some systemd units failed:\n%s' % output)
99
100
101class SystemdServiceTests(SystemdTest):
102
103    @OEHasPackage(['avahi-daemon'])
104    @OETestDepends(['systemd.SystemdBasicTests.test_systemd_basic'])
105    def test_systemd_status(self):
106        self.systemctl('status --full', 'avahi-daemon.service')
107
108    @OETestDepends(['systemd.SystemdServiceTests.test_systemd_status'])
109    def test_systemd_stop_start(self):
110        self.systemctl('stop', 'avahi-daemon.service')
111        self.systemctl('is-active', 'avahi-daemon.service',
112                       expected=3, verbose=True)
113        self.systemctl('start','avahi-daemon.service')
114        self.systemctl('is-active', 'avahi-daemon.service', verbose=True)
115
116    @OETestDepends(['systemd.SystemdServiceTests.test_systemd_status'])
117    def test_systemd_disable_enable(self):
118        self.systemctl('disable', 'avahi-daemon.service')
119        self.systemctl('is-enabled', 'avahi-daemon.service', expected=1)
120        self.systemctl('enable', 'avahi-daemon.service')
121        self.systemctl('is-enabled', 'avahi-daemon.service')
122
123class SystemdJournalTests(SystemdTest):
124
125    @OETestDepends(['systemd.SystemdBasicTests.test_systemd_basic'])
126    def test_systemd_journal(self):
127        status, output = self.target.run('journalctl')
128        self.assertEqual(status, 0, output)
129
130    @OETestDepends(['systemd.SystemdBasicTests.test_systemd_basic'])
131    def test_systemd_boot_time(self, systemd_TimeoutStartSec=90):
132        """
133        Get the target boot time from journalctl and log it
134
135        Arguments:
136        -systemd_TimeoutStartSec, an optional argument containing systemd's
137        unit start timeout to compare against
138        """
139
140        # The expression chain that uniquely identifies the time boot message.
141        expr_items=['Startup finished', 'kernel', 'userspace','\.$']
142        try:
143            output = self.journalctl(args='-o cat --reverse')
144        except AssertionError:
145            self.fail('Error occurred while calling journalctl')
146        if not len(output):
147            self.fail('Error, unable to get startup time from systemd journal')
148
149        # Check for the regular expression items that match the startup time.
150        for line in output.split('\n'):
151            check_match = ''.join(re.findall('.*'.join(expr_items), line))
152            if check_match:
153                break
154        # Put the startup time in the test log
155        if check_match:
156            self.tc.logger.info('%s' % check_match)
157        else:
158            self.skipTest('Error at obtaining the boot time from journalctl')
159        boot_time_sec = 0
160
161        # Get the numeric values from the string and convert them to seconds
162        # same data will be placed in list and string for manipulation.
163        l_boot_time = check_match.split(' ')[-2:]
164        s_boot_time = ' '.join(l_boot_time)
165        try:
166            # Obtain the minutes it took to boot.
167            if l_boot_time[0].endswith('min') and l_boot_time[0][0].isdigit():
168                boot_time_min = s_boot_time.split('min')[0]
169                # Convert to seconds and accumulate it.
170                boot_time_sec += int(boot_time_min) * 60
171            # Obtain the seconds it took to boot and accumulate.
172            boot_time_sec += float(l_boot_time[1].split('s')[0])
173        except ValueError:
174            self.skipTest('Error when parsing time from boot string')
175
176        # Assert the target boot time against systemd's unit start timeout.
177        if boot_time_sec > systemd_TimeoutStartSec:
178            msg = ("Target boot time %s exceeds systemd's TimeoutStartSec %s"
179                    % (boot_time_sec, systemd_TimeoutStartSec))
180            self.tc.logger.info(msg)
181