xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 239fd29d6f82724c95a7a7f840c9f9244bcbe6d4)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19import subprocess
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .cmd import run_cmd
31from .config import BUILD_DIR
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
37    arch = None
38
39    workdir = None
40    log = None
41    logdir = None
42
43    '''
44    @params archive: filename, Asset, or file-like object to extract
45    @params format: optional archive format (tar, zip, deb, cpio)
46    @params sub_dir: optional sub-directory to extract into
47    @params member: optional member file to limit extraction to
48
49    Extracts @archive into the scratch directory, or a directory beneath
50    named by @sub_dir. All files are extracted unless @member specifies
51    a limit.
52
53    If @format is None, heuristics will be applied to guess the format
54    from the filename or Asset URL. @format must be non-None if @archive
55    is a file-like object.
56
57    If @member is non-None, returns the fully qualified path to @member
58    '''
59    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
60        self.log.debug(f"Extract {archive} format={format}" +
61                       f"sub_dir={sub_dir} member={member}")
62        if type(archive) == Asset:
63            archive.fetch()
64        if sub_dir is None:
65            archive_extract(archive, self.scratch_file(), format, member)
66        else:
67            archive_extract(archive, self.scratch_file(sub_dir),
68                            format, member)
69
70        if member is not None:
71            return self.scratch_file(member)
72        return None
73
74    '''
75    Create a temporary directory suitable for storing UNIX
76    socket paths.
77
78    Returns: a tempfile.TemporaryDirectory instance
79    '''
80    def socket_dir(self):
81        if self.socketdir is None:
82            self.socketdir = tempfile.TemporaryDirectory(
83                prefix="qemu_func_test_sock_")
84        return self.socketdir
85
86    '''
87    @params args list of zero or more subdirectories or file
88
89    Construct a path for accessing a data file located
90    relative to the source directory that is the root for
91    functional tests.
92
93    @args may be an empty list to reference the root dir
94    itself, may be a single element to reference a file in
95    the root directory, or may be multiple elements to
96    reference a file nested below. The path components
97    will be joined using the platform appropriate path
98    separator.
99
100    Returns: string representing a file path
101    '''
102    def data_file(self, *args):
103        return str(Path(Path(__file__).parent.parent, *args))
104
105    '''
106    @params args list of zero or more subdirectories or file
107
108    Construct a path for accessing a data file located
109    relative to the build directory root.
110
111    @args may be an empty list to reference the build dir
112    itself, may be a single element to reference a file in
113    the build directory, or may be multiple elements to
114    reference a file nested below. The path components
115    will be joined using the platform appropriate path
116    separator.
117
118    Returns: string representing a file path
119    '''
120    def build_file(self, *args):
121        return str(Path(BUILD_DIR, *args))
122
123    '''
124    @params args list of zero or more subdirectories or file
125
126    Construct a path for accessing/creating a scratch file
127    located relative to a temporary directory dedicated to
128    this test case. The directory and its contents will be
129    purged upon completion of the test.
130
131    @args may be an empty list to reference the scratch dir
132    itself, may be a single element to reference a file in
133    the scratch directory, or may be multiple elements to
134    reference a file nested below. The path components
135    will be joined using the platform appropriate path
136    separator.
137
138    Returns: string representing a file path
139    '''
140    def scratch_file(self, *args):
141        return str(Path(self.workdir, *args))
142
143    '''
144    @params args list of zero or more subdirectories or file
145
146    Construct a path for accessing/creating a log file
147    located relative to a temporary directory dedicated to
148    this test case. The directory and its log files will be
149    preserved upon completion of the test.
150
151    @args may be an empty list to reference the log dir
152    itself, may be a single element to reference a file in
153    the log directory, or may be multiple elements to
154    reference a file nested below. The path components
155    will be joined using the platform appropriate path
156    separator.
157
158    Returns: string representing a file path
159    '''
160    def log_file(self, *args):
161        return str(Path(self.outputdir, *args))
162
163    def setUp(self, bin_prefix):
164        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
165        self.arch = self.qemu_bin.split('-')[-1]
166        self.socketdir = None
167
168        self.outputdir = self.build_file('tests', 'functional',
169                                         self.arch, self.id())
170        self.workdir = os.path.join(self.outputdir, 'scratch')
171        os.makedirs(self.workdir, exist_ok=True)
172
173        self.log_filename = self.log_file('base.log')
174        self.log = logging.getLogger('qemu-test')
175        self.log.setLevel(logging.DEBUG)
176        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
177        self._log_fh.setLevel(logging.DEBUG)
178        fileFormatter = logging.Formatter(
179            '%(asctime)s - %(levelname)s: %(message)s')
180        self._log_fh.setFormatter(fileFormatter)
181        self.log.addHandler(self._log_fh)
182
183        # Capture QEMUMachine logging
184        self.machinelog = logging.getLogger('qemu.machine')
185        self.machinelog.setLevel(logging.DEBUG)
186        self.machinelog.addHandler(self._log_fh)
187
188    def tearDown(self):
189        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
190            shutil.rmtree(self.workdir)
191        if self.socketdir is not None:
192            shutil.rmtree(self.socketdir.name)
193            self.socketdir = None
194        self.machinelog.removeHandler(self._log_fh)
195        self.log.removeHandler(self._log_fh)
196
197    def main():
198        path = os.path.basename(sys.argv[0])[:-3]
199
200        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
201        if cache is not None:
202            Asset.precache_suites(path, cache)
203            return
204
205        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
206                                   test_output_log = pycotap.LogMode.LogToError)
207        res = unittest.main(module = None, testRunner = tr, exit = False,
208                            argv=["__dummy__", path])
209        for (test, message) in res.result.errors + res.result.failures:
210
211            if hasattr(test, "log_filename"):
212                print('More information on ' + test.id() + ' could be found here:'
213                      '\n %s' % test.log_filename, file=sys.stderr)
214                if hasattr(test, 'console_log_name'):
215                    print(' %s' % test.console_log_name, file=sys.stderr)
216        sys.exit(not res.result.wasSuccessful())
217
218
219class QemuUserTest(QemuBaseTest):
220
221    def setUp(self):
222        super().setUp('qemu-')
223        self._ldpath = []
224
225    def add_ldpath(self, ldpath):
226        self._ldpath.append(os.path.abspath(ldpath))
227
228    def run_cmd(self, bin_path, args=[]):
229        return subprocess.run([self.qemu_bin]
230                              + ["-L %s" % ldpath for ldpath in self._ldpath]
231                              + [bin_path]
232                              + args,
233                              text=True, capture_output=True)
234
235class QemuSystemTest(QemuBaseTest):
236    """Facilitates system emulation tests."""
237
238    cpu = None
239    machine = None
240    _machinehelp = None
241
242    def setUp(self):
243        self._vms = {}
244
245        super().setUp('qemu-system-')
246
247        console_log = logging.getLogger('console')
248        console_log.setLevel(logging.DEBUG)
249        self.console_log_name = self.log_file('console.log')
250        self._console_log_fh = logging.FileHandler(self.console_log_name,
251                                                   mode='w')
252        self._console_log_fh.setLevel(logging.DEBUG)
253        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
254        self._console_log_fh.setFormatter(fileFormatter)
255        console_log.addHandler(self._console_log_fh)
256
257    def set_machine(self, machinename):
258        # TODO: We should use QMP to get the list of available machines
259        if not self._machinehelp:
260            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
261        if self._machinehelp.find(machinename) < 0:
262            self.skipTest('no support for machine ' + machinename)
263        self.machine = machinename
264
265    def require_accelerator(self, accelerator):
266        """
267        Requires an accelerator to be available for the test to continue
268
269        It takes into account the currently set qemu binary.
270
271        If the check fails, the test is canceled.  If the check itself
272        for the given accelerator is not available, the test is also
273        canceled.
274
275        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
276        :type accelerator: str
277        """
278        checker = {'tcg': tcg_available,
279                   'kvm': kvm_available}.get(accelerator)
280        if checker is None:
281            self.skipTest("Don't know how to check for the presence "
282                          "of accelerator %s" % accelerator)
283        if not checker(qemu_bin=self.qemu_bin):
284            self.skipTest("%s accelerator does not seem to be "
285                          "available" % accelerator)
286
287    def require_netdev(self, netdevname):
288        netdevhelp = run_cmd([self.qemu_bin,
289                             '-M', 'none', '-netdev', 'help'])[0];
290        if netdevhelp.find('\n' + netdevname + '\n') < 0:
291            self.skipTest('no support for " + netdevname + " networking')
292
293    def require_device(self, devicename):
294        devhelp = run_cmd([self.qemu_bin,
295                           '-M', 'none', '-device', 'help'])[0];
296        if devhelp.find(devicename) < 0:
297            self.skipTest('no support for device ' + devicename)
298
299    def _new_vm(self, name, *args):
300        vm = QEMUMachine(self.qemu_bin,
301                         name=name,
302                         base_temp_dir=self.workdir,
303                         log_dir=self.log_file())
304        self.log.debug('QEMUMachine "%s" created', name)
305        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
306
307        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
308        if sockpath is not None:
309            vm.add_args("-chardev",
310                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
311                        "-mon", "chardev=backdoor,mode=control")
312
313        if args:
314            vm.add_args(*args)
315        return vm
316
317    @property
318    def vm(self):
319        return self.get_vm(name='default')
320
321    def get_vm(self, *args, name=None):
322        if not name:
323            name = str(uuid.uuid4())
324        if self._vms.get(name) is None:
325            self._vms[name] = self._new_vm(name, *args)
326            if self.cpu is not None:
327                self._vms[name].add_args('-cpu', self.cpu)
328            if self.machine is not None:
329                self._vms[name].set_machine(self.machine)
330        return self._vms[name]
331
332    def set_vm_arg(self, arg, value):
333        """
334        Set an argument to list of extra arguments to be given to the QEMU
335        binary. If the argument already exists then its value is replaced.
336
337        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
338        :type arg: str
339        :param value: the argument value, such as "host" in "-cpu host"
340        :type value: str
341        """
342        if not arg or not value:
343            return
344        if arg not in self.vm.args:
345            self.vm.args.extend([arg, value])
346        else:
347            idx = self.vm.args.index(arg) + 1
348            if idx < len(self.vm.args):
349                self.vm.args[idx] = value
350            else:
351                self.vm.args.append(value)
352
353    def tearDown(self):
354        for vm in self._vms.values():
355            vm.shutdown()
356        logging.getLogger('console').removeHandler(self._console_log_fh)
357        super().tearDown()
358