1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import subprocess 17import pycotap 18import sys 19import unittest 20import uuid 21 22from qemu.machine import QEMUMachine 23from qemu.utils import kvm_available, tcg_available 24 25from .asset import Asset 26from .cmd import run_cmd 27from .config import BUILD_DIR 28 29 30class QemuBaseTest(unittest.TestCase): 31 32 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 33 arch = None 34 35 workdir = None 36 log = None 37 logdir = None 38 39 def setUp(self, bin_prefix): 40 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 41 self.arch = self.qemu_bin.split('-')[-1] 42 43 self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, 44 self.id()) 45 os.makedirs(self.workdir, exist_ok=True) 46 47 self.logdir = self.workdir 48 self.log = logging.getLogger('qemu-test') 49 self.log.setLevel(logging.DEBUG) 50 self._log_fh = logging.FileHandler(os.path.join(self.logdir, 51 'base.log'), mode='w') 52 self._log_fh.setLevel(logging.DEBUG) 53 fileFormatter = logging.Formatter( 54 '%(asctime)s - %(levelname)s: %(message)s') 55 self._log_fh.setFormatter(fileFormatter) 56 self.log.addHandler(self._log_fh) 57 58 def tearDown(self): 59 self.log.removeHandler(self._log_fh) 60 61 def main(): 62 path = os.path.basename(sys.argv[0])[:-3] 63 64 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 65 if cache is not None: 66 Asset.precache_suites(path, cache) 67 return 68 69 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 70 test_output_log = pycotap.LogMode.LogToError) 71 unittest.main(module = None, testRunner = tr, argv=["__dummy__", path]) 72 73 74class QemuUserTest(QemuBaseTest): 75 76 def setUp(self): 77 super().setUp('qemu-') 78 self._ldpath = [] 79 80 def add_ldpath(self, ldpath): 81 self._ldpath.append(os.path.abspath(ldpath)) 82 83 def run_cmd(self, bin_path, args=[]): 84 return subprocess.run([self.qemu_bin] 85 + ["-L %s" % ldpath for ldpath in self._ldpath] 86 + [bin_path] 87 + args, 88 text=True, capture_output=True) 89 90class QemuSystemTest(QemuBaseTest): 91 """Facilitates system emulation tests.""" 92 93 cpu = None 94 machine = None 95 _machinehelp = None 96 97 def setUp(self): 98 self._vms = {} 99 100 super().setUp('qemu-system-') 101 102 console_log = logging.getLogger('console') 103 console_log.setLevel(logging.DEBUG) 104 self._console_log_fh = logging.FileHandler(os.path.join(self.workdir, 105 'console.log'), mode='w') 106 self._console_log_fh.setLevel(logging.DEBUG) 107 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 108 self._console_log_fh.setFormatter(fileFormatter) 109 console_log.addHandler(self._console_log_fh) 110 111 def set_machine(self, machinename): 112 # TODO: We should use QMP to get the list of available machines 113 if not self._machinehelp: 114 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 115 if self._machinehelp.find(machinename) < 0: 116 self.skipTest('no support for machine ' + machinename) 117 self.machine = machinename 118 119 def require_accelerator(self, accelerator): 120 """ 121 Requires an accelerator to be available for the test to continue 122 123 It takes into account the currently set qemu binary. 124 125 If the check fails, the test is canceled. If the check itself 126 for the given accelerator is not available, the test is also 127 canceled. 128 129 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 130 :type accelerator: str 131 """ 132 checker = {'tcg': tcg_available, 133 'kvm': kvm_available}.get(accelerator) 134 if checker is None: 135 self.skipTest("Don't know how to check for the presence " 136 "of accelerator %s" % accelerator) 137 if not checker(qemu_bin=self.qemu_bin): 138 self.skipTest("%s accelerator does not seem to be " 139 "available" % accelerator) 140 141 def require_netdev(self, netdevname): 142 netdevhelp = run_cmd([self.qemu_bin, 143 '-M', 'none', '-netdev', 'help'])[0]; 144 if netdevhelp.find('\n' + netdevname + '\n') < 0: 145 self.skipTest('no support for " + netdevname + " networking') 146 147 def require_device(self, devicename): 148 devhelp = run_cmd([self.qemu_bin, 149 '-M', 'none', '-device', 'help'])[0]; 150 if devhelp.find(devicename) < 0: 151 self.skipTest('no support for device ' + devicename) 152 153 def _new_vm(self, name, *args): 154 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) 155 self.log.debug('QEMUMachine "%s" created', name) 156 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 157 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 158 if args: 159 vm.add_args(*args) 160 return vm 161 162 @property 163 def vm(self): 164 return self.get_vm(name='default') 165 166 def get_vm(self, *args, name=None): 167 if not name: 168 name = str(uuid.uuid4()) 169 if self._vms.get(name) is None: 170 self._vms[name] = self._new_vm(name, *args) 171 if self.cpu is not None: 172 self._vms[name].add_args('-cpu', self.cpu) 173 if self.machine is not None: 174 self._vms[name].set_machine(self.machine) 175 return self._vms[name] 176 177 def set_vm_arg(self, arg, value): 178 """ 179 Set an argument to list of extra arguments to be given to the QEMU 180 binary. If the argument already exists then its value is replaced. 181 182 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 183 :type arg: str 184 :param value: the argument value, such as "host" in "-cpu host" 185 :type value: str 186 """ 187 if not arg or not value: 188 return 189 if arg not in self.vm.args: 190 self.vm.args.extend([arg, value]) 191 else: 192 idx = self.vm.args.index(arg) + 1 193 if idx < len(self.vm.args): 194 self.vm.args[idx] = value 195 else: 196 self.vm.args.append(value) 197 198 def tearDown(self): 199 for vm in self._vms.values(): 200 vm.shutdown() 201 logging.getLogger('console').removeHandler(self._console_log_fh) 202 super().tearDown() 203