1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import shutil
18import subprocess
19import sys
20import unittest
21import uuid
22
23from qemu.machine import QEMUMachine
24from qemu.utils import kvm_available, tcg_available
25
26from .asset import Asset
27from .cmd import run_cmd
28from .config import BUILD_DIR
29
30
31class QemuBaseTest(unittest.TestCase):
32
33    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
34    arch = None
35
36    workdir = None
37    log = None
38    logdir = None
39
40    def setUp(self, bin_prefix):
41        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
42        self.arch = self.qemu_bin.split('-')[-1]
43
44        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
45                                      self.arch, self.id())
46        self.workdir = os.path.join(self.outputdir, 'scratch')
47        os.makedirs(self.workdir, exist_ok=True)
48
49        self.logdir = self.outputdir
50        self.log_filename = os.path.join(self.logdir, 'base.log')
51        self.log = logging.getLogger('qemu-test')
52        self.log.setLevel(logging.DEBUG)
53        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
54        self._log_fh.setLevel(logging.DEBUG)
55        fileFormatter = logging.Formatter(
56            '%(asctime)s - %(levelname)s: %(message)s')
57        self._log_fh.setFormatter(fileFormatter)
58        self.log.addHandler(self._log_fh)
59
60    def tearDown(self):
61        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
62            shutil.rmtree(self.workdir)
63        self.log.removeHandler(self._log_fh)
64
65    def main():
66        path = os.path.basename(sys.argv[0])[:-3]
67
68        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
69        if cache is not None:
70            Asset.precache_suites(path, cache)
71            return
72
73        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
74                                   test_output_log = pycotap.LogMode.LogToError)
75        res = unittest.main(module = None, testRunner = tr, exit = False,
76                            argv=["__dummy__", path])
77        for (test, message) in res.result.errors + res.result.failures:
78            print('More information on ' + test.id() + ' could be found here:'
79                  '\n %s' % test.log_filename, file=sys.stderr)
80            if hasattr(test, 'console_log_name'):
81                print(' %s' % test.console_log_name, file=sys.stderr)
82        sys.exit(not res.result.wasSuccessful())
83
84
85class QemuUserTest(QemuBaseTest):
86
87    def setUp(self):
88        super().setUp('qemu-')
89        self._ldpath = []
90
91    def add_ldpath(self, ldpath):
92        self._ldpath.append(os.path.abspath(ldpath))
93
94    def run_cmd(self, bin_path, args=[]):
95        return subprocess.run([self.qemu_bin]
96                              + ["-L %s" % ldpath for ldpath in self._ldpath]
97                              + [bin_path]
98                              + args,
99                              text=True, capture_output=True)
100
101class QemuSystemTest(QemuBaseTest):
102    """Facilitates system emulation tests."""
103
104    cpu = None
105    machine = None
106    _machinehelp = None
107
108    def setUp(self):
109        self._vms = {}
110
111        super().setUp('qemu-system-')
112
113        console_log = logging.getLogger('console')
114        console_log.setLevel(logging.DEBUG)
115        self.console_log_name = os.path.join(self.logdir, 'console.log')
116        self._console_log_fh = logging.FileHandler(self.console_log_name,
117                                                   mode='w')
118        self._console_log_fh.setLevel(logging.DEBUG)
119        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
120        self._console_log_fh.setFormatter(fileFormatter)
121        console_log.addHandler(self._console_log_fh)
122
123    def set_machine(self, machinename):
124        # TODO: We should use QMP to get the list of available machines
125        if not self._machinehelp:
126            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
127        if self._machinehelp.find(machinename) < 0:
128            self.skipTest('no support for machine ' + machinename)
129        self.machine = machinename
130
131    def require_accelerator(self, accelerator):
132        """
133        Requires an accelerator to be available for the test to continue
134
135        It takes into account the currently set qemu binary.
136
137        If the check fails, the test is canceled.  If the check itself
138        for the given accelerator is not available, the test is also
139        canceled.
140
141        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
142        :type accelerator: str
143        """
144        checker = {'tcg': tcg_available,
145                   'kvm': kvm_available}.get(accelerator)
146        if checker is None:
147            self.skipTest("Don't know how to check for the presence "
148                          "of accelerator %s" % accelerator)
149        if not checker(qemu_bin=self.qemu_bin):
150            self.skipTest("%s accelerator does not seem to be "
151                          "available" % accelerator)
152
153    def require_netdev(self, netdevname):
154        netdevhelp = run_cmd([self.qemu_bin,
155                             '-M', 'none', '-netdev', 'help'])[0];
156        if netdevhelp.find('\n' + netdevname + '\n') < 0:
157            self.skipTest('no support for " + netdevname + " networking')
158
159    def require_device(self, devicename):
160        devhelp = run_cmd([self.qemu_bin,
161                           '-M', 'none', '-device', 'help'])[0];
162        if devhelp.find(devicename) < 0:
163            self.skipTest('no support for device ' + devicename)
164
165    def _new_vm(self, name, *args):
166        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
167        self.log.debug('QEMUMachine "%s" created', name)
168        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
169        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
170        if args:
171            vm.add_args(*args)
172        return vm
173
174    @property
175    def vm(self):
176        return self.get_vm(name='default')
177
178    def get_vm(self, *args, name=None):
179        if not name:
180            name = str(uuid.uuid4())
181        if self._vms.get(name) is None:
182            self._vms[name] = self._new_vm(name, *args)
183            if self.cpu is not None:
184                self._vms[name].add_args('-cpu', self.cpu)
185            if self.machine is not None:
186                self._vms[name].set_machine(self.machine)
187        return self._vms[name]
188
189    def set_vm_arg(self, arg, value):
190        """
191        Set an argument to list of extra arguments to be given to the QEMU
192        binary. If the argument already exists then its value is replaced.
193
194        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
195        :type arg: str
196        :param value: the argument value, such as "host" in "-cpu host"
197        :type value: str
198        """
199        if not arg or not value:
200            return
201        if arg not in self.vm.args:
202            self.vm.args.extend([arg, value])
203        else:
204            idx = self.vm.args.index(arg) + 1
205            if idx < len(self.vm.args):
206                self.vm.args[idx] = value
207            else:
208                self.vm.args.append(value)
209
210    def tearDown(self):
211        for vm in self._vms.values():
212            vm.shutdown()
213        logging.getLogger('console').removeHandler(self._console_log_fh)
214        super().tearDown()
215