1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import subprocess
17import pycotap
18import sys
19import unittest
20import uuid
21
22from qemu.machine import QEMUMachine
23from qemu.utils import kvm_available, tcg_available
24
25from .asset import Asset
26from .cmd import run_cmd
27from .config import BUILD_DIR
28
29
30class QemuBaseTest(unittest.TestCase):
31
32    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
33    arch = None
34
35    workdir = None
36    log = None
37    logdir = None
38
39    def setUp(self, bin_prefix):
40        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
41        self.arch = self.qemu_bin.split('-')[-1]
42
43        self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
44                                    self.id())
45        os.makedirs(self.workdir, exist_ok=True)
46
47        self.logdir = self.workdir
48        self.log = logging.getLogger('qemu-test')
49        self.log.setLevel(logging.DEBUG)
50        self._log_fh = logging.FileHandler(os.path.join(self.logdir,
51                                                        'base.log'), mode='w')
52        self._log_fh.setLevel(logging.DEBUG)
53        fileFormatter = logging.Formatter(
54            '%(asctime)s - %(levelname)s: %(message)s')
55        self._log_fh.setFormatter(fileFormatter)
56        self.log.addHandler(self._log_fh)
57
58    def tearDown(self):
59        self.log.removeHandler(self._log_fh)
60
61    def main():
62        path = os.path.basename(sys.argv[0])[:-3]
63
64        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
65        if cache is not None:
66            Asset.precache_suites(path, cache)
67            return
68
69        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
70                                   test_output_log = pycotap.LogMode.LogToError)
71        unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
72
73
74class QemuUserTest(QemuBaseTest):
75
76    def setUp(self):
77        super().setUp('qemu-')
78        self._ldpath = []
79
80    def add_ldpath(self, ldpath):
81        self._ldpath.append(os.path.abspath(ldpath))
82
83    def run_cmd(self, bin_path, args=[]):
84        return subprocess.run([self.qemu_bin]
85                              + ["-L %s" % ldpath for ldpath in self._ldpath]
86                              + [bin_path]
87                              + args,
88                              text=True, capture_output=True)
89
90class QemuSystemTest(QemuBaseTest):
91    """Facilitates system emulation tests."""
92
93    cpu = None
94    machine = None
95    _machinehelp = None
96
97    def setUp(self):
98        self._vms = {}
99
100        super().setUp('qemu-system-')
101
102        console_log = logging.getLogger('console')
103        console_log.setLevel(logging.DEBUG)
104        self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
105                                                   'console.log'), mode='w')
106        self._console_log_fh.setLevel(logging.DEBUG)
107        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
108        self._console_log_fh.setFormatter(fileFormatter)
109        console_log.addHandler(self._console_log_fh)
110
111    def set_machine(self, machinename):
112        # TODO: We should use QMP to get the list of available machines
113        if not self._machinehelp:
114            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
115        if self._machinehelp.find(machinename) < 0:
116            self.skipTest('no support for machine ' + machinename)
117        self.machine = machinename
118
119    def require_accelerator(self, accelerator):
120        """
121        Requires an accelerator to be available for the test to continue
122
123        It takes into account the currently set qemu binary.
124
125        If the check fails, the test is canceled.  If the check itself
126        for the given accelerator is not available, the test is also
127        canceled.
128
129        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
130        :type accelerator: str
131        """
132        checker = {'tcg': tcg_available,
133                   'kvm': kvm_available}.get(accelerator)
134        if checker is None:
135            self.skipTest("Don't know how to check for the presence "
136                          "of accelerator %s" % accelerator)
137        if not checker(qemu_bin=self.qemu_bin):
138            self.skipTest("%s accelerator does not seem to be "
139                          "available" % accelerator)
140
141    def require_netdev(self, netdevname):
142        netdevhelp = run_cmd([self.qemu_bin,
143                             '-M', 'none', '-netdev', 'help'])[0];
144        if netdevhelp.find('\n' + netdevname + '\n') < 0:
145            self.skipTest('no support for " + netdevname + " networking')
146
147    def require_device(self, devicename):
148        devhelp = run_cmd([self.qemu_bin,
149                           '-M', 'none', '-device', 'help'])[0];
150        if devhelp.find(devicename) < 0:
151            self.skipTest('no support for device ' + devicename)
152
153    def _new_vm(self, name, *args):
154        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
155        self.log.debug('QEMUMachine "%s" created', name)
156        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
157        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
158        if args:
159            vm.add_args(*args)
160        return vm
161
162    @property
163    def vm(self):
164        return self.get_vm(name='default')
165
166    def get_vm(self, *args, name=None):
167        if not name:
168            name = str(uuid.uuid4())
169        if self._vms.get(name) is None:
170            self._vms[name] = self._new_vm(name, *args)
171            if self.cpu is not None:
172                self._vms[name].add_args('-cpu', self.cpu)
173            if self.machine is not None:
174                self._vms[name].set_machine(self.machine)
175        return self._vms[name]
176
177    def set_vm_arg(self, arg, value):
178        """
179        Set an argument to list of extra arguments to be given to the QEMU
180        binary. If the argument already exists then its value is replaced.
181
182        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
183        :type arg: str
184        :param value: the argument value, such as "host" in "-cpu host"
185        :type value: str
186        """
187        if not arg or not value:
188            return
189        if arg not in self.vm.args:
190            self.vm.args.extend([arg, value])
191        else:
192            idx = self.vm.args.index(arg) + 1
193            if idx < len(self.vm.args):
194                self.vm.args[idx] = value
195            else:
196                self.vm.args.append(value)
197
198    def tearDown(self):
199        for vm in self._vms.values():
200            vm.shutdown()
201        logging.getLogger('console').removeHandler(self._console_log_fh)
202        super().tearDown()
203