1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39# pylint: disable=import-error, wrong-import-position 40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 41from qemu.machine import qtest 42from qemu.qmp import QMPMessage 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79output_dir = os.environ.get('OUTPUT_DIR', '.') 80 81try: 82 test_dir = os.environ['TEST_DIR'] 83 sock_dir = os.environ['SOCK_DIR'] 84 cachemode = os.environ['CACHEMODE'] 85 aiomode = os.environ['AIOMODE'] 86 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 87except KeyError: 88 # We are using these variables as proxies to indicate that we're 89 # not being run via "check". There may be other things set up by 90 # "check" that individual test cases rely on. 91 sys.stderr.write('Please run this test via the "check" script\n') 92 sys.exit(os.EX_USAGE) 93 94socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 95 96luks_default_secret_object = 'secret,id=keysec0,data=' + \ 97 os.environ.get('IMGKEYSECRET', '') 98luks_default_key_secret_opt = 'key-secret=keysec0' 99 100sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 101 102 103def unarchive_sample_image(sample, fname): 104 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 105 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 106 shutil.copyfileobj(f_in, f_out) 107 108 109def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 110 connect_stderr: bool = True) -> Tuple[str, int]: 111 """ 112 Run a tool and return both its output and its exit code 113 """ 114 stderr = subprocess.STDOUT if connect_stderr else None 115 with subprocess.Popen(args, stdout=subprocess.PIPE, 116 stderr=stderr, universal_newlines=True) as subp: 117 output = subp.communicate()[0] 118 if subp.returncode < 0: 119 cmd = ' '.join(args) 120 sys.stderr.write(f'{tool} received signal \ 121 {-subp.returncode}: {cmd}\n') 122 return (output, subp.returncode) 123 124def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 125 """ 126 Run qemu-img and return both its output and its exit code 127 """ 128 full_args = qemu_img_args + list(args) 129 return qemu_tool_pipe_and_status('qemu-img', full_args) 130 131def qemu_img(*args: str) -> int: 132 '''Run qemu-img and return the exit code''' 133 return qemu_img_pipe_and_status(*args)[1] 134 135def ordered_qmp(qmsg, conv_keys=True): 136 # Dictionaries are not ordered prior to 3.6, therefore: 137 if isinstance(qmsg, list): 138 return [ordered_qmp(atom) for atom in qmsg] 139 if isinstance(qmsg, dict): 140 od = OrderedDict() 141 for k, v in sorted(qmsg.items()): 142 if conv_keys: 143 k = k.replace('_', '-') 144 od[k] = ordered_qmp(v, conv_keys=False) 145 return od 146 return qmsg 147 148def qemu_img_create(*args): 149 args = list(args) 150 151 # default luks support 152 if '-f' in args and args[args.index('-f') + 1] == 'luks': 153 if '-o' in args: 154 i = args.index('-o') 155 if 'key-secret' not in args[i + 1]: 156 args[i + 1].append(luks_default_key_secret_opt) 157 args.insert(i + 2, '--object') 158 args.insert(i + 3, luks_default_secret_object) 159 else: 160 args = ['-o', luks_default_key_secret_opt, 161 '--object', luks_default_secret_object] + args 162 163 args.insert(0, 'create') 164 165 return qemu_img(*args) 166 167def qemu_img_measure(*args): 168 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 169 170def qemu_img_check(*args): 171 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 172 173def qemu_img_verbose(*args): 174 '''Run qemu-img without suppressing its output and return the exit code''' 175 exitcode = subprocess.call(qemu_img_args + list(args)) 176 if exitcode < 0: 177 sys.stderr.write('qemu-img received signal %i: %s\n' 178 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 179 return exitcode 180 181def qemu_img_pipe(*args: str) -> str: 182 '''Run qemu-img and return its output''' 183 return qemu_img_pipe_and_status(*args)[0] 184 185def qemu_img_log(*args): 186 result = qemu_img_pipe(*args) 187 log(result, filters=[filter_testfiles]) 188 return result 189 190def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 191 args = ['info'] 192 if imgopts: 193 args.append('--image-opts') 194 else: 195 args += ['-f', imgfmt] 196 args += extra_args 197 args.append(filename) 198 199 output = qemu_img_pipe(*args) 200 if not filter_path: 201 filter_path = filename 202 log(filter_img_info(output, filter_path)) 203 204def qemu_io(*args): 205 '''Run qemu-io and return the stdout data''' 206 args = qemu_io_args + list(args) 207 return qemu_tool_pipe_and_status('qemu-io', args)[0] 208 209def qemu_io_log(*args): 210 result = qemu_io(*args) 211 log(result, filters=[filter_testfiles, filter_qemu_io]) 212 return result 213 214def qemu_io_silent(*args): 215 '''Run qemu-io and return the exit code, suppressing stdout''' 216 if '-f' in args or '--image-opts' in args: 217 default_args = qemu_io_args_no_fmt 218 else: 219 default_args = qemu_io_args 220 221 args = default_args + list(args) 222 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 223 if exitcode < 0: 224 sys.stderr.write('qemu-io received signal %i: %s\n' % 225 (-exitcode, ' '.join(args))) 226 return exitcode 227 228def qemu_io_silent_check(*args): 229 '''Run qemu-io and return the true if subprocess returned 0''' 230 args = qemu_io_args + list(args) 231 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 232 stderr=subprocess.STDOUT) 233 return exitcode == 0 234 235class QemuIoInteractive: 236 def __init__(self, *args): 237 self.args = qemu_io_args_no_fmt + list(args) 238 # We need to keep the Popen objext around, and not 239 # close it immediately. Therefore, disable the pylint check: 240 # pylint: disable=consider-using-with 241 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 242 stdout=subprocess.PIPE, 243 stderr=subprocess.STDOUT, 244 universal_newlines=True) 245 out = self._p.stdout.read(9) 246 if out != 'qemu-io> ': 247 # Most probably qemu-io just failed to start. 248 # Let's collect the whole output and exit. 249 out += self._p.stdout.read() 250 self._p.wait(timeout=1) 251 raise ValueError(out) 252 253 def close(self): 254 self._p.communicate('q\n') 255 256 def _read_output(self): 257 pattern = 'qemu-io> ' 258 n = len(pattern) 259 pos = 0 260 s = [] 261 while pos != n: 262 c = self._p.stdout.read(1) 263 # check unexpected EOF 264 assert c != '' 265 s.append(c) 266 if c == pattern[pos]: 267 pos += 1 268 else: 269 pos = 0 270 271 return ''.join(s[:-n]) 272 273 def cmd(self, cmd): 274 # quit command is in close(), '\n' is added automatically 275 assert '\n' not in cmd 276 cmd = cmd.strip() 277 assert cmd not in ('q', 'quit') 278 self._p.stdin.write(cmd + '\n') 279 self._p.stdin.flush() 280 return self._read_output() 281 282 283def qemu_nbd(*args): 284 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 285 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 286 287def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 288 '''Run qemu-nbd in daemon mode and return both the parent's exit code 289 and its output in case of an error''' 290 full_args = qemu_nbd_args + ['--fork'] + list(args) 291 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 292 connect_stderr=False) 293 return returncode, output if returncode else '' 294 295def qemu_nbd_list_log(*args: str) -> str: 296 '''Run qemu-nbd to list remote exports''' 297 full_args = [qemu_nbd_prog, '-L'] + list(args) 298 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 299 log(output, filters=[filter_testfiles, filter_nbd_exports]) 300 return output 301 302@contextmanager 303def qemu_nbd_popen(*args): 304 '''Context manager running qemu-nbd within the context''' 305 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 306 307 assert not os.path.exists(pid_file) 308 309 cmd = list(qemu_nbd_args) 310 cmd.extend(('--persistent', '--pid-file', pid_file)) 311 cmd.extend(args) 312 313 log('Start NBD server') 314 with subprocess.Popen(cmd) as p: 315 try: 316 while not os.path.exists(pid_file): 317 if p.poll() is not None: 318 raise RuntimeError( 319 "qemu-nbd terminated with exit code {}: {}" 320 .format(p.returncode, ' '.join(cmd))) 321 322 time.sleep(0.01) 323 yield 324 finally: 325 if os.path.exists(pid_file): 326 os.remove(pid_file) 327 log('Kill NBD server') 328 p.kill() 329 p.wait() 330 331def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 332 '''Return True if two image files are identical''' 333 return qemu_img('compare', '-f', fmt1, 334 '-F', fmt2, img1, img2) == 0 335 336def create_image(name, size): 337 '''Create a fully-allocated raw image with sector markers''' 338 with open(name, 'wb') as file: 339 i = 0 340 while i < size: 341 sector = struct.pack('>l504xl', i // 512, i // 512) 342 file.write(sector) 343 i = i + 512 344 345def image_size(img): 346 '''Return image's virtual size''' 347 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 348 return json.loads(r)['virtual-size'] 349 350def is_str(val): 351 return isinstance(val, str) 352 353test_dir_re = re.compile(r"%s" % test_dir) 354def filter_test_dir(msg): 355 return test_dir_re.sub("TEST_DIR", msg) 356 357win32_re = re.compile(r"\r") 358def filter_win32(msg): 359 return win32_re.sub("", msg) 360 361qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 362 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 363 r"and [0-9\/.inf]* ops\/sec\)") 364def filter_qemu_io(msg): 365 msg = filter_win32(msg) 366 return qemu_io_re.sub("X ops; XX:XX:XX.X " 367 "(XXX YYY/sec and XXX ops/sec)", msg) 368 369chown_re = re.compile(r"chown [0-9]+:[0-9]+") 370def filter_chown(msg): 371 return chown_re.sub("chown UID:GID", msg) 372 373def filter_qmp_event(event): 374 '''Filter a QMP event dict''' 375 event = dict(event) 376 if 'timestamp' in event: 377 event['timestamp']['seconds'] = 'SECS' 378 event['timestamp']['microseconds'] = 'USECS' 379 return event 380 381def filter_qmp(qmsg, filter_fn): 382 '''Given a string filter, filter a QMP object's values. 383 filter_fn takes a (key, value) pair.''' 384 # Iterate through either lists or dicts; 385 if isinstance(qmsg, list): 386 items = enumerate(qmsg) 387 else: 388 items = qmsg.items() 389 390 for k, v in items: 391 if isinstance(v, (dict, list)): 392 qmsg[k] = filter_qmp(v, filter_fn) 393 else: 394 qmsg[k] = filter_fn(k, v) 395 return qmsg 396 397def filter_testfiles(msg): 398 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 399 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 400 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 401 402def filter_qmp_testfiles(qmsg): 403 def _filter(_key, value): 404 if is_str(value): 405 return filter_testfiles(value) 406 return value 407 return filter_qmp(qmsg, _filter) 408 409def filter_virtio_scsi(output: str) -> str: 410 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 411 412def filter_qmp_virtio_scsi(qmsg): 413 def _filter(_key, value): 414 if is_str(value): 415 return filter_virtio_scsi(value) 416 return value 417 return filter_qmp(qmsg, _filter) 418 419def filter_generated_node_ids(msg): 420 return re.sub("#block[0-9]+", "NODE_NAME", msg) 421 422def filter_img_info(output, filename): 423 lines = [] 424 for line in output.split('\n'): 425 if 'disk size' in line or 'actual-size' in line: 426 continue 427 line = line.replace(filename, 'TEST_IMG') 428 line = filter_testfiles(line) 429 line = line.replace(imgfmt, 'IMGFMT') 430 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 431 line = re.sub('uuid: [-a-f0-9]+', 432 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 433 line) 434 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 435 lines.append(line) 436 return '\n'.join(lines) 437 438def filter_imgfmt(msg): 439 return msg.replace(imgfmt, 'IMGFMT') 440 441def filter_qmp_imgfmt(qmsg): 442 def _filter(_key, value): 443 if is_str(value): 444 return filter_imgfmt(value) 445 return value 446 return filter_qmp(qmsg, _filter) 447 448def filter_nbd_exports(output: str) -> str: 449 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 450 451 452Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 453 454def log(msg: Msg, 455 filters: Iterable[Callable[[Msg], Msg]] = (), 456 indent: Optional[int] = None) -> None: 457 """ 458 Logs either a string message or a JSON serializable message (like QMP). 459 If indent is provided, JSON serializable messages are pretty-printed. 460 """ 461 for flt in filters: 462 msg = flt(msg) 463 if isinstance(msg, (dict, list)): 464 # Don't sort if it's already sorted 465 do_sort = not isinstance(msg, OrderedDict) 466 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 467 else: 468 test_logger.info(msg) 469 470class Timeout: 471 def __init__(self, seconds, errmsg="Timeout"): 472 self.seconds = seconds 473 self.errmsg = errmsg 474 def __enter__(self): 475 signal.signal(signal.SIGALRM, self.timeout) 476 signal.setitimer(signal.ITIMER_REAL, self.seconds) 477 return self 478 def __exit__(self, exc_type, value, traceback): 479 signal.setitimer(signal.ITIMER_REAL, 0) 480 return False 481 def timeout(self, signum, frame): 482 raise Exception(self.errmsg) 483 484def file_pattern(name): 485 return "{0}-{1}".format(os.getpid(), name) 486 487class FilePath: 488 """ 489 Context manager generating multiple file names. The generated files are 490 removed when exiting the context. 491 492 Example usage: 493 494 with FilePath('a.img', 'b.img') as (img_a, img_b): 495 # Use img_a and img_b here... 496 497 # a.img and b.img are automatically removed here. 498 499 By default images are created in iotests.test_dir. To create sockets use 500 iotests.sock_dir: 501 502 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 503 504 For convenience, calling with one argument yields a single file instead of 505 a tuple with one item. 506 507 """ 508 def __init__(self, *names, base_dir=test_dir): 509 self.paths = [os.path.join(base_dir, file_pattern(name)) 510 for name in names] 511 512 def __enter__(self): 513 if len(self.paths) == 1: 514 return self.paths[0] 515 else: 516 return self.paths 517 518 def __exit__(self, exc_type, exc_val, exc_tb): 519 for path in self.paths: 520 try: 521 os.remove(path) 522 except OSError: 523 pass 524 return False 525 526 527def try_remove(img): 528 try: 529 os.remove(img) 530 except OSError: 531 pass 532 533def file_path_remover(): 534 for path in reversed(file_path_remover.paths): 535 try_remove(path) 536 537 538def file_path(*names, base_dir=test_dir): 539 ''' Another way to get auto-generated filename that cleans itself up. 540 541 Use is as simple as: 542 543 img_a, img_b = file_path('a.img', 'b.img') 544 sock = file_path('socket') 545 ''' 546 547 if not hasattr(file_path_remover, 'paths'): 548 file_path_remover.paths = [] 549 atexit.register(file_path_remover) 550 551 paths = [] 552 for name in names: 553 filename = file_pattern(name) 554 path = os.path.join(base_dir, filename) 555 file_path_remover.paths.append(path) 556 paths.append(path) 557 558 return paths[0] if len(paths) == 1 else paths 559 560def remote_filename(path): 561 if imgproto == 'file': 562 return path 563 elif imgproto == 'ssh': 564 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 565 else: 566 raise Exception("Protocol %s not supported" % (imgproto)) 567 568class VM(qtest.QEMUQtestMachine): 569 '''A QEMU VM''' 570 571 def __init__(self, path_suffix=''): 572 name = "qemu%s-%d" % (path_suffix, os.getpid()) 573 timer = 15.0 574 super().__init__(qemu_prog, qemu_opts, name=name, 575 base_temp_dir=test_dir, 576 socket_scm_helper=socket_scm_helper, 577 sock_dir=sock_dir, qmp_timer=timer) 578 self._num_drives = 0 579 580 def add_object(self, opts): 581 self._args.append('-object') 582 self._args.append(opts) 583 return self 584 585 def add_device(self, opts): 586 self._args.append('-device') 587 self._args.append(opts) 588 return self 589 590 def add_drive_raw(self, opts): 591 self._args.append('-drive') 592 self._args.append(opts) 593 return self 594 595 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 596 '''Add a virtio-blk drive to the VM''' 597 options = ['if=%s' % interface, 598 'id=drive%d' % self._num_drives] 599 600 if path is not None: 601 options.append('file=%s' % path) 602 options.append('format=%s' % img_format) 603 options.append('cache=%s' % cachemode) 604 options.append('aio=%s' % aiomode) 605 606 if opts: 607 options.append(opts) 608 609 if img_format == 'luks' and 'key-secret' not in opts: 610 # default luks support 611 if luks_default_secret_object not in self._args: 612 self.add_object(luks_default_secret_object) 613 614 options.append(luks_default_key_secret_opt) 615 616 self._args.append('-drive') 617 self._args.append(','.join(options)) 618 self._num_drives += 1 619 return self 620 621 def add_blockdev(self, opts): 622 self._args.append('-blockdev') 623 if isinstance(opts, str): 624 self._args.append(opts) 625 else: 626 self._args.append(','.join(opts)) 627 return self 628 629 def add_incoming(self, addr): 630 self._args.append('-incoming') 631 self._args.append(addr) 632 return self 633 634 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 635 cmd = 'human-monitor-command' 636 kwargs: Dict[str, Any] = {'command-line': command_line} 637 if use_log: 638 return self.qmp_log(cmd, **kwargs) 639 else: 640 return self.qmp(cmd, **kwargs) 641 642 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 643 """Pause drive r/w operations""" 644 if not event: 645 self.pause_drive(drive, "read_aio") 646 self.pause_drive(drive, "write_aio") 647 return 648 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 649 650 def resume_drive(self, drive: str) -> None: 651 """Resume drive r/w operations""" 652 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 653 654 def hmp_qemu_io(self, drive: str, cmd: str, 655 use_log: bool = False) -> QMPMessage: 656 """Write to a given drive using an HMP command""" 657 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 658 659 def flatten_qmp_object(self, obj, output=None, basestr=''): 660 if output is None: 661 output = dict() 662 if isinstance(obj, list): 663 for i, item in enumerate(obj): 664 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 665 elif isinstance(obj, dict): 666 for key in obj: 667 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 668 else: 669 output[basestr[:-1]] = obj # Strip trailing '.' 670 return output 671 672 def qmp_to_opts(self, obj): 673 obj = self.flatten_qmp_object(obj) 674 output_list = list() 675 for key in obj: 676 output_list += [key + '=' + obj[key]] 677 return ','.join(output_list) 678 679 def get_qmp_events_filtered(self, wait=60.0): 680 result = [] 681 for ev in self.get_qmp_events(wait=wait): 682 result.append(filter_qmp_event(ev)) 683 return result 684 685 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 686 full_cmd = OrderedDict(( 687 ("execute", cmd), 688 ("arguments", ordered_qmp(kwargs)) 689 )) 690 log(full_cmd, filters, indent=indent) 691 result = self.qmp(cmd, **kwargs) 692 log(result, filters, indent=indent) 693 return result 694 695 # Returns None on success, and an error string on failure 696 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 697 pre_finalize=None, cancel=False, wait=60.0): 698 """ 699 run_job moves a job from creation through to dismissal. 700 701 :param job: String. ID of recently-launched job 702 :param auto_finalize: Bool. True if the job was launched with 703 auto_finalize. Defaults to True. 704 :param auto_dismiss: Bool. True if the job was launched with 705 auto_dismiss=True. Defaults to False. 706 :param pre_finalize: Callback. A callable that takes no arguments to be 707 invoked prior to issuing job-finalize, if any. 708 :param cancel: Bool. When true, cancels the job after the pre_finalize 709 callback. 710 :param wait: Float. Timeout value specifying how long to wait for any 711 event, in seconds. Defaults to 60.0. 712 """ 713 match_device = {'data': {'device': job}} 714 match_id = {'data': {'id': job}} 715 events = [ 716 ('BLOCK_JOB_COMPLETED', match_device), 717 ('BLOCK_JOB_CANCELLED', match_device), 718 ('BLOCK_JOB_ERROR', match_device), 719 ('BLOCK_JOB_READY', match_device), 720 ('BLOCK_JOB_PENDING', match_id), 721 ('JOB_STATUS_CHANGE', match_id) 722 ] 723 error = None 724 while True: 725 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 726 if ev['event'] != 'JOB_STATUS_CHANGE': 727 log(ev) 728 continue 729 status = ev['data']['status'] 730 if status == 'aborting': 731 result = self.qmp('query-jobs') 732 for j in result['return']: 733 if j['id'] == job: 734 error = j['error'] 735 log('Job failed: %s' % (j['error'])) 736 elif status == 'ready': 737 self.qmp_log('job-complete', id=job) 738 elif status == 'pending' and not auto_finalize: 739 if pre_finalize: 740 pre_finalize() 741 if cancel: 742 self.qmp_log('job-cancel', id=job) 743 else: 744 self.qmp_log('job-finalize', id=job) 745 elif status == 'concluded' and not auto_dismiss: 746 self.qmp_log('job-dismiss', id=job) 747 elif status == 'null': 748 return error 749 750 # Returns None on success, and an error string on failure 751 def blockdev_create(self, options, job_id='job0', filters=None): 752 if filters is None: 753 filters = [filter_qmp_testfiles] 754 result = self.qmp_log('blockdev-create', filters=filters, 755 job_id=job_id, options=options) 756 757 if 'return' in result: 758 assert result['return'] == {} 759 job_result = self.run_job(job_id) 760 else: 761 job_result = result['error'] 762 763 log("") 764 return job_result 765 766 def enable_migration_events(self, name): 767 log('Enabling migration QMP events on %s...' % name) 768 log(self.qmp('migrate-set-capabilities', capabilities=[ 769 { 770 'capability': 'events', 771 'state': True 772 } 773 ])) 774 775 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 776 while True: 777 event = self.event_wait('MIGRATION') 778 # We use the default timeout, and with a timeout, event_wait() 779 # never returns None 780 assert event 781 782 log(event, filters=[filter_qmp_event]) 783 if event['data']['status'] in ('completed', 'failed'): 784 break 785 786 if event['data']['status'] == 'completed': 787 # The event may occur in finish-migrate, so wait for the expected 788 # post-migration runstate 789 runstate = None 790 while runstate != expect_runstate: 791 runstate = self.qmp('query-status')['return']['status'] 792 return True 793 else: 794 return False 795 796 def node_info(self, node_name): 797 nodes = self.qmp('query-named-block-nodes') 798 for x in nodes['return']: 799 if x['node-name'] == node_name: 800 return x 801 return None 802 803 def query_bitmaps(self): 804 res = self.qmp("query-named-block-nodes") 805 return {device['node-name']: device['dirty-bitmaps'] 806 for device in res['return'] if 'dirty-bitmaps' in device} 807 808 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 809 """ 810 get a specific bitmap from the object returned by query_bitmaps. 811 :param recording: If specified, filter results by the specified value. 812 :param bitmaps: If specified, use it instead of call query_bitmaps() 813 """ 814 if bitmaps is None: 815 bitmaps = self.query_bitmaps() 816 817 for bitmap in bitmaps[node_name]: 818 if bitmap.get('name', '') == bitmap_name: 819 if recording is None or bitmap.get('recording') == recording: 820 return bitmap 821 return None 822 823 def check_bitmap_status(self, node_name, bitmap_name, fields): 824 ret = self.get_bitmap(node_name, bitmap_name) 825 826 return fields.items() <= ret.items() 827 828 def assert_block_path(self, root, path, expected_node, graph=None): 829 """ 830 Check whether the node under the given path in the block graph 831 is @expected_node. 832 833 @root is the node name of the node where the @path is rooted. 834 835 @path is a string that consists of child names separated by 836 slashes. It must begin with a slash. 837 838 Examples for @root + @path: 839 - root="qcow2-node", path="/backing/file" 840 - root="quorum-node", path="/children.2/file" 841 842 Hypothetically, @path could be empty, in which case it would 843 point to @root. However, in practice this case is not useful 844 and hence not allowed. 845 846 @expected_node may be None. (All elements of the path but the 847 leaf must still exist.) 848 849 @graph may be None or the result of an x-debug-query-block-graph 850 call that has already been performed. 851 """ 852 if graph is None: 853 graph = self.qmp('x-debug-query-block-graph')['return'] 854 855 iter_path = iter(path.split('/')) 856 857 # Must start with a / 858 assert next(iter_path) == '' 859 860 node = next((node for node in graph['nodes'] if node['name'] == root), 861 None) 862 863 # An empty @path is not allowed, so the root node must be present 864 assert node is not None, 'Root node %s not found' % root 865 866 for child_name in iter_path: 867 assert node is not None, 'Cannot follow path %s%s' % (root, path) 868 869 try: 870 node_id = next(edge['child'] for edge in graph['edges'] 871 if (edge['parent'] == node['id'] and 872 edge['name'] == child_name)) 873 874 node = next(node for node in graph['nodes'] 875 if node['id'] == node_id) 876 877 except StopIteration: 878 node = None 879 880 if node is None: 881 assert expected_node is None, \ 882 'No node found under %s (but expected %s)' % \ 883 (path, expected_node) 884 else: 885 assert node['name'] == expected_node, \ 886 'Found node %s under %s (but expected %s)' % \ 887 (node['name'], path, expected_node) 888 889index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 890 891class QMPTestCase(unittest.TestCase): 892 '''Abstract base class for QMP test cases''' 893 894 def __init__(self, *args, **kwargs): 895 super().__init__(*args, **kwargs) 896 # Many users of this class set a VM property we rely on heavily 897 # in the methods below. 898 self.vm = None 899 900 def dictpath(self, d, path): 901 '''Traverse a path in a nested dict''' 902 for component in path.split('/'): 903 m = index_re.match(component) 904 if m: 905 component, idx = m.groups() 906 idx = int(idx) 907 908 if not isinstance(d, dict) or component not in d: 909 self.fail(f'failed path traversal for "{path}" in "{d}"') 910 d = d[component] 911 912 if m: 913 if not isinstance(d, list): 914 self.fail(f'path component "{component}" in "{path}" ' 915 f'is not a list in "{d}"') 916 try: 917 d = d[idx] 918 except IndexError: 919 self.fail(f'invalid index "{idx}" in path "{path}" ' 920 f'in "{d}"') 921 return d 922 923 def assert_qmp_absent(self, d, path): 924 try: 925 result = self.dictpath(d, path) 926 except AssertionError: 927 return 928 self.fail('path "%s" has value "%s"' % (path, str(result))) 929 930 def assert_qmp(self, d, path, value): 931 '''Assert that the value for a specific path in a QMP dict 932 matches. When given a list of values, assert that any of 933 them matches.''' 934 935 result = self.dictpath(d, path) 936 937 # [] makes no sense as a list of valid values, so treat it as 938 # an actual single value. 939 if isinstance(value, list) and value != []: 940 for v in value: 941 if result == v: 942 return 943 self.fail('no match for "%s" in %s' % (str(result), str(value))) 944 else: 945 self.assertEqual(result, value, 946 '"%s" is "%s", expected "%s"' 947 % (path, str(result), str(value))) 948 949 def assert_no_active_block_jobs(self): 950 result = self.vm.qmp('query-block-jobs') 951 self.assert_qmp(result, 'return', []) 952 953 def assert_has_block_node(self, node_name=None, file_name=None): 954 """Issue a query-named-block-nodes and assert node_name and/or 955 file_name is present in the result""" 956 def check_equal_or_none(a, b): 957 return a is None or b is None or a == b 958 assert node_name or file_name 959 result = self.vm.qmp('query-named-block-nodes') 960 for x in result["return"]: 961 if check_equal_or_none(x.get("node-name"), node_name) and \ 962 check_equal_or_none(x.get("file"), file_name): 963 return 964 self.fail("Cannot find %s %s in result:\n%s" % 965 (node_name, file_name, result)) 966 967 def assert_json_filename_equal(self, json_filename, reference): 968 '''Asserts that the given filename is a json: filename and that its 969 content is equal to the given reference object''' 970 self.assertEqual(json_filename[:5], 'json:') 971 self.assertEqual( 972 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 973 self.vm.flatten_qmp_object(reference) 974 ) 975 976 def cancel_and_wait(self, drive='drive0', force=False, 977 resume=False, wait=60.0): 978 '''Cancel a block job and wait for it to finish, returning the event''' 979 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 980 self.assert_qmp(result, 'return', {}) 981 982 if resume: 983 self.vm.resume_drive(drive) 984 985 cancelled = False 986 result = None 987 while not cancelled: 988 for event in self.vm.get_qmp_events(wait=wait): 989 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 990 event['event'] == 'BLOCK_JOB_CANCELLED': 991 self.assert_qmp(event, 'data/device', drive) 992 result = event 993 cancelled = True 994 elif event['event'] == 'JOB_STATUS_CHANGE': 995 self.assert_qmp(event, 'data/id', drive) 996 997 998 self.assert_no_active_block_jobs() 999 return result 1000 1001 def wait_until_completed(self, drive='drive0', check_offset=True, 1002 wait=60.0, error=None): 1003 '''Wait for a block job to finish, returning the event''' 1004 while True: 1005 for event in self.vm.get_qmp_events(wait=wait): 1006 if event['event'] == 'BLOCK_JOB_COMPLETED': 1007 self.assert_qmp(event, 'data/device', drive) 1008 if error is None: 1009 self.assert_qmp_absent(event, 'data/error') 1010 if check_offset: 1011 self.assert_qmp(event, 'data/offset', 1012 event['data']['len']) 1013 else: 1014 self.assert_qmp(event, 'data/error', error) 1015 self.assert_no_active_block_jobs() 1016 return event 1017 if event['event'] == 'JOB_STATUS_CHANGE': 1018 self.assert_qmp(event, 'data/id', drive) 1019 1020 def wait_ready(self, drive='drive0'): 1021 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1022 return self.vm.events_wait([ 1023 ('BLOCK_JOB_READY', 1024 {'data': {'type': 'mirror', 'device': drive}}), 1025 ('BLOCK_JOB_READY', 1026 {'data': {'type': 'commit', 'device': drive}}) 1027 ]) 1028 1029 def wait_ready_and_cancel(self, drive='drive0'): 1030 self.wait_ready(drive=drive) 1031 event = self.cancel_and_wait(drive=drive) 1032 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1033 self.assert_qmp(event, 'data/type', 'mirror') 1034 self.assert_qmp(event, 'data/offset', event['data']['len']) 1035 1036 def complete_and_wait(self, drive='drive0', wait_ready=True, 1037 completion_error=None): 1038 '''Complete a block job and wait for it to finish''' 1039 if wait_ready: 1040 self.wait_ready(drive=drive) 1041 1042 result = self.vm.qmp('block-job-complete', device=drive) 1043 self.assert_qmp(result, 'return', {}) 1044 1045 event = self.wait_until_completed(drive=drive, error=completion_error) 1046 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1047 1048 def pause_wait(self, job_id='job0'): 1049 with Timeout(3, "Timeout waiting for job to pause"): 1050 while True: 1051 result = self.vm.qmp('query-block-jobs') 1052 found = False 1053 for job in result['return']: 1054 if job['device'] == job_id: 1055 found = True 1056 if job['paused'] and not job['busy']: 1057 return job 1058 break 1059 assert found 1060 1061 def pause_job(self, job_id='job0', wait=True): 1062 result = self.vm.qmp('block-job-pause', device=job_id) 1063 self.assert_qmp(result, 'return', {}) 1064 if wait: 1065 return self.pause_wait(job_id) 1066 return result 1067 1068 def case_skip(self, reason): 1069 '''Skip this test case''' 1070 case_notrun(reason) 1071 self.skipTest(reason) 1072 1073 1074def notrun(reason): 1075 '''Skip this test suite''' 1076 # Each test in qemu-iotests has a number ("seq") 1077 seq = os.path.basename(sys.argv[0]) 1078 1079 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1080 logger.warning("%s not run: %s", seq, reason) 1081 sys.exit(0) 1082 1083def case_notrun(reason): 1084 '''Mark this test case as not having been run (without actually 1085 skipping it, that is left to the caller). See 1086 QMPTestCase.case_skip() for a variant that actually skips the 1087 current test case.''' 1088 1089 # Each test in qemu-iotests has a number ("seq") 1090 seq = os.path.basename(sys.argv[0]) 1091 1092 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1093 ' [case not run] ' + reason + '\n') 1094 1095def _verify_image_format(supported_fmts: Sequence[str] = (), 1096 unsupported_fmts: Sequence[str] = ()) -> None: 1097 if 'generic' in supported_fmts and \ 1098 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1099 # similar to 1100 # _supported_fmt generic 1101 # for bash tests 1102 supported_fmts = () 1103 1104 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1105 if not_sup or (imgfmt in unsupported_fmts): 1106 notrun('not suitable for this image format: %s' % imgfmt) 1107 1108 if imgfmt == 'luks': 1109 verify_working_luks() 1110 1111def _verify_protocol(supported: Sequence[str] = (), 1112 unsupported: Sequence[str] = ()) -> None: 1113 assert not (supported and unsupported) 1114 1115 if 'generic' in supported: 1116 return 1117 1118 not_sup = supported and (imgproto not in supported) 1119 if not_sup or (imgproto in unsupported): 1120 notrun('not suitable for this protocol: %s' % imgproto) 1121 1122def _verify_platform(supported: Sequence[str] = (), 1123 unsupported: Sequence[str] = ()) -> None: 1124 if any((sys.platform.startswith(x) for x in unsupported)): 1125 notrun('not suitable for this OS: %s' % sys.platform) 1126 1127 if supported: 1128 if not any((sys.platform.startswith(x) for x in supported)): 1129 notrun('not suitable for this OS: %s' % sys.platform) 1130 1131def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1132 if supported_cache_modes and (cachemode not in supported_cache_modes): 1133 notrun('not suitable for this cache mode: %s' % cachemode) 1134 1135def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1136 if supported_aio_modes and (aiomode not in supported_aio_modes): 1137 notrun('not suitable for this aio mode: %s' % aiomode) 1138 1139def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1140 usf_list = list(set(required_formats) - set(supported_formats())) 1141 if usf_list: 1142 notrun(f'formats {usf_list} are not whitelisted') 1143 1144 1145def _verify_virtio_blk() -> None: 1146 out = qemu_pipe('-M', 'none', '-device', 'help') 1147 if 'virtio-blk' not in out: 1148 notrun('Missing virtio-blk in QEMU binary') 1149 1150def _verify_virtio_scsi_pci_or_ccw() -> None: 1151 out = qemu_pipe('-M', 'none', '-device', 'help') 1152 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1153 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1154 1155 1156def supports_quorum(): 1157 return 'quorum' in qemu_img_pipe('--help') 1158 1159def verify_quorum(): 1160 '''Skip test suite if quorum support is not available''' 1161 if not supports_quorum(): 1162 notrun('quorum support missing') 1163 1164def has_working_luks() -> Tuple[bool, str]: 1165 """ 1166 Check whether our LUKS driver can actually create images 1167 (this extends to LUKS encryption for qcow2). 1168 1169 If not, return the reason why. 1170 """ 1171 1172 img_file = f'{test_dir}/luks-test.luks' 1173 (output, status) = \ 1174 qemu_img_pipe_and_status('create', '-f', 'luks', 1175 '--object', luks_default_secret_object, 1176 '-o', luks_default_key_secret_opt, 1177 '-o', 'iter-time=10', 1178 img_file, '1G') 1179 try: 1180 os.remove(img_file) 1181 except OSError: 1182 pass 1183 1184 if status != 0: 1185 reason = output 1186 for line in output.splitlines(): 1187 if img_file + ':' in line: 1188 reason = line.split(img_file + ':', 1)[1].strip() 1189 break 1190 1191 return (False, reason) 1192 else: 1193 return (True, '') 1194 1195def verify_working_luks(): 1196 """ 1197 Skip test suite if LUKS does not work 1198 """ 1199 (working, reason) = has_working_luks() 1200 if not working: 1201 notrun(reason) 1202 1203def qemu_pipe(*args: str) -> str: 1204 """ 1205 Run qemu with an option to print something and exit (e.g. a help option). 1206 1207 :return: QEMU's stdout output. 1208 """ 1209 full_args = [qemu_prog] + qemu_opts + list(args) 1210 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1211 return output 1212 1213def supported_formats(read_only=False): 1214 '''Set 'read_only' to True to check ro-whitelist 1215 Otherwise, rw-whitelist is checked''' 1216 1217 if not hasattr(supported_formats, "formats"): 1218 supported_formats.formats = {} 1219 1220 if read_only not in supported_formats.formats: 1221 format_message = qemu_pipe("-drive", "format=help") 1222 line = 1 if read_only else 0 1223 supported_formats.formats[read_only] = \ 1224 format_message.splitlines()[line].split(":")[1].split() 1225 1226 return supported_formats.formats[read_only] 1227 1228def skip_if_unsupported(required_formats=(), read_only=False): 1229 '''Skip Test Decorator 1230 Runs the test if all the required formats are whitelisted''' 1231 def skip_test_decorator(func): 1232 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1233 **kwargs: Dict[str, Any]) -> None: 1234 if callable(required_formats): 1235 fmts = required_formats(test_case) 1236 else: 1237 fmts = required_formats 1238 1239 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1240 if usf_list: 1241 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1242 test_case.case_skip(msg) 1243 else: 1244 func(test_case, *args, **kwargs) 1245 return func_wrapper 1246 return skip_test_decorator 1247 1248def skip_for_formats(formats: Sequence[str] = ()) \ 1249 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1250 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1251 '''Skip Test Decorator 1252 Skips the test for the given formats''' 1253 def skip_test_decorator(func): 1254 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1255 **kwargs: Dict[str, Any]) -> None: 1256 if imgfmt in formats: 1257 msg = f'{test_case}: Skipped for format {imgfmt}' 1258 test_case.case_skip(msg) 1259 else: 1260 func(test_case, *args, **kwargs) 1261 return func_wrapper 1262 return skip_test_decorator 1263 1264def skip_if_user_is_root(func): 1265 '''Skip Test Decorator 1266 Runs the test only without root permissions''' 1267 def func_wrapper(*args, **kwargs): 1268 if os.getuid() == 0: 1269 case_notrun('{}: cannot be run as root'.format(args[0])) 1270 return None 1271 else: 1272 return func(*args, **kwargs) 1273 return func_wrapper 1274 1275# We need to filter out the time taken from the output so that 1276# qemu-iotest can reliably diff the results against master output, 1277# and hide skipped tests from the reference output. 1278 1279class ReproducibleTestResult(unittest.TextTestResult): 1280 def addSkip(self, test, reason): 1281 # Same as TextTestResult, but print dot instead of "s" 1282 unittest.TestResult.addSkip(self, test, reason) 1283 if self.showAll: 1284 self.stream.writeln("skipped {0!r}".format(reason)) 1285 elif self.dots: 1286 self.stream.write(".") 1287 self.stream.flush() 1288 1289class ReproducibleStreamWrapper: 1290 def __init__(self, stream: TextIO): 1291 self.stream = stream 1292 1293 def __getattr__(self, attr): 1294 if attr in ('stream', '__getstate__'): 1295 raise AttributeError(attr) 1296 return getattr(self.stream, attr) 1297 1298 def write(self, arg=None): 1299 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1300 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1301 self.stream.write(arg) 1302 1303class ReproducibleTestRunner(unittest.TextTestRunner): 1304 def __init__(self, stream: Optional[TextIO] = None, 1305 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1306 **kwargs: Any) -> None: 1307 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1308 super().__init__(stream=rstream, # type: ignore 1309 descriptions=True, 1310 resultclass=resultclass, 1311 **kwargs) 1312 1313def execute_unittest(argv: List[str], debug: bool = False) -> None: 1314 """Executes unittests within the calling module.""" 1315 1316 # Some tests have warnings, especially ResourceWarnings for unclosed 1317 # files and sockets. Ignore them for now to ensure reproducibility of 1318 # the test output. 1319 unittest.main(argv=argv, 1320 testRunner=ReproducibleTestRunner, 1321 verbosity=2 if debug else 1, 1322 warnings=None if sys.warnoptions else 'ignore') 1323 1324def execute_setup_common(supported_fmts: Sequence[str] = (), 1325 supported_platforms: Sequence[str] = (), 1326 supported_cache_modes: Sequence[str] = (), 1327 supported_aio_modes: Sequence[str] = (), 1328 unsupported_fmts: Sequence[str] = (), 1329 supported_protocols: Sequence[str] = (), 1330 unsupported_protocols: Sequence[str] = (), 1331 required_fmts: Sequence[str] = ()) -> bool: 1332 """ 1333 Perform necessary setup for either script-style or unittest-style tests. 1334 1335 :return: Bool; Whether or not debug mode has been requested via the CLI. 1336 """ 1337 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1338 1339 debug = '-d' in sys.argv 1340 if debug: 1341 sys.argv.remove('-d') 1342 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1343 1344 _verify_image_format(supported_fmts, unsupported_fmts) 1345 _verify_protocol(supported_protocols, unsupported_protocols) 1346 _verify_platform(supported=supported_platforms) 1347 _verify_cache_mode(supported_cache_modes) 1348 _verify_aio_mode(supported_aio_modes) 1349 _verify_formats(required_fmts) 1350 _verify_virtio_blk() 1351 1352 return debug 1353 1354def execute_test(*args, test_function=None, **kwargs): 1355 """Run either unittest or script-style tests.""" 1356 1357 debug = execute_setup_common(*args, **kwargs) 1358 if not test_function: 1359 execute_unittest(sys.argv, debug) 1360 else: 1361 test_function() 1362 1363def activate_logging(): 1364 """Activate iotests.log() output to stdout for script-style tests.""" 1365 handler = logging.StreamHandler(stream=sys.stdout) 1366 formatter = logging.Formatter('%(message)s') 1367 handler.setFormatter(formatter) 1368 test_logger.addHandler(handler) 1369 test_logger.setLevel(logging.INFO) 1370 test_logger.propagate = False 1371 1372# This is called from script-style iotests without a single point of entry 1373def script_initialize(*args, **kwargs): 1374 """Initialize script-style tests without running any tests.""" 1375 activate_logging() 1376 execute_setup_common(*args, **kwargs) 1377 1378# This is called from script-style iotests with a single point of entry 1379def script_main(test_function, *args, **kwargs): 1380 """Run script-style tests outside of the unittest framework""" 1381 activate_logging() 1382 execute_test(*args, test_function=test_function, **kwargs) 1383 1384# This is called from unittest style iotests 1385def main(*args, **kwargs): 1386 """Run tests using the unittest framework""" 1387 execute_test(*args, **kwargs) 1388