1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import pycotap 17import shutil 18import subprocess 19import sys 20import unittest 21import uuid 22 23from qemu.machine import QEMUMachine 24from qemu.utils import kvm_available, tcg_available 25 26from .asset import Asset 27from .cmd import run_cmd 28from .config import BUILD_DIR 29 30 31class QemuBaseTest(unittest.TestCase): 32 33 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 34 arch = None 35 36 workdir = None 37 log = None 38 logdir = None 39 40 def setUp(self, bin_prefix): 41 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 42 self.arch = self.qemu_bin.split('-')[-1] 43 44 self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional', 45 self.arch, self.id()) 46 self.workdir = os.path.join(self.outputdir, 'scratch') 47 os.makedirs(self.workdir, exist_ok=True) 48 49 self.logdir = self.outputdir 50 self.log_filename = os.path.join(self.logdir, 'base.log') 51 self.log = logging.getLogger('qemu-test') 52 self.log.setLevel(logging.DEBUG) 53 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 54 self._log_fh.setLevel(logging.DEBUG) 55 fileFormatter = logging.Formatter( 56 '%(asctime)s - %(levelname)s: %(message)s') 57 self._log_fh.setFormatter(fileFormatter) 58 self.log.addHandler(self._log_fh) 59 60 def tearDown(self): 61 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 62 shutil.rmtree(self.workdir) 63 self.log.removeHandler(self._log_fh) 64 65 def main(): 66 path = os.path.basename(sys.argv[0])[:-3] 67 68 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 69 if cache is not None: 70 Asset.precache_suites(path, cache) 71 return 72 73 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 74 test_output_log = pycotap.LogMode.LogToError) 75 res = unittest.main(module = None, testRunner = tr, exit = False, 76 argv=["__dummy__", path]) 77 for (test, message) in res.result.errors + res.result.failures: 78 print('More information on ' + test.id() + ' could be found here:' 79 '\n %s' % test.log_filename, file=sys.stderr) 80 if hasattr(test, 'console_log_name'): 81 print(' %s' % test.console_log_name, file=sys.stderr) 82 sys.exit(not res.result.wasSuccessful()) 83 84 85class QemuUserTest(QemuBaseTest): 86 87 def setUp(self): 88 super().setUp('qemu-') 89 self._ldpath = [] 90 91 def add_ldpath(self, ldpath): 92 self._ldpath.append(os.path.abspath(ldpath)) 93 94 def run_cmd(self, bin_path, args=[]): 95 return subprocess.run([self.qemu_bin] 96 + ["-L %s" % ldpath for ldpath in self._ldpath] 97 + [bin_path] 98 + args, 99 text=True, capture_output=True) 100 101class QemuSystemTest(QemuBaseTest): 102 """Facilitates system emulation tests.""" 103 104 cpu = None 105 machine = None 106 _machinehelp = None 107 108 def setUp(self): 109 self._vms = {} 110 111 super().setUp('qemu-system-') 112 113 console_log = logging.getLogger('console') 114 console_log.setLevel(logging.DEBUG) 115 self.console_log_name = os.path.join(self.logdir, 'console.log') 116 self._console_log_fh = logging.FileHandler(self.console_log_name, 117 mode='w') 118 self._console_log_fh.setLevel(logging.DEBUG) 119 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 120 self._console_log_fh.setFormatter(fileFormatter) 121 console_log.addHandler(self._console_log_fh) 122 123 def set_machine(self, machinename): 124 # TODO: We should use QMP to get the list of available machines 125 if not self._machinehelp: 126 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 127 if self._machinehelp.find(machinename) < 0: 128 self.skipTest('no support for machine ' + machinename) 129 self.machine = machinename 130 131 def require_accelerator(self, accelerator): 132 """ 133 Requires an accelerator to be available for the test to continue 134 135 It takes into account the currently set qemu binary. 136 137 If the check fails, the test is canceled. If the check itself 138 for the given accelerator is not available, the test is also 139 canceled. 140 141 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 142 :type accelerator: str 143 """ 144 checker = {'tcg': tcg_available, 145 'kvm': kvm_available}.get(accelerator) 146 if checker is None: 147 self.skipTest("Don't know how to check for the presence " 148 "of accelerator %s" % accelerator) 149 if not checker(qemu_bin=self.qemu_bin): 150 self.skipTest("%s accelerator does not seem to be " 151 "available" % accelerator) 152 153 def require_netdev(self, netdevname): 154 netdevhelp = run_cmd([self.qemu_bin, 155 '-M', 'none', '-netdev', 'help'])[0]; 156 if netdevhelp.find('\n' + netdevname + '\n') < 0: 157 self.skipTest('no support for " + netdevname + " networking') 158 159 def require_device(self, devicename): 160 devhelp = run_cmd([self.qemu_bin, 161 '-M', 'none', '-device', 'help'])[0]; 162 if devhelp.find(devicename) < 0: 163 self.skipTest('no support for device ' + devicename) 164 165 def _new_vm(self, name, *args): 166 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) 167 self.log.debug('QEMUMachine "%s" created', name) 168 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 169 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 170 if args: 171 vm.add_args(*args) 172 return vm 173 174 @property 175 def vm(self): 176 return self.get_vm(name='default') 177 178 def get_vm(self, *args, name=None): 179 if not name: 180 name = str(uuid.uuid4()) 181 if self._vms.get(name) is None: 182 self._vms[name] = self._new_vm(name, *args) 183 if self.cpu is not None: 184 self._vms[name].add_args('-cpu', self.cpu) 185 if self.machine is not None: 186 self._vms[name].set_machine(self.machine) 187 return self._vms[name] 188 189 def set_vm_arg(self, arg, value): 190 """ 191 Set an argument to list of extra arguments to be given to the QEMU 192 binary. If the argument already exists then its value is replaced. 193 194 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 195 :type arg: str 196 :param value: the argument value, such as "host" in "-cpu host" 197 :type value: str 198 """ 199 if not arg or not value: 200 return 201 if arg not in self.vm.args: 202 self.vm.args.extend([arg, value]) 203 else: 204 idx = self.vm.args.index(arg) + 1 205 if idx < len(self.vm.args): 206 self.vm.args[idx] = value 207 else: 208 self.vm.args.append(value) 209 210 def tearDown(self): 211 for vm in self._vms.values(): 212 vm.shutdown() 213 logging.getLogger('console').removeHandler(self._console_log_fh) 214 super().tearDown() 215