1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39# pylint: disable=import-error, wrong-import-position 40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 41from qemu.machine import qtest 42from qemu.qmp import QMPMessage 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77gdb_qemu_env = os.environ.get('GDB_OPTIONS') 78qemu_gdb = [] 79if gdb_qemu_env: 80 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 81 82qemu_print = os.environ.get('PRINT_QEMU', False) 83 84imgfmt = os.environ.get('IMGFMT', 'raw') 85imgproto = os.environ.get('IMGPROTO', 'file') 86output_dir = os.environ.get('OUTPUT_DIR', '.') 87 88try: 89 test_dir = os.environ['TEST_DIR'] 90 sock_dir = os.environ['SOCK_DIR'] 91 cachemode = os.environ['CACHEMODE'] 92 aiomode = os.environ['AIOMODE'] 93 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 94except KeyError: 95 # We are using these variables as proxies to indicate that we're 96 # not being run via "check". There may be other things set up by 97 # "check" that individual test cases rely on. 98 sys.stderr.write('Please run this test via the "check" script\n') 99 sys.exit(os.EX_USAGE) 100 101qemu_valgrind = [] 102if os.environ.get('VALGRIND_QEMU') == "y" and \ 103 os.environ.get('NO_VALGRIND') != "y": 104 valgrind_logfile = "--log-file=" + test_dir 105 # %p allows to put the valgrind process PID, since 106 # we don't know it a priori (subprocess.Popen is 107 # not yet invoked) 108 valgrind_logfile += "/%p.valgrind" 109 110 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 111 112socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 113 114luks_default_secret_object = 'secret,id=keysec0,data=' + \ 115 os.environ.get('IMGKEYSECRET', '') 116luks_default_key_secret_opt = 'key-secret=keysec0' 117 118sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 119 120 121def unarchive_sample_image(sample, fname): 122 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 123 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 124 shutil.copyfileobj(f_in, f_out) 125 126 127def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 128 connect_stderr: bool = True) -> Tuple[str, int]: 129 """ 130 Run a tool and return both its output and its exit code 131 """ 132 stderr = subprocess.STDOUT if connect_stderr else None 133 with subprocess.Popen(args, stdout=subprocess.PIPE, 134 stderr=stderr, universal_newlines=True) as subp: 135 output = subp.communicate()[0] 136 if subp.returncode < 0: 137 cmd = ' '.join(args) 138 sys.stderr.write(f'{tool} received signal \ 139 {-subp.returncode}: {cmd}\n') 140 return (output, subp.returncode) 141 142def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 143 """ 144 Run qemu-img and return both its output and its exit code 145 """ 146 full_args = qemu_img_args + list(args) 147 return qemu_tool_pipe_and_status('qemu-img', full_args) 148 149def qemu_img(*args: str) -> int: 150 '''Run qemu-img and return the exit code''' 151 return qemu_img_pipe_and_status(*args)[1] 152 153def ordered_qmp(qmsg, conv_keys=True): 154 # Dictionaries are not ordered prior to 3.6, therefore: 155 if isinstance(qmsg, list): 156 return [ordered_qmp(atom) for atom in qmsg] 157 if isinstance(qmsg, dict): 158 od = OrderedDict() 159 for k, v in sorted(qmsg.items()): 160 if conv_keys: 161 k = k.replace('_', '-') 162 od[k] = ordered_qmp(v, conv_keys=False) 163 return od 164 return qmsg 165 166def qemu_img_create(*args): 167 args = list(args) 168 169 # default luks support 170 if '-f' in args and args[args.index('-f') + 1] == 'luks': 171 if '-o' in args: 172 i = args.index('-o') 173 if 'key-secret' not in args[i + 1]: 174 args[i + 1].append(luks_default_key_secret_opt) 175 args.insert(i + 2, '--object') 176 args.insert(i + 3, luks_default_secret_object) 177 else: 178 args = ['-o', luks_default_key_secret_opt, 179 '--object', luks_default_secret_object] + args 180 181 args.insert(0, 'create') 182 183 return qemu_img(*args) 184 185def qemu_img_measure(*args): 186 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 187 188def qemu_img_check(*args): 189 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 190 191def qemu_img_verbose(*args): 192 '''Run qemu-img without suppressing its output and return the exit code''' 193 exitcode = subprocess.call(qemu_img_args + list(args)) 194 if exitcode < 0: 195 sys.stderr.write('qemu-img received signal %i: %s\n' 196 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 197 return exitcode 198 199def qemu_img_pipe(*args: str) -> str: 200 '''Run qemu-img and return its output''' 201 return qemu_img_pipe_and_status(*args)[0] 202 203def qemu_img_log(*args): 204 result = qemu_img_pipe(*args) 205 log(result, filters=[filter_testfiles]) 206 return result 207 208def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 209 args = ['info'] 210 if imgopts: 211 args.append('--image-opts') 212 else: 213 args += ['-f', imgfmt] 214 args += extra_args 215 args.append(filename) 216 217 output = qemu_img_pipe(*args) 218 if not filter_path: 219 filter_path = filename 220 log(filter_img_info(output, filter_path)) 221 222def qemu_io(*args): 223 '''Run qemu-io and return the stdout data''' 224 args = qemu_io_args + list(args) 225 return qemu_tool_pipe_and_status('qemu-io', args)[0] 226 227def qemu_io_log(*args): 228 result = qemu_io(*args) 229 log(result, filters=[filter_testfiles, filter_qemu_io]) 230 return result 231 232def qemu_io_silent(*args): 233 '''Run qemu-io and return the exit code, suppressing stdout''' 234 if '-f' in args or '--image-opts' in args: 235 default_args = qemu_io_args_no_fmt 236 else: 237 default_args = qemu_io_args 238 239 args = default_args + list(args) 240 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 241 if exitcode < 0: 242 sys.stderr.write('qemu-io received signal %i: %s\n' % 243 (-exitcode, ' '.join(args))) 244 return exitcode 245 246def qemu_io_silent_check(*args): 247 '''Run qemu-io and return the true if subprocess returned 0''' 248 args = qemu_io_args + list(args) 249 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 250 stderr=subprocess.STDOUT) 251 return exitcode == 0 252 253class QemuIoInteractive: 254 def __init__(self, *args): 255 self.args = qemu_io_args_no_fmt + list(args) 256 # We need to keep the Popen objext around, and not 257 # close it immediately. Therefore, disable the pylint check: 258 # pylint: disable=consider-using-with 259 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 260 stdout=subprocess.PIPE, 261 stderr=subprocess.STDOUT, 262 universal_newlines=True) 263 out = self._p.stdout.read(9) 264 if out != 'qemu-io> ': 265 # Most probably qemu-io just failed to start. 266 # Let's collect the whole output and exit. 267 out += self._p.stdout.read() 268 self._p.wait(timeout=1) 269 raise ValueError(out) 270 271 def close(self): 272 self._p.communicate('q\n') 273 274 def _read_output(self): 275 pattern = 'qemu-io> ' 276 n = len(pattern) 277 pos = 0 278 s = [] 279 while pos != n: 280 c = self._p.stdout.read(1) 281 # check unexpected EOF 282 assert c != '' 283 s.append(c) 284 if c == pattern[pos]: 285 pos += 1 286 else: 287 pos = 0 288 289 return ''.join(s[:-n]) 290 291 def cmd(self, cmd): 292 # quit command is in close(), '\n' is added automatically 293 assert '\n' not in cmd 294 cmd = cmd.strip() 295 assert cmd not in ('q', 'quit') 296 self._p.stdin.write(cmd + '\n') 297 self._p.stdin.flush() 298 return self._read_output() 299 300 301def qemu_nbd(*args): 302 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 303 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 304 305def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 306 '''Run qemu-nbd in daemon mode and return both the parent's exit code 307 and its output in case of an error''' 308 full_args = qemu_nbd_args + ['--fork'] + list(args) 309 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 310 connect_stderr=False) 311 return returncode, output if returncode else '' 312 313def qemu_nbd_list_log(*args: str) -> str: 314 '''Run qemu-nbd to list remote exports''' 315 full_args = [qemu_nbd_prog, '-L'] + list(args) 316 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 317 log(output, filters=[filter_testfiles, filter_nbd_exports]) 318 return output 319 320@contextmanager 321def qemu_nbd_popen(*args): 322 '''Context manager running qemu-nbd within the context''' 323 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 324 325 assert not os.path.exists(pid_file) 326 327 cmd = list(qemu_nbd_args) 328 cmd.extend(('--persistent', '--pid-file', pid_file)) 329 cmd.extend(args) 330 331 log('Start NBD server') 332 with subprocess.Popen(cmd) as p: 333 try: 334 while not os.path.exists(pid_file): 335 if p.poll() is not None: 336 raise RuntimeError( 337 "qemu-nbd terminated with exit code {}: {}" 338 .format(p.returncode, ' '.join(cmd))) 339 340 time.sleep(0.01) 341 yield 342 finally: 343 if os.path.exists(pid_file): 344 os.remove(pid_file) 345 log('Kill NBD server') 346 p.kill() 347 p.wait() 348 349def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 350 '''Return True if two image files are identical''' 351 return qemu_img('compare', '-f', fmt1, 352 '-F', fmt2, img1, img2) == 0 353 354def create_image(name, size): 355 '''Create a fully-allocated raw image with sector markers''' 356 with open(name, 'wb') as file: 357 i = 0 358 while i < size: 359 sector = struct.pack('>l504xl', i // 512, i // 512) 360 file.write(sector) 361 i = i + 512 362 363def image_size(img): 364 '''Return image's virtual size''' 365 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 366 return json.loads(r)['virtual-size'] 367 368def is_str(val): 369 return isinstance(val, str) 370 371test_dir_re = re.compile(r"%s" % test_dir) 372def filter_test_dir(msg): 373 return test_dir_re.sub("TEST_DIR", msg) 374 375win32_re = re.compile(r"\r") 376def filter_win32(msg): 377 return win32_re.sub("", msg) 378 379qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 380 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 381 r"and [0-9\/.inf]* ops\/sec\)") 382def filter_qemu_io(msg): 383 msg = filter_win32(msg) 384 return qemu_io_re.sub("X ops; XX:XX:XX.X " 385 "(XXX YYY/sec and XXX ops/sec)", msg) 386 387chown_re = re.compile(r"chown [0-9]+:[0-9]+") 388def filter_chown(msg): 389 return chown_re.sub("chown UID:GID", msg) 390 391def filter_qmp_event(event): 392 '''Filter a QMP event dict''' 393 event = dict(event) 394 if 'timestamp' in event: 395 event['timestamp']['seconds'] = 'SECS' 396 event['timestamp']['microseconds'] = 'USECS' 397 return event 398 399def filter_qmp(qmsg, filter_fn): 400 '''Given a string filter, filter a QMP object's values. 401 filter_fn takes a (key, value) pair.''' 402 # Iterate through either lists or dicts; 403 if isinstance(qmsg, list): 404 items = enumerate(qmsg) 405 else: 406 items = qmsg.items() 407 408 for k, v in items: 409 if isinstance(v, (dict, list)): 410 qmsg[k] = filter_qmp(v, filter_fn) 411 else: 412 qmsg[k] = filter_fn(k, v) 413 return qmsg 414 415def filter_testfiles(msg): 416 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 417 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 418 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 419 420def filter_qmp_testfiles(qmsg): 421 def _filter(_key, value): 422 if is_str(value): 423 return filter_testfiles(value) 424 return value 425 return filter_qmp(qmsg, _filter) 426 427def filter_virtio_scsi(output: str) -> str: 428 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 429 430def filter_qmp_virtio_scsi(qmsg): 431 def _filter(_key, value): 432 if is_str(value): 433 return filter_virtio_scsi(value) 434 return value 435 return filter_qmp(qmsg, _filter) 436 437def filter_generated_node_ids(msg): 438 return re.sub("#block[0-9]+", "NODE_NAME", msg) 439 440def filter_img_info(output, filename): 441 lines = [] 442 for line in output.split('\n'): 443 if 'disk size' in line or 'actual-size' in line: 444 continue 445 line = line.replace(filename, 'TEST_IMG') 446 line = filter_testfiles(line) 447 line = line.replace(imgfmt, 'IMGFMT') 448 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 449 line = re.sub('uuid: [-a-f0-9]+', 450 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 451 line) 452 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 453 lines.append(line) 454 return '\n'.join(lines) 455 456def filter_imgfmt(msg): 457 return msg.replace(imgfmt, 'IMGFMT') 458 459def filter_qmp_imgfmt(qmsg): 460 def _filter(_key, value): 461 if is_str(value): 462 return filter_imgfmt(value) 463 return value 464 return filter_qmp(qmsg, _filter) 465 466def filter_nbd_exports(output: str) -> str: 467 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 468 469 470Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 471 472def log(msg: Msg, 473 filters: Iterable[Callable[[Msg], Msg]] = (), 474 indent: Optional[int] = None) -> None: 475 """ 476 Logs either a string message or a JSON serializable message (like QMP). 477 If indent is provided, JSON serializable messages are pretty-printed. 478 """ 479 for flt in filters: 480 msg = flt(msg) 481 if isinstance(msg, (dict, list)): 482 # Don't sort if it's already sorted 483 do_sort = not isinstance(msg, OrderedDict) 484 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 485 else: 486 test_logger.info(msg) 487 488class Timeout: 489 def __init__(self, seconds, errmsg="Timeout"): 490 self.seconds = seconds 491 self.errmsg = errmsg 492 def __enter__(self): 493 if qemu_gdb or qemu_valgrind: 494 return self 495 signal.signal(signal.SIGALRM, self.timeout) 496 signal.setitimer(signal.ITIMER_REAL, self.seconds) 497 return self 498 def __exit__(self, exc_type, value, traceback): 499 if qemu_gdb or qemu_valgrind: 500 return False 501 signal.setitimer(signal.ITIMER_REAL, 0) 502 return False 503 def timeout(self, signum, frame): 504 raise Exception(self.errmsg) 505 506def file_pattern(name): 507 return "{0}-{1}".format(os.getpid(), name) 508 509class FilePath: 510 """ 511 Context manager generating multiple file names. The generated files are 512 removed when exiting the context. 513 514 Example usage: 515 516 with FilePath('a.img', 'b.img') as (img_a, img_b): 517 # Use img_a and img_b here... 518 519 # a.img and b.img are automatically removed here. 520 521 By default images are created in iotests.test_dir. To create sockets use 522 iotests.sock_dir: 523 524 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 525 526 For convenience, calling with one argument yields a single file instead of 527 a tuple with one item. 528 529 """ 530 def __init__(self, *names, base_dir=test_dir): 531 self.paths = [os.path.join(base_dir, file_pattern(name)) 532 for name in names] 533 534 def __enter__(self): 535 if len(self.paths) == 1: 536 return self.paths[0] 537 else: 538 return self.paths 539 540 def __exit__(self, exc_type, exc_val, exc_tb): 541 for path in self.paths: 542 try: 543 os.remove(path) 544 except OSError: 545 pass 546 return False 547 548 549def try_remove(img): 550 try: 551 os.remove(img) 552 except OSError: 553 pass 554 555def file_path_remover(): 556 for path in reversed(file_path_remover.paths): 557 try_remove(path) 558 559 560def file_path(*names, base_dir=test_dir): 561 ''' Another way to get auto-generated filename that cleans itself up. 562 563 Use is as simple as: 564 565 img_a, img_b = file_path('a.img', 'b.img') 566 sock = file_path('socket') 567 ''' 568 569 if not hasattr(file_path_remover, 'paths'): 570 file_path_remover.paths = [] 571 atexit.register(file_path_remover) 572 573 paths = [] 574 for name in names: 575 filename = file_pattern(name) 576 path = os.path.join(base_dir, filename) 577 file_path_remover.paths.append(path) 578 paths.append(path) 579 580 return paths[0] if len(paths) == 1 else paths 581 582def remote_filename(path): 583 if imgproto == 'file': 584 return path 585 elif imgproto == 'ssh': 586 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 587 else: 588 raise Exception("Protocol %s not supported" % (imgproto)) 589 590class VM(qtest.QEMUQtestMachine): 591 '''A QEMU VM''' 592 593 def __init__(self, path_suffix=''): 594 name = "qemu%s-%d" % (path_suffix, os.getpid()) 595 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 596 if qemu_gdb and qemu_valgrind: 597 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 598 sys.exit(1) 599 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 600 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 601 name=name, 602 base_temp_dir=test_dir, 603 socket_scm_helper=socket_scm_helper, 604 sock_dir=sock_dir, qmp_timer=timer) 605 self._num_drives = 0 606 607 def _post_shutdown(self) -> None: 608 super()._post_shutdown() 609 if not qemu_valgrind or not self._popen: 610 return 611 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 612 if self.exitcode() == 99: 613 with open(valgrind_filename) as f: 614 print(f.read()) 615 else: 616 os.remove(valgrind_filename) 617 618 def _pre_launch(self) -> None: 619 super()._pre_launch() 620 if qemu_print: 621 # set QEMU binary output to stdout 622 self._close_qemu_log_file() 623 624 def add_object(self, opts): 625 self._args.append('-object') 626 self._args.append(opts) 627 return self 628 629 def add_device(self, opts): 630 self._args.append('-device') 631 self._args.append(opts) 632 return self 633 634 def add_drive_raw(self, opts): 635 self._args.append('-drive') 636 self._args.append(opts) 637 return self 638 639 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 640 '''Add a virtio-blk drive to the VM''' 641 options = ['if=%s' % interface, 642 'id=drive%d' % self._num_drives] 643 644 if path is not None: 645 options.append('file=%s' % path) 646 options.append('format=%s' % img_format) 647 options.append('cache=%s' % cachemode) 648 options.append('aio=%s' % aiomode) 649 650 if opts: 651 options.append(opts) 652 653 if img_format == 'luks' and 'key-secret' not in opts: 654 # default luks support 655 if luks_default_secret_object not in self._args: 656 self.add_object(luks_default_secret_object) 657 658 options.append(luks_default_key_secret_opt) 659 660 self._args.append('-drive') 661 self._args.append(','.join(options)) 662 self._num_drives += 1 663 return self 664 665 def add_blockdev(self, opts): 666 self._args.append('-blockdev') 667 if isinstance(opts, str): 668 self._args.append(opts) 669 else: 670 self._args.append(','.join(opts)) 671 return self 672 673 def add_incoming(self, addr): 674 self._args.append('-incoming') 675 self._args.append(addr) 676 return self 677 678 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 679 cmd = 'human-monitor-command' 680 kwargs: Dict[str, Any] = {'command-line': command_line} 681 if use_log: 682 return self.qmp_log(cmd, **kwargs) 683 else: 684 return self.qmp(cmd, **kwargs) 685 686 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 687 """Pause drive r/w operations""" 688 if not event: 689 self.pause_drive(drive, "read_aio") 690 self.pause_drive(drive, "write_aio") 691 return 692 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 693 694 def resume_drive(self, drive: str) -> None: 695 """Resume drive r/w operations""" 696 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 697 698 def hmp_qemu_io(self, drive: str, cmd: str, 699 use_log: bool = False) -> QMPMessage: 700 """Write to a given drive using an HMP command""" 701 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 702 703 def flatten_qmp_object(self, obj, output=None, basestr=''): 704 if output is None: 705 output = dict() 706 if isinstance(obj, list): 707 for i, item in enumerate(obj): 708 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 709 elif isinstance(obj, dict): 710 for key in obj: 711 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 712 else: 713 output[basestr[:-1]] = obj # Strip trailing '.' 714 return output 715 716 def qmp_to_opts(self, obj): 717 obj = self.flatten_qmp_object(obj) 718 output_list = list() 719 for key in obj: 720 output_list += [key + '=' + obj[key]] 721 return ','.join(output_list) 722 723 def get_qmp_events_filtered(self, wait=60.0): 724 result = [] 725 for ev in self.get_qmp_events(wait=wait): 726 result.append(filter_qmp_event(ev)) 727 return result 728 729 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 730 full_cmd = OrderedDict(( 731 ("execute", cmd), 732 ("arguments", ordered_qmp(kwargs)) 733 )) 734 log(full_cmd, filters, indent=indent) 735 result = self.qmp(cmd, **kwargs) 736 log(result, filters, indent=indent) 737 return result 738 739 # Returns None on success, and an error string on failure 740 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 741 pre_finalize=None, cancel=False, wait=60.0): 742 """ 743 run_job moves a job from creation through to dismissal. 744 745 :param job: String. ID of recently-launched job 746 :param auto_finalize: Bool. True if the job was launched with 747 auto_finalize. Defaults to True. 748 :param auto_dismiss: Bool. True if the job was launched with 749 auto_dismiss=True. Defaults to False. 750 :param pre_finalize: Callback. A callable that takes no arguments to be 751 invoked prior to issuing job-finalize, if any. 752 :param cancel: Bool. When true, cancels the job after the pre_finalize 753 callback. 754 :param wait: Float. Timeout value specifying how long to wait for any 755 event, in seconds. Defaults to 60.0. 756 """ 757 match_device = {'data': {'device': job}} 758 match_id = {'data': {'id': job}} 759 events = [ 760 ('BLOCK_JOB_COMPLETED', match_device), 761 ('BLOCK_JOB_CANCELLED', match_device), 762 ('BLOCK_JOB_ERROR', match_device), 763 ('BLOCK_JOB_READY', match_device), 764 ('BLOCK_JOB_PENDING', match_id), 765 ('JOB_STATUS_CHANGE', match_id) 766 ] 767 error = None 768 while True: 769 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 770 if ev['event'] != 'JOB_STATUS_CHANGE': 771 log(ev) 772 continue 773 status = ev['data']['status'] 774 if status == 'aborting': 775 result = self.qmp('query-jobs') 776 for j in result['return']: 777 if j['id'] == job: 778 error = j['error'] 779 log('Job failed: %s' % (j['error'])) 780 elif status == 'ready': 781 self.qmp_log('job-complete', id=job) 782 elif status == 'pending' and not auto_finalize: 783 if pre_finalize: 784 pre_finalize() 785 if cancel: 786 self.qmp_log('job-cancel', id=job) 787 else: 788 self.qmp_log('job-finalize', id=job) 789 elif status == 'concluded' and not auto_dismiss: 790 self.qmp_log('job-dismiss', id=job) 791 elif status == 'null': 792 return error 793 794 # Returns None on success, and an error string on failure 795 def blockdev_create(self, options, job_id='job0', filters=None): 796 if filters is None: 797 filters = [filter_qmp_testfiles] 798 result = self.qmp_log('blockdev-create', filters=filters, 799 job_id=job_id, options=options) 800 801 if 'return' in result: 802 assert result['return'] == {} 803 job_result = self.run_job(job_id) 804 else: 805 job_result = result['error'] 806 807 log("") 808 return job_result 809 810 def enable_migration_events(self, name): 811 log('Enabling migration QMP events on %s...' % name) 812 log(self.qmp('migrate-set-capabilities', capabilities=[ 813 { 814 'capability': 'events', 815 'state': True 816 } 817 ])) 818 819 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 820 while True: 821 event = self.event_wait('MIGRATION') 822 # We use the default timeout, and with a timeout, event_wait() 823 # never returns None 824 assert event 825 826 log(event, filters=[filter_qmp_event]) 827 if event['data']['status'] in ('completed', 'failed'): 828 break 829 830 if event['data']['status'] == 'completed': 831 # The event may occur in finish-migrate, so wait for the expected 832 # post-migration runstate 833 runstate = None 834 while runstate != expect_runstate: 835 runstate = self.qmp('query-status')['return']['status'] 836 return True 837 else: 838 return False 839 840 def node_info(self, node_name): 841 nodes = self.qmp('query-named-block-nodes') 842 for x in nodes['return']: 843 if x['node-name'] == node_name: 844 return x 845 return None 846 847 def query_bitmaps(self): 848 res = self.qmp("query-named-block-nodes") 849 return {device['node-name']: device['dirty-bitmaps'] 850 for device in res['return'] if 'dirty-bitmaps' in device} 851 852 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 853 """ 854 get a specific bitmap from the object returned by query_bitmaps. 855 :param recording: If specified, filter results by the specified value. 856 :param bitmaps: If specified, use it instead of call query_bitmaps() 857 """ 858 if bitmaps is None: 859 bitmaps = self.query_bitmaps() 860 861 for bitmap in bitmaps[node_name]: 862 if bitmap.get('name', '') == bitmap_name: 863 if recording is None or bitmap.get('recording') == recording: 864 return bitmap 865 return None 866 867 def check_bitmap_status(self, node_name, bitmap_name, fields): 868 ret = self.get_bitmap(node_name, bitmap_name) 869 870 return fields.items() <= ret.items() 871 872 def assert_block_path(self, root, path, expected_node, graph=None): 873 """ 874 Check whether the node under the given path in the block graph 875 is @expected_node. 876 877 @root is the node name of the node where the @path is rooted. 878 879 @path is a string that consists of child names separated by 880 slashes. It must begin with a slash. 881 882 Examples for @root + @path: 883 - root="qcow2-node", path="/backing/file" 884 - root="quorum-node", path="/children.2/file" 885 886 Hypothetically, @path could be empty, in which case it would 887 point to @root. However, in practice this case is not useful 888 and hence not allowed. 889 890 @expected_node may be None. (All elements of the path but the 891 leaf must still exist.) 892 893 @graph may be None or the result of an x-debug-query-block-graph 894 call that has already been performed. 895 """ 896 if graph is None: 897 graph = self.qmp('x-debug-query-block-graph')['return'] 898 899 iter_path = iter(path.split('/')) 900 901 # Must start with a / 902 assert next(iter_path) == '' 903 904 node = next((node for node in graph['nodes'] if node['name'] == root), 905 None) 906 907 # An empty @path is not allowed, so the root node must be present 908 assert node is not None, 'Root node %s not found' % root 909 910 for child_name in iter_path: 911 assert node is not None, 'Cannot follow path %s%s' % (root, path) 912 913 try: 914 node_id = next(edge['child'] for edge in graph['edges'] 915 if (edge['parent'] == node['id'] and 916 edge['name'] == child_name)) 917 918 node = next(node for node in graph['nodes'] 919 if node['id'] == node_id) 920 921 except StopIteration: 922 node = None 923 924 if node is None: 925 assert expected_node is None, \ 926 'No node found under %s (but expected %s)' % \ 927 (path, expected_node) 928 else: 929 assert node['name'] == expected_node, \ 930 'Found node %s under %s (but expected %s)' % \ 931 (node['name'], path, expected_node) 932 933index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 934 935class QMPTestCase(unittest.TestCase): 936 '''Abstract base class for QMP test cases''' 937 938 def __init__(self, *args, **kwargs): 939 super().__init__(*args, **kwargs) 940 # Many users of this class set a VM property we rely on heavily 941 # in the methods below. 942 self.vm = None 943 944 def dictpath(self, d, path): 945 '''Traverse a path in a nested dict''' 946 for component in path.split('/'): 947 m = index_re.match(component) 948 if m: 949 component, idx = m.groups() 950 idx = int(idx) 951 952 if not isinstance(d, dict) or component not in d: 953 self.fail(f'failed path traversal for "{path}" in "{d}"') 954 d = d[component] 955 956 if m: 957 if not isinstance(d, list): 958 self.fail(f'path component "{component}" in "{path}" ' 959 f'is not a list in "{d}"') 960 try: 961 d = d[idx] 962 except IndexError: 963 self.fail(f'invalid index "{idx}" in path "{path}" ' 964 f'in "{d}"') 965 return d 966 967 def assert_qmp_absent(self, d, path): 968 try: 969 result = self.dictpath(d, path) 970 except AssertionError: 971 return 972 self.fail('path "%s" has value "%s"' % (path, str(result))) 973 974 def assert_qmp(self, d, path, value): 975 '''Assert that the value for a specific path in a QMP dict 976 matches. When given a list of values, assert that any of 977 them matches.''' 978 979 result = self.dictpath(d, path) 980 981 # [] makes no sense as a list of valid values, so treat it as 982 # an actual single value. 983 if isinstance(value, list) and value != []: 984 for v in value: 985 if result == v: 986 return 987 self.fail('no match for "%s" in %s' % (str(result), str(value))) 988 else: 989 self.assertEqual(result, value, 990 '"%s" is "%s", expected "%s"' 991 % (path, str(result), str(value))) 992 993 def assert_no_active_block_jobs(self): 994 result = self.vm.qmp('query-block-jobs') 995 self.assert_qmp(result, 'return', []) 996 997 def assert_has_block_node(self, node_name=None, file_name=None): 998 """Issue a query-named-block-nodes and assert node_name and/or 999 file_name is present in the result""" 1000 def check_equal_or_none(a, b): 1001 return a is None or b is None or a == b 1002 assert node_name or file_name 1003 result = self.vm.qmp('query-named-block-nodes') 1004 for x in result["return"]: 1005 if check_equal_or_none(x.get("node-name"), node_name) and \ 1006 check_equal_or_none(x.get("file"), file_name): 1007 return 1008 self.fail("Cannot find %s %s in result:\n%s" % 1009 (node_name, file_name, result)) 1010 1011 def assert_json_filename_equal(self, json_filename, reference): 1012 '''Asserts that the given filename is a json: filename and that its 1013 content is equal to the given reference object''' 1014 self.assertEqual(json_filename[:5], 'json:') 1015 self.assertEqual( 1016 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1017 self.vm.flatten_qmp_object(reference) 1018 ) 1019 1020 def cancel_and_wait(self, drive='drive0', force=False, 1021 resume=False, wait=60.0): 1022 '''Cancel a block job and wait for it to finish, returning the event''' 1023 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1024 self.assert_qmp(result, 'return', {}) 1025 1026 if resume: 1027 self.vm.resume_drive(drive) 1028 1029 cancelled = False 1030 result = None 1031 while not cancelled: 1032 for event in self.vm.get_qmp_events(wait=wait): 1033 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1034 event['event'] == 'BLOCK_JOB_CANCELLED': 1035 self.assert_qmp(event, 'data/device', drive) 1036 result = event 1037 cancelled = True 1038 elif event['event'] == 'JOB_STATUS_CHANGE': 1039 self.assert_qmp(event, 'data/id', drive) 1040 1041 1042 self.assert_no_active_block_jobs() 1043 return result 1044 1045 def wait_until_completed(self, drive='drive0', check_offset=True, 1046 wait=60.0, error=None): 1047 '''Wait for a block job to finish, returning the event''' 1048 while True: 1049 for event in self.vm.get_qmp_events(wait=wait): 1050 if event['event'] == 'BLOCK_JOB_COMPLETED': 1051 self.assert_qmp(event, 'data/device', drive) 1052 if error is None: 1053 self.assert_qmp_absent(event, 'data/error') 1054 if check_offset: 1055 self.assert_qmp(event, 'data/offset', 1056 event['data']['len']) 1057 else: 1058 self.assert_qmp(event, 'data/error', error) 1059 self.assert_no_active_block_jobs() 1060 return event 1061 if event['event'] == 'JOB_STATUS_CHANGE': 1062 self.assert_qmp(event, 'data/id', drive) 1063 1064 def wait_ready(self, drive='drive0'): 1065 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1066 return self.vm.events_wait([ 1067 ('BLOCK_JOB_READY', 1068 {'data': {'type': 'mirror', 'device': drive}}), 1069 ('BLOCK_JOB_READY', 1070 {'data': {'type': 'commit', 'device': drive}}) 1071 ]) 1072 1073 def wait_ready_and_cancel(self, drive='drive0'): 1074 self.wait_ready(drive=drive) 1075 event = self.cancel_and_wait(drive=drive) 1076 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1077 self.assert_qmp(event, 'data/type', 'mirror') 1078 self.assert_qmp(event, 'data/offset', event['data']['len']) 1079 1080 def complete_and_wait(self, drive='drive0', wait_ready=True, 1081 completion_error=None): 1082 '''Complete a block job and wait for it to finish''' 1083 if wait_ready: 1084 self.wait_ready(drive=drive) 1085 1086 result = self.vm.qmp('block-job-complete', device=drive) 1087 self.assert_qmp(result, 'return', {}) 1088 1089 event = self.wait_until_completed(drive=drive, error=completion_error) 1090 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1091 1092 def pause_wait(self, job_id='job0'): 1093 with Timeout(3, "Timeout waiting for job to pause"): 1094 while True: 1095 result = self.vm.qmp('query-block-jobs') 1096 found = False 1097 for job in result['return']: 1098 if job['device'] == job_id: 1099 found = True 1100 if job['paused'] and not job['busy']: 1101 return job 1102 break 1103 assert found 1104 1105 def pause_job(self, job_id='job0', wait=True): 1106 result = self.vm.qmp('block-job-pause', device=job_id) 1107 self.assert_qmp(result, 'return', {}) 1108 if wait: 1109 return self.pause_wait(job_id) 1110 return result 1111 1112 def case_skip(self, reason): 1113 '''Skip this test case''' 1114 case_notrun(reason) 1115 self.skipTest(reason) 1116 1117 1118def notrun(reason): 1119 '''Skip this test suite''' 1120 # Each test in qemu-iotests has a number ("seq") 1121 seq = os.path.basename(sys.argv[0]) 1122 1123 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1124 logger.warning("%s not run: %s", seq, reason) 1125 sys.exit(0) 1126 1127def case_notrun(reason): 1128 '''Mark this test case as not having been run (without actually 1129 skipping it, that is left to the caller). See 1130 QMPTestCase.case_skip() for a variant that actually skips the 1131 current test case.''' 1132 1133 # Each test in qemu-iotests has a number ("seq") 1134 seq = os.path.basename(sys.argv[0]) 1135 1136 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1137 ' [case not run] ' + reason + '\n') 1138 1139def _verify_image_format(supported_fmts: Sequence[str] = (), 1140 unsupported_fmts: Sequence[str] = ()) -> None: 1141 if 'generic' in supported_fmts and \ 1142 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1143 # similar to 1144 # _supported_fmt generic 1145 # for bash tests 1146 supported_fmts = () 1147 1148 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1149 if not_sup or (imgfmt in unsupported_fmts): 1150 notrun('not suitable for this image format: %s' % imgfmt) 1151 1152 if imgfmt == 'luks': 1153 verify_working_luks() 1154 1155def _verify_protocol(supported: Sequence[str] = (), 1156 unsupported: Sequence[str] = ()) -> None: 1157 assert not (supported and unsupported) 1158 1159 if 'generic' in supported: 1160 return 1161 1162 not_sup = supported and (imgproto not in supported) 1163 if not_sup or (imgproto in unsupported): 1164 notrun('not suitable for this protocol: %s' % imgproto) 1165 1166def _verify_platform(supported: Sequence[str] = (), 1167 unsupported: Sequence[str] = ()) -> None: 1168 if any((sys.platform.startswith(x) for x in unsupported)): 1169 notrun('not suitable for this OS: %s' % sys.platform) 1170 1171 if supported: 1172 if not any((sys.platform.startswith(x) for x in supported)): 1173 notrun('not suitable for this OS: %s' % sys.platform) 1174 1175def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1176 if supported_cache_modes and (cachemode not in supported_cache_modes): 1177 notrun('not suitable for this cache mode: %s' % cachemode) 1178 1179def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1180 if supported_aio_modes and (aiomode not in supported_aio_modes): 1181 notrun('not suitable for this aio mode: %s' % aiomode) 1182 1183def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1184 usf_list = list(set(required_formats) - set(supported_formats())) 1185 if usf_list: 1186 notrun(f'formats {usf_list} are not whitelisted') 1187 1188 1189def _verify_virtio_blk() -> None: 1190 out = qemu_pipe('-M', 'none', '-device', 'help') 1191 if 'virtio-blk' not in out: 1192 notrun('Missing virtio-blk in QEMU binary') 1193 1194def _verify_virtio_scsi_pci_or_ccw() -> None: 1195 out = qemu_pipe('-M', 'none', '-device', 'help') 1196 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1197 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1198 1199 1200def supports_quorum(): 1201 return 'quorum' in qemu_img_pipe('--help') 1202 1203def verify_quorum(): 1204 '''Skip test suite if quorum support is not available''' 1205 if not supports_quorum(): 1206 notrun('quorum support missing') 1207 1208def has_working_luks() -> Tuple[bool, str]: 1209 """ 1210 Check whether our LUKS driver can actually create images 1211 (this extends to LUKS encryption for qcow2). 1212 1213 If not, return the reason why. 1214 """ 1215 1216 img_file = f'{test_dir}/luks-test.luks' 1217 (output, status) = \ 1218 qemu_img_pipe_and_status('create', '-f', 'luks', 1219 '--object', luks_default_secret_object, 1220 '-o', luks_default_key_secret_opt, 1221 '-o', 'iter-time=10', 1222 img_file, '1G') 1223 try: 1224 os.remove(img_file) 1225 except OSError: 1226 pass 1227 1228 if status != 0: 1229 reason = output 1230 for line in output.splitlines(): 1231 if img_file + ':' in line: 1232 reason = line.split(img_file + ':', 1)[1].strip() 1233 break 1234 1235 return (False, reason) 1236 else: 1237 return (True, '') 1238 1239def verify_working_luks(): 1240 """ 1241 Skip test suite if LUKS does not work 1242 """ 1243 (working, reason) = has_working_luks() 1244 if not working: 1245 notrun(reason) 1246 1247def qemu_pipe(*args: str) -> str: 1248 """ 1249 Run qemu with an option to print something and exit (e.g. a help option). 1250 1251 :return: QEMU's stdout output. 1252 """ 1253 full_args = [qemu_prog] + qemu_opts + list(args) 1254 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1255 return output 1256 1257def supported_formats(read_only=False): 1258 '''Set 'read_only' to True to check ro-whitelist 1259 Otherwise, rw-whitelist is checked''' 1260 1261 if not hasattr(supported_formats, "formats"): 1262 supported_formats.formats = {} 1263 1264 if read_only not in supported_formats.formats: 1265 format_message = qemu_pipe("-drive", "format=help") 1266 line = 1 if read_only else 0 1267 supported_formats.formats[read_only] = \ 1268 format_message.splitlines()[line].split(":")[1].split() 1269 1270 return supported_formats.formats[read_only] 1271 1272def skip_if_unsupported(required_formats=(), read_only=False): 1273 '''Skip Test Decorator 1274 Runs the test if all the required formats are whitelisted''' 1275 def skip_test_decorator(func): 1276 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1277 **kwargs: Dict[str, Any]) -> None: 1278 if callable(required_formats): 1279 fmts = required_formats(test_case) 1280 else: 1281 fmts = required_formats 1282 1283 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1284 if usf_list: 1285 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1286 test_case.case_skip(msg) 1287 else: 1288 func(test_case, *args, **kwargs) 1289 return func_wrapper 1290 return skip_test_decorator 1291 1292def skip_for_formats(formats: Sequence[str] = ()) \ 1293 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1294 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1295 '''Skip Test Decorator 1296 Skips the test for the given formats''' 1297 def skip_test_decorator(func): 1298 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1299 **kwargs: Dict[str, Any]) -> None: 1300 if imgfmt in formats: 1301 msg = f'{test_case}: Skipped for format {imgfmt}' 1302 test_case.case_skip(msg) 1303 else: 1304 func(test_case, *args, **kwargs) 1305 return func_wrapper 1306 return skip_test_decorator 1307 1308def skip_if_user_is_root(func): 1309 '''Skip Test Decorator 1310 Runs the test only without root permissions''' 1311 def func_wrapper(*args, **kwargs): 1312 if os.getuid() == 0: 1313 case_notrun('{}: cannot be run as root'.format(args[0])) 1314 return None 1315 else: 1316 return func(*args, **kwargs) 1317 return func_wrapper 1318 1319# We need to filter out the time taken from the output so that 1320# qemu-iotest can reliably diff the results against master output, 1321# and hide skipped tests from the reference output. 1322 1323class ReproducibleTestResult(unittest.TextTestResult): 1324 def addSkip(self, test, reason): 1325 # Same as TextTestResult, but print dot instead of "s" 1326 unittest.TestResult.addSkip(self, test, reason) 1327 if self.showAll: 1328 self.stream.writeln("skipped {0!r}".format(reason)) 1329 elif self.dots: 1330 self.stream.write(".") 1331 self.stream.flush() 1332 1333class ReproducibleStreamWrapper: 1334 def __init__(self, stream: TextIO): 1335 self.stream = stream 1336 1337 def __getattr__(self, attr): 1338 if attr in ('stream', '__getstate__'): 1339 raise AttributeError(attr) 1340 return getattr(self.stream, attr) 1341 1342 def write(self, arg=None): 1343 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1344 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1345 self.stream.write(arg) 1346 1347class ReproducibleTestRunner(unittest.TextTestRunner): 1348 def __init__(self, stream: Optional[TextIO] = None, 1349 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1350 **kwargs: Any) -> None: 1351 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1352 super().__init__(stream=rstream, # type: ignore 1353 descriptions=True, 1354 resultclass=resultclass, 1355 **kwargs) 1356 1357def execute_unittest(argv: List[str], debug: bool = False) -> None: 1358 """Executes unittests within the calling module.""" 1359 1360 # Some tests have warnings, especially ResourceWarnings for unclosed 1361 # files and sockets. Ignore them for now to ensure reproducibility of 1362 # the test output. 1363 unittest.main(argv=argv, 1364 testRunner=ReproducibleTestRunner, 1365 verbosity=2 if debug else 1, 1366 warnings=None if sys.warnoptions else 'ignore') 1367 1368def execute_setup_common(supported_fmts: Sequence[str] = (), 1369 supported_platforms: Sequence[str] = (), 1370 supported_cache_modes: Sequence[str] = (), 1371 supported_aio_modes: Sequence[str] = (), 1372 unsupported_fmts: Sequence[str] = (), 1373 supported_protocols: Sequence[str] = (), 1374 unsupported_protocols: Sequence[str] = (), 1375 required_fmts: Sequence[str] = ()) -> bool: 1376 """ 1377 Perform necessary setup for either script-style or unittest-style tests. 1378 1379 :return: Bool; Whether or not debug mode has been requested via the CLI. 1380 """ 1381 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1382 1383 debug = '-d' in sys.argv 1384 if debug: 1385 sys.argv.remove('-d') 1386 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1387 1388 _verify_image_format(supported_fmts, unsupported_fmts) 1389 _verify_protocol(supported_protocols, unsupported_protocols) 1390 _verify_platform(supported=supported_platforms) 1391 _verify_cache_mode(supported_cache_modes) 1392 _verify_aio_mode(supported_aio_modes) 1393 _verify_formats(required_fmts) 1394 _verify_virtio_blk() 1395 1396 return debug 1397 1398def execute_test(*args, test_function=None, **kwargs): 1399 """Run either unittest or script-style tests.""" 1400 1401 debug = execute_setup_common(*args, **kwargs) 1402 if not test_function: 1403 execute_unittest(sys.argv, debug) 1404 else: 1405 test_function() 1406 1407def activate_logging(): 1408 """Activate iotests.log() output to stdout for script-style tests.""" 1409 handler = logging.StreamHandler(stream=sys.stdout) 1410 formatter = logging.Formatter('%(message)s') 1411 handler.setFormatter(formatter) 1412 test_logger.addHandler(handler) 1413 test_logger.setLevel(logging.INFO) 1414 test_logger.propagate = False 1415 1416# This is called from script-style iotests without a single point of entry 1417def script_initialize(*args, **kwargs): 1418 """Initialize script-style tests without running any tests.""" 1419 activate_logging() 1420 execute_setup_common(*args, **kwargs) 1421 1422# This is called from script-style iotests with a single point of entry 1423def script_main(test_function, *args, **kwargs): 1424 """Run script-style tests outside of the unittest framework""" 1425 activate_logging() 1426 execute_test(*args, test_function=test_function, **kwargs) 1427 1428# This is called from unittest style iotests 1429def main(*args, **kwargs): 1430 """Run tests using the unittest framework""" 1431 execute_test(*args, **kwargs) 1432