1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, Iterator, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39from qemu.machine import qtest 40from qemu.qmp import QMPMessage 41 42# Use this logger for logging messages directly from the iotests module 43logger = logging.getLogger('qemu.iotests') 44logger.addHandler(logging.NullHandler()) 45 46# Use this logger for messages that ought to be used for diff output. 47test_logger = logging.getLogger('qemu.iotests.diff_io') 48 49 50faulthandler.enable() 51 52# This will not work if arguments contain spaces but is necessary if we 53# want to support the override options that ./check supports. 54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 55if os.environ.get('QEMU_IMG_OPTIONS'): 56 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 57 58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 59if os.environ.get('QEMU_IO_OPTIONS'): 60 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 61 62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 64 qemu_io_args_no_fmt += \ 65 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 66 67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 68qemu_nbd_args = [qemu_nbd_prog] 69if os.environ.get('QEMU_NBD_OPTIONS'): 70 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 71 72qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 74 75gdb_qemu_env = os.environ.get('GDB_OPTIONS') 76qemu_gdb = [] 77if gdb_qemu_env: 78 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 79 80qemu_print = os.environ.get('PRINT_QEMU', False) 81 82imgfmt = os.environ.get('IMGFMT', 'raw') 83imgproto = os.environ.get('IMGPROTO', 'file') 84output_dir = os.environ.get('OUTPUT_DIR', '.') 85 86try: 87 test_dir = os.environ['TEST_DIR'] 88 sock_dir = os.environ['SOCK_DIR'] 89 cachemode = os.environ['CACHEMODE'] 90 aiomode = os.environ['AIOMODE'] 91 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 92except KeyError: 93 # We are using these variables as proxies to indicate that we're 94 # not being run via "check". There may be other things set up by 95 # "check" that individual test cases rely on. 96 sys.stderr.write('Please run this test via the "check" script\n') 97 sys.exit(os.EX_USAGE) 98 99qemu_valgrind = [] 100if os.environ.get('VALGRIND_QEMU') == "y" and \ 101 os.environ.get('NO_VALGRIND') != "y": 102 valgrind_logfile = "--log-file=" + test_dir 103 # %p allows to put the valgrind process PID, since 104 # we don't know it a priori (subprocess.Popen is 105 # not yet invoked) 106 valgrind_logfile += "/%p.valgrind" 107 108 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 109 110luks_default_secret_object = 'secret,id=keysec0,data=' + \ 111 os.environ.get('IMGKEYSECRET', '') 112luks_default_key_secret_opt = 'key-secret=keysec0' 113 114sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 115 116 117@contextmanager 118def change_log_level( 119 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 120 """ 121 Utility function for temporarily changing the log level of a logger. 122 123 This can be used to silence errors that are expected or uninteresting. 124 """ 125 _logger = logging.getLogger(logger_name) 126 current_level = _logger.level 127 _logger.setLevel(level) 128 129 try: 130 yield 131 finally: 132 _logger.setLevel(current_level) 133 134 135def unarchive_sample_image(sample, fname): 136 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 137 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 138 shutil.copyfileobj(f_in, f_out) 139 140 141def qemu_tool_popen(args: Sequence[str], 142 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 143 stderr = subprocess.STDOUT if connect_stderr else None 144 # pylint: disable=consider-using-with 145 return subprocess.Popen(args, 146 stdout=subprocess.PIPE, 147 stderr=stderr, 148 universal_newlines=True) 149 150 151def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 152 connect_stderr: bool = True) -> Tuple[str, int]: 153 """ 154 Run a tool and return both its output and its exit code 155 """ 156 with qemu_tool_popen(args, connect_stderr) as subp: 157 output = subp.communicate()[0] 158 if subp.returncode < 0: 159 cmd = ' '.join(args) 160 sys.stderr.write(f'{tool} received signal \ 161 {-subp.returncode}: {cmd}\n') 162 return (output, subp.returncode) 163 164def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 165 """ 166 Run qemu-img and return both its output and its exit code 167 """ 168 full_args = qemu_img_args + list(args) 169 return qemu_tool_pipe_and_status('qemu-img', full_args) 170 171def qemu_img(*args: str) -> int: 172 '''Run qemu-img and return the exit code''' 173 return qemu_img_pipe_and_status(*args)[1] 174 175def ordered_qmp(qmsg, conv_keys=True): 176 # Dictionaries are not ordered prior to 3.6, therefore: 177 if isinstance(qmsg, list): 178 return [ordered_qmp(atom) for atom in qmsg] 179 if isinstance(qmsg, dict): 180 od = OrderedDict() 181 for k, v in sorted(qmsg.items()): 182 if conv_keys: 183 k = k.replace('_', '-') 184 od[k] = ordered_qmp(v, conv_keys=False) 185 return od 186 return qmsg 187 188def qemu_img_create(*args): 189 args = list(args) 190 191 # default luks support 192 if '-f' in args and args[args.index('-f') + 1] == 'luks': 193 if '-o' in args: 194 i = args.index('-o') 195 if 'key-secret' not in args[i + 1]: 196 args[i + 1].append(luks_default_key_secret_opt) 197 args.insert(i + 2, '--object') 198 args.insert(i + 3, luks_default_secret_object) 199 else: 200 args = ['-o', luks_default_key_secret_opt, 201 '--object', luks_default_secret_object] + args 202 203 args.insert(0, 'create') 204 205 return qemu_img(*args) 206 207def qemu_img_measure(*args): 208 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 209 210def qemu_img_check(*args): 211 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 212 213def qemu_img_verbose(*args): 214 '''Run qemu-img without suppressing its output and return the exit code''' 215 exitcode = subprocess.call(qemu_img_args + list(args)) 216 if exitcode < 0: 217 sys.stderr.write('qemu-img received signal %i: %s\n' 218 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 219 return exitcode 220 221def qemu_img_pipe(*args: str) -> str: 222 '''Run qemu-img and return its output''' 223 return qemu_img_pipe_and_status(*args)[0] 224 225def qemu_img_log(*args): 226 result = qemu_img_pipe(*args) 227 log(result, filters=[filter_testfiles]) 228 return result 229 230def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 231 args = ['info'] 232 if imgopts: 233 args.append('--image-opts') 234 else: 235 args += ['-f', imgfmt] 236 args += extra_args 237 args.append(filename) 238 239 output = qemu_img_pipe(*args) 240 if not filter_path: 241 filter_path = filename 242 log(filter_img_info(output, filter_path)) 243 244def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 245 if '-f' in args or '--image-opts' in args: 246 return qemu_io_args_no_fmt + list(args) 247 else: 248 return qemu_io_args + list(args) 249 250def qemu_io_popen(*args): 251 return qemu_tool_popen(qemu_io_wrap_args(args)) 252 253def qemu_io(*args): 254 '''Run qemu-io and return the stdout data''' 255 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 256 257def qemu_io_log(*args): 258 result = qemu_io(*args) 259 log(result, filters=[filter_testfiles, filter_qemu_io]) 260 return result 261 262def qemu_io_silent(*args): 263 '''Run qemu-io and return the exit code, suppressing stdout''' 264 args = qemu_io_wrap_args(args) 265 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 266 if result.returncode < 0: 267 sys.stderr.write('qemu-io received signal %i: %s\n' % 268 (-result.returncode, ' '.join(args))) 269 return result.returncode 270 271def qemu_io_silent_check(*args): 272 '''Run qemu-io and return the true if subprocess returned 0''' 273 args = qemu_io_wrap_args(args) 274 result = subprocess.run(args, stdout=subprocess.DEVNULL, 275 stderr=subprocess.STDOUT, check=False) 276 return result.returncode == 0 277 278class QemuIoInteractive: 279 def __init__(self, *args): 280 self.args = qemu_io_wrap_args(args) 281 # We need to keep the Popen objext around, and not 282 # close it immediately. Therefore, disable the pylint check: 283 # pylint: disable=consider-using-with 284 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 285 stdout=subprocess.PIPE, 286 stderr=subprocess.STDOUT, 287 universal_newlines=True) 288 out = self._p.stdout.read(9) 289 if out != 'qemu-io> ': 290 # Most probably qemu-io just failed to start. 291 # Let's collect the whole output and exit. 292 out += self._p.stdout.read() 293 self._p.wait(timeout=1) 294 raise ValueError(out) 295 296 def close(self): 297 self._p.communicate('q\n') 298 299 def _read_output(self): 300 pattern = 'qemu-io> ' 301 n = len(pattern) 302 pos = 0 303 s = [] 304 while pos != n: 305 c = self._p.stdout.read(1) 306 # check unexpected EOF 307 assert c != '' 308 s.append(c) 309 if c == pattern[pos]: 310 pos += 1 311 else: 312 pos = 0 313 314 return ''.join(s[:-n]) 315 316 def cmd(self, cmd): 317 # quit command is in close(), '\n' is added automatically 318 assert '\n' not in cmd 319 cmd = cmd.strip() 320 assert cmd not in ('q', 'quit') 321 self._p.stdin.write(cmd + '\n') 322 self._p.stdin.flush() 323 return self._read_output() 324 325 326def qemu_nbd(*args): 327 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 328 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 329 330def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 331 '''Run qemu-nbd in daemon mode and return both the parent's exit code 332 and its output in case of an error''' 333 full_args = qemu_nbd_args + ['--fork'] + list(args) 334 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 335 connect_stderr=False) 336 return returncode, output if returncode else '' 337 338def qemu_nbd_list_log(*args: str) -> str: 339 '''Run qemu-nbd to list remote exports''' 340 full_args = [qemu_nbd_prog, '-L'] + list(args) 341 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 342 log(output, filters=[filter_testfiles, filter_nbd_exports]) 343 return output 344 345@contextmanager 346def qemu_nbd_popen(*args): 347 '''Context manager running qemu-nbd within the context''' 348 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 349 350 assert not os.path.exists(pid_file) 351 352 cmd = list(qemu_nbd_args) 353 cmd.extend(('--persistent', '--pid-file', pid_file)) 354 cmd.extend(args) 355 356 log('Start NBD server') 357 with subprocess.Popen(cmd) as p: 358 try: 359 while not os.path.exists(pid_file): 360 if p.poll() is not None: 361 raise RuntimeError( 362 "qemu-nbd terminated with exit code {}: {}" 363 .format(p.returncode, ' '.join(cmd))) 364 365 time.sleep(0.01) 366 yield 367 finally: 368 if os.path.exists(pid_file): 369 os.remove(pid_file) 370 log('Kill NBD server') 371 p.kill() 372 p.wait() 373 374def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 375 '''Return True if two image files are identical''' 376 return qemu_img('compare', '-f', fmt1, 377 '-F', fmt2, img1, img2) == 0 378 379def create_image(name, size): 380 '''Create a fully-allocated raw image with sector markers''' 381 with open(name, 'wb') as file: 382 i = 0 383 while i < size: 384 sector = struct.pack('>l504xl', i // 512, i // 512) 385 file.write(sector) 386 i = i + 512 387 388def image_size(img): 389 '''Return image's virtual size''' 390 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 391 return json.loads(r)['virtual-size'] 392 393def is_str(val): 394 return isinstance(val, str) 395 396test_dir_re = re.compile(r"%s" % test_dir) 397def filter_test_dir(msg): 398 return test_dir_re.sub("TEST_DIR", msg) 399 400win32_re = re.compile(r"\r") 401def filter_win32(msg): 402 return win32_re.sub("", msg) 403 404qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 405 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 406 r"and [0-9\/.inf]* ops\/sec\)") 407def filter_qemu_io(msg): 408 msg = filter_win32(msg) 409 return qemu_io_re.sub("X ops; XX:XX:XX.X " 410 "(XXX YYY/sec and XXX ops/sec)", msg) 411 412chown_re = re.compile(r"chown [0-9]+:[0-9]+") 413def filter_chown(msg): 414 return chown_re.sub("chown UID:GID", msg) 415 416def filter_qmp_event(event): 417 '''Filter a QMP event dict''' 418 event = dict(event) 419 if 'timestamp' in event: 420 event['timestamp']['seconds'] = 'SECS' 421 event['timestamp']['microseconds'] = 'USECS' 422 return event 423 424def filter_qmp(qmsg, filter_fn): 425 '''Given a string filter, filter a QMP object's values. 426 filter_fn takes a (key, value) pair.''' 427 # Iterate through either lists or dicts; 428 if isinstance(qmsg, list): 429 items = enumerate(qmsg) 430 else: 431 items = qmsg.items() 432 433 for k, v in items: 434 if isinstance(v, (dict, list)): 435 qmsg[k] = filter_qmp(v, filter_fn) 436 else: 437 qmsg[k] = filter_fn(k, v) 438 return qmsg 439 440def filter_testfiles(msg): 441 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 442 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 443 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 444 445def filter_qmp_testfiles(qmsg): 446 def _filter(_key, value): 447 if is_str(value): 448 return filter_testfiles(value) 449 return value 450 return filter_qmp(qmsg, _filter) 451 452def filter_virtio_scsi(output: str) -> str: 453 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 454 455def filter_qmp_virtio_scsi(qmsg): 456 def _filter(_key, value): 457 if is_str(value): 458 return filter_virtio_scsi(value) 459 return value 460 return filter_qmp(qmsg, _filter) 461 462def filter_generated_node_ids(msg): 463 return re.sub("#block[0-9]+", "NODE_NAME", msg) 464 465def filter_img_info(output, filename): 466 lines = [] 467 for line in output.split('\n'): 468 if 'disk size' in line or 'actual-size' in line: 469 continue 470 line = line.replace(filename, 'TEST_IMG') 471 line = filter_testfiles(line) 472 line = line.replace(imgfmt, 'IMGFMT') 473 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 474 line = re.sub('uuid: [-a-f0-9]+', 475 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 476 line) 477 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 478 lines.append(line) 479 return '\n'.join(lines) 480 481def filter_imgfmt(msg): 482 return msg.replace(imgfmt, 'IMGFMT') 483 484def filter_qmp_imgfmt(qmsg): 485 def _filter(_key, value): 486 if is_str(value): 487 return filter_imgfmt(value) 488 return value 489 return filter_qmp(qmsg, _filter) 490 491def filter_nbd_exports(output: str) -> str: 492 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 493 494 495Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 496 497def log(msg: Msg, 498 filters: Iterable[Callable[[Msg], Msg]] = (), 499 indent: Optional[int] = None) -> None: 500 """ 501 Logs either a string message or a JSON serializable message (like QMP). 502 If indent is provided, JSON serializable messages are pretty-printed. 503 """ 504 for flt in filters: 505 msg = flt(msg) 506 if isinstance(msg, (dict, list)): 507 # Don't sort if it's already sorted 508 do_sort = not isinstance(msg, OrderedDict) 509 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 510 else: 511 test_logger.info(msg) 512 513class Timeout: 514 def __init__(self, seconds, errmsg="Timeout"): 515 self.seconds = seconds 516 self.errmsg = errmsg 517 def __enter__(self): 518 if qemu_gdb or qemu_valgrind: 519 return self 520 signal.signal(signal.SIGALRM, self.timeout) 521 signal.setitimer(signal.ITIMER_REAL, self.seconds) 522 return self 523 def __exit__(self, exc_type, value, traceback): 524 if qemu_gdb or qemu_valgrind: 525 return False 526 signal.setitimer(signal.ITIMER_REAL, 0) 527 return False 528 def timeout(self, signum, frame): 529 raise Exception(self.errmsg) 530 531def file_pattern(name): 532 return "{0}-{1}".format(os.getpid(), name) 533 534class FilePath: 535 """ 536 Context manager generating multiple file names. The generated files are 537 removed when exiting the context. 538 539 Example usage: 540 541 with FilePath('a.img', 'b.img') as (img_a, img_b): 542 # Use img_a and img_b here... 543 544 # a.img and b.img are automatically removed here. 545 546 By default images are created in iotests.test_dir. To create sockets use 547 iotests.sock_dir: 548 549 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 550 551 For convenience, calling with one argument yields a single file instead of 552 a tuple with one item. 553 554 """ 555 def __init__(self, *names, base_dir=test_dir): 556 self.paths = [os.path.join(base_dir, file_pattern(name)) 557 for name in names] 558 559 def __enter__(self): 560 if len(self.paths) == 1: 561 return self.paths[0] 562 else: 563 return self.paths 564 565 def __exit__(self, exc_type, exc_val, exc_tb): 566 for path in self.paths: 567 try: 568 os.remove(path) 569 except OSError: 570 pass 571 return False 572 573 574def try_remove(img): 575 try: 576 os.remove(img) 577 except OSError: 578 pass 579 580def file_path_remover(): 581 for path in reversed(file_path_remover.paths): 582 try_remove(path) 583 584 585def file_path(*names, base_dir=test_dir): 586 ''' Another way to get auto-generated filename that cleans itself up. 587 588 Use is as simple as: 589 590 img_a, img_b = file_path('a.img', 'b.img') 591 sock = file_path('socket') 592 ''' 593 594 if not hasattr(file_path_remover, 'paths'): 595 file_path_remover.paths = [] 596 atexit.register(file_path_remover) 597 598 paths = [] 599 for name in names: 600 filename = file_pattern(name) 601 path = os.path.join(base_dir, filename) 602 file_path_remover.paths.append(path) 603 paths.append(path) 604 605 return paths[0] if len(paths) == 1 else paths 606 607def remote_filename(path): 608 if imgproto == 'file': 609 return path 610 elif imgproto == 'ssh': 611 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 612 else: 613 raise Exception("Protocol %s not supported" % (imgproto)) 614 615class VM(qtest.QEMUQtestMachine): 616 '''A QEMU VM''' 617 618 def __init__(self, path_suffix=''): 619 name = "qemu%s-%d" % (path_suffix, os.getpid()) 620 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 621 if qemu_gdb and qemu_valgrind: 622 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 623 sys.exit(1) 624 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 625 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 626 name=name, 627 base_temp_dir=test_dir, 628 sock_dir=sock_dir, qmp_timer=timer) 629 self._num_drives = 0 630 631 def _post_shutdown(self) -> None: 632 super()._post_shutdown() 633 if not qemu_valgrind or not self._popen: 634 return 635 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 636 if self.exitcode() == 99: 637 with open(valgrind_filename, encoding='utf-8') as f: 638 print(f.read()) 639 else: 640 os.remove(valgrind_filename) 641 642 def _pre_launch(self) -> None: 643 super()._pre_launch() 644 if qemu_print: 645 # set QEMU binary output to stdout 646 self._close_qemu_log_file() 647 648 def add_object(self, opts): 649 self._args.append('-object') 650 self._args.append(opts) 651 return self 652 653 def add_device(self, opts): 654 self._args.append('-device') 655 self._args.append(opts) 656 return self 657 658 def add_drive_raw(self, opts): 659 self._args.append('-drive') 660 self._args.append(opts) 661 return self 662 663 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 664 '''Add a virtio-blk drive to the VM''' 665 options = ['if=%s' % interface, 666 'id=drive%d' % self._num_drives] 667 668 if path is not None: 669 options.append('file=%s' % path) 670 options.append('format=%s' % img_format) 671 options.append('cache=%s' % cachemode) 672 options.append('aio=%s' % aiomode) 673 674 if opts: 675 options.append(opts) 676 677 if img_format == 'luks' and 'key-secret' not in opts: 678 # default luks support 679 if luks_default_secret_object not in self._args: 680 self.add_object(luks_default_secret_object) 681 682 options.append(luks_default_key_secret_opt) 683 684 self._args.append('-drive') 685 self._args.append(','.join(options)) 686 self._num_drives += 1 687 return self 688 689 def add_blockdev(self, opts): 690 self._args.append('-blockdev') 691 if isinstance(opts, str): 692 self._args.append(opts) 693 else: 694 self._args.append(','.join(opts)) 695 return self 696 697 def add_incoming(self, addr): 698 self._args.append('-incoming') 699 self._args.append(addr) 700 return self 701 702 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 703 cmd = 'human-monitor-command' 704 kwargs: Dict[str, Any] = {'command-line': command_line} 705 if use_log: 706 return self.qmp_log(cmd, **kwargs) 707 else: 708 return self.qmp(cmd, **kwargs) 709 710 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 711 """Pause drive r/w operations""" 712 if not event: 713 self.pause_drive(drive, "read_aio") 714 self.pause_drive(drive, "write_aio") 715 return 716 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 717 718 def resume_drive(self, drive: str) -> None: 719 """Resume drive r/w operations""" 720 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 721 722 def hmp_qemu_io(self, drive: str, cmd: str, 723 use_log: bool = False, qdev: bool = False) -> QMPMessage: 724 """Write to a given drive using an HMP command""" 725 d = '-d ' if qdev else '' 726 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 727 728 def flatten_qmp_object(self, obj, output=None, basestr=''): 729 if output is None: 730 output = {} 731 if isinstance(obj, list): 732 for i, item in enumerate(obj): 733 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 734 elif isinstance(obj, dict): 735 for key in obj: 736 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 737 else: 738 output[basestr[:-1]] = obj # Strip trailing '.' 739 return output 740 741 def qmp_to_opts(self, obj): 742 obj = self.flatten_qmp_object(obj) 743 output_list = [] 744 for key in obj: 745 output_list += [key + '=' + obj[key]] 746 return ','.join(output_list) 747 748 def get_qmp_events_filtered(self, wait=60.0): 749 result = [] 750 for ev in self.get_qmp_events(wait=wait): 751 result.append(filter_qmp_event(ev)) 752 return result 753 754 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 755 full_cmd = OrderedDict(( 756 ("execute", cmd), 757 ("arguments", ordered_qmp(kwargs)) 758 )) 759 log(full_cmd, filters, indent=indent) 760 result = self.qmp(cmd, **kwargs) 761 log(result, filters, indent=indent) 762 return result 763 764 # Returns None on success, and an error string on failure 765 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 766 pre_finalize=None, cancel=False, wait=60.0): 767 """ 768 run_job moves a job from creation through to dismissal. 769 770 :param job: String. ID of recently-launched job 771 :param auto_finalize: Bool. True if the job was launched with 772 auto_finalize. Defaults to True. 773 :param auto_dismiss: Bool. True if the job was launched with 774 auto_dismiss=True. Defaults to False. 775 :param pre_finalize: Callback. A callable that takes no arguments to be 776 invoked prior to issuing job-finalize, if any. 777 :param cancel: Bool. When true, cancels the job after the pre_finalize 778 callback. 779 :param wait: Float. Timeout value specifying how long to wait for any 780 event, in seconds. Defaults to 60.0. 781 """ 782 match_device = {'data': {'device': job}} 783 match_id = {'data': {'id': job}} 784 events = [ 785 ('BLOCK_JOB_COMPLETED', match_device), 786 ('BLOCK_JOB_CANCELLED', match_device), 787 ('BLOCK_JOB_ERROR', match_device), 788 ('BLOCK_JOB_READY', match_device), 789 ('BLOCK_JOB_PENDING', match_id), 790 ('JOB_STATUS_CHANGE', match_id) 791 ] 792 error = None 793 while True: 794 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 795 if ev['event'] != 'JOB_STATUS_CHANGE': 796 log(ev) 797 continue 798 status = ev['data']['status'] 799 if status == 'aborting': 800 result = self.qmp('query-jobs') 801 for j in result['return']: 802 if j['id'] == job: 803 error = j['error'] 804 log('Job failed: %s' % (j['error'])) 805 elif status == 'ready': 806 self.qmp_log('job-complete', id=job) 807 elif status == 'pending' and not auto_finalize: 808 if pre_finalize: 809 pre_finalize() 810 if cancel: 811 self.qmp_log('job-cancel', id=job) 812 else: 813 self.qmp_log('job-finalize', id=job) 814 elif status == 'concluded' and not auto_dismiss: 815 self.qmp_log('job-dismiss', id=job) 816 elif status == 'null': 817 return error 818 819 # Returns None on success, and an error string on failure 820 def blockdev_create(self, options, job_id='job0', filters=None): 821 if filters is None: 822 filters = [filter_qmp_testfiles] 823 result = self.qmp_log('blockdev-create', filters=filters, 824 job_id=job_id, options=options) 825 826 if 'return' in result: 827 assert result['return'] == {} 828 job_result = self.run_job(job_id) 829 else: 830 job_result = result['error'] 831 832 log("") 833 return job_result 834 835 def enable_migration_events(self, name): 836 log('Enabling migration QMP events on %s...' % name) 837 log(self.qmp('migrate-set-capabilities', capabilities=[ 838 { 839 'capability': 'events', 840 'state': True 841 } 842 ])) 843 844 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 845 while True: 846 event = self.event_wait('MIGRATION') 847 # We use the default timeout, and with a timeout, event_wait() 848 # never returns None 849 assert event 850 851 log(event, filters=[filter_qmp_event]) 852 if event['data']['status'] in ('completed', 'failed'): 853 break 854 855 if event['data']['status'] == 'completed': 856 # The event may occur in finish-migrate, so wait for the expected 857 # post-migration runstate 858 runstate = None 859 while runstate != expect_runstate: 860 runstate = self.qmp('query-status')['return']['status'] 861 return True 862 else: 863 return False 864 865 def node_info(self, node_name): 866 nodes = self.qmp('query-named-block-nodes') 867 for x in nodes['return']: 868 if x['node-name'] == node_name: 869 return x 870 return None 871 872 def query_bitmaps(self): 873 res = self.qmp("query-named-block-nodes") 874 return {device['node-name']: device['dirty-bitmaps'] 875 for device in res['return'] if 'dirty-bitmaps' in device} 876 877 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 878 """ 879 get a specific bitmap from the object returned by query_bitmaps. 880 :param recording: If specified, filter results by the specified value. 881 :param bitmaps: If specified, use it instead of call query_bitmaps() 882 """ 883 if bitmaps is None: 884 bitmaps = self.query_bitmaps() 885 886 for bitmap in bitmaps[node_name]: 887 if bitmap.get('name', '') == bitmap_name: 888 if recording is None or bitmap.get('recording') == recording: 889 return bitmap 890 return None 891 892 def check_bitmap_status(self, node_name, bitmap_name, fields): 893 ret = self.get_bitmap(node_name, bitmap_name) 894 895 return fields.items() <= ret.items() 896 897 def assert_block_path(self, root, path, expected_node, graph=None): 898 """ 899 Check whether the node under the given path in the block graph 900 is @expected_node. 901 902 @root is the node name of the node where the @path is rooted. 903 904 @path is a string that consists of child names separated by 905 slashes. It must begin with a slash. 906 907 Examples for @root + @path: 908 - root="qcow2-node", path="/backing/file" 909 - root="quorum-node", path="/children.2/file" 910 911 Hypothetically, @path could be empty, in which case it would 912 point to @root. However, in practice this case is not useful 913 and hence not allowed. 914 915 @expected_node may be None. (All elements of the path but the 916 leaf must still exist.) 917 918 @graph may be None or the result of an x-debug-query-block-graph 919 call that has already been performed. 920 """ 921 if graph is None: 922 graph = self.qmp('x-debug-query-block-graph')['return'] 923 924 iter_path = iter(path.split('/')) 925 926 # Must start with a / 927 assert next(iter_path) == '' 928 929 node = next((node for node in graph['nodes'] if node['name'] == root), 930 None) 931 932 # An empty @path is not allowed, so the root node must be present 933 assert node is not None, 'Root node %s not found' % root 934 935 for child_name in iter_path: 936 assert node is not None, 'Cannot follow path %s%s' % (root, path) 937 938 try: 939 node_id = next(edge['child'] for edge in graph['edges'] 940 if (edge['parent'] == node['id'] and 941 edge['name'] == child_name)) 942 943 node = next(node for node in graph['nodes'] 944 if node['id'] == node_id) 945 946 except StopIteration: 947 node = None 948 949 if node is None: 950 assert expected_node is None, \ 951 'No node found under %s (but expected %s)' % \ 952 (path, expected_node) 953 else: 954 assert node['name'] == expected_node, \ 955 'Found node %s under %s (but expected %s)' % \ 956 (node['name'], path, expected_node) 957 958index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 959 960class QMPTestCase(unittest.TestCase): 961 '''Abstract base class for QMP test cases''' 962 963 def __init__(self, *args, **kwargs): 964 super().__init__(*args, **kwargs) 965 # Many users of this class set a VM property we rely on heavily 966 # in the methods below. 967 self.vm = None 968 969 def dictpath(self, d, path): 970 '''Traverse a path in a nested dict''' 971 for component in path.split('/'): 972 m = index_re.match(component) 973 if m: 974 component, idx = m.groups() 975 idx = int(idx) 976 977 if not isinstance(d, dict) or component not in d: 978 self.fail(f'failed path traversal for "{path}" in "{d}"') 979 d = d[component] 980 981 if m: 982 if not isinstance(d, list): 983 self.fail(f'path component "{component}" in "{path}" ' 984 f'is not a list in "{d}"') 985 try: 986 d = d[idx] 987 except IndexError: 988 self.fail(f'invalid index "{idx}" in path "{path}" ' 989 f'in "{d}"') 990 return d 991 992 def assert_qmp_absent(self, d, path): 993 try: 994 result = self.dictpath(d, path) 995 except AssertionError: 996 return 997 self.fail('path "%s" has value "%s"' % (path, str(result))) 998 999 def assert_qmp(self, d, path, value): 1000 '''Assert that the value for a specific path in a QMP dict 1001 matches. When given a list of values, assert that any of 1002 them matches.''' 1003 1004 result = self.dictpath(d, path) 1005 1006 # [] makes no sense as a list of valid values, so treat it as 1007 # an actual single value. 1008 if isinstance(value, list) and value != []: 1009 for v in value: 1010 if result == v: 1011 return 1012 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1013 else: 1014 self.assertEqual(result, value, 1015 '"%s" is "%s", expected "%s"' 1016 % (path, str(result), str(value))) 1017 1018 def assert_no_active_block_jobs(self): 1019 result = self.vm.qmp('query-block-jobs') 1020 self.assert_qmp(result, 'return', []) 1021 1022 def assert_has_block_node(self, node_name=None, file_name=None): 1023 """Issue a query-named-block-nodes and assert node_name and/or 1024 file_name is present in the result""" 1025 def check_equal_or_none(a, b): 1026 return a is None or b is None or a == b 1027 assert node_name or file_name 1028 result = self.vm.qmp('query-named-block-nodes') 1029 for x in result["return"]: 1030 if check_equal_or_none(x.get("node-name"), node_name) and \ 1031 check_equal_or_none(x.get("file"), file_name): 1032 return 1033 self.fail("Cannot find %s %s in result:\n%s" % 1034 (node_name, file_name, result)) 1035 1036 def assert_json_filename_equal(self, json_filename, reference): 1037 '''Asserts that the given filename is a json: filename and that its 1038 content is equal to the given reference object''' 1039 self.assertEqual(json_filename[:5], 'json:') 1040 self.assertEqual( 1041 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1042 self.vm.flatten_qmp_object(reference) 1043 ) 1044 1045 def cancel_and_wait(self, drive='drive0', force=False, 1046 resume=False, wait=60.0): 1047 '''Cancel a block job and wait for it to finish, returning the event''' 1048 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1049 self.assert_qmp(result, 'return', {}) 1050 1051 if resume: 1052 self.vm.resume_drive(drive) 1053 1054 cancelled = False 1055 result = None 1056 while not cancelled: 1057 for event in self.vm.get_qmp_events(wait=wait): 1058 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1059 event['event'] == 'BLOCK_JOB_CANCELLED': 1060 self.assert_qmp(event, 'data/device', drive) 1061 result = event 1062 cancelled = True 1063 elif event['event'] == 'JOB_STATUS_CHANGE': 1064 self.assert_qmp(event, 'data/id', drive) 1065 1066 1067 self.assert_no_active_block_jobs() 1068 return result 1069 1070 def wait_until_completed(self, drive='drive0', check_offset=True, 1071 wait=60.0, error=None): 1072 '''Wait for a block job to finish, returning the event''' 1073 while True: 1074 for event in self.vm.get_qmp_events(wait=wait): 1075 if event['event'] == 'BLOCK_JOB_COMPLETED': 1076 self.assert_qmp(event, 'data/device', drive) 1077 if error is None: 1078 self.assert_qmp_absent(event, 'data/error') 1079 if check_offset: 1080 self.assert_qmp(event, 'data/offset', 1081 event['data']['len']) 1082 else: 1083 self.assert_qmp(event, 'data/error', error) 1084 self.assert_no_active_block_jobs() 1085 return event 1086 if event['event'] == 'JOB_STATUS_CHANGE': 1087 self.assert_qmp(event, 'data/id', drive) 1088 1089 def wait_ready(self, drive='drive0'): 1090 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1091 return self.vm.events_wait([ 1092 ('BLOCK_JOB_READY', 1093 {'data': {'type': 'mirror', 'device': drive}}), 1094 ('BLOCK_JOB_READY', 1095 {'data': {'type': 'commit', 'device': drive}}) 1096 ]) 1097 1098 def wait_ready_and_cancel(self, drive='drive0'): 1099 self.wait_ready(drive=drive) 1100 event = self.cancel_and_wait(drive=drive) 1101 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1102 self.assert_qmp(event, 'data/type', 'mirror') 1103 self.assert_qmp(event, 'data/offset', event['data']['len']) 1104 1105 def complete_and_wait(self, drive='drive0', wait_ready=True, 1106 completion_error=None): 1107 '''Complete a block job and wait for it to finish''' 1108 if wait_ready: 1109 self.wait_ready(drive=drive) 1110 1111 result = self.vm.qmp('block-job-complete', device=drive) 1112 self.assert_qmp(result, 'return', {}) 1113 1114 event = self.wait_until_completed(drive=drive, error=completion_error) 1115 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1116 1117 def pause_wait(self, job_id='job0'): 1118 with Timeout(3, "Timeout waiting for job to pause"): 1119 while True: 1120 result = self.vm.qmp('query-block-jobs') 1121 found = False 1122 for job in result['return']: 1123 if job['device'] == job_id: 1124 found = True 1125 if job['paused'] and not job['busy']: 1126 return job 1127 break 1128 assert found 1129 1130 def pause_job(self, job_id='job0', wait=True): 1131 result = self.vm.qmp('block-job-pause', device=job_id) 1132 self.assert_qmp(result, 'return', {}) 1133 if wait: 1134 return self.pause_wait(job_id) 1135 return result 1136 1137 def case_skip(self, reason): 1138 '''Skip this test case''' 1139 case_notrun(reason) 1140 self.skipTest(reason) 1141 1142 1143def notrun(reason): 1144 '''Skip this test suite''' 1145 # Each test in qemu-iotests has a number ("seq") 1146 seq = os.path.basename(sys.argv[0]) 1147 1148 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1149 as outfile: 1150 outfile.write(reason + '\n') 1151 logger.warning("%s not run: %s", seq, reason) 1152 sys.exit(0) 1153 1154def case_notrun(reason): 1155 '''Mark this test case as not having been run (without actually 1156 skipping it, that is left to the caller). See 1157 QMPTestCase.case_skip() for a variant that actually skips the 1158 current test case.''' 1159 1160 # Each test in qemu-iotests has a number ("seq") 1161 seq = os.path.basename(sys.argv[0]) 1162 1163 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1164 as outfile: 1165 outfile.write(' [case not run] ' + reason + '\n') 1166 1167def _verify_image_format(supported_fmts: Sequence[str] = (), 1168 unsupported_fmts: Sequence[str] = ()) -> None: 1169 if 'generic' in supported_fmts and \ 1170 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1171 # similar to 1172 # _supported_fmt generic 1173 # for bash tests 1174 supported_fmts = () 1175 1176 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1177 if not_sup or (imgfmt in unsupported_fmts): 1178 notrun('not suitable for this image format: %s' % imgfmt) 1179 1180 if imgfmt == 'luks': 1181 verify_working_luks() 1182 1183def _verify_protocol(supported: Sequence[str] = (), 1184 unsupported: Sequence[str] = ()) -> None: 1185 assert not (supported and unsupported) 1186 1187 if 'generic' in supported: 1188 return 1189 1190 not_sup = supported and (imgproto not in supported) 1191 if not_sup or (imgproto in unsupported): 1192 notrun('not suitable for this protocol: %s' % imgproto) 1193 1194def _verify_platform(supported: Sequence[str] = (), 1195 unsupported: Sequence[str] = ()) -> None: 1196 if any((sys.platform.startswith(x) for x in unsupported)): 1197 notrun('not suitable for this OS: %s' % sys.platform) 1198 1199 if supported: 1200 if not any((sys.platform.startswith(x) for x in supported)): 1201 notrun('not suitable for this OS: %s' % sys.platform) 1202 1203def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1204 if supported_cache_modes and (cachemode not in supported_cache_modes): 1205 notrun('not suitable for this cache mode: %s' % cachemode) 1206 1207def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1208 if supported_aio_modes and (aiomode not in supported_aio_modes): 1209 notrun('not suitable for this aio mode: %s' % aiomode) 1210 1211def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1212 usf_list = list(set(required_formats) - set(supported_formats())) 1213 if usf_list: 1214 notrun(f'formats {usf_list} are not whitelisted') 1215 1216 1217def _verify_virtio_blk() -> None: 1218 out = qemu_pipe('-M', 'none', '-device', 'help') 1219 if 'virtio-blk' not in out: 1220 notrun('Missing virtio-blk in QEMU binary') 1221 1222def _verify_virtio_scsi_pci_or_ccw() -> None: 1223 out = qemu_pipe('-M', 'none', '-device', 'help') 1224 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1225 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1226 1227 1228def supports_quorum(): 1229 return 'quorum' in qemu_img_pipe('--help') 1230 1231def verify_quorum(): 1232 '''Skip test suite if quorum support is not available''' 1233 if not supports_quorum(): 1234 notrun('quorum support missing') 1235 1236def has_working_luks() -> Tuple[bool, str]: 1237 """ 1238 Check whether our LUKS driver can actually create images 1239 (this extends to LUKS encryption for qcow2). 1240 1241 If not, return the reason why. 1242 """ 1243 1244 img_file = f'{test_dir}/luks-test.luks' 1245 (output, status) = \ 1246 qemu_img_pipe_and_status('create', '-f', 'luks', 1247 '--object', luks_default_secret_object, 1248 '-o', luks_default_key_secret_opt, 1249 '-o', 'iter-time=10', 1250 img_file, '1G') 1251 try: 1252 os.remove(img_file) 1253 except OSError: 1254 pass 1255 1256 if status != 0: 1257 reason = output 1258 for line in output.splitlines(): 1259 if img_file + ':' in line: 1260 reason = line.split(img_file + ':', 1)[1].strip() 1261 break 1262 1263 return (False, reason) 1264 else: 1265 return (True, '') 1266 1267def verify_working_luks(): 1268 """ 1269 Skip test suite if LUKS does not work 1270 """ 1271 (working, reason) = has_working_luks() 1272 if not working: 1273 notrun(reason) 1274 1275def qemu_pipe(*args: str) -> str: 1276 """ 1277 Run qemu with an option to print something and exit (e.g. a help option). 1278 1279 :return: QEMU's stdout output. 1280 """ 1281 full_args = [qemu_prog] + qemu_opts + list(args) 1282 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1283 return output 1284 1285def supported_formats(read_only=False): 1286 '''Set 'read_only' to True to check ro-whitelist 1287 Otherwise, rw-whitelist is checked''' 1288 1289 if not hasattr(supported_formats, "formats"): 1290 supported_formats.formats = {} 1291 1292 if read_only not in supported_formats.formats: 1293 format_message = qemu_pipe("-drive", "format=help") 1294 line = 1 if read_only else 0 1295 supported_formats.formats[read_only] = \ 1296 format_message.splitlines()[line].split(":")[1].split() 1297 1298 return supported_formats.formats[read_only] 1299 1300def skip_if_unsupported(required_formats=(), read_only=False): 1301 '''Skip Test Decorator 1302 Runs the test if all the required formats are whitelisted''' 1303 def skip_test_decorator(func): 1304 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1305 **kwargs: Dict[str, Any]) -> None: 1306 if callable(required_formats): 1307 fmts = required_formats(test_case) 1308 else: 1309 fmts = required_formats 1310 1311 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1312 if usf_list: 1313 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1314 test_case.case_skip(msg) 1315 else: 1316 func(test_case, *args, **kwargs) 1317 return func_wrapper 1318 return skip_test_decorator 1319 1320def skip_for_formats(formats: Sequence[str] = ()) \ 1321 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1322 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1323 '''Skip Test Decorator 1324 Skips the test for the given formats''' 1325 def skip_test_decorator(func): 1326 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1327 **kwargs: Dict[str, Any]) -> None: 1328 if imgfmt in formats: 1329 msg = f'{test_case}: Skipped for format {imgfmt}' 1330 test_case.case_skip(msg) 1331 else: 1332 func(test_case, *args, **kwargs) 1333 return func_wrapper 1334 return skip_test_decorator 1335 1336def skip_if_user_is_root(func): 1337 '''Skip Test Decorator 1338 Runs the test only without root permissions''' 1339 def func_wrapper(*args, **kwargs): 1340 if os.getuid() == 0: 1341 case_notrun('{}: cannot be run as root'.format(args[0])) 1342 return None 1343 else: 1344 return func(*args, **kwargs) 1345 return func_wrapper 1346 1347# We need to filter out the time taken from the output so that 1348# qemu-iotest can reliably diff the results against master output, 1349# and hide skipped tests from the reference output. 1350 1351class ReproducibleTestResult(unittest.TextTestResult): 1352 def addSkip(self, test, reason): 1353 # Same as TextTestResult, but print dot instead of "s" 1354 unittest.TestResult.addSkip(self, test, reason) 1355 if self.showAll: 1356 self.stream.writeln("skipped {0!r}".format(reason)) 1357 elif self.dots: 1358 self.stream.write(".") 1359 self.stream.flush() 1360 1361class ReproducibleStreamWrapper: 1362 def __init__(self, stream: TextIO): 1363 self.stream = stream 1364 1365 def __getattr__(self, attr): 1366 if attr in ('stream', '__getstate__'): 1367 raise AttributeError(attr) 1368 return getattr(self.stream, attr) 1369 1370 def write(self, arg=None): 1371 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1372 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1373 self.stream.write(arg) 1374 1375class ReproducibleTestRunner(unittest.TextTestRunner): 1376 def __init__(self, stream: Optional[TextIO] = None, 1377 resultclass: Type[unittest.TestResult] = 1378 ReproducibleTestResult, 1379 **kwargs: Any) -> None: 1380 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1381 super().__init__(stream=rstream, # type: ignore 1382 descriptions=True, 1383 resultclass=resultclass, 1384 **kwargs) 1385 1386def execute_unittest(argv: List[str], debug: bool = False) -> None: 1387 """Executes unittests within the calling module.""" 1388 1389 # Some tests have warnings, especially ResourceWarnings for unclosed 1390 # files and sockets. Ignore them for now to ensure reproducibility of 1391 # the test output. 1392 unittest.main(argv=argv, 1393 testRunner=ReproducibleTestRunner, 1394 verbosity=2 if debug else 1, 1395 warnings=None if sys.warnoptions else 'ignore') 1396 1397def execute_setup_common(supported_fmts: Sequence[str] = (), 1398 supported_platforms: Sequence[str] = (), 1399 supported_cache_modes: Sequence[str] = (), 1400 supported_aio_modes: Sequence[str] = (), 1401 unsupported_fmts: Sequence[str] = (), 1402 supported_protocols: Sequence[str] = (), 1403 unsupported_protocols: Sequence[str] = (), 1404 required_fmts: Sequence[str] = ()) -> bool: 1405 """ 1406 Perform necessary setup for either script-style or unittest-style tests. 1407 1408 :return: Bool; Whether or not debug mode has been requested via the CLI. 1409 """ 1410 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1411 1412 debug = '-d' in sys.argv 1413 if debug: 1414 sys.argv.remove('-d') 1415 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1416 1417 _verify_image_format(supported_fmts, unsupported_fmts) 1418 _verify_protocol(supported_protocols, unsupported_protocols) 1419 _verify_platform(supported=supported_platforms) 1420 _verify_cache_mode(supported_cache_modes) 1421 _verify_aio_mode(supported_aio_modes) 1422 _verify_formats(required_fmts) 1423 _verify_virtio_blk() 1424 1425 return debug 1426 1427def execute_test(*args, test_function=None, **kwargs): 1428 """Run either unittest or script-style tests.""" 1429 1430 debug = execute_setup_common(*args, **kwargs) 1431 if not test_function: 1432 execute_unittest(sys.argv, debug) 1433 else: 1434 test_function() 1435 1436def activate_logging(): 1437 """Activate iotests.log() output to stdout for script-style tests.""" 1438 handler = logging.StreamHandler(stream=sys.stdout) 1439 formatter = logging.Formatter('%(message)s') 1440 handler.setFormatter(formatter) 1441 test_logger.addHandler(handler) 1442 test_logger.setLevel(logging.INFO) 1443 test_logger.propagate = False 1444 1445# This is called from script-style iotests without a single point of entry 1446def script_initialize(*args, **kwargs): 1447 """Initialize script-style tests without running any tests.""" 1448 activate_logging() 1449 execute_setup_common(*args, **kwargs) 1450 1451# This is called from script-style iotests with a single point of entry 1452def script_main(test_function, *args, **kwargs): 1453 """Run script-style tests outside of the unittest framework""" 1454 activate_logging() 1455 execute_test(*args, test_function=test_function, **kwargs) 1456 1457# This is called from unittest style iotests 1458def main(*args, **kwargs): 1459 """Run tests using the unittest framework""" 1460 execute_test(*args, **kwargs) 1461