1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import pycotap 17import sys 18import unittest 19import uuid 20 21from qemu.machine import QEMUMachine 22from qemu.utils import kvm_available, tcg_available 23 24from .cmd import run_cmd 25from .config import BUILD_DIR 26 27 28class QemuBaseTest(unittest.TestCase): 29 30 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 31 arch = None 32 33 workdir = None 34 log = None 35 logdir = None 36 37 def setUp(self, bin_prefix): 38 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 39 self.arch = self.qemu_bin.split('-')[-1] 40 41 self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch, 42 self.id()) 43 os.makedirs(self.workdir, exist_ok=True) 44 45 self.logdir = self.workdir 46 self.log = logging.getLogger('qemu-test') 47 self.log.setLevel(logging.DEBUG) 48 self._log_fh = logging.FileHandler(os.path.join(self.logdir, 49 'base.log'), mode='w') 50 self._log_fh.setLevel(logging.DEBUG) 51 fileFormatter = logging.Formatter( 52 '%(asctime)s - %(levelname)s: %(message)s') 53 self._log_fh.setFormatter(fileFormatter) 54 self.log.addHandler(self._log_fh) 55 56 def tearDown(self): 57 self.log.removeHandler(self._log_fh) 58 59 def main(): 60 path = os.path.basename(sys.argv[0])[:-3] 61 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 62 test_output_log = pycotap.LogMode.LogToError) 63 unittest.main(module = None, testRunner = tr, argv=["__dummy__", path]) 64 65 66class QemuSystemTest(QemuBaseTest): 67 """Facilitates system emulation tests.""" 68 69 cpu = None 70 machine = None 71 _machinehelp = None 72 73 def setUp(self): 74 self._vms = {} 75 76 super().setUp('qemu-system-') 77 78 console_log = logging.getLogger('console') 79 console_log.setLevel(logging.DEBUG) 80 self._console_log_fh = logging.FileHandler(os.path.join(self.workdir, 81 'console.log'), mode='w') 82 self._console_log_fh.setLevel(logging.DEBUG) 83 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 84 self._console_log_fh.setFormatter(fileFormatter) 85 console_log.addHandler(self._console_log_fh) 86 87 def set_machine(self, machinename): 88 # TODO: We should use QMP to get the list of available machines 89 if not self._machinehelp: 90 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 91 if self._machinehelp.find(machinename) < 0: 92 self.skipTest('no support for machine ' + machinename) 93 self.machine = machinename 94 95 def require_accelerator(self, accelerator): 96 """ 97 Requires an accelerator to be available for the test to continue 98 99 It takes into account the currently set qemu binary. 100 101 If the check fails, the test is canceled. If the check itself 102 for the given accelerator is not available, the test is also 103 canceled. 104 105 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 106 :type accelerator: str 107 """ 108 checker = {'tcg': tcg_available, 109 'kvm': kvm_available}.get(accelerator) 110 if checker is None: 111 self.skipTest("Don't know how to check for the presence " 112 "of accelerator %s" % accelerator) 113 if not checker(qemu_bin=self.qemu_bin): 114 self.skipTest("%s accelerator does not seem to be " 115 "available" % accelerator) 116 117 def require_netdev(self, netdevname): 118 netdevhelp = run_cmd([self.qemu_bin, 119 '-M', 'none', '-netdev', 'help'])[0]; 120 if netdevhelp.find('\n' + netdevname + '\n') < 0: 121 self.skipTest('no support for " + netdevname + " networking') 122 123 def require_device(self, devicename): 124 devhelp = run_cmd([self.qemu_bin, 125 '-M', 'none', '-device', 'help'])[0]; 126 if devhelp.find(devicename) < 0: 127 self.skipTest('no support for device ' + devicename) 128 129 def _new_vm(self, name, *args): 130 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir) 131 self.log.debug('QEMUMachine "%s" created', name) 132 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 133 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 134 if args: 135 vm.add_args(*args) 136 return vm 137 138 @property 139 def vm(self): 140 return self.get_vm(name='default') 141 142 def get_vm(self, *args, name=None): 143 if not name: 144 name = str(uuid.uuid4()) 145 if self._vms.get(name) is None: 146 self._vms[name] = self._new_vm(name, *args) 147 if self.cpu is not None: 148 self._vms[name].add_args('-cpu', self.cpu) 149 if self.machine is not None: 150 self._vms[name].set_machine(self.machine) 151 return self._vms[name] 152 153 def set_vm_arg(self, arg, value): 154 """ 155 Set an argument to list of extra arguments to be given to the QEMU 156 binary. If the argument already exists then its value is replaced. 157 158 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 159 :type arg: str 160 :param value: the argument value, such as "host" in "-cpu host" 161 :type value: str 162 """ 163 if not arg or not value: 164 return 165 if arg not in self.vm.args: 166 self.vm.args.extend([arg, value]) 167 else: 168 idx = self.vm.args.index(arg) + 1 169 if idx < len(self.vm.args): 170 self.vm.args[idx] = value 171 else: 172 self.vm.args.append(value) 173 174 def tearDown(self): 175 for vm in self._vms.values(): 176 vm.shutdown() 177 logging.getLogger('console').removeHandler(self._console_log_fh) 178 super().tearDown() 179