1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import subprocess 17import pycotap 18import sys 19import unittest 20import uuid 21 22from qemu.machine import QEMUMachine 23from qemu.utils import kvm_available, tcg_available 24 25from .asset import Asset 26from .cmd import run_cmd 27from .config import BUILD_DIR 28 29 30class QemuBaseTest(unittest.TestCase): 31 32 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 33 arch = None 34 35 workdir = None 36 log = None 37 logdir = None 38 39 def setUp(self, bin_prefix): 40 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 41 self.arch = self.qemu_bin.split('-')[-1] 42 43 self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, 44 self.id()) 45 os.makedirs(self.workdir, exist_ok=True) 46 47 self.logdir = self.workdir 48 self.log_filename = os.path.join(self.logdir, 'base.log') 49 self.log = logging.getLogger('qemu-test') 50 self.log.setLevel(logging.DEBUG) 51 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 52 self._log_fh.setLevel(logging.DEBUG) 53 fileFormatter = logging.Formatter( 54 '%(asctime)s - %(levelname)s: %(message)s') 55 self._log_fh.setFormatter(fileFormatter) 56 self.log.addHandler(self._log_fh) 57 58 def tearDown(self): 59 self.log.removeHandler(self._log_fh) 60 61 def main(): 62 path = os.path.basename(sys.argv[0])[:-3] 63 64 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 65 if cache is not None: 66 Asset.precache_suites(path, cache) 67 return 68 69 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 70 test_output_log = pycotap.LogMode.LogToError) 71 res = unittest.main(module = None, testRunner = tr, exit = False, 72 argv=["__dummy__", path]) 73 for (test, message) in res.result.errors + res.result.failures: 74 print('More information on ' + test.id() + ' could be found here:' 75 '\n %s' % test.log_filename, file=sys.stderr) 76 if hasattr(test, 'console_log_name'): 77 print(' %s' % test.console_log_name, file=sys.stderr) 78 sys.exit(not res.result.wasSuccessful()) 79 80 81class QemuUserTest(QemuBaseTest): 82 83 def setUp(self): 84 super().setUp('qemu-') 85 self._ldpath = [] 86 87 def add_ldpath(self, ldpath): 88 self._ldpath.append(os.path.abspath(ldpath)) 89 90 def run_cmd(self, bin_path, args=[]): 91 return subprocess.run([self.qemu_bin] 92 + ["-L %s" % ldpath for ldpath in self._ldpath] 93 + [bin_path] 94 + args, 95 text=True, capture_output=True) 96 97class QemuSystemTest(QemuBaseTest): 98 """Facilitates system emulation tests.""" 99 100 cpu = None 101 machine = None 102 _machinehelp = None 103 104 def setUp(self): 105 self._vms = {} 106 107 super().setUp('qemu-system-') 108 109 console_log = logging.getLogger('console') 110 console_log.setLevel(logging.DEBUG) 111 self.console_log_name = os.path.join(self.workdir, 'console.log') 112 self._console_log_fh = logging.FileHandler(self.console_log_name, 113 mode='w') 114 self._console_log_fh.setLevel(logging.DEBUG) 115 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 116 self._console_log_fh.setFormatter(fileFormatter) 117 console_log.addHandler(self._console_log_fh) 118 119 def set_machine(self, machinename): 120 # TODO: We should use QMP to get the list of available machines 121 if not self._machinehelp: 122 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 123 if self._machinehelp.find(machinename) < 0: 124 self.skipTest('no support for machine ' + machinename) 125 self.machine = machinename 126 127 def require_accelerator(self, accelerator): 128 """ 129 Requires an accelerator to be available for the test to continue 130 131 It takes into account the currently set qemu binary. 132 133 If the check fails, the test is canceled. If the check itself 134 for the given accelerator is not available, the test is also 135 canceled. 136 137 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 138 :type accelerator: str 139 """ 140 checker = {'tcg': tcg_available, 141 'kvm': kvm_available}.get(accelerator) 142 if checker is None: 143 self.skipTest("Don't know how to check for the presence " 144 "of accelerator %s" % accelerator) 145 if not checker(qemu_bin=self.qemu_bin): 146 self.skipTest("%s accelerator does not seem to be " 147 "available" % accelerator) 148 149 def require_netdev(self, netdevname): 150 netdevhelp = run_cmd([self.qemu_bin, 151 '-M', 'none', '-netdev', 'help'])[0]; 152 if netdevhelp.find('\n' + netdevname + '\n') < 0: 153 self.skipTest('no support for " + netdevname + " networking') 154 155 def require_device(self, devicename): 156 devhelp = run_cmd([self.qemu_bin, 157 '-M', 'none', '-device', 'help'])[0]; 158 if devhelp.find(devicename) < 0: 159 self.skipTest('no support for device ' + devicename) 160 161 def _new_vm(self, name, *args): 162 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) 163 self.log.debug('QEMUMachine "%s" created', name) 164 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 165 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 166 if args: 167 vm.add_args(*args) 168 return vm 169 170 @property 171 def vm(self): 172 return self.get_vm(name='default') 173 174 def get_vm(self, *args, name=None): 175 if not name: 176 name = str(uuid.uuid4()) 177 if self._vms.get(name) is None: 178 self._vms[name] = self._new_vm(name, *args) 179 if self.cpu is not None: 180 self._vms[name].add_args('-cpu', self.cpu) 181 if self.machine is not None: 182 self._vms[name].set_machine(self.machine) 183 return self._vms[name] 184 185 def set_vm_arg(self, arg, value): 186 """ 187 Set an argument to list of extra arguments to be given to the QEMU 188 binary. If the argument already exists then its value is replaced. 189 190 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 191 :type arg: str 192 :param value: the argument value, such as "host" in "-cpu host" 193 :type value: str 194 """ 195 if not arg or not value: 196 return 197 if arg not in self.vm.args: 198 self.vm.args.extend([arg, value]) 199 else: 200 idx = self.vm.args.index(arg) + 1 201 if idx < len(self.vm.args): 202 self.vm.args[idx] = value 203 else: 204 self.vm.args.append(value) 205 206 def tearDown(self): 207 for vm in self._vms.values(): 208 vm.shutdown() 209 logging.getLogger('console').removeHandler(self._console_log_fh) 210 super().tearDown() 211