1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import shutil 18from subprocess import run 19import sys 20import tempfile 21import warnings 22import unittest 23import uuid 24 25import pycotap 26 27from qemu.machine import QEMUMachine 28from qemu.utils import hvf_available, kvm_available, tcg_available 29 30from .archive import archive_extract 31from .asset import Asset 32from .config import BUILD_DIR, dso_suffix 33from .uncompress import uncompress 34 35 36class QemuBaseTest(unittest.TestCase): 37 38 def uncompress(self, compressed, format=None): 39 ''' 40 @params compressed: filename, Asset, or file-like object to uncompress 41 @params format: optional compression format (gzip, lzma) 42 43 Uncompresses @compressed into the scratch directory. 44 45 If @format is None, heuristics will be applied to guess the 46 format from the filename or Asset URL. @format must be non-None 47 if @uncompressed is a file-like object. 48 49 Returns the fully qualified path to the uncompressed file 50 ''' 51 self.log.debug(f"Uncompress {compressed} format={format}") 52 if isinstance(compressed, Asset): 53 compressed.fetch() 54 55 (name, _ext) = os.path.splitext(str(compressed)) 56 uncompressed = self.scratch_file(os.path.basename(name)) 57 58 uncompress(compressed, uncompressed, format) 59 60 return uncompressed 61 62 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 63 ''' 64 @params archive: filename, Asset, or file-like object to extract 65 @params format: optional archive format (tar, zip, deb, cpio) 66 @params sub_dir: optional sub-directory to extract into 67 @params member: optional member file to limit extraction to 68 69 Extracts @archive into the scratch directory, or a directory beneath 70 named by @sub_dir. All files are extracted unless @member specifies 71 a limit. 72 73 If @format is None, heuristics will be applied to guess the 74 format from the filename or Asset URL. @format must be non-None 75 if @archive is a file-like object. 76 77 If @member is non-None, returns the fully qualified path to @member 78 ''' 79 self.log.debug(f"Extract {archive} format={format}" + 80 f"sub_dir={sub_dir} member={member}") 81 if isinstance(archive, Asset): 82 archive.fetch() 83 if sub_dir is None: 84 archive_extract(archive, self.scratch_file(), format, member) 85 else: 86 archive_extract(archive, self.scratch_file(sub_dir), 87 format, member) 88 89 if member is not None: 90 return self.scratch_file(member) 91 return None 92 93 def socket_dir(self): 94 ''' 95 Create a temporary directory suitable for storing UNIX 96 socket paths. 97 98 Returns: a tempfile.TemporaryDirectory instance 99 ''' 100 if self.socketdir is None: 101 self.socketdir = tempfile.TemporaryDirectory( 102 prefix="qemu_func_test_sock_") 103 return self.socketdir 104 105 def data_file(self, *args): 106 ''' 107 @params args list of zero or more subdirectories or file 108 109 Construct a path for accessing a data file located 110 relative to the source directory that is the root for 111 functional tests. 112 113 @args may be an empty list to reference the root dir 114 itself, may be a single element to reference a file in 115 the root directory, or may be multiple elements to 116 reference a file nested below. The path components 117 will be joined using the platform appropriate path 118 separator. 119 120 Returns: string representing a file path 121 ''' 122 return str(Path(Path(__file__).parent.parent, *args)) 123 124 def build_file(self, *args): 125 ''' 126 @params args list of zero or more subdirectories or file 127 128 Construct a path for accessing a data file located 129 relative to the build directory root. 130 131 @args may be an empty list to reference the build dir 132 itself, may be a single element to reference a file in 133 the build directory, or may be multiple elements to 134 reference a file nested below. The path components 135 will be joined using the platform appropriate path 136 separator. 137 138 Returns: string representing a file path 139 ''' 140 return str(Path(BUILD_DIR, *args)) 141 142 def scratch_file(self, *args): 143 ''' 144 @params args list of zero or more subdirectories or file 145 146 Construct a path for accessing/creating a scratch file 147 located relative to a temporary directory dedicated to 148 this test case. The directory and its contents will be 149 purged upon completion of the test. 150 151 @args may be an empty list to reference the scratch dir 152 itself, may be a single element to reference a file in 153 the scratch directory, or may be multiple elements to 154 reference a file nested below. The path components 155 will be joined using the platform appropriate path 156 separator. 157 158 Returns: string representing a file path 159 ''' 160 return str(Path(self.workdir, *args)) 161 162 def log_file(self, *args): 163 ''' 164 @params args list of zero or more subdirectories or file 165 166 Construct a path for accessing/creating a log file 167 located relative to a temporary directory dedicated to 168 this test case. The directory and its log files will be 169 preserved upon completion of the test. 170 171 @args may be an empty list to reference the log dir 172 itself, may be a single element to reference a file in 173 the log directory, or may be multiple elements to 174 reference a file nested below. The path components 175 will be joined using the platform appropriate path 176 separator. 177 178 Returns: string representing a file path 179 ''' 180 return str(Path(self.outputdir, *args)) 181 182 def plugin_file(self, plugin_name): 183 ''' 184 @params plugin name 185 186 Return the full path to the plugin taking into account any host OS 187 specific suffixes. 188 ''' 189 sfx = dso_suffix() 190 return os.path.join('tests', 'tcg', 'plugins', f'{plugin_name}.{sfx}') 191 192 def assets_available(self): 193 for name, asset in vars(self.__class__).items(): 194 if name.startswith("ASSET_") and isinstance(asset, Asset): 195 if not asset.available(): 196 self.log.debug(f"Asset {asset.url} not available") 197 return False 198 return True 199 200 def setUp(self): 201 self.qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 202 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 203 self.arch = self.qemu_bin.split('-')[-1] 204 self.socketdir = None 205 206 self.outputdir = self.build_file('tests', 'functional', 207 self.arch, self.id()) 208 self.workdir = os.path.join(self.outputdir, 'scratch') 209 if os.path.exists(self.workdir): 210 # Purge as safety net in case of unclean termination of 211 # previous test, or use of QEMU_TEST_KEEP_SCRATCH 212 shutil.rmtree(self.workdir) 213 os.makedirs(self.workdir, exist_ok=True) 214 215 self.log_filename = self.log_file('base.log') 216 self.log = logging.getLogger('qemu-test') 217 self.log.setLevel(logging.DEBUG) 218 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 219 self._log_fh.setLevel(logging.DEBUG) 220 file_formatter = logging.Formatter( 221 '%(asctime)s - %(levelname)s: %(name)s.%(funcName)s %(message)s') 222 self._log_fh.setFormatter(file_formatter) 223 self.log.addHandler(self._log_fh) 224 225 # Capture QEMUMachine logging 226 self.machinelog = logging.getLogger('qemu.machine') 227 self.machinelog.setLevel(logging.DEBUG) 228 self.machinelog.addHandler(self._log_fh) 229 self.qmplog = logging.getLogger('qemu.qmp') 230 self.qmplog.setLevel(logging.DEBUG) 231 self.qmplog.addHandler(self._log_fh) 232 233 if not self.assets_available(): 234 self.skipTest('One or more assets is not available') 235 236 def tearDown(self): 237 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 238 shutil.rmtree(self.workdir) 239 if self.socketdir is not None: 240 self.socketdir.cleanup() 241 self.socketdir = None 242 self.qmplog.removeHandler(self._log_fh) 243 self.machinelog.removeHandler(self._log_fh) 244 self.log.removeHandler(self._log_fh) 245 self._log_fh.close() 246 247 @staticmethod 248 def main(): 249 warnings.simplefilter("default") 250 os.environ["PYTHONWARNINGS"] = "default" 251 252 path = os.path.basename(sys.argv[0])[:-3] 253 254 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 255 if cache is not None: 256 Asset.precache_suites(path, cache) 257 return 258 259 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 260 test_output_log = pycotap.LogMode.LogToError) 261 res = unittest.main(module = None, testRunner = tr, exit = False, 262 argv=[sys.argv[0], path] + sys.argv[1:]) 263 failed = {} 264 for (test, _message) in res.result.errors + res.result.failures: 265 if hasattr(test, "log_filename") and not test.id() in failed: 266 print('More information on ' + test.id() + ' could be found here:' 267 '\n %s' % test.log_filename, file=sys.stderr) 268 if hasattr(test, 'console_log_name'): 269 print(' %s' % test.console_log_name, file=sys.stderr) 270 failed[test.id()] = True 271 sys.exit(not res.result.wasSuccessful()) 272 273 274class QemuUserTest(QemuBaseTest): 275 276 def setUp(self): 277 super().setUp() 278 self._ldpath = [] 279 280 def add_ldpath(self, ldpath): 281 self._ldpath.append(os.path.abspath(ldpath)) 282 283 def run_cmd(self, bin_path, args=None): 284 if args is None: 285 args = [] 286 return run([self.qemu_bin] 287 + ["-L %s" % ldpath for ldpath in self._ldpath] 288 + [bin_path] 289 + args, 290 text=True, capture_output=True) 291 292class QemuSystemTest(QemuBaseTest): 293 """Facilitates system emulation tests.""" 294 295 cpu = None 296 machine = None 297 _machinehelp = None 298 299 def setUp(self): 300 self._vms = {} 301 302 super().setUp() 303 304 console_log = logging.getLogger('console') 305 console_log.setLevel(logging.DEBUG) 306 self.console_log_name = self.log_file('console.log') 307 self._console_log_fh = logging.FileHandler(self.console_log_name, 308 mode='w') 309 self._console_log_fh.setLevel(logging.DEBUG) 310 file_formatter = logging.Formatter('%(asctime)s: %(message)s') 311 self._console_log_fh.setFormatter(file_formatter) 312 console_log.addHandler(self._console_log_fh) 313 314 def set_machine(self, machinename): 315 # TODO: We should use QMP to get the list of available machines 316 if not self._machinehelp: 317 self._machinehelp = run( 318 [self.qemu_bin, '-M', 'help'], 319 capture_output=True, check=True, encoding='utf8').stdout 320 if self._machinehelp.find(machinename) < 0: 321 self.skipTest('no support for machine ' + machinename) 322 self.machine = machinename 323 324 def require_accelerator(self, accelerator): 325 """ 326 Requires an accelerator to be available for the test to continue 327 328 It takes into account the currently set qemu binary. 329 330 If the check fails, the test is canceled. If the check itself 331 for the given accelerator is not available, the test is also 332 canceled. 333 334 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 335 :type accelerator: str 336 """ 337 checker = {'tcg': tcg_available, 338 'kvm': kvm_available, 339 'hvf': hvf_available, 340 }.get(accelerator) 341 if checker is None: 342 self.skipTest("Don't know how to check for the presence " 343 "of accelerator %s" % accelerator) 344 if not checker(qemu_bin=self.qemu_bin): 345 self.skipTest("%s accelerator does not seem to be " 346 "available" % accelerator) 347 348 def require_netdev(self, netdevname): 349 helptxt = run([self.qemu_bin, '-M', 'none', '-netdev', 'help'], 350 capture_output=True, check=True, encoding='utf8').stdout 351 if helptxt.find('\n' + netdevname + '\n') < 0: 352 self.skipTest('no support for " + netdevname + " networking') 353 354 def require_device(self, devicename): 355 helptxt = run([self.qemu_bin, '-M', 'none', '-device', 'help'], 356 capture_output=True, check=True, encoding='utf8').stdout 357 if helptxt.find(devicename) < 0: 358 self.skipTest('no support for device ' + devicename) 359 360 def _new_vm(self, name, *args): 361 vm = QEMUMachine(self.qemu_bin, 362 name=name, 363 base_temp_dir=self.workdir, 364 log_dir=self.log_file()) 365 self.log.debug('QEMUMachine "%s" created', name) 366 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 367 368 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 369 if sockpath is not None: 370 vm.add_args("-chardev", 371 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 372 "-mon", "chardev=backdoor,mode=control") 373 374 if args: 375 vm.add_args(*args) 376 return vm 377 378 @property 379 def vm(self): 380 return self.get_vm(name='default') 381 382 def get_vm(self, *args, name=None): 383 if not name: 384 name = str(uuid.uuid4()) 385 if self._vms.get(name) is None: 386 self._vms[name] = self._new_vm(name, *args) 387 if self.cpu is not None: 388 self._vms[name].add_args('-cpu', self.cpu) 389 if self.machine is not None: 390 self._vms[name].set_machine(self.machine) 391 return self._vms[name] 392 393 def set_vm_arg(self, arg, value): 394 """ 395 Set an argument to list of extra arguments to be given to the QEMU 396 binary. If the argument already exists then its value is replaced. 397 398 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 399 :type arg: str 400 :param value: the argument value, such as "host" in "-cpu host" 401 :type value: str 402 """ 403 if not arg or not value: 404 return 405 if arg not in self.vm.args: 406 self.vm.args.extend([arg, value]) 407 else: 408 idx = self.vm.args.index(arg) + 1 409 if idx < len(self.vm.args): 410 self.vm.args[idx] = value 411 else: 412 self.vm.args.append(value) 413 414 def tearDown(self): 415 for vm in self._vms.values(): 416 try: 417 vm.shutdown() 418 except Exception as ex: 419 self.log.error("Failed to teardown VM: %s", ex) 420 logging.getLogger('console').removeHandler(self._console_log_fh) 421 self._console_log_fh.close() 422 super().tearDown() 423