1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import shutil
18import subprocess
19import sys
20import unittest
21import uuid
22
23from qemu.machine import QEMUMachine
24from qemu.utils import kvm_available, tcg_available
25
26from .asset import Asset
27from .cmd import run_cmd
28from .config import BUILD_DIR
29
30
31class QemuBaseTest(unittest.TestCase):
32
33    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
34    arch = None
35
36    workdir = None
37    log = None
38    logdir = None
39
40    def setUp(self, bin_prefix):
41        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
42        self.arch = self.qemu_bin.split('-')[-1]
43
44        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
45                                      self.arch, self.id())
46        self.workdir = os.path.join(self.outputdir, 'scratch')
47        os.makedirs(self.workdir, exist_ok=True)
48
49        self.logdir = self.outputdir
50        self.log_filename = os.path.join(self.logdir, 'base.log')
51        self.log = logging.getLogger('qemu-test')
52        self.log.setLevel(logging.DEBUG)
53        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
54        self._log_fh.setLevel(logging.DEBUG)
55        fileFormatter = logging.Formatter(
56            '%(asctime)s - %(levelname)s: %(message)s')
57        self._log_fh.setFormatter(fileFormatter)
58        self.log.addHandler(self._log_fh)
59
60        # Capture QEMUMachine logging
61        self.machinelog = logging.getLogger('qemu.machine')
62        self.machinelog.setLevel(logging.DEBUG)
63        self.machinelog.addHandler(self._log_fh)
64
65    def tearDown(self):
66        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
67            shutil.rmtree(self.workdir)
68        self.machinelog.removeHandler(self._log_fh)
69        self.log.removeHandler(self._log_fh)
70
71    def main():
72        path = os.path.basename(sys.argv[0])[:-3]
73
74        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
75        if cache is not None:
76            Asset.precache_suites(path, cache)
77            return
78
79        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
80                                   test_output_log = pycotap.LogMode.LogToError)
81        res = unittest.main(module = None, testRunner = tr, exit = False,
82                            argv=["__dummy__", path])
83        for (test, message) in res.result.errors + res.result.failures:
84
85            if hasattr(test, "log_filename"):
86                print('More information on ' + test.id() + ' could be found here:'
87                      '\n %s' % test.log_filename, file=sys.stderr)
88                if hasattr(test, 'console_log_name'):
89                    print(' %s' % test.console_log_name, file=sys.stderr)
90        sys.exit(not res.result.wasSuccessful())
91
92
93class QemuUserTest(QemuBaseTest):
94
95    def setUp(self):
96        super().setUp('qemu-')
97        self._ldpath = []
98
99    def add_ldpath(self, ldpath):
100        self._ldpath.append(os.path.abspath(ldpath))
101
102    def run_cmd(self, bin_path, args=[]):
103        return subprocess.run([self.qemu_bin]
104                              + ["-L %s" % ldpath for ldpath in self._ldpath]
105                              + [bin_path]
106                              + args,
107                              text=True, capture_output=True)
108
109class QemuSystemTest(QemuBaseTest):
110    """Facilitates system emulation tests."""
111
112    cpu = None
113    machine = None
114    _machinehelp = None
115
116    def setUp(self):
117        self._vms = {}
118
119        super().setUp('qemu-system-')
120
121        console_log = logging.getLogger('console')
122        console_log.setLevel(logging.DEBUG)
123        self.console_log_name = os.path.join(self.logdir, 'console.log')
124        self._console_log_fh = logging.FileHandler(self.console_log_name,
125                                                   mode='w')
126        self._console_log_fh.setLevel(logging.DEBUG)
127        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
128        self._console_log_fh.setFormatter(fileFormatter)
129        console_log.addHandler(self._console_log_fh)
130
131    def set_machine(self, machinename):
132        # TODO: We should use QMP to get the list of available machines
133        if not self._machinehelp:
134            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
135        if self._machinehelp.find(machinename) < 0:
136            self.skipTest('no support for machine ' + machinename)
137        self.machine = machinename
138
139    def require_accelerator(self, accelerator):
140        """
141        Requires an accelerator to be available for the test to continue
142
143        It takes into account the currently set qemu binary.
144
145        If the check fails, the test is canceled.  If the check itself
146        for the given accelerator is not available, the test is also
147        canceled.
148
149        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
150        :type accelerator: str
151        """
152        checker = {'tcg': tcg_available,
153                   'kvm': kvm_available}.get(accelerator)
154        if checker is None:
155            self.skipTest("Don't know how to check for the presence "
156                          "of accelerator %s" % accelerator)
157        if not checker(qemu_bin=self.qemu_bin):
158            self.skipTest("%s accelerator does not seem to be "
159                          "available" % accelerator)
160
161    def require_netdev(self, netdevname):
162        netdevhelp = run_cmd([self.qemu_bin,
163                             '-M', 'none', '-netdev', 'help'])[0];
164        if netdevhelp.find('\n' + netdevname + '\n') < 0:
165            self.skipTest('no support for " + netdevname + " networking')
166
167    def require_device(self, devicename):
168        devhelp = run_cmd([self.qemu_bin,
169                           '-M', 'none', '-device', 'help'])[0];
170        if devhelp.find(devicename) < 0:
171            self.skipTest('no support for device ' + devicename)
172
173    def _new_vm(self, name, *args):
174        vm = QEMUMachine(self.qemu_bin,
175                         name=name,
176                         base_temp_dir=self.workdir,
177                         log_dir=self.logdir)
178        self.log.debug('QEMUMachine "%s" created', name)
179        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
180
181        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
182        if sockpath is not None:
183            vm.add_args("-chardev",
184                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
185                        "-mon", "chardev=backdoor,mode=control")
186
187        if args:
188            vm.add_args(*args)
189        return vm
190
191    @property
192    def vm(self):
193        return self.get_vm(name='default')
194
195    def get_vm(self, *args, name=None):
196        if not name:
197            name = str(uuid.uuid4())
198        if self._vms.get(name) is None:
199            self._vms[name] = self._new_vm(name, *args)
200            if self.cpu is not None:
201                self._vms[name].add_args('-cpu', self.cpu)
202            if self.machine is not None:
203                self._vms[name].set_machine(self.machine)
204        return self._vms[name]
205
206    def set_vm_arg(self, arg, value):
207        """
208        Set an argument to list of extra arguments to be given to the QEMU
209        binary. If the argument already exists then its value is replaced.
210
211        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
212        :type arg: str
213        :param value: the argument value, such as "host" in "-cpu host"
214        :type value: str
215        """
216        if not arg or not value:
217            return
218        if arg not in self.vm.args:
219            self.vm.args.extend([arg, value])
220        else:
221            idx = self.vm.args.index(arg) + 1
222            if idx < len(self.vm.args):
223                self.vm.args[idx] = value
224            else:
225                self.vm.args.append(value)
226
227    def tearDown(self):
228        for vm in self._vms.values():
229            vm.shutdown()
230        logging.getLogger('console').removeHandler(self._console_log_fh)
231        super().tearDown()
232