1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19import subprocess 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .cmd import run_cmd 31from .config import BUILD_DIR 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 37 arch = None 38 39 workdir = None 40 log = None 41 logdir = None 42 43 ''' 44 @params archive: filename, Asset, or file-like object to extract 45 @params format: optional archive format (tar, zip, deb, cpio) 46 @params sub_dir: optional sub-directory to extract into 47 @params member: optional member file to limit extraction to 48 49 Extracts @archive into the scratch directory, or a directory beneath 50 named by @sub_dir. All files are extracted unless @member specifies 51 a limit. 52 53 If @format is None, heuristics will be applied to guess the format 54 from the filename or Asset URL. @format must be non-None if @archive 55 is a file-like object. 56 57 If @member is non-None, returns the fully qualified path to @member 58 ''' 59 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 60 self.log.debug(f"Extract {archive} format={format}" + 61 f"sub_dir={sub_dir} member={member}") 62 if type(archive) == Asset: 63 archive.fetch() 64 if sub_dir is None: 65 archive_extract(archive, self.scratch_file(), format, member) 66 else: 67 archive_extract(archive, self.scratch_file(sub_dir), 68 format, member) 69 70 if member is not None: 71 return self.scratch_file(member) 72 return None 73 74 ''' 75 Create a temporary directory suitable for storing UNIX 76 socket paths. 77 78 Returns: a tempfile.TemporaryDirectory instance 79 ''' 80 def socket_dir(self): 81 if self.socketdir is None: 82 self.socketdir = tempfile.TemporaryDirectory( 83 prefix="qemu_func_test_sock_") 84 return self.socketdir 85 86 ''' 87 @params args list of zero or more subdirectories or file 88 89 Construct a path for accessing a data file located 90 relative to the source directory that is the root for 91 functional tests. 92 93 @args may be an empty list to reference the root dir 94 itself, may be a single element to reference a file in 95 the root directory, or may be multiple elements to 96 reference a file nested below. The path components 97 will be joined using the platform appropriate path 98 separator. 99 100 Returns: string representing a file path 101 ''' 102 def data_file(self, *args): 103 return str(Path(Path(__file__).parent.parent, *args)) 104 105 ''' 106 @params args list of zero or more subdirectories or file 107 108 Construct a path for accessing a data file located 109 relative to the build directory root. 110 111 @args may be an empty list to reference the build dir 112 itself, may be a single element to reference a file in 113 the build directory, or may be multiple elements to 114 reference a file nested below. The path components 115 will be joined using the platform appropriate path 116 separator. 117 118 Returns: string representing a file path 119 ''' 120 def build_file(self, *args): 121 return str(Path(BUILD_DIR, *args)) 122 123 ''' 124 @params args list of zero or more subdirectories or file 125 126 Construct a path for accessing/creating a scratch file 127 located relative to a temporary directory dedicated to 128 this test case. The directory and its contents will be 129 purged upon completion of the test. 130 131 @args may be an empty list to reference the scratch dir 132 itself, may be a single element to reference a file in 133 the scratch directory, or may be multiple elements to 134 reference a file nested below. The path components 135 will be joined using the platform appropriate path 136 separator. 137 138 Returns: string representing a file path 139 ''' 140 def scratch_file(self, *args): 141 return str(Path(self.workdir, *args)) 142 143 ''' 144 @params args list of zero or more subdirectories or file 145 146 Construct a path for accessing/creating a log file 147 located relative to a temporary directory dedicated to 148 this test case. The directory and its log files will be 149 preserved upon completion of the test. 150 151 @args may be an empty list to reference the log dir 152 itself, may be a single element to reference a file in 153 the log directory, or may be multiple elements to 154 reference a file nested below. The path components 155 will be joined using the platform appropriate path 156 separator. 157 158 Returns: string representing a file path 159 ''' 160 def log_file(self, *args): 161 return str(Path(self.outputdir, *args)) 162 163 def setUp(self, bin_prefix): 164 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 165 self.arch = self.qemu_bin.split('-')[-1] 166 self.socketdir = None 167 168 self.outputdir = self.build_file('tests', 'functional', 169 self.arch, self.id()) 170 self.workdir = os.path.join(self.outputdir, 'scratch') 171 os.makedirs(self.workdir, exist_ok=True) 172 173 self.log_filename = self.log_file('base.log') 174 self.log = logging.getLogger('qemu-test') 175 self.log.setLevel(logging.DEBUG) 176 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 177 self._log_fh.setLevel(logging.DEBUG) 178 fileFormatter = logging.Formatter( 179 '%(asctime)s - %(levelname)s: %(message)s') 180 self._log_fh.setFormatter(fileFormatter) 181 self.log.addHandler(self._log_fh) 182 183 # Capture QEMUMachine logging 184 self.machinelog = logging.getLogger('qemu.machine') 185 self.machinelog.setLevel(logging.DEBUG) 186 self.machinelog.addHandler(self._log_fh) 187 188 def tearDown(self): 189 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 190 shutil.rmtree(self.workdir) 191 if self.socketdir is not None: 192 shutil.rmtree(self.socketdir.name) 193 self.socketdir = None 194 self.machinelog.removeHandler(self._log_fh) 195 self.log.removeHandler(self._log_fh) 196 197 def main(): 198 path = os.path.basename(sys.argv[0])[:-3] 199 200 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 201 if cache is not None: 202 Asset.precache_suites(path, cache) 203 return 204 205 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 206 test_output_log = pycotap.LogMode.LogToError) 207 res = unittest.main(module = None, testRunner = tr, exit = False, 208 argv=["__dummy__", path]) 209 for (test, message) in res.result.errors + res.result.failures: 210 211 if hasattr(test, "log_filename"): 212 print('More information on ' + test.id() + ' could be found here:' 213 '\n %s' % test.log_filename, file=sys.stderr) 214 if hasattr(test, 'console_log_name'): 215 print(' %s' % test.console_log_name, file=sys.stderr) 216 sys.exit(not res.result.wasSuccessful()) 217 218 219class QemuUserTest(QemuBaseTest): 220 221 def setUp(self): 222 super().setUp('qemu-') 223 self._ldpath = [] 224 225 def add_ldpath(self, ldpath): 226 self._ldpath.append(os.path.abspath(ldpath)) 227 228 def run_cmd(self, bin_path, args=[]): 229 return subprocess.run([self.qemu_bin] 230 + ["-L %s" % ldpath for ldpath in self._ldpath] 231 + [bin_path] 232 + args, 233 text=True, capture_output=True) 234 235class QemuSystemTest(QemuBaseTest): 236 """Facilitates system emulation tests.""" 237 238 cpu = None 239 machine = None 240 _machinehelp = None 241 242 def setUp(self): 243 self._vms = {} 244 245 super().setUp('qemu-system-') 246 247 console_log = logging.getLogger('console') 248 console_log.setLevel(logging.DEBUG) 249 self.console_log_name = self.log_file('console.log') 250 self._console_log_fh = logging.FileHandler(self.console_log_name, 251 mode='w') 252 self._console_log_fh.setLevel(logging.DEBUG) 253 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 254 self._console_log_fh.setFormatter(fileFormatter) 255 console_log.addHandler(self._console_log_fh) 256 257 def set_machine(self, machinename): 258 # TODO: We should use QMP to get the list of available machines 259 if not self._machinehelp: 260 self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0]; 261 if self._machinehelp.find(machinename) < 0: 262 self.skipTest('no support for machine ' + machinename) 263 self.machine = machinename 264 265 def require_accelerator(self, accelerator): 266 """ 267 Requires an accelerator to be available for the test to continue 268 269 It takes into account the currently set qemu binary. 270 271 If the check fails, the test is canceled. If the check itself 272 for the given accelerator is not available, the test is also 273 canceled. 274 275 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 276 :type accelerator: str 277 """ 278 checker = {'tcg': tcg_available, 279 'kvm': kvm_available}.get(accelerator) 280 if checker is None: 281 self.skipTest("Don't know how to check for the presence " 282 "of accelerator %s" % accelerator) 283 if not checker(qemu_bin=self.qemu_bin): 284 self.skipTest("%s accelerator does not seem to be " 285 "available" % accelerator) 286 287 def require_netdev(self, netdevname): 288 netdevhelp = run_cmd([self.qemu_bin, 289 '-M', 'none', '-netdev', 'help'])[0]; 290 if netdevhelp.find('\n' + netdevname + '\n') < 0: 291 self.skipTest('no support for " + netdevname + " networking') 292 293 def require_device(self, devicename): 294 devhelp = run_cmd([self.qemu_bin, 295 '-M', 'none', '-device', 'help'])[0]; 296 if devhelp.find(devicename) < 0: 297 self.skipTest('no support for device ' + devicename) 298 299 def _new_vm(self, name, *args): 300 vm = QEMUMachine(self.qemu_bin, 301 name=name, 302 base_temp_dir=self.workdir, 303 log_dir=self.log_file()) 304 self.log.debug('QEMUMachine "%s" created', name) 305 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 306 307 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 308 if sockpath is not None: 309 vm.add_args("-chardev", 310 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 311 "-mon", "chardev=backdoor,mode=control") 312 313 if args: 314 vm.add_args(*args) 315 return vm 316 317 @property 318 def vm(self): 319 return self.get_vm(name='default') 320 321 def get_vm(self, *args, name=None): 322 if not name: 323 name = str(uuid.uuid4()) 324 if self._vms.get(name) is None: 325 self._vms[name] = self._new_vm(name, *args) 326 if self.cpu is not None: 327 self._vms[name].add_args('-cpu', self.cpu) 328 if self.machine is not None: 329 self._vms[name].set_machine(self.machine) 330 return self._vms[name] 331 332 def set_vm_arg(self, arg, value): 333 """ 334 Set an argument to list of extra arguments to be given to the QEMU 335 binary. If the argument already exists then its value is replaced. 336 337 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 338 :type arg: str 339 :param value: the argument value, such as "host" in "-cpu host" 340 :type value: str 341 """ 342 if not arg or not value: 343 return 344 if arg not in self.vm.args: 345 self.vm.args.extend([arg, value]) 346 else: 347 idx = self.vm.args.index(arg) + 1 348 if idx < len(self.vm.args): 349 self.vm.args[idx] = value 350 else: 351 self.vm.args.append(value) 352 353 def tearDown(self): 354 for vm in self._vms.values(): 355 vm.shutdown() 356 logging.getLogger('console').removeHandler(self._console_log_fh) 357 super().tearDown() 358