1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39# pylint: disable=import-error, wrong-import-position 40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 41from qemu.machine import qtest 42from qemu.qmp import QMPMessage 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77gdb_qemu_env = os.environ.get('GDB_OPTIONS') 78qemu_gdb = [] 79if gdb_qemu_env: 80 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 81 82qemu_print = os.environ.get('PRINT_QEMU', False) 83 84imgfmt = os.environ.get('IMGFMT', 'raw') 85imgproto = os.environ.get('IMGPROTO', 'file') 86output_dir = os.environ.get('OUTPUT_DIR', '.') 87 88try: 89 test_dir = os.environ['TEST_DIR'] 90 sock_dir = os.environ['SOCK_DIR'] 91 cachemode = os.environ['CACHEMODE'] 92 aiomode = os.environ['AIOMODE'] 93 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 94except KeyError: 95 # We are using these variables as proxies to indicate that we're 96 # not being run via "check". There may be other things set up by 97 # "check" that individual test cases rely on. 98 sys.stderr.write('Please run this test via the "check" script\n') 99 sys.exit(os.EX_USAGE) 100 101qemu_valgrind = [] 102if os.environ.get('VALGRIND_QEMU') == "y" and \ 103 os.environ.get('NO_VALGRIND') != "y": 104 valgrind_logfile = "--log-file=" + test_dir 105 # %p allows to put the valgrind process PID, since 106 # we don't know it a priori (subprocess.Popen is 107 # not yet invoked) 108 valgrind_logfile += "/%p.valgrind" 109 110 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 111 112socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 113 114luks_default_secret_object = 'secret,id=keysec0,data=' + \ 115 os.environ.get('IMGKEYSECRET', '') 116luks_default_key_secret_opt = 'key-secret=keysec0' 117 118sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 119 120 121def unarchive_sample_image(sample, fname): 122 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 123 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 124 shutil.copyfileobj(f_in, f_out) 125 126 127def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 128 connect_stderr: bool = True) -> Tuple[str, int]: 129 """ 130 Run a tool and return both its output and its exit code 131 """ 132 stderr = subprocess.STDOUT if connect_stderr else None 133 with subprocess.Popen(args, stdout=subprocess.PIPE, 134 stderr=stderr, universal_newlines=True) as subp: 135 output = subp.communicate()[0] 136 if subp.returncode < 0: 137 cmd = ' '.join(args) 138 sys.stderr.write(f'{tool} received signal \ 139 {-subp.returncode}: {cmd}\n') 140 return (output, subp.returncode) 141 142def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 143 """ 144 Run qemu-img and return both its output and its exit code 145 """ 146 full_args = qemu_img_args + list(args) 147 return qemu_tool_pipe_and_status('qemu-img', full_args) 148 149def qemu_img(*args: str) -> int: 150 '''Run qemu-img and return the exit code''' 151 return qemu_img_pipe_and_status(*args)[1] 152 153def ordered_qmp(qmsg, conv_keys=True): 154 # Dictionaries are not ordered prior to 3.6, therefore: 155 if isinstance(qmsg, list): 156 return [ordered_qmp(atom) for atom in qmsg] 157 if isinstance(qmsg, dict): 158 od = OrderedDict() 159 for k, v in sorted(qmsg.items()): 160 if conv_keys: 161 k = k.replace('_', '-') 162 od[k] = ordered_qmp(v, conv_keys=False) 163 return od 164 return qmsg 165 166def qemu_img_create(*args): 167 args = list(args) 168 169 # default luks support 170 if '-f' in args and args[args.index('-f') + 1] == 'luks': 171 if '-o' in args: 172 i = args.index('-o') 173 if 'key-secret' not in args[i + 1]: 174 args[i + 1].append(luks_default_key_secret_opt) 175 args.insert(i + 2, '--object') 176 args.insert(i + 3, luks_default_secret_object) 177 else: 178 args = ['-o', luks_default_key_secret_opt, 179 '--object', luks_default_secret_object] + args 180 181 args.insert(0, 'create') 182 183 return qemu_img(*args) 184 185def qemu_img_measure(*args): 186 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 187 188def qemu_img_check(*args): 189 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 190 191def qemu_img_verbose(*args): 192 '''Run qemu-img without suppressing its output and return the exit code''' 193 exitcode = subprocess.call(qemu_img_args + list(args)) 194 if exitcode < 0: 195 sys.stderr.write('qemu-img received signal %i: %s\n' 196 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 197 return exitcode 198 199def qemu_img_pipe(*args: str) -> str: 200 '''Run qemu-img and return its output''' 201 return qemu_img_pipe_and_status(*args)[0] 202 203def qemu_img_log(*args): 204 result = qemu_img_pipe(*args) 205 log(result, filters=[filter_testfiles]) 206 return result 207 208def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 209 args = ['info'] 210 if imgopts: 211 args.append('--image-opts') 212 else: 213 args += ['-f', imgfmt] 214 args += extra_args 215 args.append(filename) 216 217 output = qemu_img_pipe(*args) 218 if not filter_path: 219 filter_path = filename 220 log(filter_img_info(output, filter_path)) 221 222def qemu_io(*args): 223 '''Run qemu-io and return the stdout data''' 224 args = qemu_io_args + list(args) 225 return qemu_tool_pipe_and_status('qemu-io', args)[0] 226 227def qemu_io_log(*args): 228 result = qemu_io(*args) 229 log(result, filters=[filter_testfiles, filter_qemu_io]) 230 return result 231 232def qemu_io_silent(*args): 233 '''Run qemu-io and return the exit code, suppressing stdout''' 234 if '-f' in args or '--image-opts' in args: 235 default_args = qemu_io_args_no_fmt 236 else: 237 default_args = qemu_io_args 238 239 args = default_args + list(args) 240 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 241 if result.returncode < 0: 242 sys.stderr.write('qemu-io received signal %i: %s\n' % 243 (-result.returncode, ' '.join(args))) 244 return result.returncode 245 246def qemu_io_silent_check(*args): 247 '''Run qemu-io and return the true if subprocess returned 0''' 248 args = qemu_io_args + list(args) 249 result = subprocess.run(args, stdout=subprocess.DEVNULL, 250 stderr=subprocess.STDOUT, check=False) 251 return result.returncode == 0 252 253class QemuIoInteractive: 254 def __init__(self, *args): 255 self.args = qemu_io_args_no_fmt + list(args) 256 # We need to keep the Popen objext around, and not 257 # close it immediately. Therefore, disable the pylint check: 258 # pylint: disable=consider-using-with 259 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 260 stdout=subprocess.PIPE, 261 stderr=subprocess.STDOUT, 262 universal_newlines=True) 263 out = self._p.stdout.read(9) 264 if out != 'qemu-io> ': 265 # Most probably qemu-io just failed to start. 266 # Let's collect the whole output and exit. 267 out += self._p.stdout.read() 268 self._p.wait(timeout=1) 269 raise ValueError(out) 270 271 def close(self): 272 self._p.communicate('q\n') 273 274 def _read_output(self): 275 pattern = 'qemu-io> ' 276 n = len(pattern) 277 pos = 0 278 s = [] 279 while pos != n: 280 c = self._p.stdout.read(1) 281 # check unexpected EOF 282 assert c != '' 283 s.append(c) 284 if c == pattern[pos]: 285 pos += 1 286 else: 287 pos = 0 288 289 return ''.join(s[:-n]) 290 291 def cmd(self, cmd): 292 # quit command is in close(), '\n' is added automatically 293 assert '\n' not in cmd 294 cmd = cmd.strip() 295 assert cmd not in ('q', 'quit') 296 self._p.stdin.write(cmd + '\n') 297 self._p.stdin.flush() 298 return self._read_output() 299 300 301def qemu_nbd(*args): 302 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 303 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 304 305def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 306 '''Run qemu-nbd in daemon mode and return both the parent's exit code 307 and its output in case of an error''' 308 full_args = qemu_nbd_args + ['--fork'] + list(args) 309 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 310 connect_stderr=False) 311 return returncode, output if returncode else '' 312 313def qemu_nbd_list_log(*args: str) -> str: 314 '''Run qemu-nbd to list remote exports''' 315 full_args = [qemu_nbd_prog, '-L'] + list(args) 316 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 317 log(output, filters=[filter_testfiles, filter_nbd_exports]) 318 return output 319 320@contextmanager 321def qemu_nbd_popen(*args): 322 '''Context manager running qemu-nbd within the context''' 323 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 324 325 assert not os.path.exists(pid_file) 326 327 cmd = list(qemu_nbd_args) 328 cmd.extend(('--persistent', '--pid-file', pid_file)) 329 cmd.extend(args) 330 331 log('Start NBD server') 332 with subprocess.Popen(cmd) as p: 333 try: 334 while not os.path.exists(pid_file): 335 if p.poll() is not None: 336 raise RuntimeError( 337 "qemu-nbd terminated with exit code {}: {}" 338 .format(p.returncode, ' '.join(cmd))) 339 340 time.sleep(0.01) 341 yield 342 finally: 343 if os.path.exists(pid_file): 344 os.remove(pid_file) 345 log('Kill NBD server') 346 p.kill() 347 p.wait() 348 349def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 350 '''Return True if two image files are identical''' 351 return qemu_img('compare', '-f', fmt1, 352 '-F', fmt2, img1, img2) == 0 353 354def create_image(name, size): 355 '''Create a fully-allocated raw image with sector markers''' 356 with open(name, 'wb') as file: 357 i = 0 358 while i < size: 359 sector = struct.pack('>l504xl', i // 512, i // 512) 360 file.write(sector) 361 i = i + 512 362 363def image_size(img): 364 '''Return image's virtual size''' 365 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 366 return json.loads(r)['virtual-size'] 367 368def is_str(val): 369 return isinstance(val, str) 370 371test_dir_re = re.compile(r"%s" % test_dir) 372def filter_test_dir(msg): 373 return test_dir_re.sub("TEST_DIR", msg) 374 375win32_re = re.compile(r"\r") 376def filter_win32(msg): 377 return win32_re.sub("", msg) 378 379qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 380 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 381 r"and [0-9\/.inf]* ops\/sec\)") 382def filter_qemu_io(msg): 383 msg = filter_win32(msg) 384 return qemu_io_re.sub("X ops; XX:XX:XX.X " 385 "(XXX YYY/sec and XXX ops/sec)", msg) 386 387chown_re = re.compile(r"chown [0-9]+:[0-9]+") 388def filter_chown(msg): 389 return chown_re.sub("chown UID:GID", msg) 390 391def filter_qmp_event(event): 392 '''Filter a QMP event dict''' 393 event = dict(event) 394 if 'timestamp' in event: 395 event['timestamp']['seconds'] = 'SECS' 396 event['timestamp']['microseconds'] = 'USECS' 397 return event 398 399def filter_qmp(qmsg, filter_fn): 400 '''Given a string filter, filter a QMP object's values. 401 filter_fn takes a (key, value) pair.''' 402 # Iterate through either lists or dicts; 403 if isinstance(qmsg, list): 404 items = enumerate(qmsg) 405 else: 406 items = qmsg.items() 407 408 for k, v in items: 409 if isinstance(v, (dict, list)): 410 qmsg[k] = filter_qmp(v, filter_fn) 411 else: 412 qmsg[k] = filter_fn(k, v) 413 return qmsg 414 415def filter_testfiles(msg): 416 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 417 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 418 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 419 420def filter_qmp_testfiles(qmsg): 421 def _filter(_key, value): 422 if is_str(value): 423 return filter_testfiles(value) 424 return value 425 return filter_qmp(qmsg, _filter) 426 427def filter_virtio_scsi(output: str) -> str: 428 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 429 430def filter_qmp_virtio_scsi(qmsg): 431 def _filter(_key, value): 432 if is_str(value): 433 return filter_virtio_scsi(value) 434 return value 435 return filter_qmp(qmsg, _filter) 436 437def filter_generated_node_ids(msg): 438 return re.sub("#block[0-9]+", "NODE_NAME", msg) 439 440def filter_img_info(output, filename): 441 lines = [] 442 for line in output.split('\n'): 443 if 'disk size' in line or 'actual-size' in line: 444 continue 445 line = line.replace(filename, 'TEST_IMG') 446 line = filter_testfiles(line) 447 line = line.replace(imgfmt, 'IMGFMT') 448 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 449 line = re.sub('uuid: [-a-f0-9]+', 450 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 451 line) 452 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 453 lines.append(line) 454 return '\n'.join(lines) 455 456def filter_imgfmt(msg): 457 return msg.replace(imgfmt, 'IMGFMT') 458 459def filter_qmp_imgfmt(qmsg): 460 def _filter(_key, value): 461 if is_str(value): 462 return filter_imgfmt(value) 463 return value 464 return filter_qmp(qmsg, _filter) 465 466def filter_nbd_exports(output: str) -> str: 467 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 468 469 470Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 471 472def log(msg: Msg, 473 filters: Iterable[Callable[[Msg], Msg]] = (), 474 indent: Optional[int] = None) -> None: 475 """ 476 Logs either a string message or a JSON serializable message (like QMP). 477 If indent is provided, JSON serializable messages are pretty-printed. 478 """ 479 for flt in filters: 480 msg = flt(msg) 481 if isinstance(msg, (dict, list)): 482 # Don't sort if it's already sorted 483 do_sort = not isinstance(msg, OrderedDict) 484 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 485 else: 486 test_logger.info(msg) 487 488class Timeout: 489 def __init__(self, seconds, errmsg="Timeout"): 490 self.seconds = seconds 491 self.errmsg = errmsg 492 def __enter__(self): 493 if qemu_gdb or qemu_valgrind: 494 return self 495 signal.signal(signal.SIGALRM, self.timeout) 496 signal.setitimer(signal.ITIMER_REAL, self.seconds) 497 return self 498 def __exit__(self, exc_type, value, traceback): 499 if qemu_gdb or qemu_valgrind: 500 return False 501 signal.setitimer(signal.ITIMER_REAL, 0) 502 return False 503 def timeout(self, signum, frame): 504 raise Exception(self.errmsg) 505 506def file_pattern(name): 507 return "{0}-{1}".format(os.getpid(), name) 508 509class FilePath: 510 """ 511 Context manager generating multiple file names. The generated files are 512 removed when exiting the context. 513 514 Example usage: 515 516 with FilePath('a.img', 'b.img') as (img_a, img_b): 517 # Use img_a and img_b here... 518 519 # a.img and b.img are automatically removed here. 520 521 By default images are created in iotests.test_dir. To create sockets use 522 iotests.sock_dir: 523 524 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 525 526 For convenience, calling with one argument yields a single file instead of 527 a tuple with one item. 528 529 """ 530 def __init__(self, *names, base_dir=test_dir): 531 self.paths = [os.path.join(base_dir, file_pattern(name)) 532 for name in names] 533 534 def __enter__(self): 535 if len(self.paths) == 1: 536 return self.paths[0] 537 else: 538 return self.paths 539 540 def __exit__(self, exc_type, exc_val, exc_tb): 541 for path in self.paths: 542 try: 543 os.remove(path) 544 except OSError: 545 pass 546 return False 547 548 549def try_remove(img): 550 try: 551 os.remove(img) 552 except OSError: 553 pass 554 555def file_path_remover(): 556 for path in reversed(file_path_remover.paths): 557 try_remove(path) 558 559 560def file_path(*names, base_dir=test_dir): 561 ''' Another way to get auto-generated filename that cleans itself up. 562 563 Use is as simple as: 564 565 img_a, img_b = file_path('a.img', 'b.img') 566 sock = file_path('socket') 567 ''' 568 569 if not hasattr(file_path_remover, 'paths'): 570 file_path_remover.paths = [] 571 atexit.register(file_path_remover) 572 573 paths = [] 574 for name in names: 575 filename = file_pattern(name) 576 path = os.path.join(base_dir, filename) 577 file_path_remover.paths.append(path) 578 paths.append(path) 579 580 return paths[0] if len(paths) == 1 else paths 581 582def remote_filename(path): 583 if imgproto == 'file': 584 return path 585 elif imgproto == 'ssh': 586 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 587 else: 588 raise Exception("Protocol %s not supported" % (imgproto)) 589 590class VM(qtest.QEMUQtestMachine): 591 '''A QEMU VM''' 592 593 def __init__(self, path_suffix=''): 594 name = "qemu%s-%d" % (path_suffix, os.getpid()) 595 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 596 if qemu_gdb and qemu_valgrind: 597 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 598 sys.exit(1) 599 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 600 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 601 name=name, 602 base_temp_dir=test_dir, 603 socket_scm_helper=socket_scm_helper, 604 sock_dir=sock_dir, qmp_timer=timer) 605 self._num_drives = 0 606 607 def _post_shutdown(self) -> None: 608 super()._post_shutdown() 609 if not qemu_valgrind or not self._popen: 610 return 611 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 612 if self.exitcode() == 99: 613 with open(valgrind_filename, encoding='utf-8') as f: 614 print(f.read()) 615 else: 616 os.remove(valgrind_filename) 617 618 def _pre_launch(self) -> None: 619 super()._pre_launch() 620 if qemu_print: 621 # set QEMU binary output to stdout 622 self._close_qemu_log_file() 623 624 def add_object(self, opts): 625 self._args.append('-object') 626 self._args.append(opts) 627 return self 628 629 def add_device(self, opts): 630 self._args.append('-device') 631 self._args.append(opts) 632 return self 633 634 def add_drive_raw(self, opts): 635 self._args.append('-drive') 636 self._args.append(opts) 637 return self 638 639 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 640 '''Add a virtio-blk drive to the VM''' 641 options = ['if=%s' % interface, 642 'id=drive%d' % self._num_drives] 643 644 if path is not None: 645 options.append('file=%s' % path) 646 options.append('format=%s' % img_format) 647 options.append('cache=%s' % cachemode) 648 options.append('aio=%s' % aiomode) 649 650 if opts: 651 options.append(opts) 652 653 if img_format == 'luks' and 'key-secret' not in opts: 654 # default luks support 655 if luks_default_secret_object not in self._args: 656 self.add_object(luks_default_secret_object) 657 658 options.append(luks_default_key_secret_opt) 659 660 self._args.append('-drive') 661 self._args.append(','.join(options)) 662 self._num_drives += 1 663 return self 664 665 def add_blockdev(self, opts): 666 self._args.append('-blockdev') 667 if isinstance(opts, str): 668 self._args.append(opts) 669 else: 670 self._args.append(','.join(opts)) 671 return self 672 673 def add_incoming(self, addr): 674 self._args.append('-incoming') 675 self._args.append(addr) 676 return self 677 678 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 679 cmd = 'human-monitor-command' 680 kwargs: Dict[str, Any] = {'command-line': command_line} 681 if use_log: 682 return self.qmp_log(cmd, **kwargs) 683 else: 684 return self.qmp(cmd, **kwargs) 685 686 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 687 """Pause drive r/w operations""" 688 if not event: 689 self.pause_drive(drive, "read_aio") 690 self.pause_drive(drive, "write_aio") 691 return 692 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 693 694 def resume_drive(self, drive: str) -> None: 695 """Resume drive r/w operations""" 696 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 697 698 def hmp_qemu_io(self, drive: str, cmd: str, 699 use_log: bool = False, qdev: bool = False) -> QMPMessage: 700 """Write to a given drive using an HMP command""" 701 d = '-d ' if qdev else '' 702 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 703 704 def flatten_qmp_object(self, obj, output=None, basestr=''): 705 if output is None: 706 output = {} 707 if isinstance(obj, list): 708 for i, item in enumerate(obj): 709 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 710 elif isinstance(obj, dict): 711 for key in obj: 712 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 713 else: 714 output[basestr[:-1]] = obj # Strip trailing '.' 715 return output 716 717 def qmp_to_opts(self, obj): 718 obj = self.flatten_qmp_object(obj) 719 output_list = [] 720 for key in obj: 721 output_list += [key + '=' + obj[key]] 722 return ','.join(output_list) 723 724 def get_qmp_events_filtered(self, wait=60.0): 725 result = [] 726 for ev in self.get_qmp_events(wait=wait): 727 result.append(filter_qmp_event(ev)) 728 return result 729 730 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 731 full_cmd = OrderedDict(( 732 ("execute", cmd), 733 ("arguments", ordered_qmp(kwargs)) 734 )) 735 log(full_cmd, filters, indent=indent) 736 result = self.qmp(cmd, **kwargs) 737 log(result, filters, indent=indent) 738 return result 739 740 # Returns None on success, and an error string on failure 741 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 742 pre_finalize=None, cancel=False, wait=60.0): 743 """ 744 run_job moves a job from creation through to dismissal. 745 746 :param job: String. ID of recently-launched job 747 :param auto_finalize: Bool. True if the job was launched with 748 auto_finalize. Defaults to True. 749 :param auto_dismiss: Bool. True if the job was launched with 750 auto_dismiss=True. Defaults to False. 751 :param pre_finalize: Callback. A callable that takes no arguments to be 752 invoked prior to issuing job-finalize, if any. 753 :param cancel: Bool. When true, cancels the job after the pre_finalize 754 callback. 755 :param wait: Float. Timeout value specifying how long to wait for any 756 event, in seconds. Defaults to 60.0. 757 """ 758 match_device = {'data': {'device': job}} 759 match_id = {'data': {'id': job}} 760 events = [ 761 ('BLOCK_JOB_COMPLETED', match_device), 762 ('BLOCK_JOB_CANCELLED', match_device), 763 ('BLOCK_JOB_ERROR', match_device), 764 ('BLOCK_JOB_READY', match_device), 765 ('BLOCK_JOB_PENDING', match_id), 766 ('JOB_STATUS_CHANGE', match_id) 767 ] 768 error = None 769 while True: 770 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 771 if ev['event'] != 'JOB_STATUS_CHANGE': 772 log(ev) 773 continue 774 status = ev['data']['status'] 775 if status == 'aborting': 776 result = self.qmp('query-jobs') 777 for j in result['return']: 778 if j['id'] == job: 779 error = j['error'] 780 log('Job failed: %s' % (j['error'])) 781 elif status == 'ready': 782 self.qmp_log('job-complete', id=job) 783 elif status == 'pending' and not auto_finalize: 784 if pre_finalize: 785 pre_finalize() 786 if cancel: 787 self.qmp_log('job-cancel', id=job) 788 else: 789 self.qmp_log('job-finalize', id=job) 790 elif status == 'concluded' and not auto_dismiss: 791 self.qmp_log('job-dismiss', id=job) 792 elif status == 'null': 793 return error 794 795 # Returns None on success, and an error string on failure 796 def blockdev_create(self, options, job_id='job0', filters=None): 797 if filters is None: 798 filters = [filter_qmp_testfiles] 799 result = self.qmp_log('blockdev-create', filters=filters, 800 job_id=job_id, options=options) 801 802 if 'return' in result: 803 assert result['return'] == {} 804 job_result = self.run_job(job_id) 805 else: 806 job_result = result['error'] 807 808 log("") 809 return job_result 810 811 def enable_migration_events(self, name): 812 log('Enabling migration QMP events on %s...' % name) 813 log(self.qmp('migrate-set-capabilities', capabilities=[ 814 { 815 'capability': 'events', 816 'state': True 817 } 818 ])) 819 820 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 821 while True: 822 event = self.event_wait('MIGRATION') 823 # We use the default timeout, and with a timeout, event_wait() 824 # never returns None 825 assert event 826 827 log(event, filters=[filter_qmp_event]) 828 if event['data']['status'] in ('completed', 'failed'): 829 break 830 831 if event['data']['status'] == 'completed': 832 # The event may occur in finish-migrate, so wait for the expected 833 # post-migration runstate 834 runstate = None 835 while runstate != expect_runstate: 836 runstate = self.qmp('query-status')['return']['status'] 837 return True 838 else: 839 return False 840 841 def node_info(self, node_name): 842 nodes = self.qmp('query-named-block-nodes') 843 for x in nodes['return']: 844 if x['node-name'] == node_name: 845 return x 846 return None 847 848 def query_bitmaps(self): 849 res = self.qmp("query-named-block-nodes") 850 return {device['node-name']: device['dirty-bitmaps'] 851 for device in res['return'] if 'dirty-bitmaps' in device} 852 853 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 854 """ 855 get a specific bitmap from the object returned by query_bitmaps. 856 :param recording: If specified, filter results by the specified value. 857 :param bitmaps: If specified, use it instead of call query_bitmaps() 858 """ 859 if bitmaps is None: 860 bitmaps = self.query_bitmaps() 861 862 for bitmap in bitmaps[node_name]: 863 if bitmap.get('name', '') == bitmap_name: 864 if recording is None or bitmap.get('recording') == recording: 865 return bitmap 866 return None 867 868 def check_bitmap_status(self, node_name, bitmap_name, fields): 869 ret = self.get_bitmap(node_name, bitmap_name) 870 871 return fields.items() <= ret.items() 872 873 def assert_block_path(self, root, path, expected_node, graph=None): 874 """ 875 Check whether the node under the given path in the block graph 876 is @expected_node. 877 878 @root is the node name of the node where the @path is rooted. 879 880 @path is a string that consists of child names separated by 881 slashes. It must begin with a slash. 882 883 Examples for @root + @path: 884 - root="qcow2-node", path="/backing/file" 885 - root="quorum-node", path="/children.2/file" 886 887 Hypothetically, @path could be empty, in which case it would 888 point to @root. However, in practice this case is not useful 889 and hence not allowed. 890 891 @expected_node may be None. (All elements of the path but the 892 leaf must still exist.) 893 894 @graph may be None or the result of an x-debug-query-block-graph 895 call that has already been performed. 896 """ 897 if graph is None: 898 graph = self.qmp('x-debug-query-block-graph')['return'] 899 900 iter_path = iter(path.split('/')) 901 902 # Must start with a / 903 assert next(iter_path) == '' 904 905 node = next((node for node in graph['nodes'] if node['name'] == root), 906 None) 907 908 # An empty @path is not allowed, so the root node must be present 909 assert node is not None, 'Root node %s not found' % root 910 911 for child_name in iter_path: 912 assert node is not None, 'Cannot follow path %s%s' % (root, path) 913 914 try: 915 node_id = next(edge['child'] for edge in graph['edges'] 916 if (edge['parent'] == node['id'] and 917 edge['name'] == child_name)) 918 919 node = next(node for node in graph['nodes'] 920 if node['id'] == node_id) 921 922 except StopIteration: 923 node = None 924 925 if node is None: 926 assert expected_node is None, \ 927 'No node found under %s (but expected %s)' % \ 928 (path, expected_node) 929 else: 930 assert node['name'] == expected_node, \ 931 'Found node %s under %s (but expected %s)' % \ 932 (node['name'], path, expected_node) 933 934index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 935 936class QMPTestCase(unittest.TestCase): 937 '''Abstract base class for QMP test cases''' 938 939 def __init__(self, *args, **kwargs): 940 super().__init__(*args, **kwargs) 941 # Many users of this class set a VM property we rely on heavily 942 # in the methods below. 943 self.vm = None 944 945 def dictpath(self, d, path): 946 '''Traverse a path in a nested dict''' 947 for component in path.split('/'): 948 m = index_re.match(component) 949 if m: 950 component, idx = m.groups() 951 idx = int(idx) 952 953 if not isinstance(d, dict) or component not in d: 954 self.fail(f'failed path traversal for "{path}" in "{d}"') 955 d = d[component] 956 957 if m: 958 if not isinstance(d, list): 959 self.fail(f'path component "{component}" in "{path}" ' 960 f'is not a list in "{d}"') 961 try: 962 d = d[idx] 963 except IndexError: 964 self.fail(f'invalid index "{idx}" in path "{path}" ' 965 f'in "{d}"') 966 return d 967 968 def assert_qmp_absent(self, d, path): 969 try: 970 result = self.dictpath(d, path) 971 except AssertionError: 972 return 973 self.fail('path "%s" has value "%s"' % (path, str(result))) 974 975 def assert_qmp(self, d, path, value): 976 '''Assert that the value for a specific path in a QMP dict 977 matches. When given a list of values, assert that any of 978 them matches.''' 979 980 result = self.dictpath(d, path) 981 982 # [] makes no sense as a list of valid values, so treat it as 983 # an actual single value. 984 if isinstance(value, list) and value != []: 985 for v in value: 986 if result == v: 987 return 988 self.fail('no match for "%s" in %s' % (str(result), str(value))) 989 else: 990 self.assertEqual(result, value, 991 '"%s" is "%s", expected "%s"' 992 % (path, str(result), str(value))) 993 994 def assert_no_active_block_jobs(self): 995 result = self.vm.qmp('query-block-jobs') 996 self.assert_qmp(result, 'return', []) 997 998 def assert_has_block_node(self, node_name=None, file_name=None): 999 """Issue a query-named-block-nodes and assert node_name and/or 1000 file_name is present in the result""" 1001 def check_equal_or_none(a, b): 1002 return a is None or b is None or a == b 1003 assert node_name or file_name 1004 result = self.vm.qmp('query-named-block-nodes') 1005 for x in result["return"]: 1006 if check_equal_or_none(x.get("node-name"), node_name) and \ 1007 check_equal_or_none(x.get("file"), file_name): 1008 return 1009 self.fail("Cannot find %s %s in result:\n%s" % 1010 (node_name, file_name, result)) 1011 1012 def assert_json_filename_equal(self, json_filename, reference): 1013 '''Asserts that the given filename is a json: filename and that its 1014 content is equal to the given reference object''' 1015 self.assertEqual(json_filename[:5], 'json:') 1016 self.assertEqual( 1017 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1018 self.vm.flatten_qmp_object(reference) 1019 ) 1020 1021 def cancel_and_wait(self, drive='drive0', force=False, 1022 resume=False, wait=60.0): 1023 '''Cancel a block job and wait for it to finish, returning the event''' 1024 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1025 self.assert_qmp(result, 'return', {}) 1026 1027 if resume: 1028 self.vm.resume_drive(drive) 1029 1030 cancelled = False 1031 result = None 1032 while not cancelled: 1033 for event in self.vm.get_qmp_events(wait=wait): 1034 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1035 event['event'] == 'BLOCK_JOB_CANCELLED': 1036 self.assert_qmp(event, 'data/device', drive) 1037 result = event 1038 cancelled = True 1039 elif event['event'] == 'JOB_STATUS_CHANGE': 1040 self.assert_qmp(event, 'data/id', drive) 1041 1042 1043 self.assert_no_active_block_jobs() 1044 return result 1045 1046 def wait_until_completed(self, drive='drive0', check_offset=True, 1047 wait=60.0, error=None): 1048 '''Wait for a block job to finish, returning the event''' 1049 while True: 1050 for event in self.vm.get_qmp_events(wait=wait): 1051 if event['event'] == 'BLOCK_JOB_COMPLETED': 1052 self.assert_qmp(event, 'data/device', drive) 1053 if error is None: 1054 self.assert_qmp_absent(event, 'data/error') 1055 if check_offset: 1056 self.assert_qmp(event, 'data/offset', 1057 event['data']['len']) 1058 else: 1059 self.assert_qmp(event, 'data/error', error) 1060 self.assert_no_active_block_jobs() 1061 return event 1062 if event['event'] == 'JOB_STATUS_CHANGE': 1063 self.assert_qmp(event, 'data/id', drive) 1064 1065 def wait_ready(self, drive='drive0'): 1066 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1067 return self.vm.events_wait([ 1068 ('BLOCK_JOB_READY', 1069 {'data': {'type': 'mirror', 'device': drive}}), 1070 ('BLOCK_JOB_READY', 1071 {'data': {'type': 'commit', 'device': drive}}) 1072 ]) 1073 1074 def wait_ready_and_cancel(self, drive='drive0'): 1075 self.wait_ready(drive=drive) 1076 event = self.cancel_and_wait(drive=drive) 1077 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1078 self.assert_qmp(event, 'data/type', 'mirror') 1079 self.assert_qmp(event, 'data/offset', event['data']['len']) 1080 1081 def complete_and_wait(self, drive='drive0', wait_ready=True, 1082 completion_error=None): 1083 '''Complete a block job and wait for it to finish''' 1084 if wait_ready: 1085 self.wait_ready(drive=drive) 1086 1087 result = self.vm.qmp('block-job-complete', device=drive) 1088 self.assert_qmp(result, 'return', {}) 1089 1090 event = self.wait_until_completed(drive=drive, error=completion_error) 1091 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1092 1093 def pause_wait(self, job_id='job0'): 1094 with Timeout(3, "Timeout waiting for job to pause"): 1095 while True: 1096 result = self.vm.qmp('query-block-jobs') 1097 found = False 1098 for job in result['return']: 1099 if job['device'] == job_id: 1100 found = True 1101 if job['paused'] and not job['busy']: 1102 return job 1103 break 1104 assert found 1105 1106 def pause_job(self, job_id='job0', wait=True): 1107 result = self.vm.qmp('block-job-pause', device=job_id) 1108 self.assert_qmp(result, 'return', {}) 1109 if wait: 1110 return self.pause_wait(job_id) 1111 return result 1112 1113 def case_skip(self, reason): 1114 '''Skip this test case''' 1115 case_notrun(reason) 1116 self.skipTest(reason) 1117 1118 1119def notrun(reason): 1120 '''Skip this test suite''' 1121 # Each test in qemu-iotests has a number ("seq") 1122 seq = os.path.basename(sys.argv[0]) 1123 1124 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1125 as outfile: 1126 outfile.write(reason + '\n') 1127 logger.warning("%s not run: %s", seq, reason) 1128 sys.exit(0) 1129 1130def case_notrun(reason): 1131 '''Mark this test case as not having been run (without actually 1132 skipping it, that is left to the caller). See 1133 QMPTestCase.case_skip() for a variant that actually skips the 1134 current test case.''' 1135 1136 # Each test in qemu-iotests has a number ("seq") 1137 seq = os.path.basename(sys.argv[0]) 1138 1139 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1140 as outfile: 1141 outfile.write(' [case not run] ' + reason + '\n') 1142 1143def _verify_image_format(supported_fmts: Sequence[str] = (), 1144 unsupported_fmts: Sequence[str] = ()) -> None: 1145 if 'generic' in supported_fmts and \ 1146 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1147 # similar to 1148 # _supported_fmt generic 1149 # for bash tests 1150 supported_fmts = () 1151 1152 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1153 if not_sup or (imgfmt in unsupported_fmts): 1154 notrun('not suitable for this image format: %s' % imgfmt) 1155 1156 if imgfmt == 'luks': 1157 verify_working_luks() 1158 1159def _verify_protocol(supported: Sequence[str] = (), 1160 unsupported: Sequence[str] = ()) -> None: 1161 assert not (supported and unsupported) 1162 1163 if 'generic' in supported: 1164 return 1165 1166 not_sup = supported and (imgproto not in supported) 1167 if not_sup or (imgproto in unsupported): 1168 notrun('not suitable for this protocol: %s' % imgproto) 1169 1170def _verify_platform(supported: Sequence[str] = (), 1171 unsupported: Sequence[str] = ()) -> None: 1172 if any((sys.platform.startswith(x) for x in unsupported)): 1173 notrun('not suitable for this OS: %s' % sys.platform) 1174 1175 if supported: 1176 if not any((sys.platform.startswith(x) for x in supported)): 1177 notrun('not suitable for this OS: %s' % sys.platform) 1178 1179def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1180 if supported_cache_modes and (cachemode not in supported_cache_modes): 1181 notrun('not suitable for this cache mode: %s' % cachemode) 1182 1183def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1184 if supported_aio_modes and (aiomode not in supported_aio_modes): 1185 notrun('not suitable for this aio mode: %s' % aiomode) 1186 1187def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1188 usf_list = list(set(required_formats) - set(supported_formats())) 1189 if usf_list: 1190 notrun(f'formats {usf_list} are not whitelisted') 1191 1192 1193def _verify_virtio_blk() -> None: 1194 out = qemu_pipe('-M', 'none', '-device', 'help') 1195 if 'virtio-blk' not in out: 1196 notrun('Missing virtio-blk in QEMU binary') 1197 1198def _verify_virtio_scsi_pci_or_ccw() -> None: 1199 out = qemu_pipe('-M', 'none', '-device', 'help') 1200 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1201 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1202 1203 1204def supports_quorum(): 1205 return 'quorum' in qemu_img_pipe('--help') 1206 1207def verify_quorum(): 1208 '''Skip test suite if quorum support is not available''' 1209 if not supports_quorum(): 1210 notrun('quorum support missing') 1211 1212def has_working_luks() -> Tuple[bool, str]: 1213 """ 1214 Check whether our LUKS driver can actually create images 1215 (this extends to LUKS encryption for qcow2). 1216 1217 If not, return the reason why. 1218 """ 1219 1220 img_file = f'{test_dir}/luks-test.luks' 1221 (output, status) = \ 1222 qemu_img_pipe_and_status('create', '-f', 'luks', 1223 '--object', luks_default_secret_object, 1224 '-o', luks_default_key_secret_opt, 1225 '-o', 'iter-time=10', 1226 img_file, '1G') 1227 try: 1228 os.remove(img_file) 1229 except OSError: 1230 pass 1231 1232 if status != 0: 1233 reason = output 1234 for line in output.splitlines(): 1235 if img_file + ':' in line: 1236 reason = line.split(img_file + ':', 1)[1].strip() 1237 break 1238 1239 return (False, reason) 1240 else: 1241 return (True, '') 1242 1243def verify_working_luks(): 1244 """ 1245 Skip test suite if LUKS does not work 1246 """ 1247 (working, reason) = has_working_luks() 1248 if not working: 1249 notrun(reason) 1250 1251def qemu_pipe(*args: str) -> str: 1252 """ 1253 Run qemu with an option to print something and exit (e.g. a help option). 1254 1255 :return: QEMU's stdout output. 1256 """ 1257 full_args = [qemu_prog] + qemu_opts + list(args) 1258 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1259 return output 1260 1261def supported_formats(read_only=False): 1262 '''Set 'read_only' to True to check ro-whitelist 1263 Otherwise, rw-whitelist is checked''' 1264 1265 if not hasattr(supported_formats, "formats"): 1266 supported_formats.formats = {} 1267 1268 if read_only not in supported_formats.formats: 1269 format_message = qemu_pipe("-drive", "format=help") 1270 line = 1 if read_only else 0 1271 supported_formats.formats[read_only] = \ 1272 format_message.splitlines()[line].split(":")[1].split() 1273 1274 return supported_formats.formats[read_only] 1275 1276def skip_if_unsupported(required_formats=(), read_only=False): 1277 '''Skip Test Decorator 1278 Runs the test if all the required formats are whitelisted''' 1279 def skip_test_decorator(func): 1280 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1281 **kwargs: Dict[str, Any]) -> None: 1282 if callable(required_formats): 1283 fmts = required_formats(test_case) 1284 else: 1285 fmts = required_formats 1286 1287 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1288 if usf_list: 1289 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1290 test_case.case_skip(msg) 1291 else: 1292 func(test_case, *args, **kwargs) 1293 return func_wrapper 1294 return skip_test_decorator 1295 1296def skip_for_formats(formats: Sequence[str] = ()) \ 1297 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1298 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1299 '''Skip Test Decorator 1300 Skips the test for the given formats''' 1301 def skip_test_decorator(func): 1302 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1303 **kwargs: Dict[str, Any]) -> None: 1304 if imgfmt in formats: 1305 msg = f'{test_case}: Skipped for format {imgfmt}' 1306 test_case.case_skip(msg) 1307 else: 1308 func(test_case, *args, **kwargs) 1309 return func_wrapper 1310 return skip_test_decorator 1311 1312def skip_if_user_is_root(func): 1313 '''Skip Test Decorator 1314 Runs the test only without root permissions''' 1315 def func_wrapper(*args, **kwargs): 1316 if os.getuid() == 0: 1317 case_notrun('{}: cannot be run as root'.format(args[0])) 1318 return None 1319 else: 1320 return func(*args, **kwargs) 1321 return func_wrapper 1322 1323# We need to filter out the time taken from the output so that 1324# qemu-iotest can reliably diff the results against master output, 1325# and hide skipped tests from the reference output. 1326 1327class ReproducibleTestResult(unittest.TextTestResult): 1328 def addSkip(self, test, reason): 1329 # Same as TextTestResult, but print dot instead of "s" 1330 unittest.TestResult.addSkip(self, test, reason) 1331 if self.showAll: 1332 self.stream.writeln("skipped {0!r}".format(reason)) 1333 elif self.dots: 1334 self.stream.write(".") 1335 self.stream.flush() 1336 1337class ReproducibleStreamWrapper: 1338 def __init__(self, stream: TextIO): 1339 self.stream = stream 1340 1341 def __getattr__(self, attr): 1342 if attr in ('stream', '__getstate__'): 1343 raise AttributeError(attr) 1344 return getattr(self.stream, attr) 1345 1346 def write(self, arg=None): 1347 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1348 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1349 self.stream.write(arg) 1350 1351class ReproducibleTestRunner(unittest.TextTestRunner): 1352 def __init__(self, stream: Optional[TextIO] = None, 1353 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1354 **kwargs: Any) -> None: 1355 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1356 super().__init__(stream=rstream, # type: ignore 1357 descriptions=True, 1358 resultclass=resultclass, 1359 **kwargs) 1360 1361def execute_unittest(argv: List[str], debug: bool = False) -> None: 1362 """Executes unittests within the calling module.""" 1363 1364 # Some tests have warnings, especially ResourceWarnings for unclosed 1365 # files and sockets. Ignore them for now to ensure reproducibility of 1366 # the test output. 1367 unittest.main(argv=argv, 1368 testRunner=ReproducibleTestRunner, 1369 verbosity=2 if debug else 1, 1370 warnings=None if sys.warnoptions else 'ignore') 1371 1372def execute_setup_common(supported_fmts: Sequence[str] = (), 1373 supported_platforms: Sequence[str] = (), 1374 supported_cache_modes: Sequence[str] = (), 1375 supported_aio_modes: Sequence[str] = (), 1376 unsupported_fmts: Sequence[str] = (), 1377 supported_protocols: Sequence[str] = (), 1378 unsupported_protocols: Sequence[str] = (), 1379 required_fmts: Sequence[str] = ()) -> bool: 1380 """ 1381 Perform necessary setup for either script-style or unittest-style tests. 1382 1383 :return: Bool; Whether or not debug mode has been requested via the CLI. 1384 """ 1385 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1386 1387 debug = '-d' in sys.argv 1388 if debug: 1389 sys.argv.remove('-d') 1390 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1391 1392 _verify_image_format(supported_fmts, unsupported_fmts) 1393 _verify_protocol(supported_protocols, unsupported_protocols) 1394 _verify_platform(supported=supported_platforms) 1395 _verify_cache_mode(supported_cache_modes) 1396 _verify_aio_mode(supported_aio_modes) 1397 _verify_formats(required_fmts) 1398 _verify_virtio_blk() 1399 1400 return debug 1401 1402def execute_test(*args, test_function=None, **kwargs): 1403 """Run either unittest or script-style tests.""" 1404 1405 debug = execute_setup_common(*args, **kwargs) 1406 if not test_function: 1407 execute_unittest(sys.argv, debug) 1408 else: 1409 test_function() 1410 1411def activate_logging(): 1412 """Activate iotests.log() output to stdout for script-style tests.""" 1413 handler = logging.StreamHandler(stream=sys.stdout) 1414 formatter = logging.Formatter('%(message)s') 1415 handler.setFormatter(formatter) 1416 test_logger.addHandler(handler) 1417 test_logger.setLevel(logging.INFO) 1418 test_logger.propagate = False 1419 1420# This is called from script-style iotests without a single point of entry 1421def script_initialize(*args, **kwargs): 1422 """Initialize script-style tests without running any tests.""" 1423 activate_logging() 1424 execute_setup_common(*args, **kwargs) 1425 1426# This is called from script-style iotests with a single point of entry 1427def script_main(test_function, *args, **kwargs): 1428 """Run script-style tests outside of the unittest framework""" 1429 activate_logging() 1430 execute_test(*args, test_function=test_function, **kwargs) 1431 1432# This is called from unittest style iotests 1433def main(*args, **kwargs): 1434 """Run tests using the unittest framework""" 1435 execute_test(*args, **kwargs) 1436