xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 069a2ce8a75c9b59a4d08d6d2da3b36bfc5af3f4)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import hvf_available, kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR, dso_suffix
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    '''
37    @params compressed: filename, Asset, or file-like object to uncompress
38    @params format: optional compression format (gzip, lzma)
39
40    Uncompresses @compressed into the scratch directory.
41
42    If @format is None, heuristics will be applied to guess the format
43    from the filename or Asset URL. @format must be non-None if @uncompressed
44    is a file-like object.
45
46    Returns the fully qualified path to the uncompressed file
47    '''
48    def uncompress(self, compressed, format=None):
49        self.log.debug(f"Uncompress {compressed} format={format}")
50        if type(compressed) == Asset:
51            compressed.fetch()
52
53        (name, ext) = os.path.splitext(str(compressed))
54        uncompressed = self.scratch_file(os.path.basename(name))
55
56        uncompress(compressed, uncompressed, format)
57
58        return uncompressed
59
60    '''
61    @params archive: filename, Asset, or file-like object to extract
62    @params format: optional archive format (tar, zip, deb, cpio)
63    @params sub_dir: optional sub-directory to extract into
64    @params member: optional member file to limit extraction to
65
66    Extracts @archive into the scratch directory, or a directory beneath
67    named by @sub_dir. All files are extracted unless @member specifies
68    a limit.
69
70    If @format is None, heuristics will be applied to guess the format
71    from the filename or Asset URL. @format must be non-None if @archive
72    is a file-like object.
73
74    If @member is non-None, returns the fully qualified path to @member
75    '''
76    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
77        self.log.debug(f"Extract {archive} format={format}" +
78                       f"sub_dir={sub_dir} member={member}")
79        if type(archive) == Asset:
80            archive.fetch()
81        if sub_dir is None:
82            archive_extract(archive, self.scratch_file(), format, member)
83        else:
84            archive_extract(archive, self.scratch_file(sub_dir),
85                            format, member)
86
87        if member is not None:
88            return self.scratch_file(member)
89        return None
90
91    '''
92    Create a temporary directory suitable for storing UNIX
93    socket paths.
94
95    Returns: a tempfile.TemporaryDirectory instance
96    '''
97    def socket_dir(self):
98        if self.socketdir is None:
99            self.socketdir = tempfile.TemporaryDirectory(
100                prefix="qemu_func_test_sock_")
101        return self.socketdir
102
103    '''
104    @params args list of zero or more subdirectories or file
105
106    Construct a path for accessing a data file located
107    relative to the source directory that is the root for
108    functional tests.
109
110    @args may be an empty list to reference the root dir
111    itself, may be a single element to reference a file in
112    the root directory, or may be multiple elements to
113    reference a file nested below. The path components
114    will be joined using the platform appropriate path
115    separator.
116
117    Returns: string representing a file path
118    '''
119    def data_file(self, *args):
120        return str(Path(Path(__file__).parent.parent, *args))
121
122    '''
123    @params args list of zero or more subdirectories or file
124
125    Construct a path for accessing a data file located
126    relative to the build directory root.
127
128    @args may be an empty list to reference the build dir
129    itself, may be a single element to reference a file in
130    the build directory, or may be multiple elements to
131    reference a file nested below. The path components
132    will be joined using the platform appropriate path
133    separator.
134
135    Returns: string representing a file path
136    '''
137    def build_file(self, *args):
138        return str(Path(BUILD_DIR, *args))
139
140    '''
141    @params args list of zero or more subdirectories or file
142
143    Construct a path for accessing/creating a scratch file
144    located relative to a temporary directory dedicated to
145    this test case. The directory and its contents will be
146    purged upon completion of the test.
147
148    @args may be an empty list to reference the scratch dir
149    itself, may be a single element to reference a file in
150    the scratch directory, or may be multiple elements to
151    reference a file nested below. The path components
152    will be joined using the platform appropriate path
153    separator.
154
155    Returns: string representing a file path
156    '''
157    def scratch_file(self, *args):
158        return str(Path(self.workdir, *args))
159
160    '''
161    @params args list of zero or more subdirectories or file
162
163    Construct a path for accessing/creating a log file
164    located relative to a temporary directory dedicated to
165    this test case. The directory and its log files will be
166    preserved upon completion of the test.
167
168    @args may be an empty list to reference the log dir
169    itself, may be a single element to reference a file in
170    the log directory, or may be multiple elements to
171    reference a file nested below. The path components
172    will be joined using the platform appropriate path
173    separator.
174
175    Returns: string representing a file path
176    '''
177    def log_file(self, *args):
178        return str(Path(self.outputdir, *args))
179
180    '''
181    @params plugin name
182
183    Return the full path to the plugin taking into account any host OS
184    specific suffixes.
185    '''
186    def plugin_file(self, plugin_name):
187        sfx = dso_suffix()
188        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
189
190    def assets_available(self):
191        for name, asset in vars(self.__class__).items():
192            if name.startswith("ASSET_") and type(asset) == Asset:
193                if not asset.available():
194                    self.log.debug(f"Asset {asset.url} not available")
195                    return False
196        return True
197
198    def setUp(self):
199        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
200        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
201        self.arch = self.qemu_bin.split('-')[-1]
202        self.socketdir = None
203
204        self.outputdir = self.build_file('tests', 'functional',
205                                         self.arch, self.id())
206        self.workdir = os.path.join(self.outputdir, 'scratch')
207        os.makedirs(self.workdir, exist_ok=True)
208
209        self.log_filename = self.log_file('base.log')
210        self.log = logging.getLogger('qemu-test')
211        self.log.setLevel(logging.DEBUG)
212        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
213        self._log_fh.setLevel(logging.DEBUG)
214        fileFormatter = logging.Formatter(
215            '%(asctime)s - %(levelname)s: %(message)s')
216        self._log_fh.setFormatter(fileFormatter)
217        self.log.addHandler(self._log_fh)
218
219        # Capture QEMUMachine logging
220        self.machinelog = logging.getLogger('qemu.machine')
221        self.machinelog.setLevel(logging.DEBUG)
222        self.machinelog.addHandler(self._log_fh)
223
224        if not self.assets_available():
225            self.skipTest('One or more assets is not available')
226
227    def tearDown(self):
228        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
229            shutil.rmtree(self.workdir)
230        if self.socketdir is not None:
231            shutil.rmtree(self.socketdir.name)
232            self.socketdir = None
233        self.machinelog.removeHandler(self._log_fh)
234        self.log.removeHandler(self._log_fh)
235        self._log_fh.close()
236
237    def main():
238        path = os.path.basename(sys.argv[0])[:-3]
239
240        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
241        if cache is not None:
242            Asset.precache_suites(path, cache)
243            return
244
245        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
246                                   test_output_log = pycotap.LogMode.LogToError)
247        res = unittest.main(module = None, testRunner = tr, exit = False,
248                            argv=["__dummy__", path])
249        for (test, message) in res.result.errors + res.result.failures:
250
251            if hasattr(test, "log_filename"):
252                print('More information on ' + test.id() + ' could be found here:'
253                      '\n %s' % test.log_filename, file=sys.stderr)
254                if hasattr(test, 'console_log_name'):
255                    print(' %s' % test.console_log_name, file=sys.stderr)
256        sys.exit(not res.result.wasSuccessful())
257
258
259class QemuUserTest(QemuBaseTest):
260
261    def setUp(self):
262        super().setUp()
263        self._ldpath = []
264
265    def add_ldpath(self, ldpath):
266        self._ldpath.append(os.path.abspath(ldpath))
267
268    def run_cmd(self, bin_path, args=[]):
269        return run([self.qemu_bin]
270                   + ["-L %s" % ldpath for ldpath in self._ldpath]
271                   + [bin_path]
272                   + args,
273                   text=True, capture_output=True)
274
275class QemuSystemTest(QemuBaseTest):
276    """Facilitates system emulation tests."""
277
278    cpu = None
279    machine = None
280    _machinehelp = None
281
282    def setUp(self):
283        self._vms = {}
284
285        super().setUp()
286
287        console_log = logging.getLogger('console')
288        console_log.setLevel(logging.DEBUG)
289        self.console_log_name = self.log_file('console.log')
290        self._console_log_fh = logging.FileHandler(self.console_log_name,
291                                                   mode='w')
292        self._console_log_fh.setLevel(logging.DEBUG)
293        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
294        self._console_log_fh.setFormatter(fileFormatter)
295        console_log.addHandler(self._console_log_fh)
296
297    def set_machine(self, machinename):
298        # TODO: We should use QMP to get the list of available machines
299        if not self._machinehelp:
300            self._machinehelp = run(
301                [self.qemu_bin, '-M', 'help'],
302                capture_output=True, check=True, encoding='utf8').stdout
303        if self._machinehelp.find(machinename) < 0:
304            self.skipTest('no support for machine ' + machinename)
305        self.machine = machinename
306
307    def require_accelerator(self, accelerator):
308        """
309        Requires an accelerator to be available for the test to continue
310
311        It takes into account the currently set qemu binary.
312
313        If the check fails, the test is canceled.  If the check itself
314        for the given accelerator is not available, the test is also
315        canceled.
316
317        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
318        :type accelerator: str
319        """
320        checker = {'tcg': tcg_available,
321                   'kvm': kvm_available,
322                   'hvf': hvf_available,
323                  }.get(accelerator)
324        if checker is None:
325            self.skipTest("Don't know how to check for the presence "
326                          "of accelerator %s" % accelerator)
327        if not checker(qemu_bin=self.qemu_bin):
328            self.skipTest("%s accelerator does not seem to be "
329                          "available" % accelerator)
330
331    def require_netdev(self, netdevname):
332        help = run([self.qemu_bin,
333                    '-M', 'none', '-netdev', 'help'],
334                   capture_output=True, check=True, encoding='utf8').stdout;
335        if help.find('\n' + netdevname + '\n') < 0:
336            self.skipTest('no support for " + netdevname + " networking')
337
338    def require_device(self, devicename):
339        help = run([self.qemu_bin,
340                    '-M', 'none', '-device', 'help'],
341                   capture_output=True, check=True, encoding='utf8').stdout;
342        if help.find(devicename) < 0:
343            self.skipTest('no support for device ' + devicename)
344
345    def _new_vm(self, name, *args):
346        vm = QEMUMachine(self.qemu_bin,
347                         name=name,
348                         base_temp_dir=self.workdir,
349                         log_dir=self.log_file())
350        self.log.debug('QEMUMachine "%s" created', name)
351        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
352
353        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
354        if sockpath is not None:
355            vm.add_args("-chardev",
356                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
357                        "-mon", "chardev=backdoor,mode=control")
358
359        if args:
360            vm.add_args(*args)
361        return vm
362
363    @property
364    def vm(self):
365        return self.get_vm(name='default')
366
367    def get_vm(self, *args, name=None):
368        if not name:
369            name = str(uuid.uuid4())
370        if self._vms.get(name) is None:
371            self._vms[name] = self._new_vm(name, *args)
372            if self.cpu is not None:
373                self._vms[name].add_args('-cpu', self.cpu)
374            if self.machine is not None:
375                self._vms[name].set_machine(self.machine)
376        return self._vms[name]
377
378    def set_vm_arg(self, arg, value):
379        """
380        Set an argument to list of extra arguments to be given to the QEMU
381        binary. If the argument already exists then its value is replaced.
382
383        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
384        :type arg: str
385        :param value: the argument value, such as "host" in "-cpu host"
386        :type value: str
387        """
388        if not arg or not value:
389            return
390        if arg not in self.vm.args:
391            self.vm.args.extend([arg, value])
392        else:
393            idx = self.vm.args.index(arg) + 1
394            if idx < len(self.vm.args):
395                self.vm.args[idx] = value
396            else:
397                self.vm.args.append(value)
398
399    def tearDown(self):
400        for vm in self._vms.values():
401            vm.shutdown()
402        logging.getLogger('console').removeHandler(self._console_log_fh)
403        self._console_log_fh.close()
404        super().tearDown()
405