1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16from pathlib import Path 17import pycotap 18import shutil 19from subprocess import run 20import sys 21import tempfile 22import unittest 23import uuid 24 25from qemu.machine import QEMUMachine 26from qemu.utils import kvm_available, tcg_available 27 28from .archive import archive_extract 29from .asset import Asset 30from .config import BUILD_DIR 31from .uncompress import uncompress 32 33 34class QemuBaseTest(unittest.TestCase): 35 36 qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY') 37 arch = None 38 39 workdir = None 40 log = None 41 logdir = None 42 43 ''' 44 @params compressed: filename, Asset, or file-like object to uncompress 45 @params format: optional compression format (gzip, lzma) 46 47 Uncompresses @compressed into the scratch directory. 48 49 If @format is None, heuristics will be applied to guess the format 50 from the filename or Asset URL. @format must be non-None if @uncompressed 51 is a file-like object. 52 53 Returns the fully qualified path to the uncompressed file 54 ''' 55 def uncompress(self, compressed, format=None): 56 self.log.debug(f"Uncompress {compressed} format={format}") 57 if type(compressed) == Asset: 58 compressed.fetch() 59 60 (name, ext) = os.path.splitext(str(compressed)) 61 uncompressed = self.scratch_file(os.path.basename(name)) 62 63 uncompress(compressed, uncompressed, format) 64 65 return uncompressed 66 67 ''' 68 @params archive: filename, Asset, or file-like object to extract 69 @params format: optional archive format (tar, zip, deb, cpio) 70 @params sub_dir: optional sub-directory to extract into 71 @params member: optional member file to limit extraction to 72 73 Extracts @archive into the scratch directory, or a directory beneath 74 named by @sub_dir. All files are extracted unless @member specifies 75 a limit. 76 77 If @format is None, heuristics will be applied to guess the format 78 from the filename or Asset URL. @format must be non-None if @archive 79 is a file-like object. 80 81 If @member is non-None, returns the fully qualified path to @member 82 ''' 83 def archive_extract(self, archive, format=None, sub_dir=None, member=None): 84 self.log.debug(f"Extract {archive} format={format}" + 85 f"sub_dir={sub_dir} member={member}") 86 if type(archive) == Asset: 87 archive.fetch() 88 if sub_dir is None: 89 archive_extract(archive, self.scratch_file(), format, member) 90 else: 91 archive_extract(archive, self.scratch_file(sub_dir), 92 format, member) 93 94 if member is not None: 95 return self.scratch_file(member) 96 return None 97 98 ''' 99 Create a temporary directory suitable for storing UNIX 100 socket paths. 101 102 Returns: a tempfile.TemporaryDirectory instance 103 ''' 104 def socket_dir(self): 105 if self.socketdir is None: 106 self.socketdir = tempfile.TemporaryDirectory( 107 prefix="qemu_func_test_sock_") 108 return self.socketdir 109 110 ''' 111 @params args list of zero or more subdirectories or file 112 113 Construct a path for accessing a data file located 114 relative to the source directory that is the root for 115 functional tests. 116 117 @args may be an empty list to reference the root dir 118 itself, may be a single element to reference a file in 119 the root directory, or may be multiple elements to 120 reference a file nested below. The path components 121 will be joined using the platform appropriate path 122 separator. 123 124 Returns: string representing a file path 125 ''' 126 def data_file(self, *args): 127 return str(Path(Path(__file__).parent.parent, *args)) 128 129 ''' 130 @params args list of zero or more subdirectories or file 131 132 Construct a path for accessing a data file located 133 relative to the build directory root. 134 135 @args may be an empty list to reference the build dir 136 itself, may be a single element to reference a file in 137 the build directory, or may be multiple elements to 138 reference a file nested below. The path components 139 will be joined using the platform appropriate path 140 separator. 141 142 Returns: string representing a file path 143 ''' 144 def build_file(self, *args): 145 return str(Path(BUILD_DIR, *args)) 146 147 ''' 148 @params args list of zero or more subdirectories or file 149 150 Construct a path for accessing/creating a scratch file 151 located relative to a temporary directory dedicated to 152 this test case. The directory and its contents will be 153 purged upon completion of the test. 154 155 @args may be an empty list to reference the scratch dir 156 itself, may be a single element to reference a file in 157 the scratch directory, or may be multiple elements to 158 reference a file nested below. The path components 159 will be joined using the platform appropriate path 160 separator. 161 162 Returns: string representing a file path 163 ''' 164 def scratch_file(self, *args): 165 return str(Path(self.workdir, *args)) 166 167 ''' 168 @params args list of zero or more subdirectories or file 169 170 Construct a path for accessing/creating a log file 171 located relative to a temporary directory dedicated to 172 this test case. The directory and its log files will be 173 preserved upon completion of the test. 174 175 @args may be an empty list to reference the log dir 176 itself, may be a single element to reference a file in 177 the log directory, or may be multiple elements to 178 reference a file nested below. The path components 179 will be joined using the platform appropriate path 180 separator. 181 182 Returns: string representing a file path 183 ''' 184 def log_file(self, *args): 185 return str(Path(self.outputdir, *args)) 186 187 def setUp(self, bin_prefix): 188 self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set') 189 self.arch = self.qemu_bin.split('-')[-1] 190 self.socketdir = None 191 192 self.outputdir = self.build_file('tests', 'functional', 193 self.arch, self.id()) 194 self.workdir = os.path.join(self.outputdir, 'scratch') 195 os.makedirs(self.workdir, exist_ok=True) 196 197 self.log_filename = self.log_file('base.log') 198 self.log = logging.getLogger('qemu-test') 199 self.log.setLevel(logging.DEBUG) 200 self._log_fh = logging.FileHandler(self.log_filename, mode='w') 201 self._log_fh.setLevel(logging.DEBUG) 202 fileFormatter = logging.Formatter( 203 '%(asctime)s - %(levelname)s: %(message)s') 204 self._log_fh.setFormatter(fileFormatter) 205 self.log.addHandler(self._log_fh) 206 207 # Capture QEMUMachine logging 208 self.machinelog = logging.getLogger('qemu.machine') 209 self.machinelog.setLevel(logging.DEBUG) 210 self.machinelog.addHandler(self._log_fh) 211 212 def tearDown(self): 213 if "QEMU_TEST_KEEP_SCRATCH" not in os.environ: 214 shutil.rmtree(self.workdir) 215 if self.socketdir is not None: 216 shutil.rmtree(self.socketdir.name) 217 self.socketdir = None 218 self.machinelog.removeHandler(self._log_fh) 219 self.log.removeHandler(self._log_fh) 220 221 def main(): 222 path = os.path.basename(sys.argv[0])[:-3] 223 224 cache = os.environ.get("QEMU_TEST_PRECACHE", None) 225 if cache is not None: 226 Asset.precache_suites(path, cache) 227 return 228 229 tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError, 230 test_output_log = pycotap.LogMode.LogToError) 231 res = unittest.main(module = None, testRunner = tr, exit = False, 232 argv=["__dummy__", path]) 233 for (test, message) in res.result.errors + res.result.failures: 234 235 if hasattr(test, "log_filename"): 236 print('More information on ' + test.id() + ' could be found here:' 237 '\n %s' % test.log_filename, file=sys.stderr) 238 if hasattr(test, 'console_log_name'): 239 print(' %s' % test.console_log_name, file=sys.stderr) 240 sys.exit(not res.result.wasSuccessful()) 241 242 243class QemuUserTest(QemuBaseTest): 244 245 def setUp(self): 246 super().setUp('qemu-') 247 self._ldpath = [] 248 249 def add_ldpath(self, ldpath): 250 self._ldpath.append(os.path.abspath(ldpath)) 251 252 def run_cmd(self, bin_path, args=[]): 253 return run([self.qemu_bin] 254 + ["-L %s" % ldpath for ldpath in self._ldpath] 255 + [bin_path] 256 + args, 257 text=True, capture_output=True) 258 259class QemuSystemTest(QemuBaseTest): 260 """Facilitates system emulation tests.""" 261 262 cpu = None 263 machine = None 264 _machinehelp = None 265 266 def setUp(self): 267 self._vms = {} 268 269 super().setUp('qemu-system-') 270 271 console_log = logging.getLogger('console') 272 console_log.setLevel(logging.DEBUG) 273 self.console_log_name = self.log_file('console.log') 274 self._console_log_fh = logging.FileHandler(self.console_log_name, 275 mode='w') 276 self._console_log_fh.setLevel(logging.DEBUG) 277 fileFormatter = logging.Formatter('%(asctime)s: %(message)s') 278 self._console_log_fh.setFormatter(fileFormatter) 279 console_log.addHandler(self._console_log_fh) 280 281 def set_machine(self, machinename): 282 # TODO: We should use QMP to get the list of available machines 283 if not self._machinehelp: 284 self._machinehelp = run( 285 [self.qemu_bin, '-M', 'help'], 286 capture_output=True, check=True, encoding='utf8').stdout 287 if self._machinehelp.find(machinename) < 0: 288 self.skipTest('no support for machine ' + machinename) 289 self.machine = machinename 290 291 def require_accelerator(self, accelerator): 292 """ 293 Requires an accelerator to be available for the test to continue 294 295 It takes into account the currently set qemu binary. 296 297 If the check fails, the test is canceled. If the check itself 298 for the given accelerator is not available, the test is also 299 canceled. 300 301 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 302 :type accelerator: str 303 """ 304 checker = {'tcg': tcg_available, 305 'kvm': kvm_available}.get(accelerator) 306 if checker is None: 307 self.skipTest("Don't know how to check for the presence " 308 "of accelerator %s" % accelerator) 309 if not checker(qemu_bin=self.qemu_bin): 310 self.skipTest("%s accelerator does not seem to be " 311 "available" % accelerator) 312 313 def require_netdev(self, netdevname): 314 help = run([self.qemu_bin, 315 '-M', 'none', '-netdev', 'help'], 316 capture_output=True, check=True, encoding='utf8').stdout; 317 if help.find('\n' + netdevname + '\n') < 0: 318 self.skipTest('no support for " + netdevname + " networking') 319 320 def require_device(self, devicename): 321 help = run([self.qemu_bin, 322 '-M', 'none', '-device', 'help'], 323 capture_output=True, check=True, encoding='utf8').stdout; 324 if help.find(devicename) < 0: 325 self.skipTest('no support for device ' + devicename) 326 327 def _new_vm(self, name, *args): 328 vm = QEMUMachine(self.qemu_bin, 329 name=name, 330 base_temp_dir=self.workdir, 331 log_dir=self.log_file()) 332 self.log.debug('QEMUMachine "%s" created', name) 333 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 334 335 sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None) 336 if sockpath is not None: 337 vm.add_args("-chardev", 338 f"socket,id=backdoor,path={sockpath},server=on,wait=off", 339 "-mon", "chardev=backdoor,mode=control") 340 341 if args: 342 vm.add_args(*args) 343 return vm 344 345 @property 346 def vm(self): 347 return self.get_vm(name='default') 348 349 def get_vm(self, *args, name=None): 350 if not name: 351 name = str(uuid.uuid4()) 352 if self._vms.get(name) is None: 353 self._vms[name] = self._new_vm(name, *args) 354 if self.cpu is not None: 355 self._vms[name].add_args('-cpu', self.cpu) 356 if self.machine is not None: 357 self._vms[name].set_machine(self.machine) 358 return self._vms[name] 359 360 def set_vm_arg(self, arg, value): 361 """ 362 Set an argument to list of extra arguments to be given to the QEMU 363 binary. If the argument already exists then its value is replaced. 364 365 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 366 :type arg: str 367 :param value: the argument value, such as "host" in "-cpu host" 368 :type value: str 369 """ 370 if not arg or not value: 371 return 372 if arg not in self.vm.args: 373 self.vm.args.extend([arg, value]) 374 else: 375 idx = self.vm.args.index(arg) + 1 376 if idx < len(self.vm.args): 377 self.vm.args[idx] = value 378 else: 379 self.vm.args.append(value) 380 381 def tearDown(self): 382 for vm in self._vms.values(): 383 vm.shutdown() 384 logging.getLogger('console').removeHandler(self._console_log_fh) 385 super().tearDown() 386