1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import pycotap 17import shutil 18import subprocess 19import sys 20import unittest 21import uuid 22 23from qemu.machine import QEMUMachine 24from qemu.utils import kvm_available, tcg_available 25 26from .asset import Asset 27from .cmd import run_cmd 28from .config import BUILD_DIR 29 30 31class QemuBaseTest(unittest.TestCase): 32 33 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 34 arch = None 35 36 workdir = None 37 log = None 38 logdir = None 39 40 def setUp(self, bin_prefix): 41 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 42 self.arch = self.qemu_bin.split('-')[-1] 43 44 self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional', 45 self.arch, self.id()) 46 self.workdir = os.path.join(self.outputdir, 'scratch') 47 os.makedirs(self.workdir, exist_ok=True) 48 49 self.logdir = self.outputdir 50 self.log_filename = os.path.join(self.logdir, 'base.log') 51 self.log = logging.getLogger('qemu-test') 52 self.log.setLevel(logging.DEBUG) 53 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 54 self._log_fh.setLevel(logging.DEBUG) 55 fileFormatter = logging.Formatter( 56 '%(asctime)s - %(levelname)s: %(message)s') 57 self._log_fh.setFormatter(fileFormatter) 58 self.log.addHandler(self._log_fh) 59 60 # Capture QEMUMachine logging 61 self.machinelog = logging.getLogger('qemu.machine') 62 self.machinelog.setLevel(logging.DEBUG) 63 self.machinelog.addHandler(self._log_fh) 64 65 def tearDown(self): 66 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 67 shutil.rmtree(self.workdir) 68 self.machinelog.removeHandler(self._log_fh) 69 self.log.removeHandler(self._log_fh) 70 71 def main(): 72 path = os.path.basename(sys.argv[0])[:-3] 73 74 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 75 if cache is not None: 76 Asset.precache_suites(path, cache) 77 return 78 79 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 80 test_output_log = pycotap.LogMode.LogToError) 81 res = unittest.main(module = None, testRunner = tr, exit = False, 82 argv=["__dummy__", path]) 83 for (test, message) in res.result.errors + res.result.failures: 84 85 if hasattr(test, "log_filename"): 86 print('More information on ' + test.id() + ' could be found here:' 87 '\n %s' % test.log_filename, file=sys.stderr) 88 if hasattr(test, 'console_log_name'): 89 print(' %s' % test.console_log_name, file=sys.stderr) 90 sys.exit(not res.result.wasSuccessful()) 91 92 93class QemuUserTest(QemuBaseTest): 94 95 def setUp(self): 96 super().setUp('qemu-') 97 self._ldpath = [] 98 99 def add_ldpath(self, ldpath): 100 self._ldpath.append(os.path.abspath(ldpath)) 101 102 def run_cmd(self, bin_path, args=[]): 103 return subprocess.run([self.qemu_bin] 104 + ["-L %s" % ldpath for ldpath in self._ldpath] 105 + [bin_path] 106 + args, 107 text=True, capture_output=True) 108 109class QemuSystemTest(QemuBaseTest): 110 """Facilitates system emulation tests.""" 111 112 cpu = None 113 machine = None 114 _machinehelp = None 115 116 def setUp(self): 117 self._vms = {} 118 119 super().setUp('qemu-system-') 120 121 console_log = logging.getLogger('console') 122 console_log.setLevel(logging.DEBUG) 123 self.console_log_name = os.path.join(self.logdir, 'console.log') 124 self._console_log_fh = logging.FileHandler(self.console_log_name, 125 mode='w') 126 self._console_log_fh.setLevel(logging.DEBUG) 127 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 128 self._console_log_fh.setFormatter(fileFormatter) 129 console_log.addHandler(self._console_log_fh) 130 131 def set_machine(self, machinename): 132 # TODO: We should use QMP to get the list of available machines 133 if not self._machinehelp: 134 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 135 if self._machinehelp.find(machinename) < 0: 136 self.skipTest('no support for machine ' + machinename) 137 self.machine = machinename 138 139 def require_accelerator(self, accelerator): 140 """ 141 Requires an accelerator to be available for the test to continue 142 143 It takes into account the currently set qemu binary. 144 145 If the check fails, the test is canceled. If the check itself 146 for the given accelerator is not available, the test is also 147 canceled. 148 149 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 150 :type accelerator: str 151 """ 152 checker = {'tcg': tcg_available, 153 'kvm': kvm_available}.get(accelerator) 154 if checker is None: 155 self.skipTest("Don't know how to check for the presence " 156 "of accelerator %s" % accelerator) 157 if not checker(qemu_bin=self.qemu_bin): 158 self.skipTest("%s accelerator does not seem to be " 159 "available" % accelerator) 160 161 def require_netdev(self, netdevname): 162 netdevhelp = run_cmd([self.qemu_bin, 163 '-M', 'none', '-netdev', 'help'])[0]; 164 if netdevhelp.find('\n' + netdevname + '\n') < 0: 165 self.skipTest('no support for " + netdevname + " networking') 166 167 def require_device(self, devicename): 168 devhelp = run_cmd([self.qemu_bin, 169 '-M', 'none', '-device', 'help'])[0]; 170 if devhelp.find(devicename) < 0: 171 self.skipTest('no support for device ' + devicename) 172 173 def _new_vm(self, name, *args): 174 vm = QEMUMachine(self.qemu_bin, 175 name=name, 176 base_temp_dir=self.workdir, 177 log_dir=self.logdir) 178 self.log.debug('QEMUMachine "%s" created', name) 179 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 180 181 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 182 if sockpath is not None: 183 vm.add_args("-chardev", 184 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 185 "-mon", "chardev=backdoor,mode=control") 186 187 if args: 188 vm.add_args(*args) 189 return vm 190 191 @property 192 def vm(self): 193 return self.get_vm(name='default') 194 195 def get_vm(self, *args, name=None): 196 if not name: 197 name = str(uuid.uuid4()) 198 if self._vms.get(name) is None: 199 self._vms[name] = self._new_vm(name, *args) 200 if self.cpu is not None: 201 self._vms[name].add_args('-cpu', self.cpu) 202 if self.machine is not None: 203 self._vms[name].set_machine(self.machine) 204 return self._vms[name] 205 206 def set_vm_arg(self, arg, value): 207 """ 208 Set an argument to list of extra arguments to be given to the QEMU 209 binary. If the argument already exists then its value is replaced. 210 211 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 212 :type arg: str 213 :param value: the argument value, such as "host" in "-cpu host" 214 :type value: str 215 """ 216 if not arg or not value: 217 return 218 if arg not in self.vm.args: 219 self.vm.args.extend([arg, value]) 220 else: 221 idx = self.vm.args.index(arg) + 1 222 if idx < len(self.vm.args): 223 self.vm.args[idx] = value 224 else: 225 self.vm.args.append(value) 226 227 def tearDown(self): 228 for vm in self._vms.values(): 229 vm.shutdown() 230 logging.getLogger('console').removeHandler(self._console_log_fh) 231 super().tearDown() 232