1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20import bz2 21from collections import OrderedDict 22import faulthandler 23import io 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, 35 List, Optional, Sequence, Tuple, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40# pylint: disable=import-error, wrong-import-position 41sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 42from qemu import qtest 43from qemu.qmp import QMPMessage 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 71qemu_nbd_args = [qemu_nbd_prog] 72if os.environ.get('QEMU_NBD_OPTIONS'): 73 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 74 75qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 76qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 77 78imgfmt = os.environ.get('IMGFMT', 'raw') 79imgproto = os.environ.get('IMGPROTO', 'file') 80output_dir = os.environ.get('OUTPUT_DIR', '.') 81 82try: 83 test_dir = os.environ['TEST_DIR'] 84 sock_dir = os.environ['SOCK_DIR'] 85 cachemode = os.environ['CACHEMODE'] 86 aiomode = os.environ['AIOMODE'] 87 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 88except KeyError: 89 # We are using these variables as proxies to indicate that we're 90 # not being run via "check". There may be other things set up by 91 # "check" that individual test cases rely on. 92 sys.stderr.write('Please run this test via the "check" script\n') 93 sys.exit(os.EX_USAGE) 94 95socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 96 97luks_default_secret_object = 'secret,id=keysec0,data=' + \ 98 os.environ.get('IMGKEYSECRET', '') 99luks_default_key_secret_opt = 'key-secret=keysec0' 100 101sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 102 103 104def unarchive_sample_image(sample, fname): 105 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 106 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 107 shutil.copyfileobj(f_in, f_out) 108 109 110def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 111 connect_stderr: bool = True) -> Tuple[str, int]: 112 """ 113 Run a tool and return both its output and its exit code 114 """ 115 stderr = subprocess.STDOUT if connect_stderr else None 116 subp = subprocess.Popen(args, 117 stdout=subprocess.PIPE, 118 stderr=stderr, 119 universal_newlines=True) 120 output = subp.communicate()[0] 121 if subp.returncode < 0: 122 cmd = ' '.join(args) 123 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') 124 return (output, subp.returncode) 125 126def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 127 """ 128 Run qemu-img and return both its output and its exit code 129 """ 130 full_args = qemu_img_args + list(args) 131 return qemu_tool_pipe_and_status('qemu-img', full_args) 132 133def qemu_img(*args: str) -> int: 134 '''Run qemu-img and return the exit code''' 135 return qemu_img_pipe_and_status(*args)[1] 136 137def ordered_qmp(qmsg, conv_keys=True): 138 # Dictionaries are not ordered prior to 3.6, therefore: 139 if isinstance(qmsg, list): 140 return [ordered_qmp(atom) for atom in qmsg] 141 if isinstance(qmsg, dict): 142 od = OrderedDict() 143 for k, v in sorted(qmsg.items()): 144 if conv_keys: 145 k = k.replace('_', '-') 146 od[k] = ordered_qmp(v, conv_keys=False) 147 return od 148 return qmsg 149 150def qemu_img_create(*args): 151 args = list(args) 152 153 # default luks support 154 if '-f' in args and args[args.index('-f') + 1] == 'luks': 155 if '-o' in args: 156 i = args.index('-o') 157 if 'key-secret' not in args[i + 1]: 158 args[i + 1].append(luks_default_key_secret_opt) 159 args.insert(i + 2, '--object') 160 args.insert(i + 3, luks_default_secret_object) 161 else: 162 args = ['-o', luks_default_key_secret_opt, 163 '--object', luks_default_secret_object] + args 164 165 args.insert(0, 'create') 166 167 return qemu_img(*args) 168 169def qemu_img_measure(*args): 170 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 171 172def qemu_img_check(*args): 173 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 174 175def qemu_img_verbose(*args): 176 '''Run qemu-img without suppressing its output and return the exit code''' 177 exitcode = subprocess.call(qemu_img_args + list(args)) 178 if exitcode < 0: 179 sys.stderr.write('qemu-img received signal %i: %s\n' 180 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 181 return exitcode 182 183def qemu_img_pipe(*args: str) -> str: 184 '''Run qemu-img and return its output''' 185 return qemu_img_pipe_and_status(*args)[0] 186 187def qemu_img_log(*args): 188 result = qemu_img_pipe(*args) 189 log(result, filters=[filter_testfiles]) 190 return result 191 192def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 193 args = ['info'] 194 if imgopts: 195 args.append('--image-opts') 196 else: 197 args += ['-f', imgfmt] 198 args += extra_args 199 args.append(filename) 200 201 output = qemu_img_pipe(*args) 202 if not filter_path: 203 filter_path = filename 204 log(filter_img_info(output, filter_path)) 205 206def qemu_io(*args): 207 '''Run qemu-io and return the stdout data''' 208 args = qemu_io_args + list(args) 209 return qemu_tool_pipe_and_status('qemu-io', args)[0] 210 211def qemu_io_log(*args): 212 result = qemu_io(*args) 213 log(result, filters=[filter_testfiles, filter_qemu_io]) 214 return result 215 216def qemu_io_silent(*args): 217 '''Run qemu-io and return the exit code, suppressing stdout''' 218 if '-f' in args or '--image-opts' in args: 219 default_args = qemu_io_args_no_fmt 220 else: 221 default_args = qemu_io_args 222 223 args = default_args + list(args) 224 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 225 if exitcode < 0: 226 sys.stderr.write('qemu-io received signal %i: %s\n' % 227 (-exitcode, ' '.join(args))) 228 return exitcode 229 230def qemu_io_silent_check(*args): 231 '''Run qemu-io and return the true if subprocess returned 0''' 232 args = qemu_io_args + list(args) 233 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 234 stderr=subprocess.STDOUT) 235 return exitcode == 0 236 237def get_virtio_scsi_device(): 238 if qemu_default_machine == 's390-ccw-virtio': 239 return 'virtio-scsi-ccw' 240 return 'virtio-scsi-pci' 241 242class QemuIoInteractive: 243 def __init__(self, *args): 244 self.args = qemu_io_args_no_fmt + list(args) 245 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 246 stdout=subprocess.PIPE, 247 stderr=subprocess.STDOUT, 248 universal_newlines=True) 249 out = self._p.stdout.read(9) 250 if out != 'qemu-io> ': 251 # Most probably qemu-io just failed to start. 252 # Let's collect the whole output and exit. 253 out += self._p.stdout.read() 254 self._p.wait(timeout=1) 255 raise ValueError(out) 256 257 def close(self): 258 self._p.communicate('q\n') 259 260 def _read_output(self): 261 pattern = 'qemu-io> ' 262 n = len(pattern) 263 pos = 0 264 s = [] 265 while pos != n: 266 c = self._p.stdout.read(1) 267 # check unexpected EOF 268 assert c != '' 269 s.append(c) 270 if c == pattern[pos]: 271 pos += 1 272 else: 273 pos = 0 274 275 return ''.join(s[:-n]) 276 277 def cmd(self, cmd): 278 # quit command is in close(), '\n' is added automatically 279 assert '\n' not in cmd 280 cmd = cmd.strip() 281 assert cmd not in ('q', 'quit') 282 self._p.stdin.write(cmd + '\n') 283 self._p.stdin.flush() 284 return self._read_output() 285 286 287def qemu_nbd(*args): 288 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 289 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 290 291def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 292 '''Run qemu-nbd in daemon mode and return both the parent's exit code 293 and its output in case of an error''' 294 full_args = qemu_nbd_args + ['--fork'] + list(args) 295 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 296 connect_stderr=False) 297 return returncode, output if returncode else '' 298 299def qemu_nbd_list_log(*args: str) -> str: 300 '''Run qemu-nbd to list remote exports''' 301 full_args = [qemu_nbd_prog, '-L'] + list(args) 302 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 303 log(output, filters=[filter_testfiles, filter_nbd_exports]) 304 return output 305 306@contextmanager 307def qemu_nbd_popen(*args): 308 '''Context manager running qemu-nbd within the context''' 309 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 310 311 assert not os.path.exists(pid_file) 312 313 cmd = list(qemu_nbd_args) 314 cmd.extend(('--persistent', '--pid-file', pid_file)) 315 cmd.extend(args) 316 317 log('Start NBD server') 318 p = subprocess.Popen(cmd) 319 try: 320 while not os.path.exists(pid_file): 321 if p.poll() is not None: 322 raise RuntimeError( 323 "qemu-nbd terminated with exit code {}: {}" 324 .format(p.returncode, ' '.join(cmd))) 325 326 time.sleep(0.01) 327 yield 328 finally: 329 if os.path.exists(pid_file): 330 os.remove(pid_file) 331 log('Kill NBD server') 332 p.kill() 333 p.wait() 334 335def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 336 '''Return True if two image files are identical''' 337 return qemu_img('compare', '-f', fmt1, 338 '-F', fmt2, img1, img2) == 0 339 340def create_image(name, size): 341 '''Create a fully-allocated raw image with sector markers''' 342 file = open(name, 'wb') 343 i = 0 344 while i < size: 345 sector = struct.pack('>l504xl', i // 512, i // 512) 346 file.write(sector) 347 i = i + 512 348 file.close() 349 350def image_size(img): 351 '''Return image's virtual size''' 352 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 353 return json.loads(r)['virtual-size'] 354 355def is_str(val): 356 return isinstance(val, str) 357 358test_dir_re = re.compile(r"%s" % test_dir) 359def filter_test_dir(msg): 360 return test_dir_re.sub("TEST_DIR", msg) 361 362win32_re = re.compile(r"\r") 363def filter_win32(msg): 364 return win32_re.sub("", msg) 365 366qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 367 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 368 r"and [0-9\/.inf]* ops\/sec\)") 369def filter_qemu_io(msg): 370 msg = filter_win32(msg) 371 return qemu_io_re.sub("X ops; XX:XX:XX.X " 372 "(XXX YYY/sec and XXX ops/sec)", msg) 373 374chown_re = re.compile(r"chown [0-9]+:[0-9]+") 375def filter_chown(msg): 376 return chown_re.sub("chown UID:GID", msg) 377 378def filter_qmp_event(event): 379 '''Filter a QMP event dict''' 380 event = dict(event) 381 if 'timestamp' in event: 382 event['timestamp']['seconds'] = 'SECS' 383 event['timestamp']['microseconds'] = 'USECS' 384 return event 385 386def filter_qmp(qmsg, filter_fn): 387 '''Given a string filter, filter a QMP object's values. 388 filter_fn takes a (key, value) pair.''' 389 # Iterate through either lists or dicts; 390 if isinstance(qmsg, list): 391 items = enumerate(qmsg) 392 else: 393 items = qmsg.items() 394 395 for k, v in items: 396 if isinstance(v, (dict, list)): 397 qmsg[k] = filter_qmp(v, filter_fn) 398 else: 399 qmsg[k] = filter_fn(k, v) 400 return qmsg 401 402def filter_testfiles(msg): 403 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 404 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 405 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 406 407def filter_qmp_testfiles(qmsg): 408 def _filter(_key, value): 409 if is_str(value): 410 return filter_testfiles(value) 411 return value 412 return filter_qmp(qmsg, _filter) 413 414def filter_virtio_scsi(output: str) -> str: 415 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 416 417def filter_qmp_virtio_scsi(qmsg): 418 def _filter(_key, value): 419 if is_str(value): 420 return filter_virtio_scsi(value) 421 return value 422 return filter_qmp(qmsg, _filter) 423 424def filter_generated_node_ids(msg): 425 return re.sub("#block[0-9]+", "NODE_NAME", msg) 426 427def filter_img_info(output, filename): 428 lines = [] 429 for line in output.split('\n'): 430 if 'disk size' in line or 'actual-size' in line: 431 continue 432 line = line.replace(filename, 'TEST_IMG') 433 line = filter_testfiles(line) 434 line = line.replace(imgfmt, 'IMGFMT') 435 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 436 line = re.sub('uuid: [-a-f0-9]+', 437 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 438 line) 439 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 440 lines.append(line) 441 return '\n'.join(lines) 442 443def filter_imgfmt(msg): 444 return msg.replace(imgfmt, 'IMGFMT') 445 446def filter_qmp_imgfmt(qmsg): 447 def _filter(_key, value): 448 if is_str(value): 449 return filter_imgfmt(value) 450 return value 451 return filter_qmp(qmsg, _filter) 452 453def filter_nbd_exports(output: str) -> str: 454 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 455 456 457Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 458 459def log(msg: Msg, 460 filters: Iterable[Callable[[Msg], Msg]] = (), 461 indent: Optional[int] = None) -> None: 462 """ 463 Logs either a string message or a JSON serializable message (like QMP). 464 If indent is provided, JSON serializable messages are pretty-printed. 465 """ 466 for flt in filters: 467 msg = flt(msg) 468 if isinstance(msg, (dict, list)): 469 # Don't sort if it's already sorted 470 do_sort = not isinstance(msg, OrderedDict) 471 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 472 else: 473 test_logger.info(msg) 474 475class Timeout: 476 def __init__(self, seconds, errmsg="Timeout"): 477 self.seconds = seconds 478 self.errmsg = errmsg 479 def __enter__(self): 480 signal.signal(signal.SIGALRM, self.timeout) 481 signal.setitimer(signal.ITIMER_REAL, self.seconds) 482 return self 483 def __exit__(self, exc_type, value, traceback): 484 signal.setitimer(signal.ITIMER_REAL, 0) 485 return False 486 def timeout(self, signum, frame): 487 raise Exception(self.errmsg) 488 489def file_pattern(name): 490 return "{0}-{1}".format(os.getpid(), name) 491 492class FilePath: 493 """ 494 Context manager generating multiple file names. The generated files are 495 removed when exiting the context. 496 497 Example usage: 498 499 with FilePath('a.img', 'b.img') as (img_a, img_b): 500 # Use img_a and img_b here... 501 502 # a.img and b.img are automatically removed here. 503 504 By default images are created in iotests.test_dir. To create sockets use 505 iotests.sock_dir: 506 507 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 508 509 For convenience, calling with one argument yields a single file instead of 510 a tuple with one item. 511 512 """ 513 def __init__(self, *names, base_dir=test_dir): 514 self.paths = [os.path.join(base_dir, file_pattern(name)) 515 for name in names] 516 517 def __enter__(self): 518 if len(self.paths) == 1: 519 return self.paths[0] 520 else: 521 return self.paths 522 523 def __exit__(self, exc_type, exc_val, exc_tb): 524 for path in self.paths: 525 try: 526 os.remove(path) 527 except OSError: 528 pass 529 return False 530 531 532def try_remove(img): 533 try: 534 os.remove(img) 535 except OSError: 536 pass 537 538def file_path_remover(): 539 for path in reversed(file_path_remover.paths): 540 try_remove(path) 541 542 543def file_path(*names, base_dir=test_dir): 544 ''' Another way to get auto-generated filename that cleans itself up. 545 546 Use is as simple as: 547 548 img_a, img_b = file_path('a.img', 'b.img') 549 sock = file_path('socket') 550 ''' 551 552 if not hasattr(file_path_remover, 'paths'): 553 file_path_remover.paths = [] 554 atexit.register(file_path_remover) 555 556 paths = [] 557 for name in names: 558 filename = file_pattern(name) 559 path = os.path.join(base_dir, filename) 560 file_path_remover.paths.append(path) 561 paths.append(path) 562 563 return paths[0] if len(paths) == 1 else paths 564 565def remote_filename(path): 566 if imgproto == 'file': 567 return path 568 elif imgproto == 'ssh': 569 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 570 else: 571 raise Exception("Protocol %s not supported" % (imgproto)) 572 573class VM(qtest.QEMUQtestMachine): 574 '''A QEMU VM''' 575 576 def __init__(self, path_suffix=''): 577 name = "qemu%s-%d" % (path_suffix, os.getpid()) 578 super().__init__(qemu_prog, qemu_opts, name=name, 579 test_dir=test_dir, 580 socket_scm_helper=socket_scm_helper, 581 sock_dir=sock_dir) 582 self._num_drives = 0 583 584 def add_object(self, opts): 585 self._args.append('-object') 586 self._args.append(opts) 587 return self 588 589 def add_device(self, opts): 590 self._args.append('-device') 591 self._args.append(opts) 592 return self 593 594 def add_drive_raw(self, opts): 595 self._args.append('-drive') 596 self._args.append(opts) 597 return self 598 599 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 600 '''Add a virtio-blk drive to the VM''' 601 options = ['if=%s' % interface, 602 'id=drive%d' % self._num_drives] 603 604 if path is not None: 605 options.append('file=%s' % path) 606 options.append('format=%s' % img_format) 607 options.append('cache=%s' % cachemode) 608 options.append('aio=%s' % aiomode) 609 610 if opts: 611 options.append(opts) 612 613 if img_format == 'luks' and 'key-secret' not in opts: 614 # default luks support 615 if luks_default_secret_object not in self._args: 616 self.add_object(luks_default_secret_object) 617 618 options.append(luks_default_key_secret_opt) 619 620 self._args.append('-drive') 621 self._args.append(','.join(options)) 622 self._num_drives += 1 623 return self 624 625 def add_blockdev(self, opts): 626 self._args.append('-blockdev') 627 if isinstance(opts, str): 628 self._args.append(opts) 629 else: 630 self._args.append(','.join(opts)) 631 return self 632 633 def add_incoming(self, addr): 634 self._args.append('-incoming') 635 self._args.append(addr) 636 return self 637 638 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 639 cmd = 'human-monitor-command' 640 kwargs: Dict[str, Any] = {'command-line': command_line} 641 if use_log: 642 return self.qmp_log(cmd, **kwargs) 643 else: 644 return self.qmp(cmd, **kwargs) 645 646 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 647 """Pause drive r/w operations""" 648 if not event: 649 self.pause_drive(drive, "read_aio") 650 self.pause_drive(drive, "write_aio") 651 return 652 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 653 654 def resume_drive(self, drive: str) -> None: 655 """Resume drive r/w operations""" 656 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 657 658 def hmp_qemu_io(self, drive: str, cmd: str, 659 use_log: bool = False) -> QMPMessage: 660 """Write to a given drive using an HMP command""" 661 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 662 663 def flatten_qmp_object(self, obj, output=None, basestr=''): 664 if output is None: 665 output = dict() 666 if isinstance(obj, list): 667 for i, item in enumerate(obj): 668 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 669 elif isinstance(obj, dict): 670 for key in obj: 671 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 672 else: 673 output[basestr[:-1]] = obj # Strip trailing '.' 674 return output 675 676 def qmp_to_opts(self, obj): 677 obj = self.flatten_qmp_object(obj) 678 output_list = list() 679 for key in obj: 680 output_list += [key + '=' + obj[key]] 681 return ','.join(output_list) 682 683 def get_qmp_events_filtered(self, wait=60.0): 684 result = [] 685 for ev in self.get_qmp_events(wait=wait): 686 result.append(filter_qmp_event(ev)) 687 return result 688 689 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 690 full_cmd = OrderedDict(( 691 ("execute", cmd), 692 ("arguments", ordered_qmp(kwargs)) 693 )) 694 log(full_cmd, filters, indent=indent) 695 result = self.qmp(cmd, **kwargs) 696 log(result, filters, indent=indent) 697 return result 698 699 # Returns None on success, and an error string on failure 700 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 701 pre_finalize=None, cancel=False, wait=60.0): 702 """ 703 run_job moves a job from creation through to dismissal. 704 705 :param job: String. ID of recently-launched job 706 :param auto_finalize: Bool. True if the job was launched with 707 auto_finalize. Defaults to True. 708 :param auto_dismiss: Bool. True if the job was launched with 709 auto_dismiss=True. Defaults to False. 710 :param pre_finalize: Callback. A callable that takes no arguments to be 711 invoked prior to issuing job-finalize, if any. 712 :param cancel: Bool. When true, cancels the job after the pre_finalize 713 callback. 714 :param wait: Float. Timeout value specifying how long to wait for any 715 event, in seconds. Defaults to 60.0. 716 """ 717 match_device = {'data': {'device': job}} 718 match_id = {'data': {'id': job}} 719 events = [ 720 ('BLOCK_JOB_COMPLETED', match_device), 721 ('BLOCK_JOB_CANCELLED', match_device), 722 ('BLOCK_JOB_ERROR', match_device), 723 ('BLOCK_JOB_READY', match_device), 724 ('BLOCK_JOB_PENDING', match_id), 725 ('JOB_STATUS_CHANGE', match_id) 726 ] 727 error = None 728 while True: 729 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 730 if ev['event'] != 'JOB_STATUS_CHANGE': 731 log(ev) 732 continue 733 status = ev['data']['status'] 734 if status == 'aborting': 735 result = self.qmp('query-jobs') 736 for j in result['return']: 737 if j['id'] == job: 738 error = j['error'] 739 log('Job failed: %s' % (j['error'])) 740 elif status == 'ready': 741 self.qmp_log('job-complete', id=job) 742 elif status == 'pending' and not auto_finalize: 743 if pre_finalize: 744 pre_finalize() 745 if cancel: 746 self.qmp_log('job-cancel', id=job) 747 else: 748 self.qmp_log('job-finalize', id=job) 749 elif status == 'concluded' and not auto_dismiss: 750 self.qmp_log('job-dismiss', id=job) 751 elif status == 'null': 752 return error 753 754 # Returns None on success, and an error string on failure 755 def blockdev_create(self, options, job_id='job0', filters=None): 756 if filters is None: 757 filters = [filter_qmp_testfiles] 758 result = self.qmp_log('blockdev-create', filters=filters, 759 job_id=job_id, options=options) 760 761 if 'return' in result: 762 assert result['return'] == {} 763 job_result = self.run_job(job_id) 764 else: 765 job_result = result['error'] 766 767 log("") 768 return job_result 769 770 def enable_migration_events(self, name): 771 log('Enabling migration QMP events on %s...' % name) 772 log(self.qmp('migrate-set-capabilities', capabilities=[ 773 { 774 'capability': 'events', 775 'state': True 776 } 777 ])) 778 779 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 780 while True: 781 event = self.event_wait('MIGRATION') 782 # We use the default timeout, and with a timeout, event_wait() 783 # never returns None 784 assert event 785 786 log(event, filters=[filter_qmp_event]) 787 if event['data']['status'] in ('completed', 'failed'): 788 break 789 790 if event['data']['status'] == 'completed': 791 # The event may occur in finish-migrate, so wait for the expected 792 # post-migration runstate 793 runstate = None 794 while runstate != expect_runstate: 795 runstate = self.qmp('query-status')['return']['status'] 796 return True 797 else: 798 return False 799 800 def node_info(self, node_name): 801 nodes = self.qmp('query-named-block-nodes') 802 for x in nodes['return']: 803 if x['node-name'] == node_name: 804 return x 805 return None 806 807 def query_bitmaps(self): 808 res = self.qmp("query-named-block-nodes") 809 return {device['node-name']: device['dirty-bitmaps'] 810 for device in res['return'] if 'dirty-bitmaps' in device} 811 812 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 813 """ 814 get a specific bitmap from the object returned by query_bitmaps. 815 :param recording: If specified, filter results by the specified value. 816 :param bitmaps: If specified, use it instead of call query_bitmaps() 817 """ 818 if bitmaps is None: 819 bitmaps = self.query_bitmaps() 820 821 for bitmap in bitmaps[node_name]: 822 if bitmap.get('name', '') == bitmap_name: 823 if recording is None or bitmap.get('recording') == recording: 824 return bitmap 825 return None 826 827 def check_bitmap_status(self, node_name, bitmap_name, fields): 828 ret = self.get_bitmap(node_name, bitmap_name) 829 830 return fields.items() <= ret.items() 831 832 def assert_block_path(self, root, path, expected_node, graph=None): 833 """ 834 Check whether the node under the given path in the block graph 835 is @expected_node. 836 837 @root is the node name of the node where the @path is rooted. 838 839 @path is a string that consists of child names separated by 840 slashes. It must begin with a slash. 841 842 Examples for @root + @path: 843 - root="qcow2-node", path="/backing/file" 844 - root="quorum-node", path="/children.2/file" 845 846 Hypothetically, @path could be empty, in which case it would 847 point to @root. However, in practice this case is not useful 848 and hence not allowed. 849 850 @expected_node may be None. (All elements of the path but the 851 leaf must still exist.) 852 853 @graph may be None or the result of an x-debug-query-block-graph 854 call that has already been performed. 855 """ 856 if graph is None: 857 graph = self.qmp('x-debug-query-block-graph')['return'] 858 859 iter_path = iter(path.split('/')) 860 861 # Must start with a / 862 assert next(iter_path) == '' 863 864 node = next((node for node in graph['nodes'] if node['name'] == root), 865 None) 866 867 # An empty @path is not allowed, so the root node must be present 868 assert node is not None, 'Root node %s not found' % root 869 870 for child_name in iter_path: 871 assert node is not None, 'Cannot follow path %s%s' % (root, path) 872 873 try: 874 node_id = next(edge['child'] for edge in graph['edges'] 875 if (edge['parent'] == node['id'] and 876 edge['name'] == child_name)) 877 878 node = next(node for node in graph['nodes'] 879 if node['id'] == node_id) 880 881 except StopIteration: 882 node = None 883 884 if node is None: 885 assert expected_node is None, \ 886 'No node found under %s (but expected %s)' % \ 887 (path, expected_node) 888 else: 889 assert node['name'] == expected_node, \ 890 'Found node %s under %s (but expected %s)' % \ 891 (node['name'], path, expected_node) 892 893index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 894 895class QMPTestCase(unittest.TestCase): 896 '''Abstract base class for QMP test cases''' 897 898 def __init__(self, *args, **kwargs): 899 super().__init__(*args, **kwargs) 900 # Many users of this class set a VM property we rely on heavily 901 # in the methods below. 902 self.vm = None 903 904 def dictpath(self, d, path): 905 '''Traverse a path in a nested dict''' 906 for component in path.split('/'): 907 m = index_re.match(component) 908 if m: 909 component, idx = m.groups() 910 idx = int(idx) 911 912 if not isinstance(d, dict) or component not in d: 913 self.fail(f'failed path traversal for "{path}" in "{d}"') 914 d = d[component] 915 916 if m: 917 if not isinstance(d, list): 918 self.fail(f'path component "{component}" in "{path}" ' 919 f'is not a list in "{d}"') 920 try: 921 d = d[idx] 922 except IndexError: 923 self.fail(f'invalid index "{idx}" in path "{path}" ' 924 f'in "{d}"') 925 return d 926 927 def assert_qmp_absent(self, d, path): 928 try: 929 result = self.dictpath(d, path) 930 except AssertionError: 931 return 932 self.fail('path "%s" has value "%s"' % (path, str(result))) 933 934 def assert_qmp(self, d, path, value): 935 '''Assert that the value for a specific path in a QMP dict 936 matches. When given a list of values, assert that any of 937 them matches.''' 938 939 result = self.dictpath(d, path) 940 941 # [] makes no sense as a list of valid values, so treat it as 942 # an actual single value. 943 if isinstance(value, list) and value != []: 944 for v in value: 945 if result == v: 946 return 947 self.fail('no match for "%s" in %s' % (str(result), str(value))) 948 else: 949 self.assertEqual(result, value, 950 '"%s" is "%s", expected "%s"' 951 % (path, str(result), str(value))) 952 953 def assert_no_active_block_jobs(self): 954 result = self.vm.qmp('query-block-jobs') 955 self.assert_qmp(result, 'return', []) 956 957 def assert_has_block_node(self, node_name=None, file_name=None): 958 """Issue a query-named-block-nodes and assert node_name and/or 959 file_name is present in the result""" 960 def check_equal_or_none(a, b): 961 return a is None or b is None or a == b 962 assert node_name or file_name 963 result = self.vm.qmp('query-named-block-nodes') 964 for x in result["return"]: 965 if check_equal_or_none(x.get("node-name"), node_name) and \ 966 check_equal_or_none(x.get("file"), file_name): 967 return 968 self.fail("Cannot find %s %s in result:\n%s" % 969 (node_name, file_name, result)) 970 971 def assert_json_filename_equal(self, json_filename, reference): 972 '''Asserts that the given filename is a json: filename and that its 973 content is equal to the given reference object''' 974 self.assertEqual(json_filename[:5], 'json:') 975 self.assertEqual( 976 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 977 self.vm.flatten_qmp_object(reference) 978 ) 979 980 def cancel_and_wait(self, drive='drive0', force=False, 981 resume=False, wait=60.0): 982 '''Cancel a block job and wait for it to finish, returning the event''' 983 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 984 self.assert_qmp(result, 'return', {}) 985 986 if resume: 987 self.vm.resume_drive(drive) 988 989 cancelled = False 990 result = None 991 while not cancelled: 992 for event in self.vm.get_qmp_events(wait=wait): 993 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 994 event['event'] == 'BLOCK_JOB_CANCELLED': 995 self.assert_qmp(event, 'data/device', drive) 996 result = event 997 cancelled = True 998 elif event['event'] == 'JOB_STATUS_CHANGE': 999 self.assert_qmp(event, 'data/id', drive) 1000 1001 1002 self.assert_no_active_block_jobs() 1003 return result 1004 1005 def wait_until_completed(self, drive='drive0', check_offset=True, 1006 wait=60.0, error=None): 1007 '''Wait for a block job to finish, returning the event''' 1008 while True: 1009 for event in self.vm.get_qmp_events(wait=wait): 1010 if event['event'] == 'BLOCK_JOB_COMPLETED': 1011 self.assert_qmp(event, 'data/device', drive) 1012 if error is None: 1013 self.assert_qmp_absent(event, 'data/error') 1014 if check_offset: 1015 self.assert_qmp(event, 'data/offset', 1016 event['data']['len']) 1017 else: 1018 self.assert_qmp(event, 'data/error', error) 1019 self.assert_no_active_block_jobs() 1020 return event 1021 if event['event'] == 'JOB_STATUS_CHANGE': 1022 self.assert_qmp(event, 'data/id', drive) 1023 1024 def wait_ready(self, drive='drive0'): 1025 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1026 return self.vm.events_wait([ 1027 ('BLOCK_JOB_READY', 1028 {'data': {'type': 'mirror', 'device': drive}}), 1029 ('BLOCK_JOB_READY', 1030 {'data': {'type': 'commit', 'device': drive}}) 1031 ]) 1032 1033 def wait_ready_and_cancel(self, drive='drive0'): 1034 self.wait_ready(drive=drive) 1035 event = self.cancel_and_wait(drive=drive) 1036 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1037 self.assert_qmp(event, 'data/type', 'mirror') 1038 self.assert_qmp(event, 'data/offset', event['data']['len']) 1039 1040 def complete_and_wait(self, drive='drive0', wait_ready=True, 1041 completion_error=None): 1042 '''Complete a block job and wait for it to finish''' 1043 if wait_ready: 1044 self.wait_ready(drive=drive) 1045 1046 result = self.vm.qmp('block-job-complete', device=drive) 1047 self.assert_qmp(result, 'return', {}) 1048 1049 event = self.wait_until_completed(drive=drive, error=completion_error) 1050 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1051 1052 def pause_wait(self, job_id='job0'): 1053 with Timeout(3, "Timeout waiting for job to pause"): 1054 while True: 1055 result = self.vm.qmp('query-block-jobs') 1056 found = False 1057 for job in result['return']: 1058 if job['device'] == job_id: 1059 found = True 1060 if job['paused'] and not job['busy']: 1061 return job 1062 break 1063 assert found 1064 1065 def pause_job(self, job_id='job0', wait=True): 1066 result = self.vm.qmp('block-job-pause', device=job_id) 1067 self.assert_qmp(result, 'return', {}) 1068 if wait: 1069 return self.pause_wait(job_id) 1070 return result 1071 1072 def case_skip(self, reason): 1073 '''Skip this test case''' 1074 case_notrun(reason) 1075 self.skipTest(reason) 1076 1077 1078def notrun(reason): 1079 '''Skip this test suite''' 1080 # Each test in qemu-iotests has a number ("seq") 1081 seq = os.path.basename(sys.argv[0]) 1082 1083 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1084 logger.warning("%s not run: %s", seq, reason) 1085 sys.exit(0) 1086 1087def case_notrun(reason): 1088 '''Mark this test case as not having been run (without actually 1089 skipping it, that is left to the caller). See 1090 QMPTestCase.case_skip() for a variant that actually skips the 1091 current test case.''' 1092 1093 # Each test in qemu-iotests has a number ("seq") 1094 seq = os.path.basename(sys.argv[0]) 1095 1096 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1097 ' [case not run] ' + reason + '\n') 1098 1099def _verify_image_format(supported_fmts: Sequence[str] = (), 1100 unsupported_fmts: Sequence[str] = ()) -> None: 1101 if 'generic' in supported_fmts and \ 1102 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1103 # similar to 1104 # _supported_fmt generic 1105 # for bash tests 1106 supported_fmts = () 1107 1108 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1109 if not_sup or (imgfmt in unsupported_fmts): 1110 notrun('not suitable for this image format: %s' % imgfmt) 1111 1112 if imgfmt == 'luks': 1113 verify_working_luks() 1114 1115def _verify_protocol(supported: Sequence[str] = (), 1116 unsupported: Sequence[str] = ()) -> None: 1117 assert not (supported and unsupported) 1118 1119 if 'generic' in supported: 1120 return 1121 1122 not_sup = supported and (imgproto not in supported) 1123 if not_sup or (imgproto in unsupported): 1124 notrun('not suitable for this protocol: %s' % imgproto) 1125 1126def _verify_platform(supported: Sequence[str] = (), 1127 unsupported: Sequence[str] = ()) -> None: 1128 if any((sys.platform.startswith(x) for x in unsupported)): 1129 notrun('not suitable for this OS: %s' % sys.platform) 1130 1131 if supported: 1132 if not any((sys.platform.startswith(x) for x in supported)): 1133 notrun('not suitable for this OS: %s' % sys.platform) 1134 1135def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1136 if supported_cache_modes and (cachemode not in supported_cache_modes): 1137 notrun('not suitable for this cache mode: %s' % cachemode) 1138 1139def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1140 if supported_aio_modes and (aiomode not in supported_aio_modes): 1141 notrun('not suitable for this aio mode: %s' % aiomode) 1142 1143def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1144 usf_list = list(set(required_formats) - set(supported_formats())) 1145 if usf_list: 1146 notrun(f'formats {usf_list} are not whitelisted') 1147 1148 1149def _verify_virtio_blk() -> None: 1150 out = qemu_pipe('-M', 'none', '-device', 'help') 1151 if 'virtio-blk' not in out: 1152 notrun('Missing virtio-blk in QEMU binary') 1153 1154 1155def supports_quorum(): 1156 return 'quorum' in qemu_img_pipe('--help') 1157 1158def verify_quorum(): 1159 '''Skip test suite if quorum support is not available''' 1160 if not supports_quorum(): 1161 notrun('quorum support missing') 1162 1163def has_working_luks() -> Tuple[bool, str]: 1164 """ 1165 Check whether our LUKS driver can actually create images 1166 (this extends to LUKS encryption for qcow2). 1167 1168 If not, return the reason why. 1169 """ 1170 1171 img_file = f'{test_dir}/luks-test.luks' 1172 (output, status) = \ 1173 qemu_img_pipe_and_status('create', '-f', 'luks', 1174 '--object', luks_default_secret_object, 1175 '-o', luks_default_key_secret_opt, 1176 '-o', 'iter-time=10', 1177 img_file, '1G') 1178 try: 1179 os.remove(img_file) 1180 except OSError: 1181 pass 1182 1183 if status != 0: 1184 reason = output 1185 for line in output.splitlines(): 1186 if img_file + ':' in line: 1187 reason = line.split(img_file + ':', 1)[1].strip() 1188 break 1189 1190 return (False, reason) 1191 else: 1192 return (True, '') 1193 1194def verify_working_luks(): 1195 """ 1196 Skip test suite if LUKS does not work 1197 """ 1198 (working, reason) = has_working_luks() 1199 if not working: 1200 notrun(reason) 1201 1202def qemu_pipe(*args: str) -> str: 1203 """ 1204 Run qemu with an option to print something and exit (e.g. a help option). 1205 1206 :return: QEMU's stdout output. 1207 """ 1208 full_args = [qemu_prog] + qemu_opts + list(args) 1209 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1210 return output 1211 1212def supported_formats(read_only=False): 1213 '''Set 'read_only' to True to check ro-whitelist 1214 Otherwise, rw-whitelist is checked''' 1215 1216 if not hasattr(supported_formats, "formats"): 1217 supported_formats.formats = {} 1218 1219 if read_only not in supported_formats.formats: 1220 format_message = qemu_pipe("-drive", "format=help") 1221 line = 1 if read_only else 0 1222 supported_formats.formats[read_only] = \ 1223 format_message.splitlines()[line].split(":")[1].split() 1224 1225 return supported_formats.formats[read_only] 1226 1227def skip_if_unsupported(required_formats=(), read_only=False): 1228 '''Skip Test Decorator 1229 Runs the test if all the required formats are whitelisted''' 1230 def skip_test_decorator(func): 1231 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1232 **kwargs: Dict[str, Any]) -> None: 1233 if callable(required_formats): 1234 fmts = required_formats(test_case) 1235 else: 1236 fmts = required_formats 1237 1238 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1239 if usf_list: 1240 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1241 test_case.case_skip(msg) 1242 else: 1243 func(test_case, *args, **kwargs) 1244 return func_wrapper 1245 return skip_test_decorator 1246 1247def skip_for_formats(formats: Sequence[str] = ()) \ 1248 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1249 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1250 '''Skip Test Decorator 1251 Skips the test for the given formats''' 1252 def skip_test_decorator(func): 1253 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1254 **kwargs: Dict[str, Any]) -> None: 1255 if imgfmt in formats: 1256 msg = f'{test_case}: Skipped for format {imgfmt}' 1257 test_case.case_skip(msg) 1258 else: 1259 func(test_case, *args, **kwargs) 1260 return func_wrapper 1261 return skip_test_decorator 1262 1263def skip_if_user_is_root(func): 1264 '''Skip Test Decorator 1265 Runs the test only without root permissions''' 1266 def func_wrapper(*args, **kwargs): 1267 if os.getuid() == 0: 1268 case_notrun('{}: cannot be run as root'.format(args[0])) 1269 return None 1270 else: 1271 return func(*args, **kwargs) 1272 return func_wrapper 1273 1274def execute_unittest(debug=False): 1275 """Executes unittests within the calling module.""" 1276 1277 verbosity = 2 if debug else 1 1278 1279 if debug: 1280 output = sys.stdout 1281 else: 1282 # We need to filter out the time taken from the output so that 1283 # qemu-iotest can reliably diff the results against master output. 1284 output = io.StringIO() 1285 1286 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1287 verbosity=verbosity) 1288 try: 1289 # unittest.main() will use sys.exit(); so expect a SystemExit 1290 # exception 1291 unittest.main(testRunner=runner) 1292 finally: 1293 # We need to filter out the time taken from the output so that 1294 # qemu-iotest can reliably diff the results against master output. 1295 if not debug: 1296 out = output.getvalue() 1297 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1298 1299 # Hide skipped tests from the reference output 1300 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1301 out_first_line, out_rest = out.split('\n', 1) 1302 out = out_first_line.replace('s', '.') + '\n' + out_rest 1303 1304 sys.stderr.write(out) 1305 1306def execute_setup_common(supported_fmts: Sequence[str] = (), 1307 supported_platforms: Sequence[str] = (), 1308 supported_cache_modes: Sequence[str] = (), 1309 supported_aio_modes: Sequence[str] = (), 1310 unsupported_fmts: Sequence[str] = (), 1311 supported_protocols: Sequence[str] = (), 1312 unsupported_protocols: Sequence[str] = (), 1313 required_fmts: Sequence[str] = ()) -> bool: 1314 """ 1315 Perform necessary setup for either script-style or unittest-style tests. 1316 1317 :return: Bool; Whether or not debug mode has been requested via the CLI. 1318 """ 1319 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1320 1321 debug = '-d' in sys.argv 1322 if debug: 1323 sys.argv.remove('-d') 1324 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1325 1326 _verify_image_format(supported_fmts, unsupported_fmts) 1327 _verify_protocol(supported_protocols, unsupported_protocols) 1328 _verify_platform(supported=supported_platforms) 1329 _verify_cache_mode(supported_cache_modes) 1330 _verify_aio_mode(supported_aio_modes) 1331 _verify_formats(required_fmts) 1332 _verify_virtio_blk() 1333 1334 return debug 1335 1336def execute_test(*args, test_function=None, **kwargs): 1337 """Run either unittest or script-style tests.""" 1338 1339 debug = execute_setup_common(*args, **kwargs) 1340 if not test_function: 1341 execute_unittest(debug) 1342 else: 1343 test_function() 1344 1345def activate_logging(): 1346 """Activate iotests.log() output to stdout for script-style tests.""" 1347 handler = logging.StreamHandler(stream=sys.stdout) 1348 formatter = logging.Formatter('%(message)s') 1349 handler.setFormatter(formatter) 1350 test_logger.addHandler(handler) 1351 test_logger.setLevel(logging.INFO) 1352 test_logger.propagate = False 1353 1354# This is called from script-style iotests without a single point of entry 1355def script_initialize(*args, **kwargs): 1356 """Initialize script-style tests without running any tests.""" 1357 activate_logging() 1358 execute_setup_common(*args, **kwargs) 1359 1360# This is called from script-style iotests with a single point of entry 1361def script_main(test_function, *args, **kwargs): 1362 """Run script-style tests outside of the unittest framework""" 1363 activate_logging() 1364 execute_test(*args, test_function=test_function, **kwargs) 1365 1366# This is called from unittest style iotests 1367def main(*args, **kwargs): 1368 """Run tests using the unittest framework""" 1369 execute_test(*args, **kwargs) 1370