1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import subprocess
17import pycotap
18import sys
19import unittest
20import uuid
21
22from qemu.machine import QEMUMachine
23from qemu.utils import kvm_available, tcg_available
24
25from .asset import Asset
26from .cmd import run_cmd
27from .config import BUILD_DIR
28
29
30class QemuBaseTest(unittest.TestCase):
31
32    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
33    arch = None
34
35    workdir = None
36    log = None
37    logdir = None
38
39    def setUp(self, bin_prefix):
40        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
41        self.arch = self.qemu_bin.split('-')[-1]
42
43        self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
44                                    self.id())
45        os.makedirs(self.workdir, exist_ok=True)
46
47        self.logdir = self.workdir
48        self.log_filename = os.path.join(self.logdir, 'base.log')
49        self.log = logging.getLogger('qemu-test')
50        self.log.setLevel(logging.DEBUG)
51        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
52        self._log_fh.setLevel(logging.DEBUG)
53        fileFormatter = logging.Formatter(
54            '%(asctime)s - %(levelname)s: %(message)s')
55        self._log_fh.setFormatter(fileFormatter)
56        self.log.addHandler(self._log_fh)
57
58    def tearDown(self):
59        self.log.removeHandler(self._log_fh)
60
61    def main():
62        path = os.path.basename(sys.argv[0])[:-3]
63
64        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
65        if cache is not None:
66            Asset.precache_suites(path, cache)
67            return
68
69        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
70                                   test_output_log = pycotap.LogMode.LogToError)
71        res = unittest.main(module = None, testRunner = tr, exit = False,
72                            argv=["__dummy__", path])
73        for (test, message) in res.result.errors + res.result.failures:
74            print('More information on ' + test.id() + ' could be found here:'
75                  '\n %s' % test.log_filename, file=sys.stderr)
76            if hasattr(test, 'console_log_name'):
77                print(' %s' % test.console_log_name, file=sys.stderr)
78        sys.exit(not res.result.wasSuccessful())
79
80
81class QemuUserTest(QemuBaseTest):
82
83    def setUp(self):
84        super().setUp('qemu-')
85        self._ldpath = []
86
87    def add_ldpath(self, ldpath):
88        self._ldpath.append(os.path.abspath(ldpath))
89
90    def run_cmd(self, bin_path, args=[]):
91        return subprocess.run([self.qemu_bin]
92                              + ["-L %s" % ldpath for ldpath in self._ldpath]
93                              + [bin_path]
94                              + args,
95                              text=True, capture_output=True)
96
97class QemuSystemTest(QemuBaseTest):
98    """Facilitates system emulation tests."""
99
100    cpu = None
101    machine = None
102    _machinehelp = None
103
104    def setUp(self):
105        self._vms = {}
106
107        super().setUp('qemu-system-')
108
109        console_log = logging.getLogger('console')
110        console_log.setLevel(logging.DEBUG)
111        self.console_log_name = os.path.join(self.workdir, 'console.log')
112        self._console_log_fh = logging.FileHandler(self.console_log_name,
113                                                   mode='w')
114        self._console_log_fh.setLevel(logging.DEBUG)
115        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
116        self._console_log_fh.setFormatter(fileFormatter)
117        console_log.addHandler(self._console_log_fh)
118
119    def set_machine(self, machinename):
120        # TODO: We should use QMP to get the list of available machines
121        if not self._machinehelp:
122            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
123        if self._machinehelp.find(machinename) < 0:
124            self.skipTest('no support for machine ' + machinename)
125        self.machine = machinename
126
127    def require_accelerator(self, accelerator):
128        """
129        Requires an accelerator to be available for the test to continue
130
131        It takes into account the currently set qemu binary.
132
133        If the check fails, the test is canceled.  If the check itself
134        for the given accelerator is not available, the test is also
135        canceled.
136
137        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
138        :type accelerator: str
139        """
140        checker = {'tcg': tcg_available,
141                   'kvm': kvm_available}.get(accelerator)
142        if checker is None:
143            self.skipTest("Don't know how to check for the presence "
144                          "of accelerator %s" % accelerator)
145        if not checker(qemu_bin=self.qemu_bin):
146            self.skipTest("%s accelerator does not seem to be "
147                          "available" % accelerator)
148
149    def require_netdev(self, netdevname):
150        netdevhelp = run_cmd([self.qemu_bin,
151                             '-M', 'none', '-netdev', 'help'])[0];
152        if netdevhelp.find('\n' + netdevname + '\n') < 0:
153            self.skipTest('no support for " + netdevname + " networking')
154
155    def require_device(self, devicename):
156        devhelp = run_cmd([self.qemu_bin,
157                           '-M', 'none', '-device', 'help'])[0];
158        if devhelp.find(devicename) < 0:
159            self.skipTest('no support for device ' + devicename)
160
161    def _new_vm(self, name, *args):
162        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
163        self.log.debug('QEMUMachine "%s" created', name)
164        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
165        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
166        if args:
167            vm.add_args(*args)
168        return vm
169
170    @property
171    def vm(self):
172        return self.get_vm(name='default')
173
174    def get_vm(self, *args, name=None):
175        if not name:
176            name = str(uuid.uuid4())
177        if self._vms.get(name) is None:
178            self._vms[name] = self._new_vm(name, *args)
179            if self.cpu is not None:
180                self._vms[name].add_args('-cpu', self.cpu)
181            if self.machine is not None:
182                self._vms[name].set_machine(self.machine)
183        return self._vms[name]
184
185    def set_vm_arg(self, arg, value):
186        """
187        Set an argument to list of extra arguments to be given to the QEMU
188        binary. If the argument already exists then its value is replaced.
189
190        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
191        :type arg: str
192        :param value: the argument value, such as "host" in "-cpu host"
193        :type value: str
194        """
195        if not arg or not value:
196            return
197        if arg not in self.vm.args:
198            self.vm.args.extend([arg, value])
199        else:
200            idx = self.vm.args.index(arg) + 1
201            if idx < len(self.vm.args):
202                self.vm.args[idx] = value
203            else:
204                self.vm.args.append(value)
205
206    def tearDown(self):
207        for vm in self._vms.values():
208            vm.shutdown()
209        logging.getLogger('console').removeHandler(self._console_log_fh)
210        super().tearDown()
211