1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import hvf_available, kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR, dso_suffix 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 ''' 37 @params compressed: filename, Asset, or file-like object to uncompress 38 @params format: optional compression format (gzip, lzma) 39 40 Uncompresses @compressed into the scratch directory. 41 42 If @format is None, heuristics will be applied to guess the format 43 from the filename or Asset URL. @format must be non-None if @uncompressed 44 is a file-like object. 45 46 Returns the fully qualified path to the uncompressed file 47 ''' 48 def uncompress(self, compressed, format=None): 49 self.log.debug(f"Uncompress {compressed} format={format}") 50 if type(compressed) == Asset: 51 compressed.fetch() 52 53 (name, ext) = os.path.splitext(str(compressed)) 54 uncompressed = self.scratch_file(os.path.basename(name)) 55 56 uncompress(compressed, uncompressed, format) 57 58 return uncompressed 59 60 ''' 61 @params archive: filename, Asset, or file-like object to extract 62 @params format: optional archive format (tar, zip, deb, cpio) 63 @params sub_dir: optional sub-directory to extract into 64 @params member: optional member file to limit extraction to 65 66 Extracts @archive into the scratch directory, or a directory beneath 67 named by @sub_dir. All files are extracted unless @member specifies 68 a limit. 69 70 If @format is None, heuristics will be applied to guess the format 71 from the filename or Asset URL. @format must be non-None if @archive 72 is a file-like object. 73 74 If @member is non-None, returns the fully qualified path to @member 75 ''' 76 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 77 self.log.debug(f"Extract {archive} format={format}" + 78 f"sub_dir={sub_dir} member={member}") 79 if type(archive) == Asset: 80 archive.fetch() 81 if sub_dir is None: 82 archive_extract(archive, self.scratch_file(), format, member) 83 else: 84 archive_extract(archive, self.scratch_file(sub_dir), 85 format, member) 86 87 if member is not None: 88 return self.scratch_file(member) 89 return None 90 91 ''' 92 Create a temporary directory suitable for storing UNIX 93 socket paths. 94 95 Returns: a tempfile.TemporaryDirectory instance 96 ''' 97 def socket_dir(self): 98 if self.socketdir is None: 99 self.socketdir = tempfile.TemporaryDirectory( 100 prefix="qemu_func_test_sock_") 101 return self.socketdir 102 103 ''' 104 @params args list of zero or more subdirectories or file 105 106 Construct a path for accessing a data file located 107 relative to the source directory that is the root for 108 functional tests. 109 110 @args may be an empty list to reference the root dir 111 itself, may be a single element to reference a file in 112 the root directory, or may be multiple elements to 113 reference a file nested below. The path components 114 will be joined using the platform appropriate path 115 separator. 116 117 Returns: string representing a file path 118 ''' 119 def data_file(self, *args): 120 return str(Path(Path(__file__).parent.parent, *args)) 121 122 ''' 123 @params args list of zero or more subdirectories or file 124 125 Construct a path for accessing a data file located 126 relative to the build directory root. 127 128 @args may be an empty list to reference the build dir 129 itself, may be a single element to reference a file in 130 the build directory, or may be multiple elements to 131 reference a file nested below. The path components 132 will be joined using the platform appropriate path 133 separator. 134 135 Returns: string representing a file path 136 ''' 137 def build_file(self, *args): 138 return str(Path(BUILD_DIR, *args)) 139 140 ''' 141 @params args list of zero or more subdirectories or file 142 143 Construct a path for accessing/creating a scratch file 144 located relative to a temporary directory dedicated to 145 this test case. The directory and its contents will be 146 purged upon completion of the test. 147 148 @args may be an empty list to reference the scratch dir 149 itself, may be a single element to reference a file in 150 the scratch directory, or may be multiple elements to 151 reference a file nested below. The path components 152 will be joined using the platform appropriate path 153 separator. 154 155 Returns: string representing a file path 156 ''' 157 def scratch_file(self, *args): 158 return str(Path(self.workdir, *args)) 159 160 ''' 161 @params args list of zero or more subdirectories or file 162 163 Construct a path for accessing/creating a log file 164 located relative to a temporary directory dedicated to 165 this test case. The directory and its log files will be 166 preserved upon completion of the test. 167 168 @args may be an empty list to reference the log dir 169 itself, may be a single element to reference a file in 170 the log directory, or may be multiple elements to 171 reference a file nested below. The path components 172 will be joined using the platform appropriate path 173 separator. 174 175 Returns: string representing a file path 176 ''' 177 def log_file(self, *args): 178 return str(Path(self.outputdir, *args)) 179 180 ''' 181 @params plugin name 182 183 Return the full path to the plugin taking into account any host OS 184 specific suffixes. 185 ''' 186 def plugin_file(self, plugin_name): 187 sfx = dso_suffix() 188 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 189 190 def assets_available(self): 191 for name, asset in vars(self.__class__).items(): 192 if name.startswith("ASSET_") and type(asset) == Asset: 193 if not asset.available(): 194 self.log.debug(f"Asset {asset.url} not available") 195 return False 196 return True 197 198 def setUp(self): 199 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 200 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 201 self.arch = self.qemu_bin.split('-')[-1] 202 self.socketdir = None 203 204 self.outputdir = self.build_file('tests', 'functional', 205 self.arch, self.id()) 206 self.workdir = os.path.join(self.outputdir, 'scratch') 207 os.makedirs(self.workdir, exist_ok=True) 208 209 self.log_filename = self.log_file('base.log') 210 self.log = logging.getLogger('qemu-test') 211 self.log.setLevel(logging.DEBUG) 212 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 213 self._log_fh.setLevel(logging.DEBUG) 214 fileFormatter = logging.Formatter( 215 '%(asctime)s - %(levelname)s: %(message)s') 216 self._log_fh.setFormatter(fileFormatter) 217 self.log.addHandler(self._log_fh) 218 219 # Capture QEMUMachine logging 220 self.machinelog = logging.getLogger('qemu.machine') 221 self.machinelog.setLevel(logging.DEBUG) 222 self.machinelog.addHandler(self._log_fh) 223 224 if not self.assets_available(): 225 self.skipTest('One or more assets is not available') 226 227 def tearDown(self): 228 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 229 shutil.rmtree(self.workdir) 230 if self.socketdir is not None: 231 shutil.rmtree(self.socketdir.name) 232 self.socketdir = None 233 self.machinelog.removeHandler(self._log_fh) 234 self.log.removeHandler(self._log_fh) 235 self._log_fh.close() 236 237 def main(): 238 path = os.path.basename(sys.argv[0])[:-3] 239 240 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 241 if cache is not None: 242 Asset.precache_suites(path, cache) 243 return 244 245 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 246 test_output_log = pycotap.LogMode.LogToError) 247 res = unittest.main(module = None, testRunner = tr, exit = False, 248 argv=["__dummy__", path]) 249 for (test, message) in res.result.errors + res.result.failures: 250 251 if hasattr(test, "log_filename"): 252 print('More information on ' + test.id() + ' could be found here:' 253 '\n %s' % test.log_filename, file=sys.stderr) 254 if hasattr(test, 'console_log_name'): 255 print(' %s' % test.console_log_name, file=sys.stderr) 256 sys.exit(not res.result.wasSuccessful()) 257 258 259class QemuUserTest(QemuBaseTest): 260 261 def setUp(self): 262 super().setUp() 263 self._ldpath = [] 264 265 def add_ldpath(self, ldpath): 266 self._ldpath.append(os.path.abspath(ldpath)) 267 268 def run_cmd(self, bin_path, args=[]): 269 return run([self.qemu_bin] 270 + ["-L %s" % ldpath for ldpath in self._ldpath] 271 + [bin_path] 272 + args, 273 text=True, capture_output=True) 274 275class QemuSystemTest(QemuBaseTest): 276 """Facilitates system emulation tests.""" 277 278 cpu = None 279 machine = None 280 _machinehelp = None 281 282 def setUp(self): 283 self._vms = {} 284 285 super().setUp() 286 287 console_log = logging.getLogger('console') 288 console_log.setLevel(logging.DEBUG) 289 self.console_log_name = self.log_file('console.log') 290 self._console_log_fh = logging.FileHandler(self.console_log_name, 291 mode='w') 292 self._console_log_fh.setLevel(logging.DEBUG) 293 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 294 self._console_log_fh.setFormatter(fileFormatter) 295 console_log.addHandler(self._console_log_fh) 296 297 def set_machine(self, machinename): 298 # TODO: We should use QMP to get the list of available machines 299 if not self._machinehelp: 300 self._machinehelp = run( 301 [self.qemu_bin, '-M', 'help'], 302 capture_output=True, check=True, encoding='utf8').stdout 303 if self._machinehelp.find(machinename) < 0: 304 self.skipTest('no support for machine ' + machinename) 305 self.machine = machinename 306 307 def require_accelerator(self, accelerator): 308 """ 309 Requires an accelerator to be available for the test to continue 310 311 It takes into account the currently set qemu binary. 312 313 If the check fails, the test is canceled. If the check itself 314 for the given accelerator is not available, the test is also 315 canceled. 316 317 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 318 :type accelerator: str 319 """ 320 checker = {'tcg': tcg_available, 321 'kvm': kvm_available, 322 'hvf': hvf_available, 323 }.get(accelerator) 324 if checker is None: 325 self.skipTest("Don't know how to check for the presence " 326 "of accelerator %s" % accelerator) 327 if not checker(qemu_bin=self.qemu_bin): 328 self.skipTest("%s accelerator does not seem to be " 329 "available" % accelerator) 330 331 def require_netdev(self, netdevname): 332 help = run([self.qemu_bin, 333 '-M', 'none', '-netdev', 'help'], 334 capture_output=True, check=True, encoding='utf8').stdout; 335 if help.find('\n' + netdevname + '\n') < 0: 336 self.skipTest('no support for " + netdevname + " networking') 337 338 def require_device(self, devicename): 339 help = run([self.qemu_bin, 340 '-M', 'none', '-device', 'help'], 341 capture_output=True, check=True, encoding='utf8').stdout; 342 if help.find(devicename) < 0: 343 self.skipTest('no support for device ' + devicename) 344 345 def _new_vm(self, name, *args): 346 vm = QEMUMachine(self.qemu_bin, 347 name=name, 348 base_temp_dir=self.workdir, 349 log_dir=self.log_file()) 350 self.log.debug('QEMUMachine "%s" created', name) 351 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 352 353 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 354 if sockpath is not None: 355 vm.add_args("-chardev", 356 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 357 "-mon", "chardev=backdoor,mode=control") 358 359 if args: 360 vm.add_args(*args) 361 return vm 362 363 @property 364 def vm(self): 365 return self.get_vm(name='default') 366 367 def get_vm(self, *args, name=None): 368 if not name: 369 name = str(uuid.uuid4()) 370 if self._vms.get(name) is None: 371 self._vms[name] = self._new_vm(name, *args) 372 if self.cpu is not None: 373 self._vms[name].add_args('-cpu', self.cpu) 374 if self.machine is not None: 375 self._vms[name].set_machine(self.machine) 376 return self._vms[name] 377 378 def set_vm_arg(self, arg, value): 379 """ 380 Set an argument to list of extra arguments to be given to the QEMU 381 binary. If the argument already exists then its value is replaced. 382 383 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 384 :type arg: str 385 :param value: the argument value, such as "host" in "-cpu host" 386 :type value: str 387 """ 388 if not arg or not value: 389 return 390 if arg not in self.vm.args: 391 self.vm.args.extend([arg, value]) 392 else: 393 idx = self.vm.args.index(arg) + 1 394 if idx < len(self.vm.args): 395 self.vm.args[idx] = value 396 else: 397 self.vm.args.append(value) 398 399 def tearDown(self): 400 for vm in self._vms.values(): 401 vm.shutdown() 402 logging.getLogger('console').removeHandler(self._console_log_fh) 403 self._console_log_fh.close() 404 super().tearDown() 405