1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 77 78gdb_qemu_env = os.environ.get('GDB_OPTIONS') 79qemu_gdb = [] 80if gdb_qemu_env: 81 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 82 83qemu_print = os.environ.get('PRINT_QEMU', False) 84 85imgfmt = os.environ.get('IMGFMT', 'raw') 86imgproto = os.environ.get('IMGPROTO', 'file') 87output_dir = os.environ.get('OUTPUT_DIR', '.') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 210 """ 211 Run qemu-img and return both its output and its exit code 212 """ 213 is_create = bool(args and args[0] == 'create') 214 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 215 return qemu_tool_pipe_and_status('qemu-img', full_args, 216 drop_successful_output=is_create) 217 218def qemu_img(*args: str) -> int: 219 '''Run qemu-img and return the exit code''' 220 return qemu_img_pipe_and_status(*args)[1] 221 222def ordered_qmp(qmsg, conv_keys=True): 223 # Dictionaries are not ordered prior to 3.6, therefore: 224 if isinstance(qmsg, list): 225 return [ordered_qmp(atom) for atom in qmsg] 226 if isinstance(qmsg, dict): 227 od = OrderedDict() 228 for k, v in sorted(qmsg.items()): 229 if conv_keys: 230 k = k.replace('_', '-') 231 od[k] = ordered_qmp(v, conv_keys=False) 232 return od 233 return qmsg 234 235def qemu_img_create(*args): 236 return qemu_img('create', *args) 237 238def qemu_img_measure(*args): 239 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 240 241def qemu_img_check(*args): 242 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 243 244def qemu_img_pipe(*args: str) -> str: 245 '''Run qemu-img and return its output''' 246 return qemu_img_pipe_and_status(*args)[0] 247 248def qemu_img_log(*args): 249 result = qemu_img_pipe(*args) 250 log(result, filters=[filter_testfiles]) 251 return result 252 253def img_info_log(filename, filter_path=None, use_image_opts=False, 254 extra_args=()): 255 args = ['info'] 256 if use_image_opts: 257 args.append('--image-opts') 258 else: 259 args += ['-f', imgfmt] 260 args += extra_args 261 args.append(filename) 262 263 output = qemu_img_pipe(*args) 264 if not filter_path: 265 filter_path = filename 266 log(filter_img_info(output, filter_path)) 267 268def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 269 if '-f' in args or '--image-opts' in args: 270 return qemu_io_args_no_fmt + list(args) 271 else: 272 return qemu_io_args + list(args) 273 274def qemu_io_popen(*args): 275 return qemu_tool_popen(qemu_io_wrap_args(args)) 276 277def qemu_io(*args): 278 '''Run qemu-io and return the stdout data''' 279 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 280 281def qemu_io_log(*args): 282 result = qemu_io(*args) 283 log(result, filters=[filter_testfiles, filter_qemu_io]) 284 return result 285 286def qemu_io_silent(*args): 287 '''Run qemu-io and return the exit code, suppressing stdout''' 288 args = qemu_io_wrap_args(args) 289 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 290 if result.returncode < 0: 291 sys.stderr.write('qemu-io received signal %i: %s\n' % 292 (-result.returncode, ' '.join(args))) 293 return result.returncode 294 295def qemu_io_silent_check(*args): 296 '''Run qemu-io and return the true if subprocess returned 0''' 297 args = qemu_io_wrap_args(args) 298 result = subprocess.run(args, stdout=subprocess.DEVNULL, 299 stderr=subprocess.STDOUT, check=False) 300 return result.returncode == 0 301 302class QemuIoInteractive: 303 def __init__(self, *args): 304 self.args = qemu_io_wrap_args(args) 305 # We need to keep the Popen objext around, and not 306 # close it immediately. Therefore, disable the pylint check: 307 # pylint: disable=consider-using-with 308 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 309 stdout=subprocess.PIPE, 310 stderr=subprocess.STDOUT, 311 universal_newlines=True) 312 out = self._p.stdout.read(9) 313 if out != 'qemu-io> ': 314 # Most probably qemu-io just failed to start. 315 # Let's collect the whole output and exit. 316 out += self._p.stdout.read() 317 self._p.wait(timeout=1) 318 raise ValueError(out) 319 320 def close(self): 321 self._p.communicate('q\n') 322 323 def _read_output(self): 324 pattern = 'qemu-io> ' 325 n = len(pattern) 326 pos = 0 327 s = [] 328 while pos != n: 329 c = self._p.stdout.read(1) 330 # check unexpected EOF 331 assert c != '' 332 s.append(c) 333 if c == pattern[pos]: 334 pos += 1 335 else: 336 pos = 0 337 338 return ''.join(s[:-n]) 339 340 def cmd(self, cmd): 341 # quit command is in close(), '\n' is added automatically 342 assert '\n' not in cmd 343 cmd = cmd.strip() 344 assert cmd not in ('q', 'quit') 345 self._p.stdin.write(cmd + '\n') 346 self._p.stdin.flush() 347 return self._read_output() 348 349 350class QemuStorageDaemon: 351 def __init__(self, *args: str, instance_id: str = 'a'): 352 assert '--pidfile' not in args 353 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 354 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 355 356 # Cannot use with here, we want the subprocess to stay around 357 # pylint: disable=consider-using-with 358 self._p = subprocess.Popen(all_args) 359 while not os.path.exists(self.pidfile): 360 if self._p.poll() is not None: 361 cmd = ' '.join(all_args) 362 raise RuntimeError( 363 'qemu-storage-daemon terminated with exit code ' + 364 f'{self._p.returncode}: {cmd}') 365 366 time.sleep(0.01) 367 368 with open(self.pidfile, encoding='utf-8') as f: 369 self._pid = int(f.read().strip()) 370 371 assert self._pid == self._p.pid 372 373 def stop(self, kill_signal=15): 374 self._p.send_signal(kill_signal) 375 self._p.wait() 376 self._p = None 377 378 try: 379 os.remove(self.pidfile) 380 except OSError: 381 pass 382 383 def __del__(self): 384 if self._p is not None: 385 self.stop(kill_signal=9) 386 387 388def qemu_nbd(*args): 389 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 390 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 391 392def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 393 '''Run qemu-nbd in daemon mode and return both the parent's exit code 394 and its output in case of an error''' 395 full_args = qemu_nbd_args + ['--fork'] + list(args) 396 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 397 connect_stderr=False) 398 return returncode, output if returncode else '' 399 400def qemu_nbd_list_log(*args: str) -> str: 401 '''Run qemu-nbd to list remote exports''' 402 full_args = [qemu_nbd_prog, '-L'] + list(args) 403 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 404 log(output, filters=[filter_testfiles, filter_nbd_exports]) 405 return output 406 407@contextmanager 408def qemu_nbd_popen(*args): 409 '''Context manager running qemu-nbd within the context''' 410 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 411 412 assert not os.path.exists(pid_file) 413 414 cmd = list(qemu_nbd_args) 415 cmd.extend(('--persistent', '--pid-file', pid_file)) 416 cmd.extend(args) 417 418 log('Start NBD server') 419 with subprocess.Popen(cmd) as p: 420 try: 421 while not os.path.exists(pid_file): 422 if p.poll() is not None: 423 raise RuntimeError( 424 "qemu-nbd terminated with exit code {}: {}" 425 .format(p.returncode, ' '.join(cmd))) 426 427 time.sleep(0.01) 428 yield 429 finally: 430 if os.path.exists(pid_file): 431 os.remove(pid_file) 432 log('Kill NBD server') 433 p.kill() 434 p.wait() 435 436def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 437 '''Return True if two image files are identical''' 438 return qemu_img('compare', '-f', fmt1, 439 '-F', fmt2, img1, img2) == 0 440 441def create_image(name, size): 442 '''Create a fully-allocated raw image with sector markers''' 443 with open(name, 'wb') as file: 444 i = 0 445 while i < size: 446 sector = struct.pack('>l504xl', i // 512, i // 512) 447 file.write(sector) 448 i = i + 512 449 450def image_size(img): 451 '''Return image's virtual size''' 452 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 453 return json.loads(r)['virtual-size'] 454 455def is_str(val): 456 return isinstance(val, str) 457 458test_dir_re = re.compile(r"%s" % test_dir) 459def filter_test_dir(msg): 460 return test_dir_re.sub("TEST_DIR", msg) 461 462win32_re = re.compile(r"\r") 463def filter_win32(msg): 464 return win32_re.sub("", msg) 465 466qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 467 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 468 r"and [0-9\/.inf]* ops\/sec\)") 469def filter_qemu_io(msg): 470 msg = filter_win32(msg) 471 return qemu_io_re.sub("X ops; XX:XX:XX.X " 472 "(XXX YYY/sec and XXX ops/sec)", msg) 473 474chown_re = re.compile(r"chown [0-9]+:[0-9]+") 475def filter_chown(msg): 476 return chown_re.sub("chown UID:GID", msg) 477 478def filter_qmp_event(event): 479 '''Filter a QMP event dict''' 480 event = dict(event) 481 if 'timestamp' in event: 482 event['timestamp']['seconds'] = 'SECS' 483 event['timestamp']['microseconds'] = 'USECS' 484 return event 485 486def filter_qmp(qmsg, filter_fn): 487 '''Given a string filter, filter a QMP object's values. 488 filter_fn takes a (key, value) pair.''' 489 # Iterate through either lists or dicts; 490 if isinstance(qmsg, list): 491 items = enumerate(qmsg) 492 else: 493 items = qmsg.items() 494 495 for k, v in items: 496 if isinstance(v, (dict, list)): 497 qmsg[k] = filter_qmp(v, filter_fn) 498 else: 499 qmsg[k] = filter_fn(k, v) 500 return qmsg 501 502def filter_testfiles(msg): 503 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 504 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 505 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 506 507def filter_qmp_testfiles(qmsg): 508 def _filter(_key, value): 509 if is_str(value): 510 return filter_testfiles(value) 511 return value 512 return filter_qmp(qmsg, _filter) 513 514def filter_virtio_scsi(output: str) -> str: 515 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 516 517def filter_qmp_virtio_scsi(qmsg): 518 def _filter(_key, value): 519 if is_str(value): 520 return filter_virtio_scsi(value) 521 return value 522 return filter_qmp(qmsg, _filter) 523 524def filter_generated_node_ids(msg): 525 return re.sub("#block[0-9]+", "NODE_NAME", msg) 526 527def filter_img_info(output, filename): 528 lines = [] 529 for line in output.split('\n'): 530 if 'disk size' in line or 'actual-size' in line: 531 continue 532 line = line.replace(filename, 'TEST_IMG') 533 line = filter_testfiles(line) 534 line = line.replace(imgfmt, 'IMGFMT') 535 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 536 line = re.sub('uuid: [-a-f0-9]+', 537 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 538 line) 539 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 540 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 541 line) 542 lines.append(line) 543 return '\n'.join(lines) 544 545def filter_imgfmt(msg): 546 return msg.replace(imgfmt, 'IMGFMT') 547 548def filter_qmp_imgfmt(qmsg): 549 def _filter(_key, value): 550 if is_str(value): 551 return filter_imgfmt(value) 552 return value 553 return filter_qmp(qmsg, _filter) 554 555def filter_nbd_exports(output: str) -> str: 556 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 557 558 559Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 560 561def log(msg: Msg, 562 filters: Iterable[Callable[[Msg], Msg]] = (), 563 indent: Optional[int] = None) -> None: 564 """ 565 Logs either a string message or a JSON serializable message (like QMP). 566 If indent is provided, JSON serializable messages are pretty-printed. 567 """ 568 for flt in filters: 569 msg = flt(msg) 570 if isinstance(msg, (dict, list)): 571 # Don't sort if it's already sorted 572 do_sort = not isinstance(msg, OrderedDict) 573 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 574 else: 575 test_logger.info(msg) 576 577class Timeout: 578 def __init__(self, seconds, errmsg="Timeout"): 579 self.seconds = seconds 580 self.errmsg = errmsg 581 def __enter__(self): 582 if qemu_gdb or qemu_valgrind: 583 return self 584 signal.signal(signal.SIGALRM, self.timeout) 585 signal.setitimer(signal.ITIMER_REAL, self.seconds) 586 return self 587 def __exit__(self, exc_type, value, traceback): 588 if qemu_gdb or qemu_valgrind: 589 return False 590 signal.setitimer(signal.ITIMER_REAL, 0) 591 return False 592 def timeout(self, signum, frame): 593 raise Exception(self.errmsg) 594 595def file_pattern(name): 596 return "{0}-{1}".format(os.getpid(), name) 597 598class FilePath: 599 """ 600 Context manager generating multiple file names. The generated files are 601 removed when exiting the context. 602 603 Example usage: 604 605 with FilePath('a.img', 'b.img') as (img_a, img_b): 606 # Use img_a and img_b here... 607 608 # a.img and b.img are automatically removed here. 609 610 By default images are created in iotests.test_dir. To create sockets use 611 iotests.sock_dir: 612 613 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 614 615 For convenience, calling with one argument yields a single file instead of 616 a tuple with one item. 617 618 """ 619 def __init__(self, *names, base_dir=test_dir): 620 self.paths = [os.path.join(base_dir, file_pattern(name)) 621 for name in names] 622 623 def __enter__(self): 624 if len(self.paths) == 1: 625 return self.paths[0] 626 else: 627 return self.paths 628 629 def __exit__(self, exc_type, exc_val, exc_tb): 630 for path in self.paths: 631 try: 632 os.remove(path) 633 except OSError: 634 pass 635 return False 636 637 638def try_remove(img): 639 try: 640 os.remove(img) 641 except OSError: 642 pass 643 644def file_path_remover(): 645 for path in reversed(file_path_remover.paths): 646 try_remove(path) 647 648 649def file_path(*names, base_dir=test_dir): 650 ''' Another way to get auto-generated filename that cleans itself up. 651 652 Use is as simple as: 653 654 img_a, img_b = file_path('a.img', 'b.img') 655 sock = file_path('socket') 656 ''' 657 658 if not hasattr(file_path_remover, 'paths'): 659 file_path_remover.paths = [] 660 atexit.register(file_path_remover) 661 662 paths = [] 663 for name in names: 664 filename = file_pattern(name) 665 path = os.path.join(base_dir, filename) 666 file_path_remover.paths.append(path) 667 paths.append(path) 668 669 return paths[0] if len(paths) == 1 else paths 670 671def remote_filename(path): 672 if imgproto == 'file': 673 return path 674 elif imgproto == 'ssh': 675 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 676 else: 677 raise Exception("Protocol %s not supported" % (imgproto)) 678 679class VM(qtest.QEMUQtestMachine): 680 '''A QEMU VM''' 681 682 def __init__(self, path_suffix=''): 683 name = "qemu%s-%d" % (path_suffix, os.getpid()) 684 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 685 if qemu_gdb and qemu_valgrind: 686 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 687 sys.exit(1) 688 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 689 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 690 name=name, 691 base_temp_dir=test_dir, 692 sock_dir=sock_dir, qmp_timer=timer) 693 self._num_drives = 0 694 695 def _post_shutdown(self) -> None: 696 super()._post_shutdown() 697 if not qemu_valgrind or not self._popen: 698 return 699 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 700 if self.exitcode() == 99: 701 with open(valgrind_filename, encoding='utf-8') as f: 702 print(f.read()) 703 else: 704 os.remove(valgrind_filename) 705 706 def _pre_launch(self) -> None: 707 super()._pre_launch() 708 if qemu_print: 709 # set QEMU binary output to stdout 710 self._close_qemu_log_file() 711 712 def add_object(self, opts): 713 self._args.append('-object') 714 self._args.append(opts) 715 return self 716 717 def add_device(self, opts): 718 self._args.append('-device') 719 self._args.append(opts) 720 return self 721 722 def add_drive_raw(self, opts): 723 self._args.append('-drive') 724 self._args.append(opts) 725 return self 726 727 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 728 '''Add a virtio-blk drive to the VM''' 729 options = ['if=%s' % interface, 730 'id=drive%d' % self._num_drives] 731 732 if path is not None: 733 options.append('file=%s' % path) 734 options.append('format=%s' % img_format) 735 options.append('cache=%s' % cachemode) 736 options.append('aio=%s' % aiomode) 737 738 if opts: 739 options.append(opts) 740 741 if img_format == 'luks' and 'key-secret' not in opts: 742 # default luks support 743 if luks_default_secret_object not in self._args: 744 self.add_object(luks_default_secret_object) 745 746 options.append(luks_default_key_secret_opt) 747 748 self._args.append('-drive') 749 self._args.append(','.join(options)) 750 self._num_drives += 1 751 return self 752 753 def add_blockdev(self, opts): 754 self._args.append('-blockdev') 755 if isinstance(opts, str): 756 self._args.append(opts) 757 else: 758 self._args.append(','.join(opts)) 759 return self 760 761 def add_incoming(self, addr): 762 self._args.append('-incoming') 763 self._args.append(addr) 764 return self 765 766 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 767 cmd = 'human-monitor-command' 768 kwargs: Dict[str, Any] = {'command-line': command_line} 769 if use_log: 770 return self.qmp_log(cmd, **kwargs) 771 else: 772 return self.qmp(cmd, **kwargs) 773 774 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 775 """Pause drive r/w operations""" 776 if not event: 777 self.pause_drive(drive, "read_aio") 778 self.pause_drive(drive, "write_aio") 779 return 780 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 781 782 def resume_drive(self, drive: str) -> None: 783 """Resume drive r/w operations""" 784 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 785 786 def hmp_qemu_io(self, drive: str, cmd: str, 787 use_log: bool = False, qdev: bool = False) -> QMPMessage: 788 """Write to a given drive using an HMP command""" 789 d = '-d ' if qdev else '' 790 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 791 792 def flatten_qmp_object(self, obj, output=None, basestr=''): 793 if output is None: 794 output = {} 795 if isinstance(obj, list): 796 for i, item in enumerate(obj): 797 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 798 elif isinstance(obj, dict): 799 for key in obj: 800 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 801 else: 802 output[basestr[:-1]] = obj # Strip trailing '.' 803 return output 804 805 def qmp_to_opts(self, obj): 806 obj = self.flatten_qmp_object(obj) 807 output_list = [] 808 for key in obj: 809 output_list += [key + '=' + obj[key]] 810 return ','.join(output_list) 811 812 def get_qmp_events_filtered(self, wait=60.0): 813 result = [] 814 for ev in self.get_qmp_events(wait=wait): 815 result.append(filter_qmp_event(ev)) 816 return result 817 818 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 819 full_cmd = OrderedDict(( 820 ("execute", cmd), 821 ("arguments", ordered_qmp(kwargs)) 822 )) 823 log(full_cmd, filters, indent=indent) 824 result = self.qmp(cmd, **kwargs) 825 log(result, filters, indent=indent) 826 return result 827 828 # Returns None on success, and an error string on failure 829 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 830 pre_finalize=None, cancel=False, wait=60.0): 831 """ 832 run_job moves a job from creation through to dismissal. 833 834 :param job: String. ID of recently-launched job 835 :param auto_finalize: Bool. True if the job was launched with 836 auto_finalize. Defaults to True. 837 :param auto_dismiss: Bool. True if the job was launched with 838 auto_dismiss=True. Defaults to False. 839 :param pre_finalize: Callback. A callable that takes no arguments to be 840 invoked prior to issuing job-finalize, if any. 841 :param cancel: Bool. When true, cancels the job after the pre_finalize 842 callback. 843 :param wait: Float. Timeout value specifying how long to wait for any 844 event, in seconds. Defaults to 60.0. 845 """ 846 match_device = {'data': {'device': job}} 847 match_id = {'data': {'id': job}} 848 events = [ 849 ('BLOCK_JOB_COMPLETED', match_device), 850 ('BLOCK_JOB_CANCELLED', match_device), 851 ('BLOCK_JOB_ERROR', match_device), 852 ('BLOCK_JOB_READY', match_device), 853 ('BLOCK_JOB_PENDING', match_id), 854 ('JOB_STATUS_CHANGE', match_id) 855 ] 856 error = None 857 while True: 858 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 859 if ev['event'] != 'JOB_STATUS_CHANGE': 860 log(ev) 861 continue 862 status = ev['data']['status'] 863 if status == 'aborting': 864 result = self.qmp('query-jobs') 865 for j in result['return']: 866 if j['id'] == job: 867 error = j['error'] 868 log('Job failed: %s' % (j['error'])) 869 elif status == 'ready': 870 self.qmp_log('job-complete', id=job) 871 elif status == 'pending' and not auto_finalize: 872 if pre_finalize: 873 pre_finalize() 874 if cancel: 875 self.qmp_log('job-cancel', id=job) 876 else: 877 self.qmp_log('job-finalize', id=job) 878 elif status == 'concluded' and not auto_dismiss: 879 self.qmp_log('job-dismiss', id=job) 880 elif status == 'null': 881 return error 882 883 # Returns None on success, and an error string on failure 884 def blockdev_create(self, options, job_id='job0', filters=None): 885 if filters is None: 886 filters = [filter_qmp_testfiles] 887 result = self.qmp_log('blockdev-create', filters=filters, 888 job_id=job_id, options=options) 889 890 if 'return' in result: 891 assert result['return'] == {} 892 job_result = self.run_job(job_id) 893 else: 894 job_result = result['error'] 895 896 log("") 897 return job_result 898 899 def enable_migration_events(self, name): 900 log('Enabling migration QMP events on %s...' % name) 901 log(self.qmp('migrate-set-capabilities', capabilities=[ 902 { 903 'capability': 'events', 904 'state': True 905 } 906 ])) 907 908 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 909 while True: 910 event = self.event_wait('MIGRATION') 911 # We use the default timeout, and with a timeout, event_wait() 912 # never returns None 913 assert event 914 915 log(event, filters=[filter_qmp_event]) 916 if event['data']['status'] in ('completed', 'failed'): 917 break 918 919 if event['data']['status'] == 'completed': 920 # The event may occur in finish-migrate, so wait for the expected 921 # post-migration runstate 922 runstate = None 923 while runstate != expect_runstate: 924 runstate = self.qmp('query-status')['return']['status'] 925 return True 926 else: 927 return False 928 929 def node_info(self, node_name): 930 nodes = self.qmp('query-named-block-nodes') 931 for x in nodes['return']: 932 if x['node-name'] == node_name: 933 return x 934 return None 935 936 def query_bitmaps(self): 937 res = self.qmp("query-named-block-nodes") 938 return {device['node-name']: device['dirty-bitmaps'] 939 for device in res['return'] if 'dirty-bitmaps' in device} 940 941 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 942 """ 943 get a specific bitmap from the object returned by query_bitmaps. 944 :param recording: If specified, filter results by the specified value. 945 :param bitmaps: If specified, use it instead of call query_bitmaps() 946 """ 947 if bitmaps is None: 948 bitmaps = self.query_bitmaps() 949 950 for bitmap in bitmaps[node_name]: 951 if bitmap.get('name', '') == bitmap_name: 952 if recording is None or bitmap.get('recording') == recording: 953 return bitmap 954 return None 955 956 def check_bitmap_status(self, node_name, bitmap_name, fields): 957 ret = self.get_bitmap(node_name, bitmap_name) 958 959 return fields.items() <= ret.items() 960 961 def assert_block_path(self, root, path, expected_node, graph=None): 962 """ 963 Check whether the node under the given path in the block graph 964 is @expected_node. 965 966 @root is the node name of the node where the @path is rooted. 967 968 @path is a string that consists of child names separated by 969 slashes. It must begin with a slash. 970 971 Examples for @root + @path: 972 - root="qcow2-node", path="/backing/file" 973 - root="quorum-node", path="/children.2/file" 974 975 Hypothetically, @path could be empty, in which case it would 976 point to @root. However, in practice this case is not useful 977 and hence not allowed. 978 979 @expected_node may be None. (All elements of the path but the 980 leaf must still exist.) 981 982 @graph may be None or the result of an x-debug-query-block-graph 983 call that has already been performed. 984 """ 985 if graph is None: 986 graph = self.qmp('x-debug-query-block-graph')['return'] 987 988 iter_path = iter(path.split('/')) 989 990 # Must start with a / 991 assert next(iter_path) == '' 992 993 node = next((node for node in graph['nodes'] if node['name'] == root), 994 None) 995 996 # An empty @path is not allowed, so the root node must be present 997 assert node is not None, 'Root node %s not found' % root 998 999 for child_name in iter_path: 1000 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1001 1002 try: 1003 node_id = next(edge['child'] for edge in graph['edges'] 1004 if (edge['parent'] == node['id'] and 1005 edge['name'] == child_name)) 1006 1007 node = next(node for node in graph['nodes'] 1008 if node['id'] == node_id) 1009 1010 except StopIteration: 1011 node = None 1012 1013 if node is None: 1014 assert expected_node is None, \ 1015 'No node found under %s (but expected %s)' % \ 1016 (path, expected_node) 1017 else: 1018 assert node['name'] == expected_node, \ 1019 'Found node %s under %s (but expected %s)' % \ 1020 (node['name'], path, expected_node) 1021 1022index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1023 1024class QMPTestCase(unittest.TestCase): 1025 '''Abstract base class for QMP test cases''' 1026 1027 def __init__(self, *args, **kwargs): 1028 super().__init__(*args, **kwargs) 1029 # Many users of this class set a VM property we rely on heavily 1030 # in the methods below. 1031 self.vm = None 1032 1033 def dictpath(self, d, path): 1034 '''Traverse a path in a nested dict''' 1035 for component in path.split('/'): 1036 m = index_re.match(component) 1037 if m: 1038 component, idx = m.groups() 1039 idx = int(idx) 1040 1041 if not isinstance(d, dict) or component not in d: 1042 self.fail(f'failed path traversal for "{path}" in "{d}"') 1043 d = d[component] 1044 1045 if m: 1046 if not isinstance(d, list): 1047 self.fail(f'path component "{component}" in "{path}" ' 1048 f'is not a list in "{d}"') 1049 try: 1050 d = d[idx] 1051 except IndexError: 1052 self.fail(f'invalid index "{idx}" in path "{path}" ' 1053 f'in "{d}"') 1054 return d 1055 1056 def assert_qmp_absent(self, d, path): 1057 try: 1058 result = self.dictpath(d, path) 1059 except AssertionError: 1060 return 1061 self.fail('path "%s" has value "%s"' % (path, str(result))) 1062 1063 def assert_qmp(self, d, path, value): 1064 '''Assert that the value for a specific path in a QMP dict 1065 matches. When given a list of values, assert that any of 1066 them matches.''' 1067 1068 result = self.dictpath(d, path) 1069 1070 # [] makes no sense as a list of valid values, so treat it as 1071 # an actual single value. 1072 if isinstance(value, list) and value != []: 1073 for v in value: 1074 if result == v: 1075 return 1076 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1077 else: 1078 self.assertEqual(result, value, 1079 '"%s" is "%s", expected "%s"' 1080 % (path, str(result), str(value))) 1081 1082 def assert_no_active_block_jobs(self): 1083 result = self.vm.qmp('query-block-jobs') 1084 self.assert_qmp(result, 'return', []) 1085 1086 def assert_has_block_node(self, node_name=None, file_name=None): 1087 """Issue a query-named-block-nodes and assert node_name and/or 1088 file_name is present in the result""" 1089 def check_equal_or_none(a, b): 1090 return a is None or b is None or a == b 1091 assert node_name or file_name 1092 result = self.vm.qmp('query-named-block-nodes') 1093 for x in result["return"]: 1094 if check_equal_or_none(x.get("node-name"), node_name) and \ 1095 check_equal_or_none(x.get("file"), file_name): 1096 return 1097 self.fail("Cannot find %s %s in result:\n%s" % 1098 (node_name, file_name, result)) 1099 1100 def assert_json_filename_equal(self, json_filename, reference): 1101 '''Asserts that the given filename is a json: filename and that its 1102 content is equal to the given reference object''' 1103 self.assertEqual(json_filename[:5], 'json:') 1104 self.assertEqual( 1105 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1106 self.vm.flatten_qmp_object(reference) 1107 ) 1108 1109 def cancel_and_wait(self, drive='drive0', force=False, 1110 resume=False, wait=60.0): 1111 '''Cancel a block job and wait for it to finish, returning the event''' 1112 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1113 self.assert_qmp(result, 'return', {}) 1114 1115 if resume: 1116 self.vm.resume_drive(drive) 1117 1118 cancelled = False 1119 result = None 1120 while not cancelled: 1121 for event in self.vm.get_qmp_events(wait=wait): 1122 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1123 event['event'] == 'BLOCK_JOB_CANCELLED': 1124 self.assert_qmp(event, 'data/device', drive) 1125 result = event 1126 cancelled = True 1127 elif event['event'] == 'JOB_STATUS_CHANGE': 1128 self.assert_qmp(event, 'data/id', drive) 1129 1130 1131 self.assert_no_active_block_jobs() 1132 return result 1133 1134 def wait_until_completed(self, drive='drive0', check_offset=True, 1135 wait=60.0, error=None): 1136 '''Wait for a block job to finish, returning the event''' 1137 while True: 1138 for event in self.vm.get_qmp_events(wait=wait): 1139 if event['event'] == 'BLOCK_JOB_COMPLETED': 1140 self.assert_qmp(event, 'data/device', drive) 1141 if error is None: 1142 self.assert_qmp_absent(event, 'data/error') 1143 if check_offset: 1144 self.assert_qmp(event, 'data/offset', 1145 event['data']['len']) 1146 else: 1147 self.assert_qmp(event, 'data/error', error) 1148 self.assert_no_active_block_jobs() 1149 return event 1150 if event['event'] == 'JOB_STATUS_CHANGE': 1151 self.assert_qmp(event, 'data/id', drive) 1152 1153 def wait_ready(self, drive='drive0'): 1154 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1155 return self.vm.events_wait([ 1156 ('BLOCK_JOB_READY', 1157 {'data': {'type': 'mirror', 'device': drive}}), 1158 ('BLOCK_JOB_READY', 1159 {'data': {'type': 'commit', 'device': drive}}) 1160 ]) 1161 1162 def wait_ready_and_cancel(self, drive='drive0'): 1163 self.wait_ready(drive=drive) 1164 event = self.cancel_and_wait(drive=drive) 1165 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1166 self.assert_qmp(event, 'data/type', 'mirror') 1167 self.assert_qmp(event, 'data/offset', event['data']['len']) 1168 1169 def complete_and_wait(self, drive='drive0', wait_ready=True, 1170 completion_error=None): 1171 '''Complete a block job and wait for it to finish''' 1172 if wait_ready: 1173 self.wait_ready(drive=drive) 1174 1175 result = self.vm.qmp('block-job-complete', device=drive) 1176 self.assert_qmp(result, 'return', {}) 1177 1178 event = self.wait_until_completed(drive=drive, error=completion_error) 1179 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1180 1181 def pause_wait(self, job_id='job0'): 1182 with Timeout(3, "Timeout waiting for job to pause"): 1183 while True: 1184 result = self.vm.qmp('query-block-jobs') 1185 found = False 1186 for job in result['return']: 1187 if job['device'] == job_id: 1188 found = True 1189 if job['paused'] and not job['busy']: 1190 return job 1191 break 1192 assert found 1193 1194 def pause_job(self, job_id='job0', wait=True): 1195 result = self.vm.qmp('block-job-pause', device=job_id) 1196 self.assert_qmp(result, 'return', {}) 1197 if wait: 1198 return self.pause_wait(job_id) 1199 return result 1200 1201 def case_skip(self, reason): 1202 '''Skip this test case''' 1203 case_notrun(reason) 1204 self.skipTest(reason) 1205 1206 1207def notrun(reason): 1208 '''Skip this test suite''' 1209 # Each test in qemu-iotests has a number ("seq") 1210 seq = os.path.basename(sys.argv[0]) 1211 1212 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1213 as outfile: 1214 outfile.write(reason + '\n') 1215 logger.warning("%s not run: %s", seq, reason) 1216 sys.exit(0) 1217 1218def case_notrun(reason): 1219 '''Mark this test case as not having been run (without actually 1220 skipping it, that is left to the caller). See 1221 QMPTestCase.case_skip() for a variant that actually skips the 1222 current test case.''' 1223 1224 # Each test in qemu-iotests has a number ("seq") 1225 seq = os.path.basename(sys.argv[0]) 1226 1227 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1228 as outfile: 1229 outfile.write(' [case not run] ' + reason + '\n') 1230 1231def _verify_image_format(supported_fmts: Sequence[str] = (), 1232 unsupported_fmts: Sequence[str] = ()) -> None: 1233 if 'generic' in supported_fmts and \ 1234 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1235 # similar to 1236 # _supported_fmt generic 1237 # for bash tests 1238 supported_fmts = () 1239 1240 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1241 if not_sup or (imgfmt in unsupported_fmts): 1242 notrun('not suitable for this image format: %s' % imgfmt) 1243 1244 if imgfmt == 'luks': 1245 verify_working_luks() 1246 1247def _verify_protocol(supported: Sequence[str] = (), 1248 unsupported: Sequence[str] = ()) -> None: 1249 assert not (supported and unsupported) 1250 1251 if 'generic' in supported: 1252 return 1253 1254 not_sup = supported and (imgproto not in supported) 1255 if not_sup or (imgproto in unsupported): 1256 notrun('not suitable for this protocol: %s' % imgproto) 1257 1258def _verify_platform(supported: Sequence[str] = (), 1259 unsupported: Sequence[str] = ()) -> None: 1260 if any((sys.platform.startswith(x) for x in unsupported)): 1261 notrun('not suitable for this OS: %s' % sys.platform) 1262 1263 if supported: 1264 if not any((sys.platform.startswith(x) for x in supported)): 1265 notrun('not suitable for this OS: %s' % sys.platform) 1266 1267def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1268 if supported_cache_modes and (cachemode not in supported_cache_modes): 1269 notrun('not suitable for this cache mode: %s' % cachemode) 1270 1271def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1272 if supported_aio_modes and (aiomode not in supported_aio_modes): 1273 notrun('not suitable for this aio mode: %s' % aiomode) 1274 1275def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1276 usf_list = list(set(required_formats) - set(supported_formats())) 1277 if usf_list: 1278 notrun(f'formats {usf_list} are not whitelisted') 1279 1280 1281def _verify_virtio_blk() -> None: 1282 out = qemu_pipe('-M', 'none', '-device', 'help') 1283 if 'virtio-blk' not in out: 1284 notrun('Missing virtio-blk in QEMU binary') 1285 1286def _verify_virtio_scsi_pci_or_ccw() -> None: 1287 out = qemu_pipe('-M', 'none', '-device', 'help') 1288 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1289 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1290 1291 1292def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1293 imgopts = os.environ.get('IMGOPTS') 1294 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1295 # but it supported only for bash tests. We don't have a concept of global 1296 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1297 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1298 unsup = list(unsupported) + ['$'] 1299 if imgopts and any(x in imgopts for x in unsup): 1300 notrun(f'not suitable for this imgopts: {imgopts}') 1301 1302 1303def supports_quorum(): 1304 return 'quorum' in qemu_img_pipe('--help') 1305 1306def verify_quorum(): 1307 '''Skip test suite if quorum support is not available''' 1308 if not supports_quorum(): 1309 notrun('quorum support missing') 1310 1311def has_working_luks() -> Tuple[bool, str]: 1312 """ 1313 Check whether our LUKS driver can actually create images 1314 (this extends to LUKS encryption for qcow2). 1315 1316 If not, return the reason why. 1317 """ 1318 1319 img_file = f'{test_dir}/luks-test.luks' 1320 (output, status) = \ 1321 qemu_img_pipe_and_status('create', '-f', 'luks', 1322 '--object', luks_default_secret_object, 1323 '-o', luks_default_key_secret_opt, 1324 '-o', 'iter-time=10', 1325 img_file, '1G') 1326 try: 1327 os.remove(img_file) 1328 except OSError: 1329 pass 1330 1331 if status != 0: 1332 reason = output 1333 for line in output.splitlines(): 1334 if img_file + ':' in line: 1335 reason = line.split(img_file + ':', 1)[1].strip() 1336 break 1337 1338 return (False, reason) 1339 else: 1340 return (True, '') 1341 1342def verify_working_luks(): 1343 """ 1344 Skip test suite if LUKS does not work 1345 """ 1346 (working, reason) = has_working_luks() 1347 if not working: 1348 notrun(reason) 1349 1350def qemu_pipe(*args: str) -> str: 1351 """ 1352 Run qemu with an option to print something and exit (e.g. a help option). 1353 1354 :return: QEMU's stdout output. 1355 """ 1356 full_args = [qemu_prog] + qemu_opts + list(args) 1357 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1358 return output 1359 1360def supported_formats(read_only=False): 1361 '''Set 'read_only' to True to check ro-whitelist 1362 Otherwise, rw-whitelist is checked''' 1363 1364 if not hasattr(supported_formats, "formats"): 1365 supported_formats.formats = {} 1366 1367 if read_only not in supported_formats.formats: 1368 format_message = qemu_pipe("-drive", "format=help") 1369 line = 1 if read_only else 0 1370 supported_formats.formats[read_only] = \ 1371 format_message.splitlines()[line].split(":")[1].split() 1372 1373 return supported_formats.formats[read_only] 1374 1375def skip_if_unsupported(required_formats=(), read_only=False): 1376 '''Skip Test Decorator 1377 Runs the test if all the required formats are whitelisted''' 1378 def skip_test_decorator(func): 1379 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1380 **kwargs: Dict[str, Any]) -> None: 1381 if callable(required_formats): 1382 fmts = required_formats(test_case) 1383 else: 1384 fmts = required_formats 1385 1386 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1387 if usf_list: 1388 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1389 test_case.case_skip(msg) 1390 else: 1391 func(test_case, *args, **kwargs) 1392 return func_wrapper 1393 return skip_test_decorator 1394 1395def skip_for_formats(formats: Sequence[str] = ()) \ 1396 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1397 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1398 '''Skip Test Decorator 1399 Skips the test for the given formats''' 1400 def skip_test_decorator(func): 1401 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1402 **kwargs: Dict[str, Any]) -> None: 1403 if imgfmt in formats: 1404 msg = f'{test_case}: Skipped for format {imgfmt}' 1405 test_case.case_skip(msg) 1406 else: 1407 func(test_case, *args, **kwargs) 1408 return func_wrapper 1409 return skip_test_decorator 1410 1411def skip_if_user_is_root(func): 1412 '''Skip Test Decorator 1413 Runs the test only without root permissions''' 1414 def func_wrapper(*args, **kwargs): 1415 if os.getuid() == 0: 1416 case_notrun('{}: cannot be run as root'.format(args[0])) 1417 return None 1418 else: 1419 return func(*args, **kwargs) 1420 return func_wrapper 1421 1422# We need to filter out the time taken from the output so that 1423# qemu-iotest can reliably diff the results against master output, 1424# and hide skipped tests from the reference output. 1425 1426class ReproducibleTestResult(unittest.TextTestResult): 1427 def addSkip(self, test, reason): 1428 # Same as TextTestResult, but print dot instead of "s" 1429 unittest.TestResult.addSkip(self, test, reason) 1430 if self.showAll: 1431 self.stream.writeln("skipped {0!r}".format(reason)) 1432 elif self.dots: 1433 self.stream.write(".") 1434 self.stream.flush() 1435 1436class ReproducibleStreamWrapper: 1437 def __init__(self, stream: TextIO): 1438 self.stream = stream 1439 1440 def __getattr__(self, attr): 1441 if attr in ('stream', '__getstate__'): 1442 raise AttributeError(attr) 1443 return getattr(self.stream, attr) 1444 1445 def write(self, arg=None): 1446 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1447 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1448 self.stream.write(arg) 1449 1450class ReproducibleTestRunner(unittest.TextTestRunner): 1451 def __init__(self, stream: Optional[TextIO] = None, 1452 resultclass: Type[unittest.TestResult] = 1453 ReproducibleTestResult, 1454 **kwargs: Any) -> None: 1455 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1456 super().__init__(stream=rstream, # type: ignore 1457 descriptions=True, 1458 resultclass=resultclass, 1459 **kwargs) 1460 1461def execute_unittest(argv: List[str], debug: bool = False) -> None: 1462 """Executes unittests within the calling module.""" 1463 1464 # Some tests have warnings, especially ResourceWarnings for unclosed 1465 # files and sockets. Ignore them for now to ensure reproducibility of 1466 # the test output. 1467 unittest.main(argv=argv, 1468 testRunner=ReproducibleTestRunner, 1469 verbosity=2 if debug else 1, 1470 warnings=None if sys.warnoptions else 'ignore') 1471 1472def execute_setup_common(supported_fmts: Sequence[str] = (), 1473 supported_platforms: Sequence[str] = (), 1474 supported_cache_modes: Sequence[str] = (), 1475 supported_aio_modes: Sequence[str] = (), 1476 unsupported_fmts: Sequence[str] = (), 1477 supported_protocols: Sequence[str] = (), 1478 unsupported_protocols: Sequence[str] = (), 1479 required_fmts: Sequence[str] = (), 1480 unsupported_imgopts: Sequence[str] = ()) -> bool: 1481 """ 1482 Perform necessary setup for either script-style or unittest-style tests. 1483 1484 :return: Bool; Whether or not debug mode has been requested via the CLI. 1485 """ 1486 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1487 1488 debug = '-d' in sys.argv 1489 if debug: 1490 sys.argv.remove('-d') 1491 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1492 1493 _verify_image_format(supported_fmts, unsupported_fmts) 1494 _verify_protocol(supported_protocols, unsupported_protocols) 1495 _verify_platform(supported=supported_platforms) 1496 _verify_cache_mode(supported_cache_modes) 1497 _verify_aio_mode(supported_aio_modes) 1498 _verify_formats(required_fmts) 1499 _verify_virtio_blk() 1500 _verify_imgopts(unsupported_imgopts) 1501 1502 return debug 1503 1504def execute_test(*args, test_function=None, **kwargs): 1505 """Run either unittest or script-style tests.""" 1506 1507 debug = execute_setup_common(*args, **kwargs) 1508 if not test_function: 1509 execute_unittest(sys.argv, debug) 1510 else: 1511 test_function() 1512 1513def activate_logging(): 1514 """Activate iotests.log() output to stdout for script-style tests.""" 1515 handler = logging.StreamHandler(stream=sys.stdout) 1516 formatter = logging.Formatter('%(message)s') 1517 handler.setFormatter(formatter) 1518 test_logger.addHandler(handler) 1519 test_logger.setLevel(logging.INFO) 1520 test_logger.propagate = False 1521 1522# This is called from script-style iotests without a single point of entry 1523def script_initialize(*args, **kwargs): 1524 """Initialize script-style tests without running any tests.""" 1525 activate_logging() 1526 execute_setup_common(*args, **kwargs) 1527 1528# This is called from script-style iotests with a single point of entry 1529def script_main(test_function, *args, **kwargs): 1530 """Run script-style tests outside of the unittest framework""" 1531 activate_logging() 1532 execute_test(*args, test_function=test_function, **kwargs) 1533 1534# This is called from unittest style iotests 1535def main(*args, **kwargs): 1536 """Run tests using the unittest framework""" 1537 execute_test(*args, **kwargs) 1538