1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import pycotap 17import shutil 18import subprocess 19import sys 20import unittest 21import uuid 22 23from qemu.machine import QEMUMachine 24from qemu.utils import kvm_available, tcg_available 25 26from .asset import Asset 27from .cmd import run_cmd 28from .config import BUILD_DIR 29 30 31class QemuBaseTest(unittest.TestCase): 32 33 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 34 arch = None 35 36 workdir = None 37 log = None 38 logdir = None 39 40 def setUp(self, bin_prefix): 41 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 42 self.arch = self.qemu_bin.split('-')[-1] 43 44 self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional', 45 self.arch, self.id()) 46 self.workdir = os.path.join(self.outputdir, 'scratch') 47 os.makedirs(self.workdir, exist_ok=True) 48 49 self.logdir = self.outputdir 50 self.log_filename = os.path.join(self.logdir, 'base.log') 51 self.log = logging.getLogger('qemu-test') 52 self.log.setLevel(logging.DEBUG) 53 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 54 self._log_fh.setLevel(logging.DEBUG) 55 fileFormatter = logging.Formatter( 56 '%(asctime)s - %(levelname)s: %(message)s') 57 self._log_fh.setFormatter(fileFormatter) 58 self.log.addHandler(self._log_fh) 59 60 # Capture QEMUMachine logging 61 self.machinelog = logging.getLogger('qemu.machine') 62 self.machinelog.setLevel(logging.DEBUG) 63 self.machinelog.addHandler(self._log_fh) 64 65 def tearDown(self): 66 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 67 shutil.rmtree(self.workdir) 68 self.machinelog.removeHandler(self._log_fh) 69 self.log.removeHandler(self._log_fh) 70 71 def main(): 72 path = os.path.basename(sys.argv[0])[:-3] 73 74 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 75 if cache is not None: 76 Asset.precache_suites(path, cache) 77 return 78 79 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 80 test_output_log = pycotap.LogMode.LogToError) 81 res = unittest.main(module = None, testRunner = tr, exit = False, 82 argv=["__dummy__", path]) 83 for (test, message) in res.result.errors + res.result.failures: 84 print('More information on ' + test.id() + ' could be found here:' 85 '\n %s' % test.log_filename, file=sys.stderr) 86 if hasattr(test, 'console_log_name'): 87 print(' %s' % test.console_log_name, file=sys.stderr) 88 sys.exit(not res.result.wasSuccessful()) 89 90 91class QemuUserTest(QemuBaseTest): 92 93 def setUp(self): 94 super().setUp('qemu-') 95 self._ldpath = [] 96 97 def add_ldpath(self, ldpath): 98 self._ldpath.append(os.path.abspath(ldpath)) 99 100 def run_cmd(self, bin_path, args=[]): 101 return subprocess.run([self.qemu_bin] 102 + ["-L %s" % ldpath for ldpath in self._ldpath] 103 + [bin_path] 104 + args, 105 text=True, capture_output=True) 106 107class QemuSystemTest(QemuBaseTest): 108 """Facilitates system emulation tests.""" 109 110 cpu = None 111 machine = None 112 _machinehelp = None 113 114 def setUp(self): 115 self._vms = {} 116 117 super().setUp('qemu-system-') 118 119 console_log = logging.getLogger('console') 120 console_log.setLevel(logging.DEBUG) 121 self.console_log_name = os.path.join(self.logdir, 'console.log') 122 self._console_log_fh = logging.FileHandler(self.console_log_name, 123 mode='w') 124 self._console_log_fh.setLevel(logging.DEBUG) 125 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 126 self._console_log_fh.setFormatter(fileFormatter) 127 console_log.addHandler(self._console_log_fh) 128 129 def set_machine(self, machinename): 130 # TODO: We should use QMP to get the list of available machines 131 if not self._machinehelp: 132 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 133 if self._machinehelp.find(machinename) < 0: 134 self.skipTest('no support for machine ' + machinename) 135 self.machine = machinename 136 137 def require_accelerator(self, accelerator): 138 """ 139 Requires an accelerator to be available for the test to continue 140 141 It takes into account the currently set qemu binary. 142 143 If the check fails, the test is canceled. If the check itself 144 for the given accelerator is not available, the test is also 145 canceled. 146 147 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 148 :type accelerator: str 149 """ 150 checker = {'tcg': tcg_available, 151 'kvm': kvm_available}.get(accelerator) 152 if checker is None: 153 self.skipTest("Don't know how to check for the presence " 154 "of accelerator %s" % accelerator) 155 if not checker(qemu_bin=self.qemu_bin): 156 self.skipTest("%s accelerator does not seem to be " 157 "available" % accelerator) 158 159 def require_netdev(self, netdevname): 160 netdevhelp = run_cmd([self.qemu_bin, 161 '-M', 'none', '-netdev', 'help'])[0]; 162 if netdevhelp.find('\n' + netdevname + '\n') < 0: 163 self.skipTest('no support for " + netdevname + " networking') 164 165 def require_device(self, devicename): 166 devhelp = run_cmd([self.qemu_bin, 167 '-M', 'none', '-device', 'help'])[0]; 168 if devhelp.find(devicename) < 0: 169 self.skipTest('no support for device ' + devicename) 170 171 def _new_vm(self, name, *args): 172 vm = QEMUMachine(self.qemu_bin, 173 name=name, 174 base_temp_dir=self.workdir, 175 log_dir=self.logdir) 176 self.log.debug('QEMUMachine "%s" created', name) 177 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 178 179 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 180 if sockpath is not None: 181 vm.add_args("-chardev", 182 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 183 "-mon", "chardev=backdoor,mode=control") 184 185 if args: 186 vm.add_args(*args) 187 return vm 188 189 @property 190 def vm(self): 191 return self.get_vm(name='default') 192 193 def get_vm(self, *args, name=None): 194 if not name: 195 name = str(uuid.uuid4()) 196 if self._vms.get(name) is None: 197 self._vms[name] = self._new_vm(name, *args) 198 if self.cpu is not None: 199 self._vms[name].add_args('-cpu', self.cpu) 200 if self.machine is not None: 201 self._vms[name].set_machine(self.machine) 202 return self._vms[name] 203 204 def set_vm_arg(self, arg, value): 205 """ 206 Set an argument to list of extra arguments to be given to the QEMU 207 binary. If the argument already exists then its value is replaced. 208 209 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 210 :type arg: str 211 :param value: the argument value, such as "host" in "-cpu host" 212 :type value: str 213 """ 214 if not arg or not value: 215 return 216 if arg not in self.vm.args: 217 self.vm.args.extend([arg, value]) 218 else: 219 idx = self.vm.args.index(arg) + 1 220 if idx < len(self.vm.args): 221 self.vm.args[idx] = value 222 else: 223 self.vm.args.append(value) 224 225 def tearDown(self): 226 for vm in self._vms.values(): 227 vm.shutdown() 228 logging.getLogger('console').removeHandler(self._console_log_fh) 229 super().tearDown() 230