xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 4481234e985aacbbebb321310d2c6993cb4983b1)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import shutil
18from subprocess import run
19import sys
20import tempfile
21import warnings
22import unittest
23import uuid
24
25import pycotap
26
27from qemu.machine import QEMUMachine
28from qemu.utils import hvf_available, kvm_available, tcg_available
29
30from .archive import archive_extract
31from .asset import Asset
32from .config import BUILD_DIR, dso_suffix
33from .uncompress import uncompress
34
35
36class QemuBaseTest(unittest.TestCase):
37
38    def uncompress(self, compressed, format=None):
39        '''
40        @params compressed: filename, Asset, or file-like object to uncompress
41        @params format: optional compression format (gzip, lzma)
42
43        Uncompresses @compressed into the scratch directory.
44
45        If @format is None, heuristics will be applied to guess the
46        format from the filename or Asset URL. @format must be non-None
47        if @uncompressed is a file-like object.
48
49        Returns the fully qualified path to the uncompressed file
50        '''
51        self.log.debug(f"Uncompress {compressed} format={format}")
52        if isinstance(compressed, Asset):
53            compressed.fetch()
54
55        (name, _ext) = os.path.splitext(str(compressed))
56        uncompressed = self.scratch_file(os.path.basename(name))
57
58        uncompress(compressed, uncompressed, format)
59
60        return uncompressed
61
62    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
63        '''
64        @params archive: filename, Asset, or file-like object to extract
65        @params format: optional archive format (tar, zip, deb, cpio)
66        @params sub_dir: optional sub-directory to extract into
67        @params member: optional member file to limit extraction to
68
69        Extracts @archive into the scratch directory, or a directory beneath
70        named by @sub_dir. All files are extracted unless @member specifies
71        a limit.
72
73        If @format is None, heuristics will be applied to guess the
74        format from the filename or Asset URL. @format must be non-None
75        if @archive is a file-like object.
76
77        If @member is non-None, returns the fully qualified path to @member
78        '''
79        self.log.debug(f"Extract {archive} format={format}" +
80                       f"sub_dir={sub_dir} member={member}")
81        if isinstance(archive, Asset):
82            archive.fetch()
83        if sub_dir is None:
84            archive_extract(archive, self.scratch_file(), format, member)
85        else:
86            archive_extract(archive, self.scratch_file(sub_dir),
87                            format, member)
88
89        if member is not None:
90            return self.scratch_file(member)
91        return None
92
93    def socket_dir(self):
94        '''
95        Create a temporary directory suitable for storing UNIX
96        socket paths.
97
98        Returns: a tempfile.TemporaryDirectory instance
99        '''
100        if self.socketdir is None:
101            self.socketdir = tempfile.TemporaryDirectory(
102                prefix="qemu_func_test_sock_")
103        return self.socketdir
104
105    def data_file(self, *args):
106        '''
107        @params args list of zero or more subdirectories or file
108
109        Construct a path for accessing a data file located
110        relative to the source directory that is the root for
111        functional tests.
112
113        @args may be an empty list to reference the root dir
114        itself, may be a single element to reference a file in
115        the root directory, or may be multiple elements to
116        reference a file nested below. The path components
117        will be joined using the platform appropriate path
118        separator.
119
120        Returns: string representing a file path
121        '''
122        return str(Path(Path(__file__).parent.parent, *args))
123
124    def build_file(self, *args):
125        '''
126        @params args list of zero or more subdirectories or file
127
128        Construct a path for accessing a data file located
129        relative to the build directory root.
130
131        @args may be an empty list to reference the build dir
132        itself, may be a single element to reference a file in
133        the build directory, or may be multiple elements to
134        reference a file nested below. The path components
135        will be joined using the platform appropriate path
136        separator.
137
138        Returns: string representing a file path
139        '''
140        return str(Path(BUILD_DIR, *args))
141
142    def scratch_file(self, *args):
143        '''
144        @params args list of zero or more subdirectories or file
145
146        Construct a path for accessing/creating a scratch file
147        located relative to a temporary directory dedicated to
148        this test case. The directory and its contents will be
149        purged upon completion of the test.
150
151        @args may be an empty list to reference the scratch dir
152        itself, may be a single element to reference a file in
153        the scratch directory, or may be multiple elements to
154        reference a file nested below. The path components
155        will be joined using the platform appropriate path
156        separator.
157
158        Returns: string representing a file path
159        '''
160        return str(Path(self.workdir, *args))
161
162    def log_file(self, *args):
163        '''
164        @params args list of zero or more subdirectories or file
165
166        Construct a path for accessing/creating a log file
167        located relative to a temporary directory dedicated to
168        this test case. The directory and its log files will be
169        preserved upon completion of the test.
170
171        @args may be an empty list to reference the log dir
172        itself, may be a single element to reference a file in
173        the log directory, or may be multiple elements to
174        reference a file nested below. The path components
175        will be joined using the platform appropriate path
176        separator.
177
178        Returns: string representing a file path
179        '''
180        return str(Path(self.outputdir, *args))
181
182    def plugin_file(self, plugin_name):
183        '''
184        @params plugin name
185
186        Return the full path to the plugin taking into account any host OS
187        specific suffixes.
188        '''
189        sfx = dso_suffix()
190        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
191
192    def assets_available(self):
193        for name, asset in vars(self.__class__).items():
194            if name.startswith("ASSET_") and isinstance(asset, Asset):
195                if not asset.available():
196                    self.log.debug(f"Asset {asset.url} not available")
197                    return False
198        return True
199
200    def setUp(self):
201        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
202        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
203        self.arch = self.qemu_bin.split('-')[-1]
204        self.socketdir = None
205
206        self.outputdir = self.build_file('tests', 'functional',
207                                         self.arch, self.id())
208        self.workdir = os.path.join(self.outputdir, 'scratch')
209        if os.path.exists(self.workdir):
210            # Purge as safety net in case of unclean termination of
211            # previous test, or use of QEMU_TEST_KEEP_SCRATCH
212            shutil.rmtree(self.workdir)
213        os.makedirs(self.workdir, exist_ok=True)
214
215        self.log_filename = self.log_file('base.log')
216        self.log = logging.getLogger('qemu-test')
217        self.log.setLevel(logging.DEBUG)
218        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
219        self._log_fh.setLevel(logging.DEBUG)
220        file_formatter = logging.Formatter(
221            '%(asctime)s - %(levelname)s: %(name)s.%(funcName)s %(message)s')
222        self._log_fh.setFormatter(file_formatter)
223        self.log.addHandler(self._log_fh)
224
225        # Capture QEMUMachine logging
226        self.machinelog = logging.getLogger('qemu.machine')
227        self.machinelog.setLevel(logging.DEBUG)
228        self.machinelog.addHandler(self._log_fh)
229        self.qmplog = logging.getLogger('qemu.qmp')
230        self.qmplog.setLevel(logging.DEBUG)
231        self.qmplog.addHandler(self._log_fh)
232
233        if not self.assets_available():
234            self.skipTest('One or more assets is not available')
235
236    def tearDown(self):
237        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
238            shutil.rmtree(self.workdir)
239        if self.socketdir is not None:
240            self.socketdir.cleanup()
241            self.socketdir = None
242        self.qmplog.removeHandler(self._log_fh)
243        self.machinelog.removeHandler(self._log_fh)
244        self.log.removeHandler(self._log_fh)
245        self._log_fh.close()
246
247    @staticmethod
248    def main():
249        warnings.simplefilter("default")
250        os.environ["PYTHONWARNINGS"] = "default"
251
252        path = os.path.basename(sys.argv[0])[:-3]
253
254        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
255        if cache is not None:
256            Asset.precache_suites(path, cache)
257            return
258
259        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
260                                   test_output_log = pycotap.LogMode.LogToError)
261        res = unittest.main(module = None, testRunner = tr, exit = False,
262                            argv=[sys.argv[0], path] + sys.argv[1:])
263        failed = {}
264        for (test, _message) in res.result.errors + res.result.failures:
265            if hasattr(test, "log_filename") and not test.id() in failed:
266                print('More information on ' + test.id() + ' could be found here:'
267                      '\n %s' % test.log_filename, file=sys.stderr)
268                if hasattr(test, 'console_log_name'):
269                    print(' %s' % test.console_log_name, file=sys.stderr)
270                failed[test.id()] = True
271        sys.exit(not res.result.wasSuccessful())
272
273
274class QemuUserTest(QemuBaseTest):
275
276    def setUp(self):
277        super().setUp()
278        self._ldpath = []
279
280    def add_ldpath(self, ldpath):
281        self._ldpath.append(os.path.abspath(ldpath))
282
283    def run_cmd(self, bin_path, args=None):
284        if args is None:
285            args = []
286        return run([self.qemu_bin]
287                   + ["-L %s" % ldpath for ldpath in self._ldpath]
288                   + [bin_path]
289                   + args,
290                   text=True, capture_output=True)
291
292class QemuSystemTest(QemuBaseTest):
293    """Facilitates system emulation tests."""
294
295    cpu = None
296    machine = None
297    _machinehelp = None
298
299    def setUp(self):
300        self._vms = {}
301
302        super().setUp()
303
304        console_log = logging.getLogger('console')
305        console_log.setLevel(logging.DEBUG)
306        self.console_log_name = self.log_file('console.log')
307        self._console_log_fh = logging.FileHandler(self.console_log_name,
308                                                   mode='w')
309        self._console_log_fh.setLevel(logging.DEBUG)
310        file_formatter = logging.Formatter('%(asctime)s: %(message)s')
311        self._console_log_fh.setFormatter(file_formatter)
312        console_log.addHandler(self._console_log_fh)
313
314    def set_machine(self, machinename):
315        # TODO: We should use QMP to get the list of available machines
316        if not self._machinehelp:
317            self._machinehelp = run(
318                [self.qemu_bin, '-M', 'help'],
319                capture_output=True, check=True, encoding='utf8').stdout
320        if self._machinehelp.find(machinename) < 0:
321            self.skipTest('no support for machine ' + machinename)
322        self.machine = machinename
323
324    def require_accelerator(self, accelerator):
325        """
326        Requires an accelerator to be available for the test to continue
327
328        It takes into account the currently set qemu binary.
329
330        If the check fails, the test is canceled.  If the check itself
331        for the given accelerator is not available, the test is also
332        canceled.
333
334        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
335        :type accelerator: str
336        """
337        checker = {'tcg': tcg_available,
338                   'kvm': kvm_available,
339                   'hvf': hvf_available,
340                  }.get(accelerator)
341        if checker is None:
342            self.skipTest("Don't know how to check for the presence "
343                          "of accelerator %s" % accelerator)
344        if not checker(qemu_bin=self.qemu_bin):
345            self.skipTest("%s accelerator does not seem to be "
346                          "available" % accelerator)
347
348    def require_netdev(self, netdevname):
349        helptxt = run([self.qemu_bin, '-M', 'none', '-netdev', 'help'],
350                      capture_output=True, check=True, encoding='utf8').stdout
351        if helptxt.find('\n' + netdevname + '\n') < 0:
352            self.skipTest('no support for " + netdevname + " networking')
353
354    def require_device(self, devicename):
355        helptxt = run([self.qemu_bin, '-M', 'none', '-device', 'help'],
356                   capture_output=True, check=True, encoding='utf8').stdout
357        if helptxt.find(devicename) < 0:
358            self.skipTest('no support for device ' + devicename)
359
360    def _new_vm(self, name, *args):
361        vm = QEMUMachine(self.qemu_bin,
362                         name=name,
363                         base_temp_dir=self.workdir,
364                         log_dir=self.log_file())
365        self.log.debug('QEMUMachine "%s" created', name)
366        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
367
368        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
369        if sockpath is not None:
370            vm.add_args("-chardev",
371                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
372                        "-mon", "chardev=backdoor,mode=control")
373
374        if args:
375            vm.add_args(*args)
376        return vm
377
378    @property
379    def vm(self):
380        return self.get_vm(name='default')
381
382    def get_vm(self, *args, name=None):
383        if not name:
384            name = str(uuid.uuid4())
385        if self._vms.get(name) is None:
386            self._vms[name] = self._new_vm(name, *args)
387            if self.cpu is not None:
388                self._vms[name].add_args('-cpu', self.cpu)
389            if self.machine is not None:
390                self._vms[name].set_machine(self.machine)
391        return self._vms[name]
392
393    def set_vm_arg(self, arg, value):
394        """
395        Set an argument to list of extra arguments to be given to the QEMU
396        binary. If the argument already exists then its value is replaced.
397
398        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
399        :type arg: str
400        :param value: the argument value, such as "host" in "-cpu host"
401        :type value: str
402        """
403        if not arg or not value:
404            return
405        if arg not in self.vm.args:
406            self.vm.args.extend([arg, value])
407        else:
408            idx = self.vm.args.index(arg) + 1
409            if idx < len(self.vm.args):
410                self.vm.args[idx] = value
411            else:
412                self.vm.args.append(value)
413
414    def tearDown(self):
415        for vm in self._vms.values():
416            try:
417                vm.shutdown()
418            except Exception as ex:
419                self.log.error("Failed to teardown VM: %s", ex)
420        logging.getLogger('console').removeHandler(self._console_log_fh)
421        self._console_log_fh.close()
422        super().tearDown()
423