1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import warnings 23import unittest 24import uuid 25 26from qemu.machine import QEMUMachine 27from qemu.utils import hvf_available, kvm_available, tcg_available 28 29from .archive import archive_extract 30from .asset import Asset 31from .config import BUILD_DIR, dso_suffix 32from .uncompress import uncompress 33 34 35class QemuBaseTest(unittest.TestCase): 36 37 ''' 38 @params compressed: filename, Asset, or file-like object to uncompress 39 @params format: optional compression format (gzip, lzma) 40 41 Uncompresses @compressed into the scratch directory. 42 43 If @format is None, heuristics will be applied to guess the format 44 from the filename or Asset URL. @format must be non-None if @uncompressed 45 is a file-like object. 46 47 Returns the fully qualified path to the uncompressed file 48 ''' 49 def uncompress(self, compressed, format=None): 50 self.log.debug(f"Uncompress {compressed} format={format}") 51 if type(compressed) == Asset: 52 compressed.fetch() 53 54 (name, ext) = os.path.splitext(str(compressed)) 55 uncompressed = self.scratch_file(os.path.basename(name)) 56 57 uncompress(compressed, uncompressed, format) 58 59 return uncompressed 60 61 ''' 62 @params archive: filename, Asset, or file-like object to extract 63 @params format: optional archive format (tar, zip, deb, cpio) 64 @params sub_dir: optional sub-directory to extract into 65 @params member: optional member file to limit extraction to 66 67 Extracts @archive into the scratch directory, or a directory beneath 68 named by @sub_dir. All files are extracted unless @member specifies 69 a limit. 70 71 If @format is None, heuristics will be applied to guess the format 72 from the filename or Asset URL. @format must be non-None if @archive 73 is a file-like object. 74 75 If @member is non-None, returns the fully qualified path to @member 76 ''' 77 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 78 self.log.debug(f"Extract {archive} format={format}" + 79 f"sub_dir={sub_dir} member={member}") 80 if type(archive) == Asset: 81 archive.fetch() 82 if sub_dir is None: 83 archive_extract(archive, self.scratch_file(), format, member) 84 else: 85 archive_extract(archive, self.scratch_file(sub_dir), 86 format, member) 87 88 if member is not None: 89 return self.scratch_file(member) 90 return None 91 92 ''' 93 Create a temporary directory suitable for storing UNIX 94 socket paths. 95 96 Returns: a tempfile.TemporaryDirectory instance 97 ''' 98 def socket_dir(self): 99 if self.socketdir is None: 100 self.socketdir = tempfile.TemporaryDirectory( 101 prefix="qemu_func_test_sock_") 102 return self.socketdir 103 104 ''' 105 @params args list of zero or more subdirectories or file 106 107 Construct a path for accessing a data file located 108 relative to the source directory that is the root for 109 functional tests. 110 111 @args may be an empty list to reference the root dir 112 itself, may be a single element to reference a file in 113 the root directory, or may be multiple elements to 114 reference a file nested below. The path components 115 will be joined using the platform appropriate path 116 separator. 117 118 Returns: string representing a file path 119 ''' 120 def data_file(self, *args): 121 return str(Path(Path(__file__).parent.parent, *args)) 122 123 ''' 124 @params args list of zero or more subdirectories or file 125 126 Construct a path for accessing a data file located 127 relative to the build directory root. 128 129 @args may be an empty list to reference the build dir 130 itself, may be a single element to reference a file in 131 the build directory, or may be multiple elements to 132 reference a file nested below. The path components 133 will be joined using the platform appropriate path 134 separator. 135 136 Returns: string representing a file path 137 ''' 138 def build_file(self, *args): 139 return str(Path(BUILD_DIR, *args)) 140 141 ''' 142 @params args list of zero or more subdirectories or file 143 144 Construct a path for accessing/creating a scratch file 145 located relative to a temporary directory dedicated to 146 this test case. The directory and its contents will be 147 purged upon completion of the test. 148 149 @args may be an empty list to reference the scratch dir 150 itself, may be a single element to reference a file in 151 the scratch directory, or may be multiple elements to 152 reference a file nested below. The path components 153 will be joined using the platform appropriate path 154 separator. 155 156 Returns: string representing a file path 157 ''' 158 def scratch_file(self, *args): 159 return str(Path(self.workdir, *args)) 160 161 ''' 162 @params args list of zero or more subdirectories or file 163 164 Construct a path for accessing/creating a log file 165 located relative to a temporary directory dedicated to 166 this test case. The directory and its log files will be 167 preserved upon completion of the test. 168 169 @args may be an empty list to reference the log dir 170 itself, may be a single element to reference a file in 171 the log directory, or may be multiple elements to 172 reference a file nested below. The path components 173 will be joined using the platform appropriate path 174 separator. 175 176 Returns: string representing a file path 177 ''' 178 def log_file(self, *args): 179 return str(Path(self.outputdir, *args)) 180 181 ''' 182 @params plugin name 183 184 Return the full path to the plugin taking into account any host OS 185 specific suffixes. 186 ''' 187 def plugin_file(self, plugin_name): 188 sfx = dso_suffix() 189 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 190 191 def assets_available(self): 192 for name, asset in vars(self.__class__).items(): 193 if name.startswith("ASSET_") and type(asset) == Asset: 194 if not asset.available(): 195 self.log.debug(f"Asset {asset.url} not available") 196 return False 197 return True 198 199 def setUp(self): 200 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 201 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 202 self.arch = self.qemu_bin.split('-')[-1] 203 self.socketdir = None 204 205 self.outputdir = self.build_file('tests', 'functional', 206 self.arch, self.id()) 207 self.workdir = os.path.join(self.outputdir, 'scratch') 208 os.makedirs(self.workdir, exist_ok=True) 209 210 self.log_filename = self.log_file('base.log') 211 self.log = logging.getLogger('qemu-test') 212 self.log.setLevel(logging.DEBUG) 213 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 214 self._log_fh.setLevel(logging.DEBUG) 215 fileFormatter = logging.Formatter( 216 '%(asctime)s - %(levelname)s: %(message)s') 217 self._log_fh.setFormatter(fileFormatter) 218 self.log.addHandler(self._log_fh) 219 220 # Capture QEMUMachine logging 221 self.machinelog = logging.getLogger('qemu.machine') 222 self.machinelog.setLevel(logging.DEBUG) 223 self.machinelog.addHandler(self._log_fh) 224 225 if not self.assets_available(): 226 self.skipTest('One or more assets is not available') 227 228 def tearDown(self): 229 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 230 shutil.rmtree(self.workdir) 231 if self.socketdir is not None: 232 shutil.rmtree(self.socketdir.name) 233 self.socketdir = None 234 self.machinelog.removeHandler(self._log_fh) 235 self.log.removeHandler(self._log_fh) 236 self._log_fh.close() 237 238 def main(): 239 warnings.simplefilter("default") 240 os.environ["PYTHONWARNINGS"] = "default" 241 242 path = os.path.basename(sys.argv[0])[:-3] 243 244 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 245 if cache is not None: 246 Asset.precache_suites(path, cache) 247 return 248 249 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 250 test_output_log = pycotap.LogMode.LogToError) 251 res = unittest.main(module = None, testRunner = tr, exit = False, 252 argv=[sys.argv[0], path] + sys.argv[1:]) 253 for (test, message) in res.result.errors + res.result.failures: 254 255 if hasattr(test, "log_filename"): 256 print('More information on ' + test.id() + ' could be found here:' 257 '\n %s' % test.log_filename, file=sys.stderr) 258 if hasattr(test, 'console_log_name'): 259 print(' %s' % test.console_log_name, file=sys.stderr) 260 sys.exit(not res.result.wasSuccessful()) 261 262 263class QemuUserTest(QemuBaseTest): 264 265 def setUp(self): 266 super().setUp() 267 self._ldpath = [] 268 269 def add_ldpath(self, ldpath): 270 self._ldpath.append(os.path.abspath(ldpath)) 271 272 def run_cmd(self, bin_path, args=[]): 273 return run([self.qemu_bin] 274 + ["-L %s" % ldpath for ldpath in self._ldpath] 275 + [bin_path] 276 + args, 277 text=True, capture_output=True) 278 279class QemuSystemTest(QemuBaseTest): 280 """Facilitates system emulation tests.""" 281 282 cpu = None 283 machine = None 284 _machinehelp = None 285 286 def setUp(self): 287 self._vms = {} 288 289 super().setUp() 290 291 console_log = logging.getLogger('console') 292 console_log.setLevel(logging.DEBUG) 293 self.console_log_name = self.log_file('console.log') 294 self._console_log_fh = logging.FileHandler(self.console_log_name, 295 mode='w') 296 self._console_log_fh.setLevel(logging.DEBUG) 297 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 298 self._console_log_fh.setFormatter(fileFormatter) 299 console_log.addHandler(self._console_log_fh) 300 301 def set_machine(self, machinename): 302 # TODO: We should use QMP to get the list of available machines 303 if not self._machinehelp: 304 self._machinehelp = run( 305 [self.qemu_bin, '-M', 'help'], 306 capture_output=True, check=True, encoding='utf8').stdout 307 if self._machinehelp.find(machinename) < 0: 308 self.skipTest('no support for machine ' + machinename) 309 self.machine = machinename 310 311 def require_accelerator(self, accelerator): 312 """ 313 Requires an accelerator to be available for the test to continue 314 315 It takes into account the currently set qemu binary. 316 317 If the check fails, the test is canceled. If the check itself 318 for the given accelerator is not available, the test is also 319 canceled. 320 321 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 322 :type accelerator: str 323 """ 324 checker = {'tcg': tcg_available, 325 'kvm': kvm_available, 326 'hvf': hvf_available, 327 }.get(accelerator) 328 if checker is None: 329 self.skipTest("Don't know how to check for the presence " 330 "of accelerator %s" % accelerator) 331 if not checker(qemu_bin=self.qemu_bin): 332 self.skipTest("%s accelerator does not seem to be " 333 "available" % accelerator) 334 335 def require_netdev(self, netdevname): 336 help = run([self.qemu_bin, 337 '-M', 'none', '-netdev', 'help'], 338 capture_output=True, check=True, encoding='utf8').stdout; 339 if help.find('\n' + netdevname + '\n') < 0: 340 self.skipTest('no support for " + netdevname + " networking') 341 342 def require_device(self, devicename): 343 help = run([self.qemu_bin, 344 '-M', 'none', '-device', 'help'], 345 capture_output=True, check=True, encoding='utf8').stdout; 346 if help.find(devicename) < 0: 347 self.skipTest('no support for device ' + devicename) 348 349 def _new_vm(self, name, *args): 350 vm = QEMUMachine(self.qemu_bin, 351 name=name, 352 base_temp_dir=self.workdir, 353 log_dir=self.log_file()) 354 self.log.debug('QEMUMachine "%s" created', name) 355 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 356 357 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 358 if sockpath is not None: 359 vm.add_args("-chardev", 360 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 361 "-mon", "chardev=backdoor,mode=control") 362 363 if args: 364 vm.add_args(*args) 365 return vm 366 367 @property 368 def vm(self): 369 return self.get_vm(name='default') 370 371 def get_vm(self, *args, name=None): 372 if not name: 373 name = str(uuid.uuid4()) 374 if self._vms.get(name) is None: 375 self._vms[name] = self._new_vm(name, *args) 376 if self.cpu is not None: 377 self._vms[name].add_args('-cpu', self.cpu) 378 if self.machine is not None: 379 self._vms[name].set_machine(self.machine) 380 return self._vms[name] 381 382 def set_vm_arg(self, arg, value): 383 """ 384 Set an argument to list of extra arguments to be given to the QEMU 385 binary. If the argument already exists then its value is replaced. 386 387 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 388 :type arg: str 389 :param value: the argument value, such as "host" in "-cpu host" 390 :type value: str 391 """ 392 if not arg or not value: 393 return 394 if arg not in self.vm.args: 395 self.vm.args.extend([arg, value]) 396 else: 397 idx = self.vm.args.index(arg) + 1 398 if idx < len(self.vm.args): 399 self.vm.args[idx] = value 400 else: 401 self.vm.args.append(value) 402 403 def tearDown(self): 404 for vm in self._vms.values(): 405 vm.shutdown() 406 logging.getLogger('console').removeHandler(self._console_log_fh) 407 self._console_log_fh.close() 408 super().tearDown() 409