1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76gdb_qemu_env = os.environ.get('GDB_OPTIONS') 77qemu_gdb = [] 78if gdb_qemu_env: 79 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 80 81qemu_print = os.environ.get('PRINT_QEMU', False) 82 83imgfmt = os.environ.get('IMGFMT', 'raw') 84imgproto = os.environ.get('IMGPROTO', 'file') 85output_dir = os.environ.get('OUTPUT_DIR', '.') 86 87try: 88 test_dir = os.environ['TEST_DIR'] 89 sock_dir = os.environ['SOCK_DIR'] 90 cachemode = os.environ['CACHEMODE'] 91 aiomode = os.environ['AIOMODE'] 92 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 93except KeyError: 94 # We are using these variables as proxies to indicate that we're 95 # not being run via "check". There may be other things set up by 96 # "check" that individual test cases rely on. 97 sys.stderr.write('Please run this test via the "check" script\n') 98 sys.exit(os.EX_USAGE) 99 100qemu_valgrind = [] 101if os.environ.get('VALGRIND_QEMU') == "y" and \ 102 os.environ.get('NO_VALGRIND') != "y": 103 valgrind_logfile = "--log-file=" + test_dir 104 # %p allows to put the valgrind process PID, since 105 # we don't know it a priori (subprocess.Popen is 106 # not yet invoked) 107 valgrind_logfile += "/%p.valgrind" 108 109 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 110 111luks_default_secret_object = 'secret,id=keysec0,data=' + \ 112 os.environ.get('IMGKEYSECRET', '') 113luks_default_key_secret_opt = 'key-secret=keysec0' 114 115sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 116 117 118@contextmanager 119def change_log_level( 120 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 121 """ 122 Utility function for temporarily changing the log level of a logger. 123 124 This can be used to silence errors that are expected or uninteresting. 125 """ 126 _logger = logging.getLogger(logger_name) 127 current_level = _logger.level 128 _logger.setLevel(level) 129 130 try: 131 yield 132 finally: 133 _logger.setLevel(current_level) 134 135 136def unarchive_sample_image(sample, fname): 137 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 138 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 139 shutil.copyfileobj(f_in, f_out) 140 141 142def qemu_tool_popen(args: Sequence[str], 143 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 144 stderr = subprocess.STDOUT if connect_stderr else None 145 # pylint: disable=consider-using-with 146 return subprocess.Popen(args, 147 stdout=subprocess.PIPE, 148 stderr=stderr, 149 universal_newlines=True) 150 151 152def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 153 connect_stderr: bool = True, 154 drop_successful_output: bool = False) \ 155 -> Tuple[str, int]: 156 """ 157 Run a tool and return both its output and its exit code 158 """ 159 with qemu_tool_popen(args, connect_stderr) as subp: 160 output = subp.communicate()[0] 161 if subp.returncode < 0: 162 cmd = ' '.join(args) 163 sys.stderr.write(f'{tool} received signal \ 164 {-subp.returncode}: {cmd}\n') 165 if drop_successful_output and subp.returncode == 0: 166 output = '' 167 return (output, subp.returncode) 168 169def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 170 if not args or args[0] != 'create': 171 return list(args) 172 args = args[1:] 173 174 p = argparse.ArgumentParser(allow_abbrev=False) 175 # -o option may be specified several times 176 p.add_argument('-o', action='append', default=[]) 177 p.add_argument('-f') 178 parsed, remaining = p.parse_known_args(args) 179 180 opts_list = parsed.o 181 182 result = ['create'] 183 if parsed.f is not None: 184 result += ['-f', parsed.f] 185 186 # IMGOPTS most probably contain options specific for the selected format, 187 # like extended_l2 or compression_type for qcow2. Test may want to create 188 # additional images in other formats that doesn't support these options. 189 # So, use IMGOPTS only for images created in imgfmt format. 190 imgopts = os.environ.get('IMGOPTS') 191 if imgopts and parsed.f == imgfmt: 192 opts_list.insert(0, imgopts) 193 194 # default luks support 195 if parsed.f == 'luks' and \ 196 all('key-secret' not in opts for opts in opts_list): 197 result += ['--object', luks_default_secret_object] 198 opts_list.append(luks_default_key_secret_opt) 199 200 for opts in opts_list: 201 result += ['-o', opts] 202 203 result += remaining 204 205 return result 206 207def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 208 """ 209 Run qemu-img and return both its output and its exit code 210 """ 211 is_create = bool(args and args[0] == 'create') 212 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 213 return qemu_tool_pipe_and_status('qemu-img', full_args, 214 drop_successful_output=is_create) 215 216def qemu_img(*args: str) -> int: 217 '''Run qemu-img and return the exit code''' 218 return qemu_img_pipe_and_status(*args)[1] 219 220def ordered_qmp(qmsg, conv_keys=True): 221 # Dictionaries are not ordered prior to 3.6, therefore: 222 if isinstance(qmsg, list): 223 return [ordered_qmp(atom) for atom in qmsg] 224 if isinstance(qmsg, dict): 225 od = OrderedDict() 226 for k, v in sorted(qmsg.items()): 227 if conv_keys: 228 k = k.replace('_', '-') 229 od[k] = ordered_qmp(v, conv_keys=False) 230 return od 231 return qmsg 232 233def qemu_img_create(*args): 234 return qemu_img('create', *args) 235 236def qemu_img_measure(*args): 237 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 238 239def qemu_img_check(*args): 240 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 241 242def qemu_img_pipe(*args: str) -> str: 243 '''Run qemu-img and return its output''' 244 return qemu_img_pipe_and_status(*args)[0] 245 246def qemu_img_log(*args): 247 result = qemu_img_pipe(*args) 248 log(result, filters=[filter_testfiles]) 249 return result 250 251def img_info_log(filename, filter_path=None, use_image_opts=False, 252 extra_args=()): 253 args = ['info'] 254 if use_image_opts: 255 args.append('--image-opts') 256 else: 257 args += ['-f', imgfmt] 258 args += extra_args 259 args.append(filename) 260 261 output = qemu_img_pipe(*args) 262 if not filter_path: 263 filter_path = filename 264 log(filter_img_info(output, filter_path)) 265 266def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 267 if '-f' in args or '--image-opts' in args: 268 return qemu_io_args_no_fmt + list(args) 269 else: 270 return qemu_io_args + list(args) 271 272def qemu_io_popen(*args): 273 return qemu_tool_popen(qemu_io_wrap_args(args)) 274 275def qemu_io(*args): 276 '''Run qemu-io and return the stdout data''' 277 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 278 279def qemu_io_log(*args): 280 result = qemu_io(*args) 281 log(result, filters=[filter_testfiles, filter_qemu_io]) 282 return result 283 284def qemu_io_silent(*args): 285 '''Run qemu-io and return the exit code, suppressing stdout''' 286 args = qemu_io_wrap_args(args) 287 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 288 if result.returncode < 0: 289 sys.stderr.write('qemu-io received signal %i: %s\n' % 290 (-result.returncode, ' '.join(args))) 291 return result.returncode 292 293def qemu_io_silent_check(*args): 294 '''Run qemu-io and return the true if subprocess returned 0''' 295 args = qemu_io_wrap_args(args) 296 result = subprocess.run(args, stdout=subprocess.DEVNULL, 297 stderr=subprocess.STDOUT, check=False) 298 return result.returncode == 0 299 300class QemuIoInteractive: 301 def __init__(self, *args): 302 self.args = qemu_io_wrap_args(args) 303 # We need to keep the Popen objext around, and not 304 # close it immediately. Therefore, disable the pylint check: 305 # pylint: disable=consider-using-with 306 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 307 stdout=subprocess.PIPE, 308 stderr=subprocess.STDOUT, 309 universal_newlines=True) 310 out = self._p.stdout.read(9) 311 if out != 'qemu-io> ': 312 # Most probably qemu-io just failed to start. 313 # Let's collect the whole output and exit. 314 out += self._p.stdout.read() 315 self._p.wait(timeout=1) 316 raise ValueError(out) 317 318 def close(self): 319 self._p.communicate('q\n') 320 321 def _read_output(self): 322 pattern = 'qemu-io> ' 323 n = len(pattern) 324 pos = 0 325 s = [] 326 while pos != n: 327 c = self._p.stdout.read(1) 328 # check unexpected EOF 329 assert c != '' 330 s.append(c) 331 if c == pattern[pos]: 332 pos += 1 333 else: 334 pos = 0 335 336 return ''.join(s[:-n]) 337 338 def cmd(self, cmd): 339 # quit command is in close(), '\n' is added automatically 340 assert '\n' not in cmd 341 cmd = cmd.strip() 342 assert cmd not in ('q', 'quit') 343 self._p.stdin.write(cmd + '\n') 344 self._p.stdin.flush() 345 return self._read_output() 346 347 348def qemu_nbd(*args): 349 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 350 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 351 352def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 353 '''Run qemu-nbd in daemon mode and return both the parent's exit code 354 and its output in case of an error''' 355 full_args = qemu_nbd_args + ['--fork'] + list(args) 356 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 357 connect_stderr=False) 358 return returncode, output if returncode else '' 359 360def qemu_nbd_list_log(*args: str) -> str: 361 '''Run qemu-nbd to list remote exports''' 362 full_args = [qemu_nbd_prog, '-L'] + list(args) 363 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 364 log(output, filters=[filter_testfiles, filter_nbd_exports]) 365 return output 366 367@contextmanager 368def qemu_nbd_popen(*args): 369 '''Context manager running qemu-nbd within the context''' 370 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 371 372 assert not os.path.exists(pid_file) 373 374 cmd = list(qemu_nbd_args) 375 cmd.extend(('--persistent', '--pid-file', pid_file)) 376 cmd.extend(args) 377 378 log('Start NBD server') 379 with subprocess.Popen(cmd) as p: 380 try: 381 while not os.path.exists(pid_file): 382 if p.poll() is not None: 383 raise RuntimeError( 384 "qemu-nbd terminated with exit code {}: {}" 385 .format(p.returncode, ' '.join(cmd))) 386 387 time.sleep(0.01) 388 yield 389 finally: 390 if os.path.exists(pid_file): 391 os.remove(pid_file) 392 log('Kill NBD server') 393 p.kill() 394 p.wait() 395 396def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 397 '''Return True if two image files are identical''' 398 return qemu_img('compare', '-f', fmt1, 399 '-F', fmt2, img1, img2) == 0 400 401def create_image(name, size): 402 '''Create a fully-allocated raw image with sector markers''' 403 with open(name, 'wb') as file: 404 i = 0 405 while i < size: 406 sector = struct.pack('>l504xl', i // 512, i // 512) 407 file.write(sector) 408 i = i + 512 409 410def image_size(img): 411 '''Return image's virtual size''' 412 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 413 return json.loads(r)['virtual-size'] 414 415def is_str(val): 416 return isinstance(val, str) 417 418test_dir_re = re.compile(r"%s" % test_dir) 419def filter_test_dir(msg): 420 return test_dir_re.sub("TEST_DIR", msg) 421 422win32_re = re.compile(r"\r") 423def filter_win32(msg): 424 return win32_re.sub("", msg) 425 426qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 427 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 428 r"and [0-9\/.inf]* ops\/sec\)") 429def filter_qemu_io(msg): 430 msg = filter_win32(msg) 431 return qemu_io_re.sub("X ops; XX:XX:XX.X " 432 "(XXX YYY/sec and XXX ops/sec)", msg) 433 434chown_re = re.compile(r"chown [0-9]+:[0-9]+") 435def filter_chown(msg): 436 return chown_re.sub("chown UID:GID", msg) 437 438def filter_qmp_event(event): 439 '''Filter a QMP event dict''' 440 event = dict(event) 441 if 'timestamp' in event: 442 event['timestamp']['seconds'] = 'SECS' 443 event['timestamp']['microseconds'] = 'USECS' 444 return event 445 446def filter_qmp(qmsg, filter_fn): 447 '''Given a string filter, filter a QMP object's values. 448 filter_fn takes a (key, value) pair.''' 449 # Iterate through either lists or dicts; 450 if isinstance(qmsg, list): 451 items = enumerate(qmsg) 452 else: 453 items = qmsg.items() 454 455 for k, v in items: 456 if isinstance(v, (dict, list)): 457 qmsg[k] = filter_qmp(v, filter_fn) 458 else: 459 qmsg[k] = filter_fn(k, v) 460 return qmsg 461 462def filter_testfiles(msg): 463 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 464 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 465 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 466 467def filter_qmp_testfiles(qmsg): 468 def _filter(_key, value): 469 if is_str(value): 470 return filter_testfiles(value) 471 return value 472 return filter_qmp(qmsg, _filter) 473 474def filter_virtio_scsi(output: str) -> str: 475 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 476 477def filter_qmp_virtio_scsi(qmsg): 478 def _filter(_key, value): 479 if is_str(value): 480 return filter_virtio_scsi(value) 481 return value 482 return filter_qmp(qmsg, _filter) 483 484def filter_generated_node_ids(msg): 485 return re.sub("#block[0-9]+", "NODE_NAME", msg) 486 487def filter_img_info(output, filename): 488 lines = [] 489 for line in output.split('\n'): 490 if 'disk size' in line or 'actual-size' in line: 491 continue 492 line = line.replace(filename, 'TEST_IMG') 493 line = filter_testfiles(line) 494 line = line.replace(imgfmt, 'IMGFMT') 495 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 496 line = re.sub('uuid: [-a-f0-9]+', 497 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 498 line) 499 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 500 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 501 line) 502 lines.append(line) 503 return '\n'.join(lines) 504 505def filter_imgfmt(msg): 506 return msg.replace(imgfmt, 'IMGFMT') 507 508def filter_qmp_imgfmt(qmsg): 509 def _filter(_key, value): 510 if is_str(value): 511 return filter_imgfmt(value) 512 return value 513 return filter_qmp(qmsg, _filter) 514 515def filter_nbd_exports(output: str) -> str: 516 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 517 518 519Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 520 521def log(msg: Msg, 522 filters: Iterable[Callable[[Msg], Msg]] = (), 523 indent: Optional[int] = None) -> None: 524 """ 525 Logs either a string message or a JSON serializable message (like QMP). 526 If indent is provided, JSON serializable messages are pretty-printed. 527 """ 528 for flt in filters: 529 msg = flt(msg) 530 if isinstance(msg, (dict, list)): 531 # Don't sort if it's already sorted 532 do_sort = not isinstance(msg, OrderedDict) 533 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 534 else: 535 test_logger.info(msg) 536 537class Timeout: 538 def __init__(self, seconds, errmsg="Timeout"): 539 self.seconds = seconds 540 self.errmsg = errmsg 541 def __enter__(self): 542 if qemu_gdb or qemu_valgrind: 543 return self 544 signal.signal(signal.SIGALRM, self.timeout) 545 signal.setitimer(signal.ITIMER_REAL, self.seconds) 546 return self 547 def __exit__(self, exc_type, value, traceback): 548 if qemu_gdb or qemu_valgrind: 549 return False 550 signal.setitimer(signal.ITIMER_REAL, 0) 551 return False 552 def timeout(self, signum, frame): 553 raise Exception(self.errmsg) 554 555def file_pattern(name): 556 return "{0}-{1}".format(os.getpid(), name) 557 558class FilePath: 559 """ 560 Context manager generating multiple file names. The generated files are 561 removed when exiting the context. 562 563 Example usage: 564 565 with FilePath('a.img', 'b.img') as (img_a, img_b): 566 # Use img_a and img_b here... 567 568 # a.img and b.img are automatically removed here. 569 570 By default images are created in iotests.test_dir. To create sockets use 571 iotests.sock_dir: 572 573 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 574 575 For convenience, calling with one argument yields a single file instead of 576 a tuple with one item. 577 578 """ 579 def __init__(self, *names, base_dir=test_dir): 580 self.paths = [os.path.join(base_dir, file_pattern(name)) 581 for name in names] 582 583 def __enter__(self): 584 if len(self.paths) == 1: 585 return self.paths[0] 586 else: 587 return self.paths 588 589 def __exit__(self, exc_type, exc_val, exc_tb): 590 for path in self.paths: 591 try: 592 os.remove(path) 593 except OSError: 594 pass 595 return False 596 597 598def try_remove(img): 599 try: 600 os.remove(img) 601 except OSError: 602 pass 603 604def file_path_remover(): 605 for path in reversed(file_path_remover.paths): 606 try_remove(path) 607 608 609def file_path(*names, base_dir=test_dir): 610 ''' Another way to get auto-generated filename that cleans itself up. 611 612 Use is as simple as: 613 614 img_a, img_b = file_path('a.img', 'b.img') 615 sock = file_path('socket') 616 ''' 617 618 if not hasattr(file_path_remover, 'paths'): 619 file_path_remover.paths = [] 620 atexit.register(file_path_remover) 621 622 paths = [] 623 for name in names: 624 filename = file_pattern(name) 625 path = os.path.join(base_dir, filename) 626 file_path_remover.paths.append(path) 627 paths.append(path) 628 629 return paths[0] if len(paths) == 1 else paths 630 631def remote_filename(path): 632 if imgproto == 'file': 633 return path 634 elif imgproto == 'ssh': 635 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 636 else: 637 raise Exception("Protocol %s not supported" % (imgproto)) 638 639class VM(qtest.QEMUQtestMachine): 640 '''A QEMU VM''' 641 642 def __init__(self, path_suffix=''): 643 name = "qemu%s-%d" % (path_suffix, os.getpid()) 644 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 645 if qemu_gdb and qemu_valgrind: 646 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 647 sys.exit(1) 648 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 649 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 650 name=name, 651 base_temp_dir=test_dir, 652 sock_dir=sock_dir, qmp_timer=timer) 653 self._num_drives = 0 654 655 def _post_shutdown(self) -> None: 656 super()._post_shutdown() 657 if not qemu_valgrind or not self._popen: 658 return 659 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 660 if self.exitcode() == 99: 661 with open(valgrind_filename, encoding='utf-8') as f: 662 print(f.read()) 663 else: 664 os.remove(valgrind_filename) 665 666 def _pre_launch(self) -> None: 667 super()._pre_launch() 668 if qemu_print: 669 # set QEMU binary output to stdout 670 self._close_qemu_log_file() 671 672 def add_object(self, opts): 673 self._args.append('-object') 674 self._args.append(opts) 675 return self 676 677 def add_device(self, opts): 678 self._args.append('-device') 679 self._args.append(opts) 680 return self 681 682 def add_drive_raw(self, opts): 683 self._args.append('-drive') 684 self._args.append(opts) 685 return self 686 687 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 688 '''Add a virtio-blk drive to the VM''' 689 options = ['if=%s' % interface, 690 'id=drive%d' % self._num_drives] 691 692 if path is not None: 693 options.append('file=%s' % path) 694 options.append('format=%s' % img_format) 695 options.append('cache=%s' % cachemode) 696 options.append('aio=%s' % aiomode) 697 698 if opts: 699 options.append(opts) 700 701 if img_format == 'luks' and 'key-secret' not in opts: 702 # default luks support 703 if luks_default_secret_object not in self._args: 704 self.add_object(luks_default_secret_object) 705 706 options.append(luks_default_key_secret_opt) 707 708 self._args.append('-drive') 709 self._args.append(','.join(options)) 710 self._num_drives += 1 711 return self 712 713 def add_blockdev(self, opts): 714 self._args.append('-blockdev') 715 if isinstance(opts, str): 716 self._args.append(opts) 717 else: 718 self._args.append(','.join(opts)) 719 return self 720 721 def add_incoming(self, addr): 722 self._args.append('-incoming') 723 self._args.append(addr) 724 return self 725 726 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 727 cmd = 'human-monitor-command' 728 kwargs: Dict[str, Any] = {'command-line': command_line} 729 if use_log: 730 return self.qmp_log(cmd, **kwargs) 731 else: 732 return self.qmp(cmd, **kwargs) 733 734 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 735 """Pause drive r/w operations""" 736 if not event: 737 self.pause_drive(drive, "read_aio") 738 self.pause_drive(drive, "write_aio") 739 return 740 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 741 742 def resume_drive(self, drive: str) -> None: 743 """Resume drive r/w operations""" 744 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 745 746 def hmp_qemu_io(self, drive: str, cmd: str, 747 use_log: bool = False, qdev: bool = False) -> QMPMessage: 748 """Write to a given drive using an HMP command""" 749 d = '-d ' if qdev else '' 750 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 751 752 def flatten_qmp_object(self, obj, output=None, basestr=''): 753 if output is None: 754 output = {} 755 if isinstance(obj, list): 756 for i, item in enumerate(obj): 757 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 758 elif isinstance(obj, dict): 759 for key in obj: 760 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 761 else: 762 output[basestr[:-1]] = obj # Strip trailing '.' 763 return output 764 765 def qmp_to_opts(self, obj): 766 obj = self.flatten_qmp_object(obj) 767 output_list = [] 768 for key in obj: 769 output_list += [key + '=' + obj[key]] 770 return ','.join(output_list) 771 772 def get_qmp_events_filtered(self, wait=60.0): 773 result = [] 774 for ev in self.get_qmp_events(wait=wait): 775 result.append(filter_qmp_event(ev)) 776 return result 777 778 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 779 full_cmd = OrderedDict(( 780 ("execute", cmd), 781 ("arguments", ordered_qmp(kwargs)) 782 )) 783 log(full_cmd, filters, indent=indent) 784 result = self.qmp(cmd, **kwargs) 785 log(result, filters, indent=indent) 786 return result 787 788 # Returns None on success, and an error string on failure 789 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 790 pre_finalize=None, cancel=False, wait=60.0): 791 """ 792 run_job moves a job from creation through to dismissal. 793 794 :param job: String. ID of recently-launched job 795 :param auto_finalize: Bool. True if the job was launched with 796 auto_finalize. Defaults to True. 797 :param auto_dismiss: Bool. True if the job was launched with 798 auto_dismiss=True. Defaults to False. 799 :param pre_finalize: Callback. A callable that takes no arguments to be 800 invoked prior to issuing job-finalize, if any. 801 :param cancel: Bool. When true, cancels the job after the pre_finalize 802 callback. 803 :param wait: Float. Timeout value specifying how long to wait for any 804 event, in seconds. Defaults to 60.0. 805 """ 806 match_device = {'data': {'device': job}} 807 match_id = {'data': {'id': job}} 808 events = [ 809 ('BLOCK_JOB_COMPLETED', match_device), 810 ('BLOCK_JOB_CANCELLED', match_device), 811 ('BLOCK_JOB_ERROR', match_device), 812 ('BLOCK_JOB_READY', match_device), 813 ('BLOCK_JOB_PENDING', match_id), 814 ('JOB_STATUS_CHANGE', match_id) 815 ] 816 error = None 817 while True: 818 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 819 if ev['event'] != 'JOB_STATUS_CHANGE': 820 log(ev) 821 continue 822 status = ev['data']['status'] 823 if status == 'aborting': 824 result = self.qmp('query-jobs') 825 for j in result['return']: 826 if j['id'] == job: 827 error = j['error'] 828 log('Job failed: %s' % (j['error'])) 829 elif status == 'ready': 830 self.qmp_log('job-complete', id=job) 831 elif status == 'pending' and not auto_finalize: 832 if pre_finalize: 833 pre_finalize() 834 if cancel: 835 self.qmp_log('job-cancel', id=job) 836 else: 837 self.qmp_log('job-finalize', id=job) 838 elif status == 'concluded' and not auto_dismiss: 839 self.qmp_log('job-dismiss', id=job) 840 elif status == 'null': 841 return error 842 843 # Returns None on success, and an error string on failure 844 def blockdev_create(self, options, job_id='job0', filters=None): 845 if filters is None: 846 filters = [filter_qmp_testfiles] 847 result = self.qmp_log('blockdev-create', filters=filters, 848 job_id=job_id, options=options) 849 850 if 'return' in result: 851 assert result['return'] == {} 852 job_result = self.run_job(job_id) 853 else: 854 job_result = result['error'] 855 856 log("") 857 return job_result 858 859 def enable_migration_events(self, name): 860 log('Enabling migration QMP events on %s...' % name) 861 log(self.qmp('migrate-set-capabilities', capabilities=[ 862 { 863 'capability': 'events', 864 'state': True 865 } 866 ])) 867 868 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 869 while True: 870 event = self.event_wait('MIGRATION') 871 # We use the default timeout, and with a timeout, event_wait() 872 # never returns None 873 assert event 874 875 log(event, filters=[filter_qmp_event]) 876 if event['data']['status'] in ('completed', 'failed'): 877 break 878 879 if event['data']['status'] == 'completed': 880 # The event may occur in finish-migrate, so wait for the expected 881 # post-migration runstate 882 runstate = None 883 while runstate != expect_runstate: 884 runstate = self.qmp('query-status')['return']['status'] 885 return True 886 else: 887 return False 888 889 def node_info(self, node_name): 890 nodes = self.qmp('query-named-block-nodes') 891 for x in nodes['return']: 892 if x['node-name'] == node_name: 893 return x 894 return None 895 896 def query_bitmaps(self): 897 res = self.qmp("query-named-block-nodes") 898 return {device['node-name']: device['dirty-bitmaps'] 899 for device in res['return'] if 'dirty-bitmaps' in device} 900 901 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 902 """ 903 get a specific bitmap from the object returned by query_bitmaps. 904 :param recording: If specified, filter results by the specified value. 905 :param bitmaps: If specified, use it instead of call query_bitmaps() 906 """ 907 if bitmaps is None: 908 bitmaps = self.query_bitmaps() 909 910 for bitmap in bitmaps[node_name]: 911 if bitmap.get('name', '') == bitmap_name: 912 if recording is None or bitmap.get('recording') == recording: 913 return bitmap 914 return None 915 916 def check_bitmap_status(self, node_name, bitmap_name, fields): 917 ret = self.get_bitmap(node_name, bitmap_name) 918 919 return fields.items() <= ret.items() 920 921 def assert_block_path(self, root, path, expected_node, graph=None): 922 """ 923 Check whether the node under the given path in the block graph 924 is @expected_node. 925 926 @root is the node name of the node where the @path is rooted. 927 928 @path is a string that consists of child names separated by 929 slashes. It must begin with a slash. 930 931 Examples for @root + @path: 932 - root="qcow2-node", path="/backing/file" 933 - root="quorum-node", path="/children.2/file" 934 935 Hypothetically, @path could be empty, in which case it would 936 point to @root. However, in practice this case is not useful 937 and hence not allowed. 938 939 @expected_node may be None. (All elements of the path but the 940 leaf must still exist.) 941 942 @graph may be None or the result of an x-debug-query-block-graph 943 call that has already been performed. 944 """ 945 if graph is None: 946 graph = self.qmp('x-debug-query-block-graph')['return'] 947 948 iter_path = iter(path.split('/')) 949 950 # Must start with a / 951 assert next(iter_path) == '' 952 953 node = next((node for node in graph['nodes'] if node['name'] == root), 954 None) 955 956 # An empty @path is not allowed, so the root node must be present 957 assert node is not None, 'Root node %s not found' % root 958 959 for child_name in iter_path: 960 assert node is not None, 'Cannot follow path %s%s' % (root, path) 961 962 try: 963 node_id = next(edge['child'] for edge in graph['edges'] 964 if (edge['parent'] == node['id'] and 965 edge['name'] == child_name)) 966 967 node = next(node for node in graph['nodes'] 968 if node['id'] == node_id) 969 970 except StopIteration: 971 node = None 972 973 if node is None: 974 assert expected_node is None, \ 975 'No node found under %s (but expected %s)' % \ 976 (path, expected_node) 977 else: 978 assert node['name'] == expected_node, \ 979 'Found node %s under %s (but expected %s)' % \ 980 (node['name'], path, expected_node) 981 982index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 983 984class QMPTestCase(unittest.TestCase): 985 '''Abstract base class for QMP test cases''' 986 987 def __init__(self, *args, **kwargs): 988 super().__init__(*args, **kwargs) 989 # Many users of this class set a VM property we rely on heavily 990 # in the methods below. 991 self.vm = None 992 993 def dictpath(self, d, path): 994 '''Traverse a path in a nested dict''' 995 for component in path.split('/'): 996 m = index_re.match(component) 997 if m: 998 component, idx = m.groups() 999 idx = int(idx) 1000 1001 if not isinstance(d, dict) or component not in d: 1002 self.fail(f'failed path traversal for "{path}" in "{d}"') 1003 d = d[component] 1004 1005 if m: 1006 if not isinstance(d, list): 1007 self.fail(f'path component "{component}" in "{path}" ' 1008 f'is not a list in "{d}"') 1009 try: 1010 d = d[idx] 1011 except IndexError: 1012 self.fail(f'invalid index "{idx}" in path "{path}" ' 1013 f'in "{d}"') 1014 return d 1015 1016 def assert_qmp_absent(self, d, path): 1017 try: 1018 result = self.dictpath(d, path) 1019 except AssertionError: 1020 return 1021 self.fail('path "%s" has value "%s"' % (path, str(result))) 1022 1023 def assert_qmp(self, d, path, value): 1024 '''Assert that the value for a specific path in a QMP dict 1025 matches. When given a list of values, assert that any of 1026 them matches.''' 1027 1028 result = self.dictpath(d, path) 1029 1030 # [] makes no sense as a list of valid values, so treat it as 1031 # an actual single value. 1032 if isinstance(value, list) and value != []: 1033 for v in value: 1034 if result == v: 1035 return 1036 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1037 else: 1038 self.assertEqual(result, value, 1039 '"%s" is "%s", expected "%s"' 1040 % (path, str(result), str(value))) 1041 1042 def assert_no_active_block_jobs(self): 1043 result = self.vm.qmp('query-block-jobs') 1044 self.assert_qmp(result, 'return', []) 1045 1046 def assert_has_block_node(self, node_name=None, file_name=None): 1047 """Issue a query-named-block-nodes and assert node_name and/or 1048 file_name is present in the result""" 1049 def check_equal_or_none(a, b): 1050 return a is None or b is None or a == b 1051 assert node_name or file_name 1052 result = self.vm.qmp('query-named-block-nodes') 1053 for x in result["return"]: 1054 if check_equal_or_none(x.get("node-name"), node_name) and \ 1055 check_equal_or_none(x.get("file"), file_name): 1056 return 1057 self.fail("Cannot find %s %s in result:\n%s" % 1058 (node_name, file_name, result)) 1059 1060 def assert_json_filename_equal(self, json_filename, reference): 1061 '''Asserts that the given filename is a json: filename and that its 1062 content is equal to the given reference object''' 1063 self.assertEqual(json_filename[:5], 'json:') 1064 self.assertEqual( 1065 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1066 self.vm.flatten_qmp_object(reference) 1067 ) 1068 1069 def cancel_and_wait(self, drive='drive0', force=False, 1070 resume=False, wait=60.0): 1071 '''Cancel a block job and wait for it to finish, returning the event''' 1072 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1073 self.assert_qmp(result, 'return', {}) 1074 1075 if resume: 1076 self.vm.resume_drive(drive) 1077 1078 cancelled = False 1079 result = None 1080 while not cancelled: 1081 for event in self.vm.get_qmp_events(wait=wait): 1082 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1083 event['event'] == 'BLOCK_JOB_CANCELLED': 1084 self.assert_qmp(event, 'data/device', drive) 1085 result = event 1086 cancelled = True 1087 elif event['event'] == 'JOB_STATUS_CHANGE': 1088 self.assert_qmp(event, 'data/id', drive) 1089 1090 1091 self.assert_no_active_block_jobs() 1092 return result 1093 1094 def wait_until_completed(self, drive='drive0', check_offset=True, 1095 wait=60.0, error=None): 1096 '''Wait for a block job to finish, returning the event''' 1097 while True: 1098 for event in self.vm.get_qmp_events(wait=wait): 1099 if event['event'] == 'BLOCK_JOB_COMPLETED': 1100 self.assert_qmp(event, 'data/device', drive) 1101 if error is None: 1102 self.assert_qmp_absent(event, 'data/error') 1103 if check_offset: 1104 self.assert_qmp(event, 'data/offset', 1105 event['data']['len']) 1106 else: 1107 self.assert_qmp(event, 'data/error', error) 1108 self.assert_no_active_block_jobs() 1109 return event 1110 if event['event'] == 'JOB_STATUS_CHANGE': 1111 self.assert_qmp(event, 'data/id', drive) 1112 1113 def wait_ready(self, drive='drive0'): 1114 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1115 return self.vm.events_wait([ 1116 ('BLOCK_JOB_READY', 1117 {'data': {'type': 'mirror', 'device': drive}}), 1118 ('BLOCK_JOB_READY', 1119 {'data': {'type': 'commit', 'device': drive}}) 1120 ]) 1121 1122 def wait_ready_and_cancel(self, drive='drive0'): 1123 self.wait_ready(drive=drive) 1124 event = self.cancel_and_wait(drive=drive) 1125 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1126 self.assert_qmp(event, 'data/type', 'mirror') 1127 self.assert_qmp(event, 'data/offset', event['data']['len']) 1128 1129 def complete_and_wait(self, drive='drive0', wait_ready=True, 1130 completion_error=None): 1131 '''Complete a block job and wait for it to finish''' 1132 if wait_ready: 1133 self.wait_ready(drive=drive) 1134 1135 result = self.vm.qmp('block-job-complete', device=drive) 1136 self.assert_qmp(result, 'return', {}) 1137 1138 event = self.wait_until_completed(drive=drive, error=completion_error) 1139 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1140 1141 def pause_wait(self, job_id='job0'): 1142 with Timeout(3, "Timeout waiting for job to pause"): 1143 while True: 1144 result = self.vm.qmp('query-block-jobs') 1145 found = False 1146 for job in result['return']: 1147 if job['device'] == job_id: 1148 found = True 1149 if job['paused'] and not job['busy']: 1150 return job 1151 break 1152 assert found 1153 1154 def pause_job(self, job_id='job0', wait=True): 1155 result = self.vm.qmp('block-job-pause', device=job_id) 1156 self.assert_qmp(result, 'return', {}) 1157 if wait: 1158 return self.pause_wait(job_id) 1159 return result 1160 1161 def case_skip(self, reason): 1162 '''Skip this test case''' 1163 case_notrun(reason) 1164 self.skipTest(reason) 1165 1166 1167def notrun(reason): 1168 '''Skip this test suite''' 1169 # Each test in qemu-iotests has a number ("seq") 1170 seq = os.path.basename(sys.argv[0]) 1171 1172 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1173 as outfile: 1174 outfile.write(reason + '\n') 1175 logger.warning("%s not run: %s", seq, reason) 1176 sys.exit(0) 1177 1178def case_notrun(reason): 1179 '''Mark this test case as not having been run (without actually 1180 skipping it, that is left to the caller). See 1181 QMPTestCase.case_skip() for a variant that actually skips the 1182 current test case.''' 1183 1184 # Each test in qemu-iotests has a number ("seq") 1185 seq = os.path.basename(sys.argv[0]) 1186 1187 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1188 as outfile: 1189 outfile.write(' [case not run] ' + reason + '\n') 1190 1191def _verify_image_format(supported_fmts: Sequence[str] = (), 1192 unsupported_fmts: Sequence[str] = ()) -> None: 1193 if 'generic' in supported_fmts and \ 1194 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1195 # similar to 1196 # _supported_fmt generic 1197 # for bash tests 1198 supported_fmts = () 1199 1200 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1201 if not_sup or (imgfmt in unsupported_fmts): 1202 notrun('not suitable for this image format: %s' % imgfmt) 1203 1204 if imgfmt == 'luks': 1205 verify_working_luks() 1206 1207def _verify_protocol(supported: Sequence[str] = (), 1208 unsupported: Sequence[str] = ()) -> None: 1209 assert not (supported and unsupported) 1210 1211 if 'generic' in supported: 1212 return 1213 1214 not_sup = supported and (imgproto not in supported) 1215 if not_sup or (imgproto in unsupported): 1216 notrun('not suitable for this protocol: %s' % imgproto) 1217 1218def _verify_platform(supported: Sequence[str] = (), 1219 unsupported: Sequence[str] = ()) -> None: 1220 if any((sys.platform.startswith(x) for x in unsupported)): 1221 notrun('not suitable for this OS: %s' % sys.platform) 1222 1223 if supported: 1224 if not any((sys.platform.startswith(x) for x in supported)): 1225 notrun('not suitable for this OS: %s' % sys.platform) 1226 1227def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1228 if supported_cache_modes and (cachemode not in supported_cache_modes): 1229 notrun('not suitable for this cache mode: %s' % cachemode) 1230 1231def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1232 if supported_aio_modes and (aiomode not in supported_aio_modes): 1233 notrun('not suitable for this aio mode: %s' % aiomode) 1234 1235def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1236 usf_list = list(set(required_formats) - set(supported_formats())) 1237 if usf_list: 1238 notrun(f'formats {usf_list} are not whitelisted') 1239 1240 1241def _verify_virtio_blk() -> None: 1242 out = qemu_pipe('-M', 'none', '-device', 'help') 1243 if 'virtio-blk' not in out: 1244 notrun('Missing virtio-blk in QEMU binary') 1245 1246def _verify_virtio_scsi_pci_or_ccw() -> None: 1247 out = qemu_pipe('-M', 'none', '-device', 'help') 1248 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1249 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1250 1251 1252def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1253 imgopts = os.environ.get('IMGOPTS') 1254 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1255 # but it supported only for bash tests. We don't have a concept of global 1256 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1257 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1258 unsup = list(unsupported) + ['$'] 1259 if imgopts and any(x in imgopts for x in unsup): 1260 notrun(f'not suitable for this imgopts: {imgopts}') 1261 1262 1263def supports_quorum(): 1264 return 'quorum' in qemu_img_pipe('--help') 1265 1266def verify_quorum(): 1267 '''Skip test suite if quorum support is not available''' 1268 if not supports_quorum(): 1269 notrun('quorum support missing') 1270 1271def has_working_luks() -> Tuple[bool, str]: 1272 """ 1273 Check whether our LUKS driver can actually create images 1274 (this extends to LUKS encryption for qcow2). 1275 1276 If not, return the reason why. 1277 """ 1278 1279 img_file = f'{test_dir}/luks-test.luks' 1280 (output, status) = \ 1281 qemu_img_pipe_and_status('create', '-f', 'luks', 1282 '--object', luks_default_secret_object, 1283 '-o', luks_default_key_secret_opt, 1284 '-o', 'iter-time=10', 1285 img_file, '1G') 1286 try: 1287 os.remove(img_file) 1288 except OSError: 1289 pass 1290 1291 if status != 0: 1292 reason = output 1293 for line in output.splitlines(): 1294 if img_file + ':' in line: 1295 reason = line.split(img_file + ':', 1)[1].strip() 1296 break 1297 1298 return (False, reason) 1299 else: 1300 return (True, '') 1301 1302def verify_working_luks(): 1303 """ 1304 Skip test suite if LUKS does not work 1305 """ 1306 (working, reason) = has_working_luks() 1307 if not working: 1308 notrun(reason) 1309 1310def qemu_pipe(*args: str) -> str: 1311 """ 1312 Run qemu with an option to print something and exit (e.g. a help option). 1313 1314 :return: QEMU's stdout output. 1315 """ 1316 full_args = [qemu_prog] + qemu_opts + list(args) 1317 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1318 return output 1319 1320def supported_formats(read_only=False): 1321 '''Set 'read_only' to True to check ro-whitelist 1322 Otherwise, rw-whitelist is checked''' 1323 1324 if not hasattr(supported_formats, "formats"): 1325 supported_formats.formats = {} 1326 1327 if read_only not in supported_formats.formats: 1328 format_message = qemu_pipe("-drive", "format=help") 1329 line = 1 if read_only else 0 1330 supported_formats.formats[read_only] = \ 1331 format_message.splitlines()[line].split(":")[1].split() 1332 1333 return supported_formats.formats[read_only] 1334 1335def skip_if_unsupported(required_formats=(), read_only=False): 1336 '''Skip Test Decorator 1337 Runs the test if all the required formats are whitelisted''' 1338 def skip_test_decorator(func): 1339 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1340 **kwargs: Dict[str, Any]) -> None: 1341 if callable(required_formats): 1342 fmts = required_formats(test_case) 1343 else: 1344 fmts = required_formats 1345 1346 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1347 if usf_list: 1348 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1349 test_case.case_skip(msg) 1350 else: 1351 func(test_case, *args, **kwargs) 1352 return func_wrapper 1353 return skip_test_decorator 1354 1355def skip_for_formats(formats: Sequence[str] = ()) \ 1356 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1357 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1358 '''Skip Test Decorator 1359 Skips the test for the given formats''' 1360 def skip_test_decorator(func): 1361 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1362 **kwargs: Dict[str, Any]) -> None: 1363 if imgfmt in formats: 1364 msg = f'{test_case}: Skipped for format {imgfmt}' 1365 test_case.case_skip(msg) 1366 else: 1367 func(test_case, *args, **kwargs) 1368 return func_wrapper 1369 return skip_test_decorator 1370 1371def skip_if_user_is_root(func): 1372 '''Skip Test Decorator 1373 Runs the test only without root permissions''' 1374 def func_wrapper(*args, **kwargs): 1375 if os.getuid() == 0: 1376 case_notrun('{}: cannot be run as root'.format(args[0])) 1377 return None 1378 else: 1379 return func(*args, **kwargs) 1380 return func_wrapper 1381 1382# We need to filter out the time taken from the output so that 1383# qemu-iotest can reliably diff the results against master output, 1384# and hide skipped tests from the reference output. 1385 1386class ReproducibleTestResult(unittest.TextTestResult): 1387 def addSkip(self, test, reason): 1388 # Same as TextTestResult, but print dot instead of "s" 1389 unittest.TestResult.addSkip(self, test, reason) 1390 if self.showAll: 1391 self.stream.writeln("skipped {0!r}".format(reason)) 1392 elif self.dots: 1393 self.stream.write(".") 1394 self.stream.flush() 1395 1396class ReproducibleStreamWrapper: 1397 def __init__(self, stream: TextIO): 1398 self.stream = stream 1399 1400 def __getattr__(self, attr): 1401 if attr in ('stream', '__getstate__'): 1402 raise AttributeError(attr) 1403 return getattr(self.stream, attr) 1404 1405 def write(self, arg=None): 1406 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1407 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1408 self.stream.write(arg) 1409 1410class ReproducibleTestRunner(unittest.TextTestRunner): 1411 def __init__(self, stream: Optional[TextIO] = None, 1412 resultclass: Type[unittest.TestResult] = 1413 ReproducibleTestResult, 1414 **kwargs: Any) -> None: 1415 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1416 super().__init__(stream=rstream, # type: ignore 1417 descriptions=True, 1418 resultclass=resultclass, 1419 **kwargs) 1420 1421def execute_unittest(argv: List[str], debug: bool = False) -> None: 1422 """Executes unittests within the calling module.""" 1423 1424 # Some tests have warnings, especially ResourceWarnings for unclosed 1425 # files and sockets. Ignore them for now to ensure reproducibility of 1426 # the test output. 1427 unittest.main(argv=argv, 1428 testRunner=ReproducibleTestRunner, 1429 verbosity=2 if debug else 1, 1430 warnings=None if sys.warnoptions else 'ignore') 1431 1432def execute_setup_common(supported_fmts: Sequence[str] = (), 1433 supported_platforms: Sequence[str] = (), 1434 supported_cache_modes: Sequence[str] = (), 1435 supported_aio_modes: Sequence[str] = (), 1436 unsupported_fmts: Sequence[str] = (), 1437 supported_protocols: Sequence[str] = (), 1438 unsupported_protocols: Sequence[str] = (), 1439 required_fmts: Sequence[str] = (), 1440 unsupported_imgopts: Sequence[str] = ()) -> bool: 1441 """ 1442 Perform necessary setup for either script-style or unittest-style tests. 1443 1444 :return: Bool; Whether or not debug mode has been requested via the CLI. 1445 """ 1446 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1447 1448 debug = '-d' in sys.argv 1449 if debug: 1450 sys.argv.remove('-d') 1451 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1452 1453 _verify_image_format(supported_fmts, unsupported_fmts) 1454 _verify_protocol(supported_protocols, unsupported_protocols) 1455 _verify_platform(supported=supported_platforms) 1456 _verify_cache_mode(supported_cache_modes) 1457 _verify_aio_mode(supported_aio_modes) 1458 _verify_formats(required_fmts) 1459 _verify_virtio_blk() 1460 _verify_imgopts(unsupported_imgopts) 1461 1462 return debug 1463 1464def execute_test(*args, test_function=None, **kwargs): 1465 """Run either unittest or script-style tests.""" 1466 1467 debug = execute_setup_common(*args, **kwargs) 1468 if not test_function: 1469 execute_unittest(sys.argv, debug) 1470 else: 1471 test_function() 1472 1473def activate_logging(): 1474 """Activate iotests.log() output to stdout for script-style tests.""" 1475 handler = logging.StreamHandler(stream=sys.stdout) 1476 formatter = logging.Formatter('%(message)s') 1477 handler.setFormatter(formatter) 1478 test_logger.addHandler(handler) 1479 test_logger.setLevel(logging.INFO) 1480 test_logger.propagate = False 1481 1482# This is called from script-style iotests without a single point of entry 1483def script_initialize(*args, **kwargs): 1484 """Initialize script-style tests without running any tests.""" 1485 activate_logging() 1486 execute_setup_common(*args, **kwargs) 1487 1488# This is called from script-style iotests with a single point of entry 1489def script_main(test_function, *args, **kwargs): 1490 """Run script-style tests outside of the unittest framework""" 1491 activate_logging() 1492 execute_test(*args, test_function=test_function, **kwargs) 1493 1494# This is called from unittest style iotests 1495def main(*args, **kwargs): 1496 """Run tests using the unittest framework""" 1497 execute_test(*args, **kwargs) 1498