1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import pycotap 17import sys 18import unittest 19import uuid 20 21from qemu.machine import QEMUMachine 22from qemu.utils import kvm_available, tcg_available 23 24from .asset import Asset 25from .cmd import run_cmd 26from .config import BUILD_DIR 27 28 29class QemuBaseTest(unittest.TestCase): 30 31 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 32 arch = None 33 34 workdir = None 35 log = None 36 logdir = None 37 38 def setUp(self, bin_prefix): 39 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 40 self.arch = self.qemu_bin.split('-')[-1] 41 42 self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, 43 self.id()) 44 os.makedirs(self.workdir, exist_ok=True) 45 46 self.logdir = self.workdir 47 self.log = logging.getLogger('qemu-test') 48 self.log.setLevel(logging.DEBUG) 49 self._log_fh = logging.FileHandler(os.path.join(self.logdir, 50 'base.log'), mode='w') 51 self._log_fh.setLevel(logging.DEBUG) 52 fileFormatter = logging.Formatter( 53 '%(asctime)s - %(levelname)s: %(message)s') 54 self._log_fh.setFormatter(fileFormatter) 55 self.log.addHandler(self._log_fh) 56 57 def tearDown(self): 58 self.log.removeHandler(self._log_fh) 59 60 def main(): 61 path = os.path.basename(sys.argv[0])[:-3] 62 63 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 64 if cache is not None: 65 Asset.precache_suites(path, cache) 66 return 67 68 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 69 test_output_log = pycotap.LogMode.LogToError) 70 unittest.main(module = None, testRunner = tr, argv=["__dummy__", path]) 71 72 73class QemuSystemTest(QemuBaseTest): 74 """Facilitates system emulation tests.""" 75 76 cpu = None 77 machine = None 78 _machinehelp = None 79 80 def setUp(self): 81 self._vms = {} 82 83 super().setUp('qemu-system-') 84 85 console_log = logging.getLogger('console') 86 console_log.setLevel(logging.DEBUG) 87 self._console_log_fh = logging.FileHandler(os.path.join(self.workdir, 88 'console.log'), mode='w') 89 self._console_log_fh.setLevel(logging.DEBUG) 90 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 91 self._console_log_fh.setFormatter(fileFormatter) 92 console_log.addHandler(self._console_log_fh) 93 94 def set_machine(self, machinename): 95 # TODO: We should use QMP to get the list of available machines 96 if not self._machinehelp: 97 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 98 if self._machinehelp.find(machinename) < 0: 99 self.skipTest('no support for machine ' + machinename) 100 self.machine = machinename 101 102 def require_accelerator(self, accelerator): 103 """ 104 Requires an accelerator to be available for the test to continue 105 106 It takes into account the currently set qemu binary. 107 108 If the check fails, the test is canceled. If the check itself 109 for the given accelerator is not available, the test is also 110 canceled. 111 112 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 113 :type accelerator: str 114 """ 115 checker = {'tcg': tcg_available, 116 'kvm': kvm_available}.get(accelerator) 117 if checker is None: 118 self.skipTest("Don't know how to check for the presence " 119 "of accelerator %s" % accelerator) 120 if not checker(qemu_bin=self.qemu_bin): 121 self.skipTest("%s accelerator does not seem to be " 122 "available" % accelerator) 123 124 def require_netdev(self, netdevname): 125 netdevhelp = run_cmd([self.qemu_bin, 126 '-M', 'none', '-netdev', 'help'])[0]; 127 if netdevhelp.find('\n' + netdevname + '\n') < 0: 128 self.skipTest('no support for " + netdevname + " networking') 129 130 def require_device(self, devicename): 131 devhelp = run_cmd([self.qemu_bin, 132 '-M', 'none', '-device', 'help'])[0]; 133 if devhelp.find(devicename) < 0: 134 self.skipTest('no support for device ' + devicename) 135 136 def _new_vm(self, name, *args): 137 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) 138 self.log.debug('QEMUMachine "%s" created', name) 139 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 140 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 141 if args: 142 vm.add_args(*args) 143 return vm 144 145 @property 146 def vm(self): 147 return self.get_vm(name='default') 148 149 def get_vm(self, *args, name=None): 150 if not name: 151 name = str(uuid.uuid4()) 152 if self._vms.get(name) is None: 153 self._vms[name] = self._new_vm(name, *args) 154 if self.cpu is not None: 155 self._vms[name].add_args('-cpu', self.cpu) 156 if self.machine is not None: 157 self._vms[name].set_machine(self.machine) 158 return self._vms[name] 159 160 def set_vm_arg(self, arg, value): 161 """ 162 Set an argument to list of extra arguments to be given to the QEMU 163 binary. If the argument already exists then its value is replaced. 164 165 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 166 :type arg: str 167 :param value: the argument value, such as "host" in "-cpu host" 168 :type value: str 169 """ 170 if not arg or not value: 171 return 172 if arg not in self.vm.args: 173 self.vm.args.extend([arg, value]) 174 else: 175 idx = self.vm.args.index(arg) + 1 176 if idx < len(self.vm.args): 177 self.vm.args[idx] = value 178 else: 179 self.vm.args.append(value) 180 181 def tearDown(self): 182 for vm in self._vms.values(): 183 vm.shutdown() 184 logging.getLogger('console').removeHandler(self._console_log_fh) 185 super().tearDown() 186