1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39# pylint: disable=import-error, wrong-import-position 40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 41from qemu.machine import qtest 42from qemu.qmp import QMPMessage 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77gdb_qemu_env = os.environ.get('GDB_OPTIONS') 78qemu_gdb = [] 79if gdb_qemu_env: 80 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 81 82qemu_print = os.environ.get('PRINT_QEMU', False) 83 84imgfmt = os.environ.get('IMGFMT', 'raw') 85imgproto = os.environ.get('IMGPROTO', 'file') 86output_dir = os.environ.get('OUTPUT_DIR', '.') 87 88try: 89 test_dir = os.environ['TEST_DIR'] 90 sock_dir = os.environ['SOCK_DIR'] 91 cachemode = os.environ['CACHEMODE'] 92 aiomode = os.environ['AIOMODE'] 93 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 94except KeyError: 95 # We are using these variables as proxies to indicate that we're 96 # not being run via "check". There may be other things set up by 97 # "check" that individual test cases rely on. 98 sys.stderr.write('Please run this test via the "check" script\n') 99 sys.exit(os.EX_USAGE) 100 101qemu_valgrind = [] 102if os.environ.get('VALGRIND_QEMU') == "y" and \ 103 os.environ.get('NO_VALGRIND') != "y": 104 valgrind_logfile = "--log-file=" + test_dir 105 # %p allows to put the valgrind process PID, since 106 # we don't know it a priori (subprocess.Popen is 107 # not yet invoked) 108 valgrind_logfile += "/%p.valgrind" 109 110 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 111 112socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 113 114luks_default_secret_object = 'secret,id=keysec0,data=' + \ 115 os.environ.get('IMGKEYSECRET', '') 116luks_default_key_secret_opt = 'key-secret=keysec0' 117 118sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 119 120 121def unarchive_sample_image(sample, fname): 122 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 123 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 124 shutil.copyfileobj(f_in, f_out) 125 126 127def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 128 connect_stderr: bool = True) -> Tuple[str, int]: 129 """ 130 Run a tool and return both its output and its exit code 131 """ 132 stderr = subprocess.STDOUT if connect_stderr else None 133 with subprocess.Popen(args, stdout=subprocess.PIPE, 134 stderr=stderr, universal_newlines=True) as subp: 135 output = subp.communicate()[0] 136 if subp.returncode < 0: 137 cmd = ' '.join(args) 138 sys.stderr.write(f'{tool} received signal \ 139 {-subp.returncode}: {cmd}\n') 140 return (output, subp.returncode) 141 142def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 143 """ 144 Run qemu-img and return both its output and its exit code 145 """ 146 full_args = qemu_img_args + list(args) 147 return qemu_tool_pipe_and_status('qemu-img', full_args) 148 149def qemu_img(*args: str) -> int: 150 '''Run qemu-img and return the exit code''' 151 return qemu_img_pipe_and_status(*args)[1] 152 153def ordered_qmp(qmsg, conv_keys=True): 154 # Dictionaries are not ordered prior to 3.6, therefore: 155 if isinstance(qmsg, list): 156 return [ordered_qmp(atom) for atom in qmsg] 157 if isinstance(qmsg, dict): 158 od = OrderedDict() 159 for k, v in sorted(qmsg.items()): 160 if conv_keys: 161 k = k.replace('_', '-') 162 od[k] = ordered_qmp(v, conv_keys=False) 163 return od 164 return qmsg 165 166def qemu_img_create(*args): 167 args = list(args) 168 169 # default luks support 170 if '-f' in args and args[args.index('-f') + 1] == 'luks': 171 if '-o' in args: 172 i = args.index('-o') 173 if 'key-secret' not in args[i + 1]: 174 args[i + 1].append(luks_default_key_secret_opt) 175 args.insert(i + 2, '--object') 176 args.insert(i + 3, luks_default_secret_object) 177 else: 178 args = ['-o', luks_default_key_secret_opt, 179 '--object', luks_default_secret_object] + args 180 181 args.insert(0, 'create') 182 183 return qemu_img(*args) 184 185def qemu_img_measure(*args): 186 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 187 188def qemu_img_check(*args): 189 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 190 191def qemu_img_verbose(*args): 192 '''Run qemu-img without suppressing its output and return the exit code''' 193 exitcode = subprocess.call(qemu_img_args + list(args)) 194 if exitcode < 0: 195 sys.stderr.write('qemu-img received signal %i: %s\n' 196 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 197 return exitcode 198 199def qemu_img_pipe(*args: str) -> str: 200 '''Run qemu-img and return its output''' 201 return qemu_img_pipe_and_status(*args)[0] 202 203def qemu_img_log(*args): 204 result = qemu_img_pipe(*args) 205 log(result, filters=[filter_testfiles]) 206 return result 207 208def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 209 args = ['info'] 210 if imgopts: 211 args.append('--image-opts') 212 else: 213 args += ['-f', imgfmt] 214 args += extra_args 215 args.append(filename) 216 217 output = qemu_img_pipe(*args) 218 if not filter_path: 219 filter_path = filename 220 log(filter_img_info(output, filter_path)) 221 222def qemu_io(*args): 223 '''Run qemu-io and return the stdout data''' 224 args = qemu_io_args + list(args) 225 return qemu_tool_pipe_and_status('qemu-io', args)[0] 226 227def qemu_io_log(*args): 228 result = qemu_io(*args) 229 log(result, filters=[filter_testfiles, filter_qemu_io]) 230 return result 231 232def qemu_io_silent(*args): 233 '''Run qemu-io and return the exit code, suppressing stdout''' 234 if '-f' in args or '--image-opts' in args: 235 default_args = qemu_io_args_no_fmt 236 else: 237 default_args = qemu_io_args 238 239 args = default_args + list(args) 240 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 241 if result.returncode < 0: 242 sys.stderr.write('qemu-io received signal %i: %s\n' % 243 (-result.returncode, ' '.join(args))) 244 return result.returncode 245 246def qemu_io_silent_check(*args): 247 '''Run qemu-io and return the true if subprocess returned 0''' 248 args = qemu_io_args + list(args) 249 result = subprocess.run(args, stdout=subprocess.DEVNULL, 250 stderr=subprocess.STDOUT, check=False) 251 return result.returncode == 0 252 253class QemuIoInteractive: 254 def __init__(self, *args): 255 self.args = qemu_io_args_no_fmt + list(args) 256 # We need to keep the Popen objext around, and not 257 # close it immediately. Therefore, disable the pylint check: 258 # pylint: disable=consider-using-with 259 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 260 stdout=subprocess.PIPE, 261 stderr=subprocess.STDOUT, 262 universal_newlines=True) 263 out = self._p.stdout.read(9) 264 if out != 'qemu-io> ': 265 # Most probably qemu-io just failed to start. 266 # Let's collect the whole output and exit. 267 out += self._p.stdout.read() 268 self._p.wait(timeout=1) 269 raise ValueError(out) 270 271 def close(self): 272 self._p.communicate('q\n') 273 274 def _read_output(self): 275 pattern = 'qemu-io> ' 276 n = len(pattern) 277 pos = 0 278 s = [] 279 while pos != n: 280 c = self._p.stdout.read(1) 281 # check unexpected EOF 282 assert c != '' 283 s.append(c) 284 if c == pattern[pos]: 285 pos += 1 286 else: 287 pos = 0 288 289 return ''.join(s[:-n]) 290 291 def cmd(self, cmd): 292 # quit command is in close(), '\n' is added automatically 293 assert '\n' not in cmd 294 cmd = cmd.strip() 295 assert cmd not in ('q', 'quit') 296 self._p.stdin.write(cmd + '\n') 297 self._p.stdin.flush() 298 return self._read_output() 299 300 301def qemu_nbd(*args): 302 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 303 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 304 305def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 306 '''Run qemu-nbd in daemon mode and return both the parent's exit code 307 and its output in case of an error''' 308 full_args = qemu_nbd_args + ['--fork'] + list(args) 309 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 310 connect_stderr=False) 311 return returncode, output if returncode else '' 312 313def qemu_nbd_list_log(*args: str) -> str: 314 '''Run qemu-nbd to list remote exports''' 315 full_args = [qemu_nbd_prog, '-L'] + list(args) 316 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 317 log(output, filters=[filter_testfiles, filter_nbd_exports]) 318 return output 319 320@contextmanager 321def qemu_nbd_popen(*args): 322 '''Context manager running qemu-nbd within the context''' 323 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 324 325 assert not os.path.exists(pid_file) 326 327 cmd = list(qemu_nbd_args) 328 cmd.extend(('--persistent', '--pid-file', pid_file)) 329 cmd.extend(args) 330 331 log('Start NBD server') 332 with subprocess.Popen(cmd) as p: 333 try: 334 while not os.path.exists(pid_file): 335 if p.poll() is not None: 336 raise RuntimeError( 337 "qemu-nbd terminated with exit code {}: {}" 338 .format(p.returncode, ' '.join(cmd))) 339 340 time.sleep(0.01) 341 yield 342 finally: 343 if os.path.exists(pid_file): 344 os.remove(pid_file) 345 log('Kill NBD server') 346 p.kill() 347 p.wait() 348 349def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 350 '''Return True if two image files are identical''' 351 return qemu_img('compare', '-f', fmt1, 352 '-F', fmt2, img1, img2) == 0 353 354def create_image(name, size): 355 '''Create a fully-allocated raw image with sector markers''' 356 with open(name, 'wb') as file: 357 i = 0 358 while i < size: 359 sector = struct.pack('>l504xl', i // 512, i // 512) 360 file.write(sector) 361 i = i + 512 362 363def image_size(img): 364 '''Return image's virtual size''' 365 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 366 return json.loads(r)['virtual-size'] 367 368def is_str(val): 369 return isinstance(val, str) 370 371test_dir_re = re.compile(r"%s" % test_dir) 372def filter_test_dir(msg): 373 return test_dir_re.sub("TEST_DIR", msg) 374 375win32_re = re.compile(r"\r") 376def filter_win32(msg): 377 return win32_re.sub("", msg) 378 379qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 380 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 381 r"and [0-9\/.inf]* ops\/sec\)") 382def filter_qemu_io(msg): 383 msg = filter_win32(msg) 384 return qemu_io_re.sub("X ops; XX:XX:XX.X " 385 "(XXX YYY/sec and XXX ops/sec)", msg) 386 387chown_re = re.compile(r"chown [0-9]+:[0-9]+") 388def filter_chown(msg): 389 return chown_re.sub("chown UID:GID", msg) 390 391def filter_qmp_event(event): 392 '''Filter a QMP event dict''' 393 event = dict(event) 394 if 'timestamp' in event: 395 event['timestamp']['seconds'] = 'SECS' 396 event['timestamp']['microseconds'] = 'USECS' 397 return event 398 399def filter_qmp(qmsg, filter_fn): 400 '''Given a string filter, filter a QMP object's values. 401 filter_fn takes a (key, value) pair.''' 402 # Iterate through either lists or dicts; 403 if isinstance(qmsg, list): 404 items = enumerate(qmsg) 405 else: 406 items = qmsg.items() 407 408 for k, v in items: 409 if isinstance(v, (dict, list)): 410 qmsg[k] = filter_qmp(v, filter_fn) 411 else: 412 qmsg[k] = filter_fn(k, v) 413 return qmsg 414 415def filter_testfiles(msg): 416 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 417 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 418 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 419 420def filter_qmp_testfiles(qmsg): 421 def _filter(_key, value): 422 if is_str(value): 423 return filter_testfiles(value) 424 return value 425 return filter_qmp(qmsg, _filter) 426 427def filter_virtio_scsi(output: str) -> str: 428 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 429 430def filter_qmp_virtio_scsi(qmsg): 431 def _filter(_key, value): 432 if is_str(value): 433 return filter_virtio_scsi(value) 434 return value 435 return filter_qmp(qmsg, _filter) 436 437def filter_generated_node_ids(msg): 438 return re.sub("#block[0-9]+", "NODE_NAME", msg) 439 440def filter_img_info(output, filename): 441 lines = [] 442 for line in output.split('\n'): 443 if 'disk size' in line or 'actual-size' in line: 444 continue 445 line = line.replace(filename, 'TEST_IMG') 446 line = filter_testfiles(line) 447 line = line.replace(imgfmt, 'IMGFMT') 448 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 449 line = re.sub('uuid: [-a-f0-9]+', 450 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 451 line) 452 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 453 lines.append(line) 454 return '\n'.join(lines) 455 456def filter_imgfmt(msg): 457 return msg.replace(imgfmt, 'IMGFMT') 458 459def filter_qmp_imgfmt(qmsg): 460 def _filter(_key, value): 461 if is_str(value): 462 return filter_imgfmt(value) 463 return value 464 return filter_qmp(qmsg, _filter) 465 466def filter_nbd_exports(output: str) -> str: 467 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 468 469 470Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 471 472def log(msg: Msg, 473 filters: Iterable[Callable[[Msg], Msg]] = (), 474 indent: Optional[int] = None) -> None: 475 """ 476 Logs either a string message or a JSON serializable message (like QMP). 477 If indent is provided, JSON serializable messages are pretty-printed. 478 """ 479 for flt in filters: 480 msg = flt(msg) 481 if isinstance(msg, (dict, list)): 482 # Don't sort if it's already sorted 483 do_sort = not isinstance(msg, OrderedDict) 484 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 485 else: 486 test_logger.info(msg) 487 488class Timeout: 489 def __init__(self, seconds, errmsg="Timeout"): 490 self.seconds = seconds 491 self.errmsg = errmsg 492 def __enter__(self): 493 if qemu_gdb or qemu_valgrind: 494 return self 495 signal.signal(signal.SIGALRM, self.timeout) 496 signal.setitimer(signal.ITIMER_REAL, self.seconds) 497 return self 498 def __exit__(self, exc_type, value, traceback): 499 if qemu_gdb or qemu_valgrind: 500 return False 501 signal.setitimer(signal.ITIMER_REAL, 0) 502 return False 503 def timeout(self, signum, frame): 504 raise Exception(self.errmsg) 505 506def file_pattern(name): 507 return "{0}-{1}".format(os.getpid(), name) 508 509class FilePath: 510 """ 511 Context manager generating multiple file names. The generated files are 512 removed when exiting the context. 513 514 Example usage: 515 516 with FilePath('a.img', 'b.img') as (img_a, img_b): 517 # Use img_a and img_b here... 518 519 # a.img and b.img are automatically removed here. 520 521 By default images are created in iotests.test_dir. To create sockets use 522 iotests.sock_dir: 523 524 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 525 526 For convenience, calling with one argument yields a single file instead of 527 a tuple with one item. 528 529 """ 530 def __init__(self, *names, base_dir=test_dir): 531 self.paths = [os.path.join(base_dir, file_pattern(name)) 532 for name in names] 533 534 def __enter__(self): 535 if len(self.paths) == 1: 536 return self.paths[0] 537 else: 538 return self.paths 539 540 def __exit__(self, exc_type, exc_val, exc_tb): 541 for path in self.paths: 542 try: 543 os.remove(path) 544 except OSError: 545 pass 546 return False 547 548 549def try_remove(img): 550 try: 551 os.remove(img) 552 except OSError: 553 pass 554 555def file_path_remover(): 556 for path in reversed(file_path_remover.paths): 557 try_remove(path) 558 559 560def file_path(*names, base_dir=test_dir): 561 ''' Another way to get auto-generated filename that cleans itself up. 562 563 Use is as simple as: 564 565 img_a, img_b = file_path('a.img', 'b.img') 566 sock = file_path('socket') 567 ''' 568 569 if not hasattr(file_path_remover, 'paths'): 570 file_path_remover.paths = [] 571 atexit.register(file_path_remover) 572 573 paths = [] 574 for name in names: 575 filename = file_pattern(name) 576 path = os.path.join(base_dir, filename) 577 file_path_remover.paths.append(path) 578 paths.append(path) 579 580 return paths[0] if len(paths) == 1 else paths 581 582def remote_filename(path): 583 if imgproto == 'file': 584 return path 585 elif imgproto == 'ssh': 586 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 587 else: 588 raise Exception("Protocol %s not supported" % (imgproto)) 589 590class VM(qtest.QEMUQtestMachine): 591 '''A QEMU VM''' 592 593 def __init__(self, path_suffix=''): 594 name = "qemu%s-%d" % (path_suffix, os.getpid()) 595 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 596 if qemu_gdb and qemu_valgrind: 597 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 598 sys.exit(1) 599 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 600 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 601 name=name, 602 base_temp_dir=test_dir, 603 socket_scm_helper=socket_scm_helper, 604 sock_dir=sock_dir, qmp_timer=timer) 605 self._num_drives = 0 606 607 def _post_shutdown(self) -> None: 608 super()._post_shutdown() 609 if not qemu_valgrind or not self._popen: 610 return 611 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 612 if self.exitcode() == 99: 613 with open(valgrind_filename) as f: 614 print(f.read()) 615 else: 616 os.remove(valgrind_filename) 617 618 def _pre_launch(self) -> None: 619 super()._pre_launch() 620 if qemu_print: 621 # set QEMU binary output to stdout 622 self._close_qemu_log_file() 623 624 def add_object(self, opts): 625 self._args.append('-object') 626 self._args.append(opts) 627 return self 628 629 def add_device(self, opts): 630 self._args.append('-device') 631 self._args.append(opts) 632 return self 633 634 def add_drive_raw(self, opts): 635 self._args.append('-drive') 636 self._args.append(opts) 637 return self 638 639 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 640 '''Add a virtio-blk drive to the VM''' 641 options = ['if=%s' % interface, 642 'id=drive%d' % self._num_drives] 643 644 if path is not None: 645 options.append('file=%s' % path) 646 options.append('format=%s' % img_format) 647 options.append('cache=%s' % cachemode) 648 options.append('aio=%s' % aiomode) 649 650 if opts: 651 options.append(opts) 652 653 if img_format == 'luks' and 'key-secret' not in opts: 654 # default luks support 655 if luks_default_secret_object not in self._args: 656 self.add_object(luks_default_secret_object) 657 658 options.append(luks_default_key_secret_opt) 659 660 self._args.append('-drive') 661 self._args.append(','.join(options)) 662 self._num_drives += 1 663 return self 664 665 def add_blockdev(self, opts): 666 self._args.append('-blockdev') 667 if isinstance(opts, str): 668 self._args.append(opts) 669 else: 670 self._args.append(','.join(opts)) 671 return self 672 673 def add_incoming(self, addr): 674 self._args.append('-incoming') 675 self._args.append(addr) 676 return self 677 678 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 679 cmd = 'human-monitor-command' 680 kwargs: Dict[str, Any] = {'command-line': command_line} 681 if use_log: 682 return self.qmp_log(cmd, **kwargs) 683 else: 684 return self.qmp(cmd, **kwargs) 685 686 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 687 """Pause drive r/w operations""" 688 if not event: 689 self.pause_drive(drive, "read_aio") 690 self.pause_drive(drive, "write_aio") 691 return 692 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 693 694 def resume_drive(self, drive: str) -> None: 695 """Resume drive r/w operations""" 696 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 697 698 def hmp_qemu_io(self, drive: str, cmd: str, 699 use_log: bool = False) -> QMPMessage: 700 """Write to a given drive using an HMP command""" 701 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 702 703 def flatten_qmp_object(self, obj, output=None, basestr=''): 704 if output is None: 705 output = dict() 706 if isinstance(obj, list): 707 for i, item in enumerate(obj): 708 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 709 elif isinstance(obj, dict): 710 for key in obj: 711 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 712 else: 713 output[basestr[:-1]] = obj # Strip trailing '.' 714 return output 715 716 def qmp_to_opts(self, obj): 717 obj = self.flatten_qmp_object(obj) 718 output_list = list() 719 for key in obj: 720 output_list += [key + '=' + obj[key]] 721 return ','.join(output_list) 722 723 def get_qmp_events_filtered(self, wait=60.0): 724 result = [] 725 for ev in self.get_qmp_events(wait=wait): 726 result.append(filter_qmp_event(ev)) 727 return result 728 729 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 730 full_cmd = OrderedDict(( 731 ("execute", cmd), 732 ("arguments", ordered_qmp(kwargs)) 733 )) 734 log(full_cmd, filters, indent=indent) 735 result = self.qmp(cmd, **kwargs) 736 log(result, filters, indent=indent) 737 return result 738 739 # Returns None on success, and an error string on failure 740 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 741 pre_finalize=None, cancel=False, wait=60.0): 742 """ 743 run_job moves a job from creation through to dismissal. 744 745 :param job: String. ID of recently-launched job 746 :param auto_finalize: Bool. True if the job was launched with 747 auto_finalize. Defaults to True. 748 :param auto_dismiss: Bool. True if the job was launched with 749 auto_dismiss=True. Defaults to False. 750 :param pre_finalize: Callback. A callable that takes no arguments to be 751 invoked prior to issuing job-finalize, if any. 752 :param cancel: Bool. When true, cancels the job after the pre_finalize 753 callback. 754 :param wait: Float. Timeout value specifying how long to wait for any 755 event, in seconds. Defaults to 60.0. 756 """ 757 match_device = {'data': {'device': job}} 758 match_id = {'data': {'id': job}} 759 events = [ 760 ('BLOCK_JOB_COMPLETED', match_device), 761 ('BLOCK_JOB_CANCELLED', match_device), 762 ('BLOCK_JOB_ERROR', match_device), 763 ('BLOCK_JOB_READY', match_device), 764 ('BLOCK_JOB_PENDING', match_id), 765 ('JOB_STATUS_CHANGE', match_id) 766 ] 767 error = None 768 while True: 769 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 770 if ev['event'] != 'JOB_STATUS_CHANGE': 771 log(ev) 772 continue 773 status = ev['data']['status'] 774 if status == 'aborting': 775 result = self.qmp('query-jobs') 776 for j in result['return']: 777 if j['id'] == job: 778 error = j['error'] 779 log('Job failed: %s' % (j['error'])) 780 elif status == 'ready': 781 self.qmp_log('job-complete', id=job) 782 elif status == 'pending' and not auto_finalize: 783 if pre_finalize: 784 pre_finalize() 785 if cancel: 786 self.qmp_log('job-cancel', id=job) 787 else: 788 self.qmp_log('job-finalize', id=job) 789 elif status == 'concluded' and not auto_dismiss: 790 self.qmp_log('job-dismiss', id=job) 791 elif status == 'null': 792 return error 793 794 # Returns None on success, and an error string on failure 795 def blockdev_create(self, options, job_id='job0', filters=None): 796 if filters is None: 797 filters = [filter_qmp_testfiles] 798 result = self.qmp_log('blockdev-create', filters=filters, 799 job_id=job_id, options=options) 800 801 if 'return' in result: 802 assert result['return'] == {} 803 job_result = self.run_job(job_id) 804 else: 805 job_result = result['error'] 806 807 log("") 808 return job_result 809 810 def enable_migration_events(self, name): 811 log('Enabling migration QMP events on %s...' % name) 812 log(self.qmp('migrate-set-capabilities', capabilities=[ 813 { 814 'capability': 'events', 815 'state': True 816 } 817 ])) 818 819 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 820 while True: 821 event = self.event_wait('MIGRATION') 822 # We use the default timeout, and with a timeout, event_wait() 823 # never returns None 824 assert event 825 826 log(event, filters=[filter_qmp_event]) 827 if event['data']['status'] in ('completed', 'failed'): 828 break 829 830 if event['data']['status'] == 'completed': 831 # The event may occur in finish-migrate, so wait for the expected 832 # post-migration runstate 833 runstate = None 834 while runstate != expect_runstate: 835 runstate = self.qmp('query-status')['return']['status'] 836 return True 837 else: 838 return False 839 840 def node_info(self, node_name): 841 nodes = self.qmp('query-named-block-nodes') 842 for x in nodes['return']: 843 if x['node-name'] == node_name: 844 return x 845 return None 846 847 def query_bitmaps(self): 848 res = self.qmp("query-named-block-nodes") 849 return {device['node-name']: device['dirty-bitmaps'] 850 for device in res['return'] if 'dirty-bitmaps' in device} 851 852 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 853 """ 854 get a specific bitmap from the object returned by query_bitmaps. 855 :param recording: If specified, filter results by the specified value. 856 :param bitmaps: If specified, use it instead of call query_bitmaps() 857 """ 858 if bitmaps is None: 859 bitmaps = self.query_bitmaps() 860 861 for bitmap in bitmaps[node_name]: 862 if bitmap.get('name', '') == bitmap_name: 863 if recording is None or bitmap.get('recording') == recording: 864 return bitmap 865 return None 866 867 def check_bitmap_status(self, node_name, bitmap_name, fields): 868 ret = self.get_bitmap(node_name, bitmap_name) 869 870 return fields.items() <= ret.items() 871 872 def assert_block_path(self, root, path, expected_node, graph=None): 873 """ 874 Check whether the node under the given path in the block graph 875 is @expected_node. 876 877 @root is the node name of the node where the @path is rooted. 878 879 @path is a string that consists of child names separated by 880 slashes. It must begin with a slash. 881 882 Examples for @root + @path: 883 - root="qcow2-node", path="/backing/file" 884 - root="quorum-node", path="/children.2/file" 885 886 Hypothetically, @path could be empty, in which case it would 887 point to @root. However, in practice this case is not useful 888 and hence not allowed. 889 890 @expected_node may be None. (All elements of the path but the 891 leaf must still exist.) 892 893 @graph may be None or the result of an x-debug-query-block-graph 894 call that has already been performed. 895 """ 896 if graph is None: 897 graph = self.qmp('x-debug-query-block-graph')['return'] 898 899 iter_path = iter(path.split('/')) 900 901 # Must start with a / 902 assert next(iter_path) == '' 903 904 node = next((node for node in graph['nodes'] if node['name'] == root), 905 None) 906 907 # An empty @path is not allowed, so the root node must be present 908 assert node is not None, 'Root node %s not found' % root 909 910 for child_name in iter_path: 911 assert node is not None, 'Cannot follow path %s%s' % (root, path) 912 913 try: 914 node_id = next(edge['child'] for edge in graph['edges'] 915 if (edge['parent'] == node['id'] and 916 edge['name'] == child_name)) 917 918 node = next(node for node in graph['nodes'] 919 if node['id'] == node_id) 920 921 except StopIteration: 922 node = None 923 924 if node is None: 925 assert expected_node is None, \ 926 'No node found under %s (but expected %s)' % \ 927 (path, expected_node) 928 else: 929 assert node['name'] == expected_node, \ 930 'Found node %s under %s (but expected %s)' % \ 931 (node['name'], path, expected_node) 932 933index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 934 935class QMPTestCase(unittest.TestCase): 936 '''Abstract base class for QMP test cases''' 937 938 def __init__(self, *args, **kwargs): 939 super().__init__(*args, **kwargs) 940 # Many users of this class set a VM property we rely on heavily 941 # in the methods below. 942 self.vm = None 943 944 def dictpath(self, d, path): 945 '''Traverse a path in a nested dict''' 946 for component in path.split('/'): 947 m = index_re.match(component) 948 if m: 949 component, idx = m.groups() 950 idx = int(idx) 951 952 if not isinstance(d, dict) or component not in d: 953 self.fail(f'failed path traversal for "{path}" in "{d}"') 954 d = d[component] 955 956 if m: 957 if not isinstance(d, list): 958 self.fail(f'path component "{component}" in "{path}" ' 959 f'is not a list in "{d}"') 960 try: 961 d = d[idx] 962 except IndexError: 963 self.fail(f'invalid index "{idx}" in path "{path}" ' 964 f'in "{d}"') 965 return d 966 967 def assert_qmp_absent(self, d, path): 968 try: 969 result = self.dictpath(d, path) 970 except AssertionError: 971 return 972 self.fail('path "%s" has value "%s"' % (path, str(result))) 973 974 def assert_qmp(self, d, path, value): 975 '''Assert that the value for a specific path in a QMP dict 976 matches. When given a list of values, assert that any of 977 them matches.''' 978 979 result = self.dictpath(d, path) 980 981 # [] makes no sense as a list of valid values, so treat it as 982 # an actual single value. 983 if isinstance(value, list) and value != []: 984 for v in value: 985 if result == v: 986 return 987 self.fail('no match for "%s" in %s' % (str(result), str(value))) 988 else: 989 self.assertEqual(result, value, 990 '"%s" is "%s", expected "%s"' 991 % (path, str(result), str(value))) 992 993 def assert_no_active_block_jobs(self): 994 result = self.vm.qmp('query-block-jobs') 995 self.assert_qmp(result, 'return', []) 996 997 def assert_has_block_node(self, node_name=None, file_name=None): 998 """Issue a query-named-block-nodes and assert node_name and/or 999 file_name is present in the result""" 1000 def check_equal_or_none(a, b): 1001 return a is None or b is None or a == b 1002 assert node_name or file_name 1003 result = self.vm.qmp('query-named-block-nodes') 1004 for x in result["return"]: 1005 if check_equal_or_none(x.get("node-name"), node_name) and \ 1006 check_equal_or_none(x.get("file"), file_name): 1007 return 1008 self.fail("Cannot find %s %s in result:\n%s" % 1009 (node_name, file_name, result)) 1010 1011 def assert_json_filename_equal(self, json_filename, reference): 1012 '''Asserts that the given filename is a json: filename and that its 1013 content is equal to the given reference object''' 1014 self.assertEqual(json_filename[:5], 'json:') 1015 self.assertEqual( 1016 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1017 self.vm.flatten_qmp_object(reference) 1018 ) 1019 1020 def cancel_and_wait(self, drive='drive0', force=False, 1021 resume=False, wait=60.0): 1022 '''Cancel a block job and wait for it to finish, returning the event''' 1023 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1024 self.assert_qmp(result, 'return', {}) 1025 1026 if resume: 1027 self.vm.resume_drive(drive) 1028 1029 cancelled = False 1030 result = None 1031 while not cancelled: 1032 for event in self.vm.get_qmp_events(wait=wait): 1033 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1034 event['event'] == 'BLOCK_JOB_CANCELLED': 1035 self.assert_qmp(event, 'data/device', drive) 1036 result = event 1037 cancelled = True 1038 elif event['event'] == 'JOB_STATUS_CHANGE': 1039 self.assert_qmp(event, 'data/id', drive) 1040 1041 1042 self.assert_no_active_block_jobs() 1043 return result 1044 1045 def wait_until_completed(self, drive='drive0', check_offset=True, 1046 wait=60.0, error=None): 1047 '''Wait for a block job to finish, returning the event''' 1048 while True: 1049 for event in self.vm.get_qmp_events(wait=wait): 1050 if event['event'] == 'BLOCK_JOB_COMPLETED': 1051 self.assert_qmp(event, 'data/device', drive) 1052 if error is None: 1053 self.assert_qmp_absent(event, 'data/error') 1054 if check_offset: 1055 self.assert_qmp(event, 'data/offset', 1056 event['data']['len']) 1057 else: 1058 self.assert_qmp(event, 'data/error', error) 1059 self.assert_no_active_block_jobs() 1060 return event 1061 if event['event'] == 'JOB_STATUS_CHANGE': 1062 self.assert_qmp(event, 'data/id', drive) 1063 1064 def wait_ready(self, drive='drive0'): 1065 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1066 return self.vm.events_wait([ 1067 ('BLOCK_JOB_READY', 1068 {'data': {'type': 'mirror', 'device': drive}}), 1069 ('BLOCK_JOB_READY', 1070 {'data': {'type': 'commit', 'device': drive}}) 1071 ]) 1072 1073 def wait_ready_and_cancel(self, drive='drive0'): 1074 self.wait_ready(drive=drive) 1075 event = self.cancel_and_wait(drive=drive) 1076 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1077 self.assert_qmp(event, 'data/type', 'mirror') 1078 self.assert_qmp(event, 'data/offset', event['data']['len']) 1079 1080 def complete_and_wait(self, drive='drive0', wait_ready=True, 1081 completion_error=None): 1082 '''Complete a block job and wait for it to finish''' 1083 if wait_ready: 1084 self.wait_ready(drive=drive) 1085 1086 result = self.vm.qmp('block-job-complete', device=drive) 1087 self.assert_qmp(result, 'return', {}) 1088 1089 event = self.wait_until_completed(drive=drive, error=completion_error) 1090 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1091 1092 def pause_wait(self, job_id='job0'): 1093 with Timeout(3, "Timeout waiting for job to pause"): 1094 while True: 1095 result = self.vm.qmp('query-block-jobs') 1096 found = False 1097 for job in result['return']: 1098 if job['device'] == job_id: 1099 found = True 1100 if job['paused'] and not job['busy']: 1101 return job 1102 break 1103 assert found 1104 1105 def pause_job(self, job_id='job0', wait=True): 1106 result = self.vm.qmp('block-job-pause', device=job_id) 1107 self.assert_qmp(result, 'return', {}) 1108 if wait: 1109 return self.pause_wait(job_id) 1110 return result 1111 1112 def case_skip(self, reason): 1113 '''Skip this test case''' 1114 case_notrun(reason) 1115 self.skipTest(reason) 1116 1117 1118def notrun(reason): 1119 '''Skip this test suite''' 1120 # Each test in qemu-iotests has a number ("seq") 1121 seq = os.path.basename(sys.argv[0]) 1122 1123 with open('%s/%s.notrun' % (output_dir, seq), 'w') as outfile: 1124 outfile.write(reason + '\n') 1125 logger.warning("%s not run: %s", seq, reason) 1126 sys.exit(0) 1127 1128def case_notrun(reason): 1129 '''Mark this test case as not having been run (without actually 1130 skipping it, that is left to the caller). See 1131 QMPTestCase.case_skip() for a variant that actually skips the 1132 current test case.''' 1133 1134 # Each test in qemu-iotests has a number ("seq") 1135 seq = os.path.basename(sys.argv[0]) 1136 1137 with open('%s/%s.casenotrun' % (output_dir, seq), 'a') as outfile: 1138 outfile.write(' [case not run] ' + reason + '\n') 1139 1140def _verify_image_format(supported_fmts: Sequence[str] = (), 1141 unsupported_fmts: Sequence[str] = ()) -> None: 1142 if 'generic' in supported_fmts and \ 1143 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1144 # similar to 1145 # _supported_fmt generic 1146 # for bash tests 1147 supported_fmts = () 1148 1149 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1150 if not_sup or (imgfmt in unsupported_fmts): 1151 notrun('not suitable for this image format: %s' % imgfmt) 1152 1153 if imgfmt == 'luks': 1154 verify_working_luks() 1155 1156def _verify_protocol(supported: Sequence[str] = (), 1157 unsupported: Sequence[str] = ()) -> None: 1158 assert not (supported and unsupported) 1159 1160 if 'generic' in supported: 1161 return 1162 1163 not_sup = supported and (imgproto not in supported) 1164 if not_sup or (imgproto in unsupported): 1165 notrun('not suitable for this protocol: %s' % imgproto) 1166 1167def _verify_platform(supported: Sequence[str] = (), 1168 unsupported: Sequence[str] = ()) -> None: 1169 if any((sys.platform.startswith(x) for x in unsupported)): 1170 notrun('not suitable for this OS: %s' % sys.platform) 1171 1172 if supported: 1173 if not any((sys.platform.startswith(x) for x in supported)): 1174 notrun('not suitable for this OS: %s' % sys.platform) 1175 1176def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1177 if supported_cache_modes and (cachemode not in supported_cache_modes): 1178 notrun('not suitable for this cache mode: %s' % cachemode) 1179 1180def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1181 if supported_aio_modes and (aiomode not in supported_aio_modes): 1182 notrun('not suitable for this aio mode: %s' % aiomode) 1183 1184def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1185 usf_list = list(set(required_formats) - set(supported_formats())) 1186 if usf_list: 1187 notrun(f'formats {usf_list} are not whitelisted') 1188 1189 1190def _verify_virtio_blk() -> None: 1191 out = qemu_pipe('-M', 'none', '-device', 'help') 1192 if 'virtio-blk' not in out: 1193 notrun('Missing virtio-blk in QEMU binary') 1194 1195def _verify_virtio_scsi_pci_or_ccw() -> None: 1196 out = qemu_pipe('-M', 'none', '-device', 'help') 1197 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1198 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1199 1200 1201def supports_quorum(): 1202 return 'quorum' in qemu_img_pipe('--help') 1203 1204def verify_quorum(): 1205 '''Skip test suite if quorum support is not available''' 1206 if not supports_quorum(): 1207 notrun('quorum support missing') 1208 1209def has_working_luks() -> Tuple[bool, str]: 1210 """ 1211 Check whether our LUKS driver can actually create images 1212 (this extends to LUKS encryption for qcow2). 1213 1214 If not, return the reason why. 1215 """ 1216 1217 img_file = f'{test_dir}/luks-test.luks' 1218 (output, status) = \ 1219 qemu_img_pipe_and_status('create', '-f', 'luks', 1220 '--object', luks_default_secret_object, 1221 '-o', luks_default_key_secret_opt, 1222 '-o', 'iter-time=10', 1223 img_file, '1G') 1224 try: 1225 os.remove(img_file) 1226 except OSError: 1227 pass 1228 1229 if status != 0: 1230 reason = output 1231 for line in output.splitlines(): 1232 if img_file + ':' in line: 1233 reason = line.split(img_file + ':', 1)[1].strip() 1234 break 1235 1236 return (False, reason) 1237 else: 1238 return (True, '') 1239 1240def verify_working_luks(): 1241 """ 1242 Skip test suite if LUKS does not work 1243 """ 1244 (working, reason) = has_working_luks() 1245 if not working: 1246 notrun(reason) 1247 1248def qemu_pipe(*args: str) -> str: 1249 """ 1250 Run qemu with an option to print something and exit (e.g. a help option). 1251 1252 :return: QEMU's stdout output. 1253 """ 1254 full_args = [qemu_prog] + qemu_opts + list(args) 1255 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1256 return output 1257 1258def supported_formats(read_only=False): 1259 '''Set 'read_only' to True to check ro-whitelist 1260 Otherwise, rw-whitelist is checked''' 1261 1262 if not hasattr(supported_formats, "formats"): 1263 supported_formats.formats = {} 1264 1265 if read_only not in supported_formats.formats: 1266 format_message = qemu_pipe("-drive", "format=help") 1267 line = 1 if read_only else 0 1268 supported_formats.formats[read_only] = \ 1269 format_message.splitlines()[line].split(":")[1].split() 1270 1271 return supported_formats.formats[read_only] 1272 1273def skip_if_unsupported(required_formats=(), read_only=False): 1274 '''Skip Test Decorator 1275 Runs the test if all the required formats are whitelisted''' 1276 def skip_test_decorator(func): 1277 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1278 **kwargs: Dict[str, Any]) -> None: 1279 if callable(required_formats): 1280 fmts = required_formats(test_case) 1281 else: 1282 fmts = required_formats 1283 1284 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1285 if usf_list: 1286 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1287 test_case.case_skip(msg) 1288 else: 1289 func(test_case, *args, **kwargs) 1290 return func_wrapper 1291 return skip_test_decorator 1292 1293def skip_for_formats(formats: Sequence[str] = ()) \ 1294 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1295 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1296 '''Skip Test Decorator 1297 Skips the test for the given formats''' 1298 def skip_test_decorator(func): 1299 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1300 **kwargs: Dict[str, Any]) -> None: 1301 if imgfmt in formats: 1302 msg = f'{test_case}: Skipped for format {imgfmt}' 1303 test_case.case_skip(msg) 1304 else: 1305 func(test_case, *args, **kwargs) 1306 return func_wrapper 1307 return skip_test_decorator 1308 1309def skip_if_user_is_root(func): 1310 '''Skip Test Decorator 1311 Runs the test only without root permissions''' 1312 def func_wrapper(*args, **kwargs): 1313 if os.getuid() == 0: 1314 case_notrun('{}: cannot be run as root'.format(args[0])) 1315 return None 1316 else: 1317 return func(*args, **kwargs) 1318 return func_wrapper 1319 1320# We need to filter out the time taken from the output so that 1321# qemu-iotest can reliably diff the results against master output, 1322# and hide skipped tests from the reference output. 1323 1324class ReproducibleTestResult(unittest.TextTestResult): 1325 def addSkip(self, test, reason): 1326 # Same as TextTestResult, but print dot instead of "s" 1327 unittest.TestResult.addSkip(self, test, reason) 1328 if self.showAll: 1329 self.stream.writeln("skipped {0!r}".format(reason)) 1330 elif self.dots: 1331 self.stream.write(".") 1332 self.stream.flush() 1333 1334class ReproducibleStreamWrapper: 1335 def __init__(self, stream: TextIO): 1336 self.stream = stream 1337 1338 def __getattr__(self, attr): 1339 if attr in ('stream', '__getstate__'): 1340 raise AttributeError(attr) 1341 return getattr(self.stream, attr) 1342 1343 def write(self, arg=None): 1344 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1345 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1346 self.stream.write(arg) 1347 1348class ReproducibleTestRunner(unittest.TextTestRunner): 1349 def __init__(self, stream: Optional[TextIO] = None, 1350 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1351 **kwargs: Any) -> None: 1352 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1353 super().__init__(stream=rstream, # type: ignore 1354 descriptions=True, 1355 resultclass=resultclass, 1356 **kwargs) 1357 1358def execute_unittest(argv: List[str], debug: bool = False) -> None: 1359 """Executes unittests within the calling module.""" 1360 1361 # Some tests have warnings, especially ResourceWarnings for unclosed 1362 # files and sockets. Ignore them for now to ensure reproducibility of 1363 # the test output. 1364 unittest.main(argv=argv, 1365 testRunner=ReproducibleTestRunner, 1366 verbosity=2 if debug else 1, 1367 warnings=None if sys.warnoptions else 'ignore') 1368 1369def execute_setup_common(supported_fmts: Sequence[str] = (), 1370 supported_platforms: Sequence[str] = (), 1371 supported_cache_modes: Sequence[str] = (), 1372 supported_aio_modes: Sequence[str] = (), 1373 unsupported_fmts: Sequence[str] = (), 1374 supported_protocols: Sequence[str] = (), 1375 unsupported_protocols: Sequence[str] = (), 1376 required_fmts: Sequence[str] = ()) -> bool: 1377 """ 1378 Perform necessary setup for either script-style or unittest-style tests. 1379 1380 :return: Bool; Whether or not debug mode has been requested via the CLI. 1381 """ 1382 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1383 1384 debug = '-d' in sys.argv 1385 if debug: 1386 sys.argv.remove('-d') 1387 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1388 1389 _verify_image_format(supported_fmts, unsupported_fmts) 1390 _verify_protocol(supported_protocols, unsupported_protocols) 1391 _verify_platform(supported=supported_platforms) 1392 _verify_cache_mode(supported_cache_modes) 1393 _verify_aio_mode(supported_aio_modes) 1394 _verify_formats(required_fmts) 1395 _verify_virtio_blk() 1396 1397 return debug 1398 1399def execute_test(*args, test_function=None, **kwargs): 1400 """Run either unittest or script-style tests.""" 1401 1402 debug = execute_setup_common(*args, **kwargs) 1403 if not test_function: 1404 execute_unittest(sys.argv, debug) 1405 else: 1406 test_function() 1407 1408def activate_logging(): 1409 """Activate iotests.log() output to stdout for script-style tests.""" 1410 handler = logging.StreamHandler(stream=sys.stdout) 1411 formatter = logging.Formatter('%(message)s') 1412 handler.setFormatter(formatter) 1413 test_logger.addHandler(handler) 1414 test_logger.setLevel(logging.INFO) 1415 test_logger.propagate = False 1416 1417# This is called from script-style iotests without a single point of entry 1418def script_initialize(*args, **kwargs): 1419 """Initialize script-style tests without running any tests.""" 1420 activate_logging() 1421 execute_setup_common(*args, **kwargs) 1422 1423# This is called from script-style iotests with a single point of entry 1424def script_main(test_function, *args, **kwargs): 1425 """Run script-style tests outside of the unittest framework""" 1426 activate_logging() 1427 execute_test(*args, test_function=test_function, **kwargs) 1428 1429# This is called from unittest style iotests 1430def main(*args, **kwargs): 1431 """Run tests using the unittest framework""" 1432 execute_test(*args, **kwargs) 1433