xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 20b28f58b428a78ebd3f4b57b668dc0214918639)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import warnings
23import unittest
24import uuid
25
26from qemu.machine import QEMUMachine
27from qemu.utils import hvf_available, kvm_available, tcg_available
28
29from .archive import archive_extract
30from .asset import Asset
31from .config import BUILD_DIR, dso_suffix
32from .uncompress import uncompress
33
34
35class QemuBaseTest(unittest.TestCase):
36
37    '''
38    @params compressed: filename, Asset, or file-like object to uncompress
39    @params format: optional compression format (gzip, lzma)
40
41    Uncompresses @compressed into the scratch directory.
42
43    If @format is None, heuristics will be applied to guess the format
44    from the filename or Asset URL. @format must be non-None if @uncompressed
45    is a file-like object.
46
47    Returns the fully qualified path to the uncompressed file
48    '''
49    def uncompress(self, compressed, format=None):
50        self.log.debug(f"Uncompress {compressed} format={format}")
51        if type(compressed) == Asset:
52            compressed.fetch()
53
54        (name, ext) = os.path.splitext(str(compressed))
55        uncompressed = self.scratch_file(os.path.basename(name))
56
57        uncompress(compressed, uncompressed, format)
58
59        return uncompressed
60
61    '''
62    @params archive: filename, Asset, or file-like object to extract
63    @params format: optional archive format (tar, zip, deb, cpio)
64    @params sub_dir: optional sub-directory to extract into
65    @params member: optional member file to limit extraction to
66
67    Extracts @archive into the scratch directory, or a directory beneath
68    named by @sub_dir. All files are extracted unless @member specifies
69    a limit.
70
71    If @format is None, heuristics will be applied to guess the format
72    from the filename or Asset URL. @format must be non-None if @archive
73    is a file-like object.
74
75    If @member is non-None, returns the fully qualified path to @member
76    '''
77    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
78        self.log.debug(f"Extract {archive} format={format}" +
79                       f"sub_dir={sub_dir} member={member}")
80        if type(archive) == Asset:
81            archive.fetch()
82        if sub_dir is None:
83            archive_extract(archive, self.scratch_file(), format, member)
84        else:
85            archive_extract(archive, self.scratch_file(sub_dir),
86                            format, member)
87
88        if member is not None:
89            return self.scratch_file(member)
90        return None
91
92    '''
93    Create a temporary directory suitable for storing UNIX
94    socket paths.
95
96    Returns: a tempfile.TemporaryDirectory instance
97    '''
98    def socket_dir(self):
99        if self.socketdir is None:
100            self.socketdir = tempfile.TemporaryDirectory(
101                prefix="qemu_func_test_sock_")
102        return self.socketdir
103
104    '''
105    @params args list of zero or more subdirectories or file
106
107    Construct a path for accessing a data file located
108    relative to the source directory that is the root for
109    functional tests.
110
111    @args may be an empty list to reference the root dir
112    itself, may be a single element to reference a file in
113    the root directory, or may be multiple elements to
114    reference a file nested below. The path components
115    will be joined using the platform appropriate path
116    separator.
117
118    Returns: string representing a file path
119    '''
120    def data_file(self, *args):
121        return str(Path(Path(__file__).parent.parent, *args))
122
123    '''
124    @params args list of zero or more subdirectories or file
125
126    Construct a path for accessing a data file located
127    relative to the build directory root.
128
129    @args may be an empty list to reference the build dir
130    itself, may be a single element to reference a file in
131    the build directory, or may be multiple elements to
132    reference a file nested below. The path components
133    will be joined using the platform appropriate path
134    separator.
135
136    Returns: string representing a file path
137    '''
138    def build_file(self, *args):
139        return str(Path(BUILD_DIR, *args))
140
141    '''
142    @params args list of zero or more subdirectories or file
143
144    Construct a path for accessing/creating a scratch file
145    located relative to a temporary directory dedicated to
146    this test case. The directory and its contents will be
147    purged upon completion of the test.
148
149    @args may be an empty list to reference the scratch dir
150    itself, may be a single element to reference a file in
151    the scratch directory, or may be multiple elements to
152    reference a file nested below. The path components
153    will be joined using the platform appropriate path
154    separator.
155
156    Returns: string representing a file path
157    '''
158    def scratch_file(self, *args):
159        return str(Path(self.workdir, *args))
160
161    '''
162    @params args list of zero or more subdirectories or file
163
164    Construct a path for accessing/creating a log file
165    located relative to a temporary directory dedicated to
166    this test case. The directory and its log files will be
167    preserved upon completion of the test.
168
169    @args may be an empty list to reference the log dir
170    itself, may be a single element to reference a file in
171    the log directory, or may be multiple elements to
172    reference a file nested below. The path components
173    will be joined using the platform appropriate path
174    separator.
175
176    Returns: string representing a file path
177    '''
178    def log_file(self, *args):
179        return str(Path(self.outputdir, *args))
180
181    '''
182    @params plugin name
183
184    Return the full path to the plugin taking into account any host OS
185    specific suffixes.
186    '''
187    def plugin_file(self, plugin_name):
188        sfx = dso_suffix()
189        return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}')
190
191    def assets_available(self):
192        for name, asset in vars(self.__class__).items():
193            if name.startswith("ASSET_") and type(asset) == Asset:
194                if not asset.available():
195                    self.log.debug(f"Asset {asset.url} not available")
196                    return False
197        return True
198
199    def setUp(self):
200        self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
201        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
202        self.arch = self.qemu_bin.split('-')[-1]
203        self.socketdir = None
204
205        self.outputdir = self.build_file('tests', 'functional',
206                                         self.arch, self.id())
207        self.workdir = os.path.join(self.outputdir, 'scratch')
208        os.makedirs(self.workdir, exist_ok=True)
209
210        self.log_filename = self.log_file('base.log')
211        self.log = logging.getLogger('qemu-test')
212        self.log.setLevel(logging.DEBUG)
213        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
214        self._log_fh.setLevel(logging.DEBUG)
215        fileFormatter = logging.Formatter(
216            '%(asctime)s - %(levelname)s: %(message)s')
217        self._log_fh.setFormatter(fileFormatter)
218        self.log.addHandler(self._log_fh)
219
220        # Capture QEMUMachine logging
221        self.machinelog = logging.getLogger('qemu.machine')
222        self.machinelog.setLevel(logging.DEBUG)
223        self.machinelog.addHandler(self._log_fh)
224
225        if not self.assets_available():
226            self.skipTest('One or more assets is not available')
227
228    def tearDown(self):
229        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
230            shutil.rmtree(self.workdir)
231        if self.socketdir is not None:
232            shutil.rmtree(self.socketdir.name)
233            self.socketdir = None
234        self.machinelog.removeHandler(self._log_fh)
235        self.log.removeHandler(self._log_fh)
236        self._log_fh.close()
237
238    def main():
239        warnings.simplefilter("default")
240        os.environ["PYTHONWARNINGS"] = "default"
241
242        path = os.path.basename(sys.argv[0])[:-3]
243
244        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
245        if cache is not None:
246            Asset.precache_suites(path, cache)
247            return
248
249        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
250                                   test_output_log = pycotap.LogMode.LogToError)
251        res = unittest.main(module = None, testRunner = tr, exit = False,
252                            argv=[sys.argv[0], path] + sys.argv[1:])
253        for (test, message) in res.result.errors + res.result.failures:
254
255            if hasattr(test, "log_filename"):
256                print('More information on ' + test.id() + ' could be found here:'
257                      '\n %s' % test.log_filename, file=sys.stderr)
258                if hasattr(test, 'console_log_name'):
259                    print(' %s' % test.console_log_name, file=sys.stderr)
260        sys.exit(not res.result.wasSuccessful())
261
262
263class QemuUserTest(QemuBaseTest):
264
265    def setUp(self):
266        super().setUp()
267        self._ldpath = []
268
269    def add_ldpath(self, ldpath):
270        self._ldpath.append(os.path.abspath(ldpath))
271
272    def run_cmd(self, bin_path, args=[]):
273        return run([self.qemu_bin]
274                   + ["-L %s" % ldpath for ldpath in self._ldpath]
275                   + [bin_path]
276                   + args,
277                   text=True, capture_output=True)
278
279class QemuSystemTest(QemuBaseTest):
280    """Facilitates system emulation tests."""
281
282    cpu = None
283    machine = None
284    _machinehelp = None
285
286    def setUp(self):
287        self._vms = {}
288
289        super().setUp()
290
291        console_log = logging.getLogger('console')
292        console_log.setLevel(logging.DEBUG)
293        self.console_log_name = self.log_file('console.log')
294        self._console_log_fh = logging.FileHandler(self.console_log_name,
295                                                   mode='w')
296        self._console_log_fh.setLevel(logging.DEBUG)
297        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
298        self._console_log_fh.setFormatter(fileFormatter)
299        console_log.addHandler(self._console_log_fh)
300
301    def set_machine(self, machinename):
302        # TODO: We should use QMP to get the list of available machines
303        if not self._machinehelp:
304            self._machinehelp = run(
305                [self.qemu_bin, '-M', 'help'],
306                capture_output=True, check=True, encoding='utf8').stdout
307        if self._machinehelp.find(machinename) < 0:
308            self.skipTest('no support for machine ' + machinename)
309        self.machine = machinename
310
311    def require_accelerator(self, accelerator):
312        """
313        Requires an accelerator to be available for the test to continue
314
315        It takes into account the currently set qemu binary.
316
317        If the check fails, the test is canceled.  If the check itself
318        for the given accelerator is not available, the test is also
319        canceled.
320
321        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
322        :type accelerator: str
323        """
324        checker = {'tcg': tcg_available,
325                   'kvm': kvm_available,
326                   'hvf': hvf_available,
327                  }.get(accelerator)
328        if checker is None:
329            self.skipTest("Don't know how to check for the presence "
330                          "of accelerator %s" % accelerator)
331        if not checker(qemu_bin=self.qemu_bin):
332            self.skipTest("%s accelerator does not seem to be "
333                          "available" % accelerator)
334
335    def require_netdev(self, netdevname):
336        help = run([self.qemu_bin,
337                    '-M', 'none', '-netdev', 'help'],
338                   capture_output=True, check=True, encoding='utf8').stdout;
339        if help.find('\n' + netdevname + '\n') < 0:
340            self.skipTest('no support for " + netdevname + " networking')
341
342    def require_device(self, devicename):
343        help = run([self.qemu_bin,
344                    '-M', 'none', '-device', 'help'],
345                   capture_output=True, check=True, encoding='utf8').stdout;
346        if help.find(devicename) < 0:
347            self.skipTest('no support for device ' + devicename)
348
349    def _new_vm(self, name, *args):
350        vm = QEMUMachine(self.qemu_bin,
351                         name=name,
352                         base_temp_dir=self.workdir,
353                         log_dir=self.log_file())
354        self.log.debug('QEMUMachine "%s" created', name)
355        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
356
357        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
358        if sockpath is not None:
359            vm.add_args("-chardev",
360                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
361                        "-mon", "chardev=backdoor,mode=control")
362
363        if args:
364            vm.add_args(*args)
365        return vm
366
367    @property
368    def vm(self):
369        return self.get_vm(name='default')
370
371    def get_vm(self, *args, name=None):
372        if not name:
373            name = str(uuid.uuid4())
374        if self._vms.get(name) is None:
375            self._vms[name] = self._new_vm(name, *args)
376            if self.cpu is not None:
377                self._vms[name].add_args('-cpu', self.cpu)
378            if self.machine is not None:
379                self._vms[name].set_machine(self.machine)
380        return self._vms[name]
381
382    def set_vm_arg(self, arg, value):
383        """
384        Set an argument to list of extra arguments to be given to the QEMU
385        binary. If the argument already exists then its value is replaced.
386
387        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
388        :type arg: str
389        :param value: the argument value, such as "host" in "-cpu host"
390        :type value: str
391        """
392        if not arg or not value:
393            return
394        if arg not in self.vm.args:
395            self.vm.args.extend([arg, value])
396        else:
397            idx = self.vm.args.index(arg) + 1
398            if idx < len(self.vm.args):
399                self.vm.args[idx] = value
400            else:
401                self.vm.args.append(value)
402
403    def tearDown(self):
404        for vm in self._vms.values():
405            vm.shutdown()
406        logging.getLogger('console').removeHandler(self._console_log_fh)
407        self._console_log_fh.close()
408        super().tearDown()
409