xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 1a8755a51eef11360af92adf71fed6a20a1260b2)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import shutil
18import subprocess
19import sys
20import unittest
21import uuid
22
23from qemu.machine import QEMUMachine
24from qemu.utils import kvm_available, tcg_available
25
26from .asset import Asset
27from .cmd import run_cmd
28from .config import BUILD_DIR
29
30
31class QemuBaseTest(unittest.TestCase):
32
33    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
34    arch = None
35
36    workdir = None
37    log = None
38    logdir = None
39
40    def setUp(self, bin_prefix):
41        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
42        self.arch = self.qemu_bin.split('-')[-1]
43
44        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
45                                      self.arch, self.id())
46        self.workdir = os.path.join(self.outputdir, 'scratch')
47        os.makedirs(self.workdir, exist_ok=True)
48
49        self.logdir = self.outputdir
50        self.log_filename = os.path.join(self.logdir, 'base.log')
51        self.log = logging.getLogger('qemu-test')
52        self.log.setLevel(logging.DEBUG)
53        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
54        self._log_fh.setLevel(logging.DEBUG)
55        fileFormatter = logging.Formatter(
56            '%(asctime)s - %(levelname)s: %(message)s')
57        self._log_fh.setFormatter(fileFormatter)
58        self.log.addHandler(self._log_fh)
59
60        # Capture QEMUMachine logging
61        self.machinelog = logging.getLogger('qemu.machine')
62        self.machinelog.setLevel(logging.DEBUG)
63        self.machinelog.addHandler(self._log_fh)
64
65    def tearDown(self):
66        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
67            shutil.rmtree(self.workdir)
68        self.machinelog.removeHandler(self._log_fh)
69        self.log.removeHandler(self._log_fh)
70
71    def main():
72        path = os.path.basename(sys.argv[0])[:-3]
73
74        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
75        if cache is not None:
76            Asset.precache_suites(path, cache)
77            return
78
79        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
80                                   test_output_log = pycotap.LogMode.LogToError)
81        res = unittest.main(module = None, testRunner = tr, exit = False,
82                            argv=["__dummy__", path])
83        for (test, message) in res.result.errors + res.result.failures:
84            print('More information on ' + test.id() + ' could be found here:'
85                  '\n %s' % test.log_filename, file=sys.stderr)
86            if hasattr(test, 'console_log_name'):
87                print(' %s' % test.console_log_name, file=sys.stderr)
88        sys.exit(not res.result.wasSuccessful())
89
90
91class QemuUserTest(QemuBaseTest):
92
93    def setUp(self):
94        super().setUp('qemu-')
95        self._ldpath = []
96
97    def add_ldpath(self, ldpath):
98        self._ldpath.append(os.path.abspath(ldpath))
99
100    def run_cmd(self, bin_path, args=[]):
101        return subprocess.run([self.qemu_bin]
102                              + ["-L %s" % ldpath for ldpath in self._ldpath]
103                              + [bin_path]
104                              + args,
105                              text=True, capture_output=True)
106
107class QemuSystemTest(QemuBaseTest):
108    """Facilitates system emulation tests."""
109
110    cpu = None
111    machine = None
112    _machinehelp = None
113
114    def setUp(self):
115        self._vms = {}
116
117        super().setUp('qemu-system-')
118
119        console_log = logging.getLogger('console')
120        console_log.setLevel(logging.DEBUG)
121        self.console_log_name = os.path.join(self.logdir, 'console.log')
122        self._console_log_fh = logging.FileHandler(self.console_log_name,
123                                                   mode='w')
124        self._console_log_fh.setLevel(logging.DEBUG)
125        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
126        self._console_log_fh.setFormatter(fileFormatter)
127        console_log.addHandler(self._console_log_fh)
128
129    def set_machine(self, machinename):
130        # TODO: We should use QMP to get the list of available machines
131        if not self._machinehelp:
132            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
133        if self._machinehelp.find(machinename) < 0:
134            self.skipTest('no support for machine ' + machinename)
135        self.machine = machinename
136
137    def require_accelerator(self, accelerator):
138        """
139        Requires an accelerator to be available for the test to continue
140
141        It takes into account the currently set qemu binary.
142
143        If the check fails, the test is canceled.  If the check itself
144        for the given accelerator is not available, the test is also
145        canceled.
146
147        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
148        :type accelerator: str
149        """
150        checker = {'tcg': tcg_available,
151                   'kvm': kvm_available}.get(accelerator)
152        if checker is None:
153            self.skipTest("Don't know how to check for the presence "
154                          "of accelerator %s" % accelerator)
155        if not checker(qemu_bin=self.qemu_bin):
156            self.skipTest("%s accelerator does not seem to be "
157                          "available" % accelerator)
158
159    def require_netdev(self, netdevname):
160        netdevhelp = run_cmd([self.qemu_bin,
161                             '-M', 'none', '-netdev', 'help'])[0];
162        if netdevhelp.find('\n' + netdevname + '\n') < 0:
163            self.skipTest('no support for " + netdevname + " networking')
164
165    def require_device(self, devicename):
166        devhelp = run_cmd([self.qemu_bin,
167                           '-M', 'none', '-device', 'help'])[0];
168        if devhelp.find(devicename) < 0:
169            self.skipTest('no support for device ' + devicename)
170
171    def _new_vm(self, name, *args):
172        vm = QEMUMachine(self.qemu_bin,
173                         name=name,
174                         base_temp_dir=self.workdir,
175                         log_dir=self.logdir)
176        self.log.debug('QEMUMachine "%s" created', name)
177        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
178
179        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
180        if sockpath is not None:
181            vm.add_args("-chardev",
182                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
183                        "-mon", "chardev=backdoor,mode=control")
184
185        if args:
186            vm.add_args(*args)
187        return vm
188
189    @property
190    def vm(self):
191        return self.get_vm(name='default')
192
193    def get_vm(self, *args, name=None):
194        if not name:
195            name = str(uuid.uuid4())
196        if self._vms.get(name) is None:
197            self._vms[name] = self._new_vm(name, *args)
198            if self.cpu is not None:
199                self._vms[name].add_args('-cpu', self.cpu)
200            if self.machine is not None:
201                self._vms[name].set_machine(self.machine)
202        return self._vms[name]
203
204    def set_vm_arg(self, arg, value):
205        """
206        Set an argument to list of extra arguments to be given to the QEMU
207        binary. If the argument already exists then its value is replaced.
208
209        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
210        :type arg: str
211        :param value: the argument value, such as "host" in "-cpu host"
212        :type value: str
213        """
214        if not arg or not value:
215            return
216        if arg not in self.vm.args:
217            self.vm.args.extend([arg, value])
218        else:
219            idx = self.vm.args.index(arg) + 1
220            if idx < len(self.vm.args):
221                self.vm.args[idx] = value
222            else:
223                self.vm.args.append(value)
224
225    def tearDown(self):
226        for vm in self._vms.values():
227            vm.shutdown()
228        logging.getLogger('console').removeHandler(self._console_log_fh)
229        super().tearDown()
230