xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 37e9b19c34d9500164e33ccf377e1830e956bca0)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16from pathlib import Path
17import pycotap
18import shutil
19from subprocess import run
20import sys
21import tempfile
22import unittest
23import uuid
24
25from qemu.machine import QEMUMachine
26from qemu.utils import kvm_available, tcg_available
27
28from .archive import archive_extract
29from .asset import Asset
30from .config import BUILD_DIR
31from .uncompress import uncompress
32
33
34class QemuBaseTest(unittest.TestCase):
35
36    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
37    arch = None
38
39    workdir = None
40    log = None
41    logdir = None
42
43    '''
44    @params compressed: filename, Asset, or file-like object to uncompress
45    @params format: optional compression format (gzip, lzma)
46
47    Uncompresses @compressed into the scratch directory.
48
49    If @format is None, heuristics will be applied to guess the format
50    from the filename or Asset URL. @format must be non-None if @uncompressed
51    is a file-like object.
52
53    Returns the fully qualified path to the uncompressed file
54    '''
55    def uncompress(self, compressed, format=None):
56        self.log.debug(f"Uncompress {compressed} format={format}")
57        if type(compressed) == Asset:
58            compressed.fetch()
59
60        (name, ext) = os.path.splitext(str(compressed))
61        uncompressed = self.scratch_file(os.path.basename(name))
62
63        uncompress(compressed, uncompressed, format)
64
65        return uncompressed
66
67    '''
68    @params archive: filename, Asset, or file-like object to extract
69    @params format: optional archive format (tar, zip, deb, cpio)
70    @params sub_dir: optional sub-directory to extract into
71    @params member: optional member file to limit extraction to
72
73    Extracts @archive into the scratch directory, or a directory beneath
74    named by @sub_dir. All files are extracted unless @member specifies
75    a limit.
76
77    If @format is None, heuristics will be applied to guess the format
78    from the filename or Asset URL. @format must be non-None if @archive
79    is a file-like object.
80
81    If @member is non-None, returns the fully qualified path to @member
82    '''
83    def archive_extract(self, archive, format=None, sub_dir=None, member=None):
84        self.log.debug(f"Extract {archive} format={format}" +
85                       f"sub_dir={sub_dir} member={member}")
86        if type(archive) == Asset:
87            archive.fetch()
88        if sub_dir is None:
89            archive_extract(archive, self.scratch_file(), format, member)
90        else:
91            archive_extract(archive, self.scratch_file(sub_dir),
92                            format, member)
93
94        if member is not None:
95            return self.scratch_file(member)
96        return None
97
98    '''
99    Create a temporary directory suitable for storing UNIX
100    socket paths.
101
102    Returns: a tempfile.TemporaryDirectory instance
103    '''
104    def socket_dir(self):
105        if self.socketdir is None:
106            self.socketdir = tempfile.TemporaryDirectory(
107                prefix="qemu_func_test_sock_")
108        return self.socketdir
109
110    '''
111    @params args list of zero or more subdirectories or file
112
113    Construct a path for accessing a data file located
114    relative to the source directory that is the root for
115    functional tests.
116
117    @args may be an empty list to reference the root dir
118    itself, may be a single element to reference a file in
119    the root directory, or may be multiple elements to
120    reference a file nested below. The path components
121    will be joined using the platform appropriate path
122    separator.
123
124    Returns: string representing a file path
125    '''
126    def data_file(self, *args):
127        return str(Path(Path(__file__).parent.parent, *args))
128
129    '''
130    @params args list of zero or more subdirectories or file
131
132    Construct a path for accessing a data file located
133    relative to the build directory root.
134
135    @args may be an empty list to reference the build dir
136    itself, may be a single element to reference a file in
137    the build directory, or may be multiple elements to
138    reference a file nested below. The path components
139    will be joined using the platform appropriate path
140    separator.
141
142    Returns: string representing a file path
143    '''
144    def build_file(self, *args):
145        return str(Path(BUILD_DIR, *args))
146
147    '''
148    @params args list of zero or more subdirectories or file
149
150    Construct a path for accessing/creating a scratch file
151    located relative to a temporary directory dedicated to
152    this test case. The directory and its contents will be
153    purged upon completion of the test.
154
155    @args may be an empty list to reference the scratch dir
156    itself, may be a single element to reference a file in
157    the scratch directory, or may be multiple elements to
158    reference a file nested below. The path components
159    will be joined using the platform appropriate path
160    separator.
161
162    Returns: string representing a file path
163    '''
164    def scratch_file(self, *args):
165        return str(Path(self.workdir, *args))
166
167    '''
168    @params args list of zero or more subdirectories or file
169
170    Construct a path for accessing/creating a log file
171    located relative to a temporary directory dedicated to
172    this test case. The directory and its log files will be
173    preserved upon completion of the test.
174
175    @args may be an empty list to reference the log dir
176    itself, may be a single element to reference a file in
177    the log directory, or may be multiple elements to
178    reference a file nested below. The path components
179    will be joined using the platform appropriate path
180    separator.
181
182    Returns: string representing a file path
183    '''
184    def log_file(self, *args):
185        return str(Path(self.outputdir, *args))
186
187    def setUp(self, bin_prefix):
188        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
189        self.arch = self.qemu_bin.split('-')[-1]
190        self.socketdir = None
191
192        self.outputdir = self.build_file('tests', 'functional',
193                                         self.arch, self.id())
194        self.workdir = os.path.join(self.outputdir, 'scratch')
195        os.makedirs(self.workdir, exist_ok=True)
196
197        self.log_filename = self.log_file('base.log')
198        self.log = logging.getLogger('qemu-test')
199        self.log.setLevel(logging.DEBUG)
200        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
201        self._log_fh.setLevel(logging.DEBUG)
202        fileFormatter = logging.Formatter(
203            '%(asctime)s - %(levelname)s: %(message)s')
204        self._log_fh.setFormatter(fileFormatter)
205        self.log.addHandler(self._log_fh)
206
207        # Capture QEMUMachine logging
208        self.machinelog = logging.getLogger('qemu.machine')
209        self.machinelog.setLevel(logging.DEBUG)
210        self.machinelog.addHandler(self._log_fh)
211
212    def tearDown(self):
213        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
214            shutil.rmtree(self.workdir)
215        if self.socketdir is not None:
216            shutil.rmtree(self.socketdir.name)
217            self.socketdir = None
218        self.machinelog.removeHandler(self._log_fh)
219        self.log.removeHandler(self._log_fh)
220
221    def main():
222        path = os.path.basename(sys.argv[0])[:-3]
223
224        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
225        if cache is not None:
226            Asset.precache_suites(path, cache)
227            return
228
229        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
230                                   test_output_log = pycotap.LogMode.LogToError)
231        res = unittest.main(module = None, testRunner = tr, exit = False,
232                            argv=["__dummy__", path])
233        for (test, message) in res.result.errors + res.result.failures:
234
235            if hasattr(test, "log_filename"):
236                print('More information on ' + test.id() + ' could be found here:'
237                      '\n %s' % test.log_filename, file=sys.stderr)
238                if hasattr(test, 'console_log_name'):
239                    print(' %s' % test.console_log_name, file=sys.stderr)
240        sys.exit(not res.result.wasSuccessful())
241
242
243class QemuUserTest(QemuBaseTest):
244
245    def setUp(self):
246        super().setUp('qemu-')
247        self._ldpath = []
248
249    def add_ldpath(self, ldpath):
250        self._ldpath.append(os.path.abspath(ldpath))
251
252    def run_cmd(self, bin_path, args=[]):
253        return run([self.qemu_bin]
254                   + ["-L %s" % ldpath for ldpath in self._ldpath]
255                   + [bin_path]
256                   + args,
257                   text=True, capture_output=True)
258
259class QemuSystemTest(QemuBaseTest):
260    """Facilitates system emulation tests."""
261
262    cpu = None
263    machine = None
264    _machinehelp = None
265
266    def setUp(self):
267        self._vms = {}
268
269        super().setUp('qemu-system-')
270
271        console_log = logging.getLogger('console')
272        console_log.setLevel(logging.DEBUG)
273        self.console_log_name = self.log_file('console.log')
274        self._console_log_fh = logging.FileHandler(self.console_log_name,
275                                                   mode='w')
276        self._console_log_fh.setLevel(logging.DEBUG)
277        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
278        self._console_log_fh.setFormatter(fileFormatter)
279        console_log.addHandler(self._console_log_fh)
280
281    def set_machine(self, machinename):
282        # TODO: We should use QMP to get the list of available machines
283        if not self._machinehelp:
284            self._machinehelp = run(
285                [self.qemu_bin, '-M', 'help'],
286                capture_output=True, check=True, encoding='utf8').stdout
287        if self._machinehelp.find(machinename) < 0:
288            self.skipTest('no support for machine ' + machinename)
289        self.machine = machinename
290
291    def require_accelerator(self, accelerator):
292        """
293        Requires an accelerator to be available for the test to continue
294
295        It takes into account the currently set qemu binary.
296
297        If the check fails, the test is canceled.  If the check itself
298        for the given accelerator is not available, the test is also
299        canceled.
300
301        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
302        :type accelerator: str
303        """
304        checker = {'tcg': tcg_available,
305                   'kvm': kvm_available}.get(accelerator)
306        if checker is None:
307            self.skipTest("Don't know how to check for the presence "
308                          "of accelerator %s" % accelerator)
309        if not checker(qemu_bin=self.qemu_bin):
310            self.skipTest("%s accelerator does not seem to be "
311                          "available" % accelerator)
312
313    def require_netdev(self, netdevname):
314        help = run([self.qemu_bin,
315                    '-M', 'none', '-netdev', 'help'],
316                   capture_output=True, check=True, encoding='utf8').stdout;
317        if help.find('\n' + netdevname + '\n') < 0:
318            self.skipTest('no support for " + netdevname + " networking')
319
320    def require_device(self, devicename):
321        help = run([self.qemu_bin,
322                    '-M', 'none', '-device', 'help'],
323                   capture_output=True, check=True, encoding='utf8').stdout;
324        if help.find(devicename) < 0:
325            self.skipTest('no support for device ' + devicename)
326
327    def _new_vm(self, name, *args):
328        vm = QEMUMachine(self.qemu_bin,
329                         name=name,
330                         base_temp_dir=self.workdir,
331                         log_dir=self.log_file())
332        self.log.debug('QEMUMachine "%s" created', name)
333        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
334
335        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
336        if sockpath is not None:
337            vm.add_args("-chardev",
338                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
339                        "-mon", "chardev=backdoor,mode=control")
340
341        if args:
342            vm.add_args(*args)
343        return vm
344
345    @property
346    def vm(self):
347        return self.get_vm(name='default')
348
349    def get_vm(self, *args, name=None):
350        if not name:
351            name = str(uuid.uuid4())
352        if self._vms.get(name) is None:
353            self._vms[name] = self._new_vm(name, *args)
354            if self.cpu is not None:
355                self._vms[name].add_args('-cpu', self.cpu)
356            if self.machine is not None:
357                self._vms[name].set_machine(self.machine)
358        return self._vms[name]
359
360    def set_vm_arg(self, arg, value):
361        """
362        Set an argument to list of extra arguments to be given to the QEMU
363        binary. If the argument already exists then its value is replaced.
364
365        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
366        :type arg: str
367        :param value: the argument value, such as "host" in "-cpu host"
368        :type value: str
369        """
370        if not arg or not value:
371            return
372        if arg not in self.vm.args:
373            self.vm.args.extend([arg, value])
374        else:
375            idx = self.vm.args.index(arg) + 1
376            if idx < len(self.vm.args):
377                self.vm.args[idx] = value
378            else:
379                self.vm.args.append(value)
380
381    def tearDown(self):
382        for vm in self._vms.values():
383            vm.shutdown()
384        logging.getLogger('console').removeHandler(self._console_log_fh)
385        super().tearDown()
386