1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import json 24import logging 25import os 26import re 27import shutil 28import signal 29import struct 30import subprocess 31import sys 32import time 33from typing import (Any, Callable, Dict, Iterable, 34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 35import unittest 36 37from contextlib import contextmanager 38 39# pylint: disable=import-error, wrong-import-position 40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 41from qemu import qtest 42from qemu.qmp import QMPMessage 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79output_dir = os.environ.get('OUTPUT_DIR', '.') 80 81try: 82 test_dir = os.environ['TEST_DIR'] 83 sock_dir = os.environ['SOCK_DIR'] 84 cachemode = os.environ['CACHEMODE'] 85 aiomode = os.environ['AIOMODE'] 86 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 87except KeyError: 88 # We are using these variables as proxies to indicate that we're 89 # not being run via "check". There may be other things set up by 90 # "check" that individual test cases rely on. 91 sys.stderr.write('Please run this test via the "check" script\n') 92 sys.exit(os.EX_USAGE) 93 94socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 95 96luks_default_secret_object = 'secret,id=keysec0,data=' + \ 97 os.environ.get('IMGKEYSECRET', '') 98luks_default_key_secret_opt = 'key-secret=keysec0' 99 100sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 101 102 103def unarchive_sample_image(sample, fname): 104 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 105 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 106 shutil.copyfileobj(f_in, f_out) 107 108 109def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 110 connect_stderr: bool = True) -> Tuple[str, int]: 111 """ 112 Run a tool and return both its output and its exit code 113 """ 114 stderr = subprocess.STDOUT if connect_stderr else None 115 subp = subprocess.Popen(args, 116 stdout=subprocess.PIPE, 117 stderr=stderr, 118 universal_newlines=True) 119 output = subp.communicate()[0] 120 if subp.returncode < 0: 121 cmd = ' '.join(args) 122 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') 123 return (output, subp.returncode) 124 125def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 126 """ 127 Run qemu-img and return both its output and its exit code 128 """ 129 full_args = qemu_img_args + list(args) 130 return qemu_tool_pipe_and_status('qemu-img', full_args) 131 132def qemu_img(*args: str) -> int: 133 '''Run qemu-img and return the exit code''' 134 return qemu_img_pipe_and_status(*args)[1] 135 136def ordered_qmp(qmsg, conv_keys=True): 137 # Dictionaries are not ordered prior to 3.6, therefore: 138 if isinstance(qmsg, list): 139 return [ordered_qmp(atom) for atom in qmsg] 140 if isinstance(qmsg, dict): 141 od = OrderedDict() 142 for k, v in sorted(qmsg.items()): 143 if conv_keys: 144 k = k.replace('_', '-') 145 od[k] = ordered_qmp(v, conv_keys=False) 146 return od 147 return qmsg 148 149def qemu_img_create(*args): 150 args = list(args) 151 152 # default luks support 153 if '-f' in args and args[args.index('-f') + 1] == 'luks': 154 if '-o' in args: 155 i = args.index('-o') 156 if 'key-secret' not in args[i + 1]: 157 args[i + 1].append(luks_default_key_secret_opt) 158 args.insert(i + 2, '--object') 159 args.insert(i + 3, luks_default_secret_object) 160 else: 161 args = ['-o', luks_default_key_secret_opt, 162 '--object', luks_default_secret_object] + args 163 164 args.insert(0, 'create') 165 166 return qemu_img(*args) 167 168def qemu_img_measure(*args): 169 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 170 171def qemu_img_check(*args): 172 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 173 174def qemu_img_verbose(*args): 175 '''Run qemu-img without suppressing its output and return the exit code''' 176 exitcode = subprocess.call(qemu_img_args + list(args)) 177 if exitcode < 0: 178 sys.stderr.write('qemu-img received signal %i: %s\n' 179 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 180 return exitcode 181 182def qemu_img_pipe(*args: str) -> str: 183 '''Run qemu-img and return its output''' 184 return qemu_img_pipe_and_status(*args)[0] 185 186def qemu_img_log(*args): 187 result = qemu_img_pipe(*args) 188 log(result, filters=[filter_testfiles]) 189 return result 190 191def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 192 args = ['info'] 193 if imgopts: 194 args.append('--image-opts') 195 else: 196 args += ['-f', imgfmt] 197 args += extra_args 198 args.append(filename) 199 200 output = qemu_img_pipe(*args) 201 if not filter_path: 202 filter_path = filename 203 log(filter_img_info(output, filter_path)) 204 205def qemu_io(*args): 206 '''Run qemu-io and return the stdout data''' 207 args = qemu_io_args + list(args) 208 return qemu_tool_pipe_and_status('qemu-io', args)[0] 209 210def qemu_io_log(*args): 211 result = qemu_io(*args) 212 log(result, filters=[filter_testfiles, filter_qemu_io]) 213 return result 214 215def qemu_io_silent(*args): 216 '''Run qemu-io and return the exit code, suppressing stdout''' 217 if '-f' in args or '--image-opts' in args: 218 default_args = qemu_io_args_no_fmt 219 else: 220 default_args = qemu_io_args 221 222 args = default_args + list(args) 223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 224 if exitcode < 0: 225 sys.stderr.write('qemu-io received signal %i: %s\n' % 226 (-exitcode, ' '.join(args))) 227 return exitcode 228 229def qemu_io_silent_check(*args): 230 '''Run qemu-io and return the true if subprocess returned 0''' 231 args = qemu_io_args + list(args) 232 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 233 stderr=subprocess.STDOUT) 234 return exitcode == 0 235 236class QemuIoInteractive: 237 def __init__(self, *args): 238 self.args = qemu_io_args_no_fmt + list(args) 239 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 240 stdout=subprocess.PIPE, 241 stderr=subprocess.STDOUT, 242 universal_newlines=True) 243 out = self._p.stdout.read(9) 244 if out != 'qemu-io> ': 245 # Most probably qemu-io just failed to start. 246 # Let's collect the whole output and exit. 247 out += self._p.stdout.read() 248 self._p.wait(timeout=1) 249 raise ValueError(out) 250 251 def close(self): 252 self._p.communicate('q\n') 253 254 def _read_output(self): 255 pattern = 'qemu-io> ' 256 n = len(pattern) 257 pos = 0 258 s = [] 259 while pos != n: 260 c = self._p.stdout.read(1) 261 # check unexpected EOF 262 assert c != '' 263 s.append(c) 264 if c == pattern[pos]: 265 pos += 1 266 else: 267 pos = 0 268 269 return ''.join(s[:-n]) 270 271 def cmd(self, cmd): 272 # quit command is in close(), '\n' is added automatically 273 assert '\n' not in cmd 274 cmd = cmd.strip() 275 assert cmd not in ('q', 'quit') 276 self._p.stdin.write(cmd + '\n') 277 self._p.stdin.flush() 278 return self._read_output() 279 280 281def qemu_nbd(*args): 282 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 283 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 284 285def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 286 '''Run qemu-nbd in daemon mode and return both the parent's exit code 287 and its output in case of an error''' 288 full_args = qemu_nbd_args + ['--fork'] + list(args) 289 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 290 connect_stderr=False) 291 return returncode, output if returncode else '' 292 293def qemu_nbd_list_log(*args: str) -> str: 294 '''Run qemu-nbd to list remote exports''' 295 full_args = [qemu_nbd_prog, '-L'] + list(args) 296 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 297 log(output, filters=[filter_testfiles, filter_nbd_exports]) 298 return output 299 300@contextmanager 301def qemu_nbd_popen(*args): 302 '''Context manager running qemu-nbd within the context''' 303 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 304 305 assert not os.path.exists(pid_file) 306 307 cmd = list(qemu_nbd_args) 308 cmd.extend(('--persistent', '--pid-file', pid_file)) 309 cmd.extend(args) 310 311 log('Start NBD server') 312 p = subprocess.Popen(cmd) 313 try: 314 while not os.path.exists(pid_file): 315 if p.poll() is not None: 316 raise RuntimeError( 317 "qemu-nbd terminated with exit code {}: {}" 318 .format(p.returncode, ' '.join(cmd))) 319 320 time.sleep(0.01) 321 yield 322 finally: 323 if os.path.exists(pid_file): 324 os.remove(pid_file) 325 log('Kill NBD server') 326 p.kill() 327 p.wait() 328 329def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 330 '''Return True if two image files are identical''' 331 return qemu_img('compare', '-f', fmt1, 332 '-F', fmt2, img1, img2) == 0 333 334def create_image(name, size): 335 '''Create a fully-allocated raw image with sector markers''' 336 file = open(name, 'wb') 337 i = 0 338 while i < size: 339 sector = struct.pack('>l504xl', i // 512, i // 512) 340 file.write(sector) 341 i = i + 512 342 file.close() 343 344def image_size(img): 345 '''Return image's virtual size''' 346 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 347 return json.loads(r)['virtual-size'] 348 349def is_str(val): 350 return isinstance(val, str) 351 352test_dir_re = re.compile(r"%s" % test_dir) 353def filter_test_dir(msg): 354 return test_dir_re.sub("TEST_DIR", msg) 355 356win32_re = re.compile(r"\r") 357def filter_win32(msg): 358 return win32_re.sub("", msg) 359 360qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 361 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 362 r"and [0-9\/.inf]* ops\/sec\)") 363def filter_qemu_io(msg): 364 msg = filter_win32(msg) 365 return qemu_io_re.sub("X ops; XX:XX:XX.X " 366 "(XXX YYY/sec and XXX ops/sec)", msg) 367 368chown_re = re.compile(r"chown [0-9]+:[0-9]+") 369def filter_chown(msg): 370 return chown_re.sub("chown UID:GID", msg) 371 372def filter_qmp_event(event): 373 '''Filter a QMP event dict''' 374 event = dict(event) 375 if 'timestamp' in event: 376 event['timestamp']['seconds'] = 'SECS' 377 event['timestamp']['microseconds'] = 'USECS' 378 return event 379 380def filter_qmp(qmsg, filter_fn): 381 '''Given a string filter, filter a QMP object's values. 382 filter_fn takes a (key, value) pair.''' 383 # Iterate through either lists or dicts; 384 if isinstance(qmsg, list): 385 items = enumerate(qmsg) 386 else: 387 items = qmsg.items() 388 389 for k, v in items: 390 if isinstance(v, (dict, list)): 391 qmsg[k] = filter_qmp(v, filter_fn) 392 else: 393 qmsg[k] = filter_fn(k, v) 394 return qmsg 395 396def filter_testfiles(msg): 397 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 398 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 399 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 400 401def filter_qmp_testfiles(qmsg): 402 def _filter(_key, value): 403 if is_str(value): 404 return filter_testfiles(value) 405 return value 406 return filter_qmp(qmsg, _filter) 407 408def filter_virtio_scsi(output: str) -> str: 409 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 410 411def filter_qmp_virtio_scsi(qmsg): 412 def _filter(_key, value): 413 if is_str(value): 414 return filter_virtio_scsi(value) 415 return value 416 return filter_qmp(qmsg, _filter) 417 418def filter_generated_node_ids(msg): 419 return re.sub("#block[0-9]+", "NODE_NAME", msg) 420 421def filter_img_info(output, filename): 422 lines = [] 423 for line in output.split('\n'): 424 if 'disk size' in line or 'actual-size' in line: 425 continue 426 line = line.replace(filename, 'TEST_IMG') 427 line = filter_testfiles(line) 428 line = line.replace(imgfmt, 'IMGFMT') 429 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 430 line = re.sub('uuid: [-a-f0-9]+', 431 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 432 line) 433 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 434 lines.append(line) 435 return '\n'.join(lines) 436 437def filter_imgfmt(msg): 438 return msg.replace(imgfmt, 'IMGFMT') 439 440def filter_qmp_imgfmt(qmsg): 441 def _filter(_key, value): 442 if is_str(value): 443 return filter_imgfmt(value) 444 return value 445 return filter_qmp(qmsg, _filter) 446 447def filter_nbd_exports(output: str) -> str: 448 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 449 450 451Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 452 453def log(msg: Msg, 454 filters: Iterable[Callable[[Msg], Msg]] = (), 455 indent: Optional[int] = None) -> None: 456 """ 457 Logs either a string message or a JSON serializable message (like QMP). 458 If indent is provided, JSON serializable messages are pretty-printed. 459 """ 460 for flt in filters: 461 msg = flt(msg) 462 if isinstance(msg, (dict, list)): 463 # Don't sort if it's already sorted 464 do_sort = not isinstance(msg, OrderedDict) 465 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 466 else: 467 test_logger.info(msg) 468 469class Timeout: 470 def __init__(self, seconds, errmsg="Timeout"): 471 self.seconds = seconds 472 self.errmsg = errmsg 473 def __enter__(self): 474 signal.signal(signal.SIGALRM, self.timeout) 475 signal.setitimer(signal.ITIMER_REAL, self.seconds) 476 return self 477 def __exit__(self, exc_type, value, traceback): 478 signal.setitimer(signal.ITIMER_REAL, 0) 479 return False 480 def timeout(self, signum, frame): 481 raise Exception(self.errmsg) 482 483def file_pattern(name): 484 return "{0}-{1}".format(os.getpid(), name) 485 486class FilePath: 487 """ 488 Context manager generating multiple file names. The generated files are 489 removed when exiting the context. 490 491 Example usage: 492 493 with FilePath('a.img', 'b.img') as (img_a, img_b): 494 # Use img_a and img_b here... 495 496 # a.img and b.img are automatically removed here. 497 498 By default images are created in iotests.test_dir. To create sockets use 499 iotests.sock_dir: 500 501 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 502 503 For convenience, calling with one argument yields a single file instead of 504 a tuple with one item. 505 506 """ 507 def __init__(self, *names, base_dir=test_dir): 508 self.paths = [os.path.join(base_dir, file_pattern(name)) 509 for name in names] 510 511 def __enter__(self): 512 if len(self.paths) == 1: 513 return self.paths[0] 514 else: 515 return self.paths 516 517 def __exit__(self, exc_type, exc_val, exc_tb): 518 for path in self.paths: 519 try: 520 os.remove(path) 521 except OSError: 522 pass 523 return False 524 525 526def try_remove(img): 527 try: 528 os.remove(img) 529 except OSError: 530 pass 531 532def file_path_remover(): 533 for path in reversed(file_path_remover.paths): 534 try_remove(path) 535 536 537def file_path(*names, base_dir=test_dir): 538 ''' Another way to get auto-generated filename that cleans itself up. 539 540 Use is as simple as: 541 542 img_a, img_b = file_path('a.img', 'b.img') 543 sock = file_path('socket') 544 ''' 545 546 if not hasattr(file_path_remover, 'paths'): 547 file_path_remover.paths = [] 548 atexit.register(file_path_remover) 549 550 paths = [] 551 for name in names: 552 filename = file_pattern(name) 553 path = os.path.join(base_dir, filename) 554 file_path_remover.paths.append(path) 555 paths.append(path) 556 557 return paths[0] if len(paths) == 1 else paths 558 559def remote_filename(path): 560 if imgproto == 'file': 561 return path 562 elif imgproto == 'ssh': 563 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 564 else: 565 raise Exception("Protocol %s not supported" % (imgproto)) 566 567class VM(qtest.QEMUQtestMachine): 568 '''A QEMU VM''' 569 570 def __init__(self, path_suffix=''): 571 name = "qemu%s-%d" % (path_suffix, os.getpid()) 572 super().__init__(qemu_prog, qemu_opts, name=name, 573 test_dir=test_dir, 574 socket_scm_helper=socket_scm_helper, 575 sock_dir=sock_dir) 576 self._num_drives = 0 577 578 def add_object(self, opts): 579 self._args.append('-object') 580 self._args.append(opts) 581 return self 582 583 def add_device(self, opts): 584 self._args.append('-device') 585 self._args.append(opts) 586 return self 587 588 def add_drive_raw(self, opts): 589 self._args.append('-drive') 590 self._args.append(opts) 591 return self 592 593 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 594 '''Add a virtio-blk drive to the VM''' 595 options = ['if=%s' % interface, 596 'id=drive%d' % self._num_drives] 597 598 if path is not None: 599 options.append('file=%s' % path) 600 options.append('format=%s' % img_format) 601 options.append('cache=%s' % cachemode) 602 options.append('aio=%s' % aiomode) 603 604 if opts: 605 options.append(opts) 606 607 if img_format == 'luks' and 'key-secret' not in opts: 608 # default luks support 609 if luks_default_secret_object not in self._args: 610 self.add_object(luks_default_secret_object) 611 612 options.append(luks_default_key_secret_opt) 613 614 self._args.append('-drive') 615 self._args.append(','.join(options)) 616 self._num_drives += 1 617 return self 618 619 def add_blockdev(self, opts): 620 self._args.append('-blockdev') 621 if isinstance(opts, str): 622 self._args.append(opts) 623 else: 624 self._args.append(','.join(opts)) 625 return self 626 627 def add_incoming(self, addr): 628 self._args.append('-incoming') 629 self._args.append(addr) 630 return self 631 632 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 633 cmd = 'human-monitor-command' 634 kwargs: Dict[str, Any] = {'command-line': command_line} 635 if use_log: 636 return self.qmp_log(cmd, **kwargs) 637 else: 638 return self.qmp(cmd, **kwargs) 639 640 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 641 """Pause drive r/w operations""" 642 if not event: 643 self.pause_drive(drive, "read_aio") 644 self.pause_drive(drive, "write_aio") 645 return 646 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 647 648 def resume_drive(self, drive: str) -> None: 649 """Resume drive r/w operations""" 650 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 651 652 def hmp_qemu_io(self, drive: str, cmd: str, 653 use_log: bool = False) -> QMPMessage: 654 """Write to a given drive using an HMP command""" 655 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 656 657 def flatten_qmp_object(self, obj, output=None, basestr=''): 658 if output is None: 659 output = dict() 660 if isinstance(obj, list): 661 for i, item in enumerate(obj): 662 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 663 elif isinstance(obj, dict): 664 for key in obj: 665 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 666 else: 667 output[basestr[:-1]] = obj # Strip trailing '.' 668 return output 669 670 def qmp_to_opts(self, obj): 671 obj = self.flatten_qmp_object(obj) 672 output_list = list() 673 for key in obj: 674 output_list += [key + '=' + obj[key]] 675 return ','.join(output_list) 676 677 def get_qmp_events_filtered(self, wait=60.0): 678 result = [] 679 for ev in self.get_qmp_events(wait=wait): 680 result.append(filter_qmp_event(ev)) 681 return result 682 683 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 684 full_cmd = OrderedDict(( 685 ("execute", cmd), 686 ("arguments", ordered_qmp(kwargs)) 687 )) 688 log(full_cmd, filters, indent=indent) 689 result = self.qmp(cmd, **kwargs) 690 log(result, filters, indent=indent) 691 return result 692 693 # Returns None on success, and an error string on failure 694 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 695 pre_finalize=None, cancel=False, wait=60.0): 696 """ 697 run_job moves a job from creation through to dismissal. 698 699 :param job: String. ID of recently-launched job 700 :param auto_finalize: Bool. True if the job was launched with 701 auto_finalize. Defaults to True. 702 :param auto_dismiss: Bool. True if the job was launched with 703 auto_dismiss=True. Defaults to False. 704 :param pre_finalize: Callback. A callable that takes no arguments to be 705 invoked prior to issuing job-finalize, if any. 706 :param cancel: Bool. When true, cancels the job after the pre_finalize 707 callback. 708 :param wait: Float. Timeout value specifying how long to wait for any 709 event, in seconds. Defaults to 60.0. 710 """ 711 match_device = {'data': {'device': job}} 712 match_id = {'data': {'id': job}} 713 events = [ 714 ('BLOCK_JOB_COMPLETED', match_device), 715 ('BLOCK_JOB_CANCELLED', match_device), 716 ('BLOCK_JOB_ERROR', match_device), 717 ('BLOCK_JOB_READY', match_device), 718 ('BLOCK_JOB_PENDING', match_id), 719 ('JOB_STATUS_CHANGE', match_id) 720 ] 721 error = None 722 while True: 723 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 724 if ev['event'] != 'JOB_STATUS_CHANGE': 725 log(ev) 726 continue 727 status = ev['data']['status'] 728 if status == 'aborting': 729 result = self.qmp('query-jobs') 730 for j in result['return']: 731 if j['id'] == job: 732 error = j['error'] 733 log('Job failed: %s' % (j['error'])) 734 elif status == 'ready': 735 self.qmp_log('job-complete', id=job) 736 elif status == 'pending' and not auto_finalize: 737 if pre_finalize: 738 pre_finalize() 739 if cancel: 740 self.qmp_log('job-cancel', id=job) 741 else: 742 self.qmp_log('job-finalize', id=job) 743 elif status == 'concluded' and not auto_dismiss: 744 self.qmp_log('job-dismiss', id=job) 745 elif status == 'null': 746 return error 747 748 # Returns None on success, and an error string on failure 749 def blockdev_create(self, options, job_id='job0', filters=None): 750 if filters is None: 751 filters = [filter_qmp_testfiles] 752 result = self.qmp_log('blockdev-create', filters=filters, 753 job_id=job_id, options=options) 754 755 if 'return' in result: 756 assert result['return'] == {} 757 job_result = self.run_job(job_id) 758 else: 759 job_result = result['error'] 760 761 log("") 762 return job_result 763 764 def enable_migration_events(self, name): 765 log('Enabling migration QMP events on %s...' % name) 766 log(self.qmp('migrate-set-capabilities', capabilities=[ 767 { 768 'capability': 'events', 769 'state': True 770 } 771 ])) 772 773 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 774 while True: 775 event = self.event_wait('MIGRATION') 776 # We use the default timeout, and with a timeout, event_wait() 777 # never returns None 778 assert event 779 780 log(event, filters=[filter_qmp_event]) 781 if event['data']['status'] in ('completed', 'failed'): 782 break 783 784 if event['data']['status'] == 'completed': 785 # The event may occur in finish-migrate, so wait for the expected 786 # post-migration runstate 787 runstate = None 788 while runstate != expect_runstate: 789 runstate = self.qmp('query-status')['return']['status'] 790 return True 791 else: 792 return False 793 794 def node_info(self, node_name): 795 nodes = self.qmp('query-named-block-nodes') 796 for x in nodes['return']: 797 if x['node-name'] == node_name: 798 return x 799 return None 800 801 def query_bitmaps(self): 802 res = self.qmp("query-named-block-nodes") 803 return {device['node-name']: device['dirty-bitmaps'] 804 for device in res['return'] if 'dirty-bitmaps' in device} 805 806 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 807 """ 808 get a specific bitmap from the object returned by query_bitmaps. 809 :param recording: If specified, filter results by the specified value. 810 :param bitmaps: If specified, use it instead of call query_bitmaps() 811 """ 812 if bitmaps is None: 813 bitmaps = self.query_bitmaps() 814 815 for bitmap in bitmaps[node_name]: 816 if bitmap.get('name', '') == bitmap_name: 817 if recording is None or bitmap.get('recording') == recording: 818 return bitmap 819 return None 820 821 def check_bitmap_status(self, node_name, bitmap_name, fields): 822 ret = self.get_bitmap(node_name, bitmap_name) 823 824 return fields.items() <= ret.items() 825 826 def assert_block_path(self, root, path, expected_node, graph=None): 827 """ 828 Check whether the node under the given path in the block graph 829 is @expected_node. 830 831 @root is the node name of the node where the @path is rooted. 832 833 @path is a string that consists of child names separated by 834 slashes. It must begin with a slash. 835 836 Examples for @root + @path: 837 - root="qcow2-node", path="/backing/file" 838 - root="quorum-node", path="/children.2/file" 839 840 Hypothetically, @path could be empty, in which case it would 841 point to @root. However, in practice this case is not useful 842 and hence not allowed. 843 844 @expected_node may be None. (All elements of the path but the 845 leaf must still exist.) 846 847 @graph may be None or the result of an x-debug-query-block-graph 848 call that has already been performed. 849 """ 850 if graph is None: 851 graph = self.qmp('x-debug-query-block-graph')['return'] 852 853 iter_path = iter(path.split('/')) 854 855 # Must start with a / 856 assert next(iter_path) == '' 857 858 node = next((node for node in graph['nodes'] if node['name'] == root), 859 None) 860 861 # An empty @path is not allowed, so the root node must be present 862 assert node is not None, 'Root node %s not found' % root 863 864 for child_name in iter_path: 865 assert node is not None, 'Cannot follow path %s%s' % (root, path) 866 867 try: 868 node_id = next(edge['child'] for edge in graph['edges'] 869 if (edge['parent'] == node['id'] and 870 edge['name'] == child_name)) 871 872 node = next(node for node in graph['nodes'] 873 if node['id'] == node_id) 874 875 except StopIteration: 876 node = None 877 878 if node is None: 879 assert expected_node is None, \ 880 'No node found under %s (but expected %s)' % \ 881 (path, expected_node) 882 else: 883 assert node['name'] == expected_node, \ 884 'Found node %s under %s (but expected %s)' % \ 885 (node['name'], path, expected_node) 886 887index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 888 889class QMPTestCase(unittest.TestCase): 890 '''Abstract base class for QMP test cases''' 891 892 def __init__(self, *args, **kwargs): 893 super().__init__(*args, **kwargs) 894 # Many users of this class set a VM property we rely on heavily 895 # in the methods below. 896 self.vm = None 897 898 def dictpath(self, d, path): 899 '''Traverse a path in a nested dict''' 900 for component in path.split('/'): 901 m = index_re.match(component) 902 if m: 903 component, idx = m.groups() 904 idx = int(idx) 905 906 if not isinstance(d, dict) or component not in d: 907 self.fail(f'failed path traversal for "{path}" in "{d}"') 908 d = d[component] 909 910 if m: 911 if not isinstance(d, list): 912 self.fail(f'path component "{component}" in "{path}" ' 913 f'is not a list in "{d}"') 914 try: 915 d = d[idx] 916 except IndexError: 917 self.fail(f'invalid index "{idx}" in path "{path}" ' 918 f'in "{d}"') 919 return d 920 921 def assert_qmp_absent(self, d, path): 922 try: 923 result = self.dictpath(d, path) 924 except AssertionError: 925 return 926 self.fail('path "%s" has value "%s"' % (path, str(result))) 927 928 def assert_qmp(self, d, path, value): 929 '''Assert that the value for a specific path in a QMP dict 930 matches. When given a list of values, assert that any of 931 them matches.''' 932 933 result = self.dictpath(d, path) 934 935 # [] makes no sense as a list of valid values, so treat it as 936 # an actual single value. 937 if isinstance(value, list) and value != []: 938 for v in value: 939 if result == v: 940 return 941 self.fail('no match for "%s" in %s' % (str(result), str(value))) 942 else: 943 self.assertEqual(result, value, 944 '"%s" is "%s", expected "%s"' 945 % (path, str(result), str(value))) 946 947 def assert_no_active_block_jobs(self): 948 result = self.vm.qmp('query-block-jobs') 949 self.assert_qmp(result, 'return', []) 950 951 def assert_has_block_node(self, node_name=None, file_name=None): 952 """Issue a query-named-block-nodes and assert node_name and/or 953 file_name is present in the result""" 954 def check_equal_or_none(a, b): 955 return a is None or b is None or a == b 956 assert node_name or file_name 957 result = self.vm.qmp('query-named-block-nodes') 958 for x in result["return"]: 959 if check_equal_or_none(x.get("node-name"), node_name) and \ 960 check_equal_or_none(x.get("file"), file_name): 961 return 962 self.fail("Cannot find %s %s in result:\n%s" % 963 (node_name, file_name, result)) 964 965 def assert_json_filename_equal(self, json_filename, reference): 966 '''Asserts that the given filename is a json: filename and that its 967 content is equal to the given reference object''' 968 self.assertEqual(json_filename[:5], 'json:') 969 self.assertEqual( 970 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 971 self.vm.flatten_qmp_object(reference) 972 ) 973 974 def cancel_and_wait(self, drive='drive0', force=False, 975 resume=False, wait=60.0): 976 '''Cancel a block job and wait for it to finish, returning the event''' 977 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 978 self.assert_qmp(result, 'return', {}) 979 980 if resume: 981 self.vm.resume_drive(drive) 982 983 cancelled = False 984 result = None 985 while not cancelled: 986 for event in self.vm.get_qmp_events(wait=wait): 987 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 988 event['event'] == 'BLOCK_JOB_CANCELLED': 989 self.assert_qmp(event, 'data/device', drive) 990 result = event 991 cancelled = True 992 elif event['event'] == 'JOB_STATUS_CHANGE': 993 self.assert_qmp(event, 'data/id', drive) 994 995 996 self.assert_no_active_block_jobs() 997 return result 998 999 def wait_until_completed(self, drive='drive0', check_offset=True, 1000 wait=60.0, error=None): 1001 '''Wait for a block job to finish, returning the event''' 1002 while True: 1003 for event in self.vm.get_qmp_events(wait=wait): 1004 if event['event'] == 'BLOCK_JOB_COMPLETED': 1005 self.assert_qmp(event, 'data/device', drive) 1006 if error is None: 1007 self.assert_qmp_absent(event, 'data/error') 1008 if check_offset: 1009 self.assert_qmp(event, 'data/offset', 1010 event['data']['len']) 1011 else: 1012 self.assert_qmp(event, 'data/error', error) 1013 self.assert_no_active_block_jobs() 1014 return event 1015 if event['event'] == 'JOB_STATUS_CHANGE': 1016 self.assert_qmp(event, 'data/id', drive) 1017 1018 def wait_ready(self, drive='drive0'): 1019 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1020 return self.vm.events_wait([ 1021 ('BLOCK_JOB_READY', 1022 {'data': {'type': 'mirror', 'device': drive}}), 1023 ('BLOCK_JOB_READY', 1024 {'data': {'type': 'commit', 'device': drive}}) 1025 ]) 1026 1027 def wait_ready_and_cancel(self, drive='drive0'): 1028 self.wait_ready(drive=drive) 1029 event = self.cancel_and_wait(drive=drive) 1030 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1031 self.assert_qmp(event, 'data/type', 'mirror') 1032 self.assert_qmp(event, 'data/offset', event['data']['len']) 1033 1034 def complete_and_wait(self, drive='drive0', wait_ready=True, 1035 completion_error=None): 1036 '''Complete a block job and wait for it to finish''' 1037 if wait_ready: 1038 self.wait_ready(drive=drive) 1039 1040 result = self.vm.qmp('block-job-complete', device=drive) 1041 self.assert_qmp(result, 'return', {}) 1042 1043 event = self.wait_until_completed(drive=drive, error=completion_error) 1044 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1045 1046 def pause_wait(self, job_id='job0'): 1047 with Timeout(3, "Timeout waiting for job to pause"): 1048 while True: 1049 result = self.vm.qmp('query-block-jobs') 1050 found = False 1051 for job in result['return']: 1052 if job['device'] == job_id: 1053 found = True 1054 if job['paused'] and not job['busy']: 1055 return job 1056 break 1057 assert found 1058 1059 def pause_job(self, job_id='job0', wait=True): 1060 result = self.vm.qmp('block-job-pause', device=job_id) 1061 self.assert_qmp(result, 'return', {}) 1062 if wait: 1063 return self.pause_wait(job_id) 1064 return result 1065 1066 def case_skip(self, reason): 1067 '''Skip this test case''' 1068 case_notrun(reason) 1069 self.skipTest(reason) 1070 1071 1072def notrun(reason): 1073 '''Skip this test suite''' 1074 # Each test in qemu-iotests has a number ("seq") 1075 seq = os.path.basename(sys.argv[0]) 1076 1077 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1078 logger.warning("%s not run: %s", seq, reason) 1079 sys.exit(0) 1080 1081def case_notrun(reason): 1082 '''Mark this test case as not having been run (without actually 1083 skipping it, that is left to the caller). See 1084 QMPTestCase.case_skip() for a variant that actually skips the 1085 current test case.''' 1086 1087 # Each test in qemu-iotests has a number ("seq") 1088 seq = os.path.basename(sys.argv[0]) 1089 1090 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1091 ' [case not run] ' + reason + '\n') 1092 1093def _verify_image_format(supported_fmts: Sequence[str] = (), 1094 unsupported_fmts: Sequence[str] = ()) -> None: 1095 if 'generic' in supported_fmts and \ 1096 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1097 # similar to 1098 # _supported_fmt generic 1099 # for bash tests 1100 supported_fmts = () 1101 1102 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1103 if not_sup or (imgfmt in unsupported_fmts): 1104 notrun('not suitable for this image format: %s' % imgfmt) 1105 1106 if imgfmt == 'luks': 1107 verify_working_luks() 1108 1109def _verify_protocol(supported: Sequence[str] = (), 1110 unsupported: Sequence[str] = ()) -> None: 1111 assert not (supported and unsupported) 1112 1113 if 'generic' in supported: 1114 return 1115 1116 not_sup = supported and (imgproto not in supported) 1117 if not_sup or (imgproto in unsupported): 1118 notrun('not suitable for this protocol: %s' % imgproto) 1119 1120def _verify_platform(supported: Sequence[str] = (), 1121 unsupported: Sequence[str] = ()) -> None: 1122 if any((sys.platform.startswith(x) for x in unsupported)): 1123 notrun('not suitable for this OS: %s' % sys.platform) 1124 1125 if supported: 1126 if not any((sys.platform.startswith(x) for x in supported)): 1127 notrun('not suitable for this OS: %s' % sys.platform) 1128 1129def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1130 if supported_cache_modes and (cachemode not in supported_cache_modes): 1131 notrun('not suitable for this cache mode: %s' % cachemode) 1132 1133def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1134 if supported_aio_modes and (aiomode not in supported_aio_modes): 1135 notrun('not suitable for this aio mode: %s' % aiomode) 1136 1137def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1138 usf_list = list(set(required_formats) - set(supported_formats())) 1139 if usf_list: 1140 notrun(f'formats {usf_list} are not whitelisted') 1141 1142 1143def _verify_virtio_blk() -> None: 1144 out = qemu_pipe('-M', 'none', '-device', 'help') 1145 if 'virtio-blk' not in out: 1146 notrun('Missing virtio-blk in QEMU binary') 1147 1148def _verify_virtio_scsi_pci_or_ccw() -> None: 1149 out = qemu_pipe('-M', 'none', '-device', 'help') 1150 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1151 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1152 1153 1154def supports_quorum(): 1155 return 'quorum' in qemu_img_pipe('--help') 1156 1157def verify_quorum(): 1158 '''Skip test suite if quorum support is not available''' 1159 if not supports_quorum(): 1160 notrun('quorum support missing') 1161 1162def has_working_luks() -> Tuple[bool, str]: 1163 """ 1164 Check whether our LUKS driver can actually create images 1165 (this extends to LUKS encryption for qcow2). 1166 1167 If not, return the reason why. 1168 """ 1169 1170 img_file = f'{test_dir}/luks-test.luks' 1171 (output, status) = \ 1172 qemu_img_pipe_and_status('create', '-f', 'luks', 1173 '--object', luks_default_secret_object, 1174 '-o', luks_default_key_secret_opt, 1175 '-o', 'iter-time=10', 1176 img_file, '1G') 1177 try: 1178 os.remove(img_file) 1179 except OSError: 1180 pass 1181 1182 if status != 0: 1183 reason = output 1184 for line in output.splitlines(): 1185 if img_file + ':' in line: 1186 reason = line.split(img_file + ':', 1)[1].strip() 1187 break 1188 1189 return (False, reason) 1190 else: 1191 return (True, '') 1192 1193def verify_working_luks(): 1194 """ 1195 Skip test suite if LUKS does not work 1196 """ 1197 (working, reason) = has_working_luks() 1198 if not working: 1199 notrun(reason) 1200 1201def qemu_pipe(*args: str) -> str: 1202 """ 1203 Run qemu with an option to print something and exit (e.g. a help option). 1204 1205 :return: QEMU's stdout output. 1206 """ 1207 full_args = [qemu_prog] + qemu_opts + list(args) 1208 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1209 return output 1210 1211def supported_formats(read_only=False): 1212 '''Set 'read_only' to True to check ro-whitelist 1213 Otherwise, rw-whitelist is checked''' 1214 1215 if not hasattr(supported_formats, "formats"): 1216 supported_formats.formats = {} 1217 1218 if read_only not in supported_formats.formats: 1219 format_message = qemu_pipe("-drive", "format=help") 1220 line = 1 if read_only else 0 1221 supported_formats.formats[read_only] = \ 1222 format_message.splitlines()[line].split(":")[1].split() 1223 1224 return supported_formats.formats[read_only] 1225 1226def skip_if_unsupported(required_formats=(), read_only=False): 1227 '''Skip Test Decorator 1228 Runs the test if all the required formats are whitelisted''' 1229 def skip_test_decorator(func): 1230 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1231 **kwargs: Dict[str, Any]) -> None: 1232 if callable(required_formats): 1233 fmts = required_formats(test_case) 1234 else: 1235 fmts = required_formats 1236 1237 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1238 if usf_list: 1239 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1240 test_case.case_skip(msg) 1241 else: 1242 func(test_case, *args, **kwargs) 1243 return func_wrapper 1244 return skip_test_decorator 1245 1246def skip_for_formats(formats: Sequence[str] = ()) \ 1247 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1248 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1249 '''Skip Test Decorator 1250 Skips the test for the given formats''' 1251 def skip_test_decorator(func): 1252 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1253 **kwargs: Dict[str, Any]) -> None: 1254 if imgfmt in formats: 1255 msg = f'{test_case}: Skipped for format {imgfmt}' 1256 test_case.case_skip(msg) 1257 else: 1258 func(test_case, *args, **kwargs) 1259 return func_wrapper 1260 return skip_test_decorator 1261 1262def skip_if_user_is_root(func): 1263 '''Skip Test Decorator 1264 Runs the test only without root permissions''' 1265 def func_wrapper(*args, **kwargs): 1266 if os.getuid() == 0: 1267 case_notrun('{}: cannot be run as root'.format(args[0])) 1268 return None 1269 else: 1270 return func(*args, **kwargs) 1271 return func_wrapper 1272 1273# We need to filter out the time taken from the output so that 1274# qemu-iotest can reliably diff the results against master output, 1275# and hide skipped tests from the reference output. 1276 1277class ReproducibleTestResult(unittest.TextTestResult): 1278 def addSkip(self, test, reason): 1279 # Same as TextTestResult, but print dot instead of "s" 1280 unittest.TestResult.addSkip(self, test, reason) 1281 if self.showAll: 1282 self.stream.writeln("skipped {0!r}".format(reason)) 1283 elif self.dots: 1284 self.stream.write(".") 1285 self.stream.flush() 1286 1287class ReproducibleStreamWrapper: 1288 def __init__(self, stream: TextIO): 1289 self.stream = stream 1290 1291 def __getattr__(self, attr): 1292 if attr in ('stream', '__getstate__'): 1293 raise AttributeError(attr) 1294 return getattr(self.stream, attr) 1295 1296 def write(self, arg=None): 1297 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1298 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1299 self.stream.write(arg) 1300 1301class ReproducibleTestRunner(unittest.TextTestRunner): 1302 def __init__(self, stream: Optional[TextIO] = None, 1303 resultclass: Type[unittest.TestResult] = ReproducibleTestResult, 1304 **kwargs: Any) -> None: 1305 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1306 super().__init__(stream=rstream, # type: ignore 1307 descriptions=True, 1308 resultclass=resultclass, 1309 **kwargs) 1310 1311def execute_unittest(argv: List[str], debug: bool = False) -> None: 1312 """Executes unittests within the calling module.""" 1313 1314 # Some tests have warnings, especially ResourceWarnings for unclosed 1315 # files and sockets. Ignore them for now to ensure reproducibility of 1316 # the test output. 1317 unittest.main(argv=argv, 1318 testRunner=ReproducibleTestRunner, 1319 verbosity=2 if debug else 1, 1320 warnings=None if sys.warnoptions else 'ignore') 1321 1322def execute_setup_common(supported_fmts: Sequence[str] = (), 1323 supported_platforms: Sequence[str] = (), 1324 supported_cache_modes: Sequence[str] = (), 1325 supported_aio_modes: Sequence[str] = (), 1326 unsupported_fmts: Sequence[str] = (), 1327 supported_protocols: Sequence[str] = (), 1328 unsupported_protocols: Sequence[str] = (), 1329 required_fmts: Sequence[str] = ()) -> bool: 1330 """ 1331 Perform necessary setup for either script-style or unittest-style tests. 1332 1333 :return: Bool; Whether or not debug mode has been requested via the CLI. 1334 """ 1335 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1336 1337 debug = '-d' in sys.argv 1338 if debug: 1339 sys.argv.remove('-d') 1340 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1341 1342 _verify_image_format(supported_fmts, unsupported_fmts) 1343 _verify_protocol(supported_protocols, unsupported_protocols) 1344 _verify_platform(supported=supported_platforms) 1345 _verify_cache_mode(supported_cache_modes) 1346 _verify_aio_mode(supported_aio_modes) 1347 _verify_formats(required_fmts) 1348 _verify_virtio_blk() 1349 1350 return debug 1351 1352def execute_test(*args, test_function=None, **kwargs): 1353 """Run either unittest or script-style tests.""" 1354 1355 debug = execute_setup_common(*args, **kwargs) 1356 if not test_function: 1357 execute_unittest(sys.argv, debug) 1358 else: 1359 test_function() 1360 1361def activate_logging(): 1362 """Activate iotests.log() output to stdout for script-style tests.""" 1363 handler = logging.StreamHandler(stream=sys.stdout) 1364 formatter = logging.Formatter('%(message)s') 1365 handler.setFormatter(formatter) 1366 test_logger.addHandler(handler) 1367 test_logger.setLevel(logging.INFO) 1368 test_logger.propagate = False 1369 1370# This is called from script-style iotests without a single point of entry 1371def script_initialize(*args, **kwargs): 1372 """Initialize script-style tests without running any tests.""" 1373 activate_logging() 1374 execute_setup_common(*args, **kwargs) 1375 1376# This is called from script-style iotests with a single point of entry 1377def script_main(test_function, *args, **kwargs): 1378 """Run script-style tests outside of the unittest framework""" 1379 activate_logging() 1380 execute_test(*args, test_function=test_function, **kwargs) 1381 1382# This is called from unittest style iotests 1383def main(*args, **kwargs): 1384 """Run tests using the unittest framework""" 1385 execute_test(*args, **kwargs) 1386