xref: /openbmc/qemu/tests/functional/qemu_test/testcase.py (revision 84e4a27feda50323361af775ffe21fd26db3a051)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import sys
18import unittest
19import uuid
20
21from qemu.machine import QEMUMachine
22from qemu.utils import kvm_available, tcg_available
23
24from .cmd import run_cmd
25from .config import BUILD_DIR
26
27
28class QemuBaseTest(unittest.TestCase):
29
30    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
31    arch = None
32
33    workdir = None
34    log = None
35    logdir = None
36
37    def setUp(self, bin_prefix):
38        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
39        self.arch = self.qemu_bin.split('-')[-1]
40
41        self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
42                                    self.id())
43        os.makedirs(self.workdir, exist_ok=True)
44
45        self.logdir = self.workdir
46        self.log = logging.getLogger('qemu-test')
47        self.log.setLevel(logging.DEBUG)
48        self._log_fh = logging.FileHandler(os.path.join(self.logdir,
49                                                        'base.log'), mode='w')
50        self._log_fh.setLevel(logging.DEBUG)
51        fileFormatter = logging.Formatter(
52            '%(asctime)s - %(levelname)s: %(message)s')
53        self._log_fh.setFormatter(fileFormatter)
54        self.log.addHandler(self._log_fh)
55
56    def tearDown(self):
57        self.log.removeHandler(self._log_fh)
58
59    def main():
60        path = os.path.basename(sys.argv[0])[:-3]
61        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
62                                   test_output_log = pycotap.LogMode.LogToError)
63        unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
64
65
66class QemuSystemTest(QemuBaseTest):
67    """Facilitates system emulation tests."""
68
69    cpu = None
70    machine = None
71    _machinehelp = None
72
73    def setUp(self):
74        self._vms = {}
75
76        super().setUp('qemu-system-')
77
78        console_log = logging.getLogger('console')
79        console_log.setLevel(logging.DEBUG)
80        self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
81                                                   'console.log'), mode='w')
82        self._console_log_fh.setLevel(logging.DEBUG)
83        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
84        self._console_log_fh.setFormatter(fileFormatter)
85        console_log.addHandler(self._console_log_fh)
86
87    def set_machine(self, machinename):
88        # TODO: We should use QMP to get the list of available machines
89        if not self._machinehelp:
90            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
91        if self._machinehelp.find(machinename) < 0:
92            self.skipTest('no support for machine ' + machinename)
93        self.machine = machinename
94
95    def require_accelerator(self, accelerator):
96        """
97        Requires an accelerator to be available for the test to continue
98
99        It takes into account the currently set qemu binary.
100
101        If the check fails, the test is canceled.  If the check itself
102        for the given accelerator is not available, the test is also
103        canceled.
104
105        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
106        :type accelerator: str
107        """
108        checker = {'tcg': tcg_available,
109                   'kvm': kvm_available}.get(accelerator)
110        if checker is None:
111            self.skipTest("Don't know how to check for the presence "
112                          "of accelerator %s" % accelerator)
113        if not checker(qemu_bin=self.qemu_bin):
114            self.skipTest("%s accelerator does not seem to be "
115                          "available" % accelerator)
116
117    def require_netdev(self, netdevname):
118        netdevhelp = run_cmd([self.qemu_bin,
119                             '-M', 'none', '-netdev', 'help'])[0];
120        if netdevhelp.find('\n' + netdevname + '\n') < 0:
121            self.skipTest('no support for " + netdevname + " networking')
122
123    def require_device(self, devicename):
124        devhelp = run_cmd([self.qemu_bin,
125                           '-M', 'none', '-device', 'help'])[0];
126        if devhelp.find(devicename) < 0:
127            self.skipTest('no support for device ' + devicename)
128
129    def _new_vm(self, name, *args):
130        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
131        self.log.debug('QEMUMachine "%s" created', name)
132        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
133        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
134        if args:
135            vm.add_args(*args)
136        return vm
137
138    @property
139    def vm(self):
140        return self.get_vm(name='default')
141
142    def get_vm(self, *args, name=None):
143        if not name:
144            name = str(uuid.uuid4())
145        if self._vms.get(name) is None:
146            self._vms[name] = self._new_vm(name, *args)
147            if self.cpu is not None:
148                self._vms[name].add_args('-cpu', self.cpu)
149            if self.machine is not None:
150                self._vms[name].set_machine(self.machine)
151        return self._vms[name]
152
153    def set_vm_arg(self, arg, value):
154        """
155        Set an argument to list of extra arguments to be given to the QEMU
156        binary. If the argument already exists then its value is replaced.
157
158        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
159        :type arg: str
160        :param value: the argument value, such as "host" in "-cpu host"
161        :type value: str
162        """
163        if not arg or not value:
164            return
165        if arg not in self.vm.args:
166            self.vm.args.extend([arg, value])
167        else:
168            idx = self.vm.args.index(arg) + 1
169            if idx < len(self.vm.args):
170                self.vm.args[idx] = value
171            else:
172                self.vm.args.append(value)
173
174    def tearDown(self):
175        for vm in self._vms.values():
176            vm.shutdown()
177        logging.getLogger('console').removeHandler(self._console_log_fh)
178        super().tearDown()
179