1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.aqmp.legacy import QEMUMonitorProtocol 41from qemu.machine import qtest 42from qemu.qmp import QMPMessage 43from qemu.utils import VerboseProcessError 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 71qemu_nbd_args = [qemu_nbd_prog] 72if os.environ.get('QEMU_NBD_OPTIONS'): 73 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 74 75qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 76qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 77 78qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 79 80gdb_qemu_env = os.environ.get('GDB_OPTIONS') 81qemu_gdb = [] 82if gdb_qemu_env: 83 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 84 85qemu_print = os.environ.get('PRINT_QEMU', False) 86 87imgfmt = os.environ.get('IMGFMT', 'raw') 88imgproto = os.environ.get('IMGPROTO', 'file') 89 90try: 91 test_dir = os.environ['TEST_DIR'] 92 sock_dir = os.environ['SOCK_DIR'] 93 cachemode = os.environ['CACHEMODE'] 94 aiomode = os.environ['AIOMODE'] 95 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 96except KeyError: 97 # We are using these variables as proxies to indicate that we're 98 # not being run via "check". There may be other things set up by 99 # "check" that individual test cases rely on. 100 sys.stderr.write('Please run this test via the "check" script\n') 101 sys.exit(os.EX_USAGE) 102 103qemu_valgrind = [] 104if os.environ.get('VALGRIND_QEMU') == "y" and \ 105 os.environ.get('NO_VALGRIND') != "y": 106 valgrind_logfile = "--log-file=" + test_dir 107 # %p allows to put the valgrind process PID, since 108 # we don't know it a priori (subprocess.Popen is 109 # not yet invoked) 110 valgrind_logfile += "/%p.valgrind" 111 112 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 113 114luks_default_secret_object = 'secret,id=keysec0,data=' + \ 115 os.environ.get('IMGKEYSECRET', '') 116luks_default_key_secret_opt = 'key-secret=keysec0' 117 118sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 119 120 121@contextmanager 122def change_log_level( 123 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 124 """ 125 Utility function for temporarily changing the log level of a logger. 126 127 This can be used to silence errors that are expected or uninteresting. 128 """ 129 _logger = logging.getLogger(logger_name) 130 current_level = _logger.level 131 _logger.setLevel(level) 132 133 try: 134 yield 135 finally: 136 _logger.setLevel(current_level) 137 138 139def unarchive_sample_image(sample, fname): 140 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 141 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 142 shutil.copyfileobj(f_in, f_out) 143 144 145def qemu_tool_popen(args: Sequence[str], 146 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 147 stderr = subprocess.STDOUT if connect_stderr else None 148 # pylint: disable=consider-using-with 149 return subprocess.Popen(args, 150 stdout=subprocess.PIPE, 151 stderr=stderr, 152 universal_newlines=True) 153 154 155def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 156 connect_stderr: bool = True, 157 drop_successful_output: bool = False) \ 158 -> Tuple[str, int]: 159 """ 160 Run a tool and return both its output and its exit code 161 """ 162 with qemu_tool_popen(args, connect_stderr) as subp: 163 output = subp.communicate()[0] 164 if subp.returncode < 0: 165 cmd = ' '.join(args) 166 sys.stderr.write(f'{tool} received signal \ 167 {-subp.returncode}: {cmd}\n') 168 if drop_successful_output and subp.returncode == 0: 169 output = '' 170 return (output, subp.returncode) 171 172def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 173 if not args or args[0] != 'create': 174 return list(args) 175 args = args[1:] 176 177 p = argparse.ArgumentParser(allow_abbrev=False) 178 # -o option may be specified several times 179 p.add_argument('-o', action='append', default=[]) 180 p.add_argument('-f') 181 parsed, remaining = p.parse_known_args(args) 182 183 opts_list = parsed.o 184 185 result = ['create'] 186 if parsed.f is not None: 187 result += ['-f', parsed.f] 188 189 # IMGOPTS most probably contain options specific for the selected format, 190 # like extended_l2 or compression_type for qcow2. Test may want to create 191 # additional images in other formats that doesn't support these options. 192 # So, use IMGOPTS only for images created in imgfmt format. 193 imgopts = os.environ.get('IMGOPTS') 194 if imgopts and parsed.f == imgfmt: 195 opts_list.insert(0, imgopts) 196 197 # default luks support 198 if parsed.f == 'luks' and \ 199 all('key-secret' not in opts for opts in opts_list): 200 result += ['--object', luks_default_secret_object] 201 opts_list.append(luks_default_key_secret_opt) 202 203 for opts in opts_list: 204 result += ['-o', opts] 205 206 result += remaining 207 208 return result 209 210def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run qemu_img and return the status code and console output. 214 215 This function always prepends QEMU_IMG_OPTIONS and may further alter 216 the args for 'create' commands. 217 218 :param args: command-line arguments to qemu-img. 219 :param check: Enforce a return code of zero. 220 :param combine_stdio: set to False to keep stdout/stderr separated. 221 222 :raise VerboseProcessError: 223 When the return code is negative, or on any non-zero exit code 224 when 'check=True' was provided (the default). This exception has 225 'stdout', 'stderr', and 'returncode' properties that may be 226 inspected to show greater detail. If this exception is not 227 handled, the command-line, return code, and all console output 228 will be included at the bottom of the stack trace. 229 230 :return: 231 a CompletedProcess. This object has args, returncode, and stdout 232 properties. If streams are not combined, it will also have a 233 stderr property. 234 """ 235 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 236 237 subp = subprocess.run( 238 full_args, 239 stdout=subprocess.PIPE, 240 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 241 universal_newlines=True, 242 check=False 243 ) 244 245 if check and subp.returncode or (subp.returncode < 0): 246 raise VerboseProcessError( 247 subp.returncode, full_args, 248 output=subp.stdout, 249 stderr=subp.stderr, 250 ) 251 252 return subp 253 254 255def ordered_qmp(qmsg, conv_keys=True): 256 # Dictionaries are not ordered prior to 3.6, therefore: 257 if isinstance(qmsg, list): 258 return [ordered_qmp(atom) for atom in qmsg] 259 if isinstance(qmsg, dict): 260 od = OrderedDict() 261 for k, v in sorted(qmsg.items()): 262 if conv_keys: 263 k = k.replace('_', '-') 264 od[k] = ordered_qmp(v, conv_keys=False) 265 return od 266 return qmsg 267 268def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 269 return qemu_img('create', *args) 270 271def qemu_img_json(*args: str) -> Any: 272 """ 273 Run qemu-img and return its output as deserialized JSON. 274 275 :raise CalledProcessError: 276 When qemu-img crashes, or returns a non-zero exit code without 277 producing a valid JSON document to stdout. 278 :raise JSONDecoderError: 279 When qemu-img returns 0, but failed to produce a valid JSON document. 280 281 :return: A deserialized JSON object; probably a dict[str, Any]. 282 """ 283 try: 284 res = qemu_img(*args, combine_stdio=False) 285 except subprocess.CalledProcessError as exc: 286 # Terminated due to signal. Don't bother. 287 if exc.returncode < 0: 288 raise 289 290 # Commands like 'check' can return failure (exit codes 2 and 3) 291 # to indicate command completion, but with errors found. For 292 # multi-command flexibility, ignore the exact error codes and 293 # *try* to load JSON. 294 try: 295 return json.loads(exc.stdout) 296 except json.JSONDecodeError: 297 # Nope. This thing is toast. Raise the /process/ error. 298 pass 299 raise 300 301 return json.loads(res.stdout) 302 303def qemu_img_measure(*args: str) -> Any: 304 return qemu_img_json("measure", "--output", "json", *args) 305 306def qemu_img_check(*args: str) -> Any: 307 return qemu_img_json("check", "--output", "json", *args) 308 309def qemu_img_info(*args: str) -> Any: 310 return qemu_img_json('info', "--output", "json", *args) 311 312def qemu_img_map(*args: str) -> Any: 313 return qemu_img_json('map', "--output", "json", *args) 314 315def qemu_img_log(*args: str, check: bool = True 316 ) -> 'subprocess.CompletedProcess[str]': 317 result = qemu_img(*args, check=check) 318 log(result.stdout, filters=[filter_testfiles]) 319 return result 320 321def img_info_log(filename: str, filter_path: Optional[str] = None, 322 use_image_opts: bool = False, extra_args: Sequence[str] = (), 323 check: bool = True, 324 ) -> None: 325 args = ['info'] 326 if use_image_opts: 327 args.append('--image-opts') 328 else: 329 args += ['-f', imgfmt] 330 args += extra_args 331 args.append(filename) 332 333 output = qemu_img(*args, check=check).stdout 334 if not filter_path: 335 filter_path = filename 336 log(filter_img_info(output, filter_path)) 337 338def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 339 if '-f' in args or '--image-opts' in args: 340 return qemu_io_args_no_fmt + list(args) 341 else: 342 return qemu_io_args + list(args) 343 344def qemu_io_popen(*args): 345 return qemu_tool_popen(qemu_io_wrap_args(args)) 346 347def qemu_io(*args): 348 '''Run qemu-io and return the stdout data''' 349 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 350 351def qemu_io_pipe_and_status(*args): 352 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args)) 353 354def qemu_io_log(*args): 355 result = qemu_io(*args) 356 log(result, filters=[filter_testfiles, filter_qemu_io]) 357 return result 358 359def qemu_io_silent(*args): 360 '''Run qemu-io and return the exit code, suppressing stdout''' 361 args = qemu_io_wrap_args(args) 362 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 363 if result.returncode < 0: 364 sys.stderr.write('qemu-io received signal %i: %s\n' % 365 (-result.returncode, ' '.join(args))) 366 return result.returncode 367 368def qemu_io_silent_check(*args): 369 '''Run qemu-io and return the true if subprocess returned 0''' 370 args = qemu_io_wrap_args(args) 371 result = subprocess.run(args, stdout=subprocess.DEVNULL, 372 stderr=subprocess.STDOUT, check=False) 373 return result.returncode == 0 374 375class QemuIoInteractive: 376 def __init__(self, *args): 377 self.args = qemu_io_wrap_args(args) 378 # We need to keep the Popen objext around, and not 379 # close it immediately. Therefore, disable the pylint check: 380 # pylint: disable=consider-using-with 381 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 382 stdout=subprocess.PIPE, 383 stderr=subprocess.STDOUT, 384 universal_newlines=True) 385 out = self._p.stdout.read(9) 386 if out != 'qemu-io> ': 387 # Most probably qemu-io just failed to start. 388 # Let's collect the whole output and exit. 389 out += self._p.stdout.read() 390 self._p.wait(timeout=1) 391 raise ValueError(out) 392 393 def close(self): 394 self._p.communicate('q\n') 395 396 def _read_output(self): 397 pattern = 'qemu-io> ' 398 n = len(pattern) 399 pos = 0 400 s = [] 401 while pos != n: 402 c = self._p.stdout.read(1) 403 # check unexpected EOF 404 assert c != '' 405 s.append(c) 406 if c == pattern[pos]: 407 pos += 1 408 else: 409 pos = 0 410 411 return ''.join(s[:-n]) 412 413 def cmd(self, cmd): 414 # quit command is in close(), '\n' is added automatically 415 assert '\n' not in cmd 416 cmd = cmd.strip() 417 assert cmd not in ('q', 'quit') 418 self._p.stdin.write(cmd + '\n') 419 self._p.stdin.flush() 420 return self._read_output() 421 422 423class QemuStorageDaemon: 424 _qmp: Optional[QEMUMonitorProtocol] = None 425 _qmpsock: Optional[str] = None 426 # Python < 3.8 would complain if this type were not a string literal 427 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 428 _p: 'Optional[subprocess.Popen[bytes]]' = None 429 430 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 431 assert '--pidfile' not in args 432 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 433 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 434 435 if qmp: 436 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 437 all_args += ['--chardev', 438 f'socket,id=qmp-sock,path={self._qmpsock}', 439 '--monitor', 'qmp-sock'] 440 441 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 442 443 # Cannot use with here, we want the subprocess to stay around 444 # pylint: disable=consider-using-with 445 self._p = subprocess.Popen(all_args) 446 if self._qmp is not None: 447 self._qmp.accept() 448 while not os.path.exists(self.pidfile): 449 if self._p.poll() is not None: 450 cmd = ' '.join(all_args) 451 raise RuntimeError( 452 'qemu-storage-daemon terminated with exit code ' + 453 f'{self._p.returncode}: {cmd}') 454 455 time.sleep(0.01) 456 457 with open(self.pidfile, encoding='utf-8') as f: 458 self._pid = int(f.read().strip()) 459 460 assert self._pid == self._p.pid 461 462 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 463 -> QMPMessage: 464 assert self._qmp is not None 465 return self._qmp.cmd(cmd, args) 466 467 def stop(self, kill_signal=15): 468 self._p.send_signal(kill_signal) 469 self._p.wait() 470 self._p = None 471 472 if self._qmp: 473 self._qmp.close() 474 475 if self._qmpsock is not None: 476 try: 477 os.remove(self._qmpsock) 478 except OSError: 479 pass 480 try: 481 os.remove(self.pidfile) 482 except OSError: 483 pass 484 485 def __del__(self): 486 if self._p is not None: 487 self.stop(kill_signal=9) 488 489 490def qemu_nbd(*args): 491 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 492 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 493 494def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 495 '''Run qemu-nbd in daemon mode and return both the parent's exit code 496 and its output in case of an error''' 497 full_args = qemu_nbd_args + ['--fork'] + list(args) 498 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 499 connect_stderr=False) 500 return returncode, output if returncode else '' 501 502def qemu_nbd_list_log(*args: str) -> str: 503 '''Run qemu-nbd to list remote exports''' 504 full_args = [qemu_nbd_prog, '-L'] + list(args) 505 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 506 log(output, filters=[filter_testfiles, filter_nbd_exports]) 507 return output 508 509@contextmanager 510def qemu_nbd_popen(*args): 511 '''Context manager running qemu-nbd within the context''' 512 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 513 514 assert not os.path.exists(pid_file) 515 516 cmd = list(qemu_nbd_args) 517 cmd.extend(('--persistent', '--pid-file', pid_file)) 518 cmd.extend(args) 519 520 log('Start NBD server') 521 with subprocess.Popen(cmd) as p: 522 try: 523 while not os.path.exists(pid_file): 524 if p.poll() is not None: 525 raise RuntimeError( 526 "qemu-nbd terminated with exit code {}: {}" 527 .format(p.returncode, ' '.join(cmd))) 528 529 time.sleep(0.01) 530 yield 531 finally: 532 if os.path.exists(pid_file): 533 os.remove(pid_file) 534 log('Kill NBD server') 535 p.kill() 536 p.wait() 537 538def compare_images(img1: str, img2: str, 539 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 540 """ 541 Compare two images with QEMU_IMG; return True if they are identical. 542 543 :raise CalledProcessError: 544 when qemu-img crashes or returns a status code of anything other 545 than 0 (identical) or 1 (different). 546 """ 547 try: 548 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 549 return True 550 except subprocess.CalledProcessError as exc: 551 if exc.returncode == 1: 552 return False 553 raise 554 555def create_image(name, size): 556 '''Create a fully-allocated raw image with sector markers''' 557 with open(name, 'wb') as file: 558 i = 0 559 while i < size: 560 sector = struct.pack('>l504xl', i // 512, i // 512) 561 file.write(sector) 562 i = i + 512 563 564def image_size(img: str) -> int: 565 """Return image's virtual size""" 566 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 567 if not isinstance(value, int): 568 type_name = type(value).__name__ 569 raise TypeError("Expected 'int' for 'virtual-size', " 570 f"got '{value}' of type '{type_name}'") 571 return value 572 573def is_str(val): 574 return isinstance(val, str) 575 576test_dir_re = re.compile(r"%s" % test_dir) 577def filter_test_dir(msg): 578 return test_dir_re.sub("TEST_DIR", msg) 579 580win32_re = re.compile(r"\r") 581def filter_win32(msg): 582 return win32_re.sub("", msg) 583 584qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 585 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 586 r"and [0-9\/.inf]* ops\/sec\)") 587def filter_qemu_io(msg): 588 msg = filter_win32(msg) 589 return qemu_io_re.sub("X ops; XX:XX:XX.X " 590 "(XXX YYY/sec and XXX ops/sec)", msg) 591 592chown_re = re.compile(r"chown [0-9]+:[0-9]+") 593def filter_chown(msg): 594 return chown_re.sub("chown UID:GID", msg) 595 596def filter_qmp_event(event): 597 '''Filter a QMP event dict''' 598 event = dict(event) 599 if 'timestamp' in event: 600 event['timestamp']['seconds'] = 'SECS' 601 event['timestamp']['microseconds'] = 'USECS' 602 return event 603 604def filter_qmp(qmsg, filter_fn): 605 '''Given a string filter, filter a QMP object's values. 606 filter_fn takes a (key, value) pair.''' 607 # Iterate through either lists or dicts; 608 if isinstance(qmsg, list): 609 items = enumerate(qmsg) 610 elif isinstance(qmsg, dict): 611 items = qmsg.items() 612 else: 613 return filter_fn(None, qmsg) 614 615 for k, v in items: 616 if isinstance(v, (dict, list)): 617 qmsg[k] = filter_qmp(v, filter_fn) 618 else: 619 qmsg[k] = filter_fn(k, v) 620 return qmsg 621 622def filter_testfiles(msg): 623 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 624 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 625 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 626 627def filter_qmp_testfiles(qmsg): 628 def _filter(_key, value): 629 if is_str(value): 630 return filter_testfiles(value) 631 return value 632 return filter_qmp(qmsg, _filter) 633 634def filter_virtio_scsi(output: str) -> str: 635 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 636 637def filter_qmp_virtio_scsi(qmsg): 638 def _filter(_key, value): 639 if is_str(value): 640 return filter_virtio_scsi(value) 641 return value 642 return filter_qmp(qmsg, _filter) 643 644def filter_generated_node_ids(msg): 645 return re.sub("#block[0-9]+", "NODE_NAME", msg) 646 647def filter_img_info(output, filename): 648 lines = [] 649 for line in output.split('\n'): 650 if 'disk size' in line or 'actual-size' in line: 651 continue 652 line = line.replace(filename, 'TEST_IMG') 653 line = filter_testfiles(line) 654 line = line.replace(imgfmt, 'IMGFMT') 655 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 656 line = re.sub('uuid: [-a-f0-9]+', 657 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 658 line) 659 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 660 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 661 line) 662 lines.append(line) 663 return '\n'.join(lines) 664 665def filter_imgfmt(msg): 666 return msg.replace(imgfmt, 'IMGFMT') 667 668def filter_qmp_imgfmt(qmsg): 669 def _filter(_key, value): 670 if is_str(value): 671 return filter_imgfmt(value) 672 return value 673 return filter_qmp(qmsg, _filter) 674 675def filter_nbd_exports(output: str) -> str: 676 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 677 678 679Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 680 681def log(msg: Msg, 682 filters: Iterable[Callable[[Msg], Msg]] = (), 683 indent: Optional[int] = None) -> None: 684 """ 685 Logs either a string message or a JSON serializable message (like QMP). 686 If indent is provided, JSON serializable messages are pretty-printed. 687 """ 688 for flt in filters: 689 msg = flt(msg) 690 if isinstance(msg, (dict, list)): 691 # Don't sort if it's already sorted 692 do_sort = not isinstance(msg, OrderedDict) 693 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 694 else: 695 test_logger.info(msg) 696 697class Timeout: 698 def __init__(self, seconds, errmsg="Timeout"): 699 self.seconds = seconds 700 self.errmsg = errmsg 701 def __enter__(self): 702 if qemu_gdb or qemu_valgrind: 703 return self 704 signal.signal(signal.SIGALRM, self.timeout) 705 signal.setitimer(signal.ITIMER_REAL, self.seconds) 706 return self 707 def __exit__(self, exc_type, value, traceback): 708 if qemu_gdb or qemu_valgrind: 709 return False 710 signal.setitimer(signal.ITIMER_REAL, 0) 711 return False 712 def timeout(self, signum, frame): 713 raise Exception(self.errmsg) 714 715def file_pattern(name): 716 return "{0}-{1}".format(os.getpid(), name) 717 718class FilePath: 719 """ 720 Context manager generating multiple file names. The generated files are 721 removed when exiting the context. 722 723 Example usage: 724 725 with FilePath('a.img', 'b.img') as (img_a, img_b): 726 # Use img_a and img_b here... 727 728 # a.img and b.img are automatically removed here. 729 730 By default images are created in iotests.test_dir. To create sockets use 731 iotests.sock_dir: 732 733 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 734 735 For convenience, calling with one argument yields a single file instead of 736 a tuple with one item. 737 738 """ 739 def __init__(self, *names, base_dir=test_dir): 740 self.paths = [os.path.join(base_dir, file_pattern(name)) 741 for name in names] 742 743 def __enter__(self): 744 if len(self.paths) == 1: 745 return self.paths[0] 746 else: 747 return self.paths 748 749 def __exit__(self, exc_type, exc_val, exc_tb): 750 for path in self.paths: 751 try: 752 os.remove(path) 753 except OSError: 754 pass 755 return False 756 757 758def try_remove(img): 759 try: 760 os.remove(img) 761 except OSError: 762 pass 763 764def file_path_remover(): 765 for path in reversed(file_path_remover.paths): 766 try_remove(path) 767 768 769def file_path(*names, base_dir=test_dir): 770 ''' Another way to get auto-generated filename that cleans itself up. 771 772 Use is as simple as: 773 774 img_a, img_b = file_path('a.img', 'b.img') 775 sock = file_path('socket') 776 ''' 777 778 if not hasattr(file_path_remover, 'paths'): 779 file_path_remover.paths = [] 780 atexit.register(file_path_remover) 781 782 paths = [] 783 for name in names: 784 filename = file_pattern(name) 785 path = os.path.join(base_dir, filename) 786 file_path_remover.paths.append(path) 787 paths.append(path) 788 789 return paths[0] if len(paths) == 1 else paths 790 791def remote_filename(path): 792 if imgproto == 'file': 793 return path 794 elif imgproto == 'ssh': 795 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 796 else: 797 raise Exception("Protocol %s not supported" % (imgproto)) 798 799class VM(qtest.QEMUQtestMachine): 800 '''A QEMU VM''' 801 802 def __init__(self, path_suffix=''): 803 name = "qemu%s-%d" % (path_suffix, os.getpid()) 804 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 805 if qemu_gdb and qemu_valgrind: 806 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 807 sys.exit(1) 808 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 809 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 810 name=name, 811 base_temp_dir=test_dir, 812 sock_dir=sock_dir, qmp_timer=timer) 813 self._num_drives = 0 814 815 def _post_shutdown(self) -> None: 816 super()._post_shutdown() 817 if not qemu_valgrind or not self._popen: 818 return 819 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 820 if self.exitcode() == 99: 821 with open(valgrind_filename, encoding='utf-8') as f: 822 print(f.read()) 823 else: 824 os.remove(valgrind_filename) 825 826 def _pre_launch(self) -> None: 827 super()._pre_launch() 828 if qemu_print: 829 # set QEMU binary output to stdout 830 self._close_qemu_log_file() 831 832 def add_object(self, opts): 833 self._args.append('-object') 834 self._args.append(opts) 835 return self 836 837 def add_device(self, opts): 838 self._args.append('-device') 839 self._args.append(opts) 840 return self 841 842 def add_drive_raw(self, opts): 843 self._args.append('-drive') 844 self._args.append(opts) 845 return self 846 847 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 848 '''Add a virtio-blk drive to the VM''' 849 options = ['if=%s' % interface, 850 'id=drive%d' % self._num_drives] 851 852 if path is not None: 853 options.append('file=%s' % path) 854 options.append('format=%s' % img_format) 855 options.append('cache=%s' % cachemode) 856 options.append('aio=%s' % aiomode) 857 858 if opts: 859 options.append(opts) 860 861 if img_format == 'luks' and 'key-secret' not in opts: 862 # default luks support 863 if luks_default_secret_object not in self._args: 864 self.add_object(luks_default_secret_object) 865 866 options.append(luks_default_key_secret_opt) 867 868 self._args.append('-drive') 869 self._args.append(','.join(options)) 870 self._num_drives += 1 871 return self 872 873 def add_blockdev(self, opts): 874 self._args.append('-blockdev') 875 if isinstance(opts, str): 876 self._args.append(opts) 877 else: 878 self._args.append(','.join(opts)) 879 return self 880 881 def add_incoming(self, addr): 882 self._args.append('-incoming') 883 self._args.append(addr) 884 return self 885 886 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 887 cmd = 'human-monitor-command' 888 kwargs: Dict[str, Any] = {'command-line': command_line} 889 if use_log: 890 return self.qmp_log(cmd, **kwargs) 891 else: 892 return self.qmp(cmd, **kwargs) 893 894 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 895 """Pause drive r/w operations""" 896 if not event: 897 self.pause_drive(drive, "read_aio") 898 self.pause_drive(drive, "write_aio") 899 return 900 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 901 902 def resume_drive(self, drive: str) -> None: 903 """Resume drive r/w operations""" 904 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 905 906 def hmp_qemu_io(self, drive: str, cmd: str, 907 use_log: bool = False, qdev: bool = False) -> QMPMessage: 908 """Write to a given drive using an HMP command""" 909 d = '-d ' if qdev else '' 910 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 911 912 def flatten_qmp_object(self, obj, output=None, basestr=''): 913 if output is None: 914 output = {} 915 if isinstance(obj, list): 916 for i, item in enumerate(obj): 917 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 918 elif isinstance(obj, dict): 919 for key in obj: 920 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 921 else: 922 output[basestr[:-1]] = obj # Strip trailing '.' 923 return output 924 925 def qmp_to_opts(self, obj): 926 obj = self.flatten_qmp_object(obj) 927 output_list = [] 928 for key in obj: 929 output_list += [key + '=' + obj[key]] 930 return ','.join(output_list) 931 932 def get_qmp_events_filtered(self, wait=60.0): 933 result = [] 934 for ev in self.get_qmp_events(wait=wait): 935 result.append(filter_qmp_event(ev)) 936 return result 937 938 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 939 full_cmd = OrderedDict(( 940 ("execute", cmd), 941 ("arguments", ordered_qmp(kwargs)) 942 )) 943 log(full_cmd, filters, indent=indent) 944 result = self.qmp(cmd, **kwargs) 945 log(result, filters, indent=indent) 946 return result 947 948 # Returns None on success, and an error string on failure 949 def run_job(self, job: str, auto_finalize: bool = True, 950 auto_dismiss: bool = False, 951 pre_finalize: Optional[Callable[[], None]] = None, 952 cancel: bool = False, wait: float = 60.0, 953 filters: Iterable[Callable[[Any], Any]] = (), 954 ) -> Optional[str]: 955 """ 956 run_job moves a job from creation through to dismissal. 957 958 :param job: String. ID of recently-launched job 959 :param auto_finalize: Bool. True if the job was launched with 960 auto_finalize. Defaults to True. 961 :param auto_dismiss: Bool. True if the job was launched with 962 auto_dismiss=True. Defaults to False. 963 :param pre_finalize: Callback. A callable that takes no arguments to be 964 invoked prior to issuing job-finalize, if any. 965 :param cancel: Bool. When true, cancels the job after the pre_finalize 966 callback. 967 :param wait: Float. Timeout value specifying how long to wait for any 968 event, in seconds. Defaults to 60.0. 969 """ 970 match_device = {'data': {'device': job}} 971 match_id = {'data': {'id': job}} 972 events = [ 973 ('BLOCK_JOB_COMPLETED', match_device), 974 ('BLOCK_JOB_CANCELLED', match_device), 975 ('BLOCK_JOB_ERROR', match_device), 976 ('BLOCK_JOB_READY', match_device), 977 ('BLOCK_JOB_PENDING', match_id), 978 ('JOB_STATUS_CHANGE', match_id) 979 ] 980 error = None 981 while True: 982 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 983 if ev['event'] != 'JOB_STATUS_CHANGE': 984 log(ev, filters=filters) 985 continue 986 status = ev['data']['status'] 987 if status == 'aborting': 988 result = self.qmp('query-jobs') 989 for j in result['return']: 990 if j['id'] == job: 991 error = j['error'] 992 log('Job failed: %s' % (j['error']), filters=filters) 993 elif status == 'ready': 994 self.qmp_log('job-complete', id=job, filters=filters) 995 elif status == 'pending' and not auto_finalize: 996 if pre_finalize: 997 pre_finalize() 998 if cancel: 999 self.qmp_log('job-cancel', id=job, filters=filters) 1000 else: 1001 self.qmp_log('job-finalize', id=job, filters=filters) 1002 elif status == 'concluded' and not auto_dismiss: 1003 self.qmp_log('job-dismiss', id=job, filters=filters) 1004 elif status == 'null': 1005 return error 1006 1007 # Returns None on success, and an error string on failure 1008 def blockdev_create(self, options, job_id='job0', filters=None): 1009 if filters is None: 1010 filters = [filter_qmp_testfiles] 1011 result = self.qmp_log('blockdev-create', filters=filters, 1012 job_id=job_id, options=options) 1013 1014 if 'return' in result: 1015 assert result['return'] == {} 1016 job_result = self.run_job(job_id, filters=filters) 1017 else: 1018 job_result = result['error'] 1019 1020 log("") 1021 return job_result 1022 1023 def enable_migration_events(self, name): 1024 log('Enabling migration QMP events on %s...' % name) 1025 log(self.qmp('migrate-set-capabilities', capabilities=[ 1026 { 1027 'capability': 'events', 1028 'state': True 1029 } 1030 ])) 1031 1032 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1033 while True: 1034 event = self.event_wait('MIGRATION') 1035 # We use the default timeout, and with a timeout, event_wait() 1036 # never returns None 1037 assert event 1038 1039 log(event, filters=[filter_qmp_event]) 1040 if event['data']['status'] in ('completed', 'failed'): 1041 break 1042 1043 if event['data']['status'] == 'completed': 1044 # The event may occur in finish-migrate, so wait for the expected 1045 # post-migration runstate 1046 runstate = None 1047 while runstate != expect_runstate: 1048 runstate = self.qmp('query-status')['return']['status'] 1049 return True 1050 else: 1051 return False 1052 1053 def node_info(self, node_name): 1054 nodes = self.qmp('query-named-block-nodes') 1055 for x in nodes['return']: 1056 if x['node-name'] == node_name: 1057 return x 1058 return None 1059 1060 def query_bitmaps(self): 1061 res = self.qmp("query-named-block-nodes") 1062 return {device['node-name']: device['dirty-bitmaps'] 1063 for device in res['return'] if 'dirty-bitmaps' in device} 1064 1065 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1066 """ 1067 get a specific bitmap from the object returned by query_bitmaps. 1068 :param recording: If specified, filter results by the specified value. 1069 :param bitmaps: If specified, use it instead of call query_bitmaps() 1070 """ 1071 if bitmaps is None: 1072 bitmaps = self.query_bitmaps() 1073 1074 for bitmap in bitmaps[node_name]: 1075 if bitmap.get('name', '') == bitmap_name: 1076 if recording is None or bitmap.get('recording') == recording: 1077 return bitmap 1078 return None 1079 1080 def check_bitmap_status(self, node_name, bitmap_name, fields): 1081 ret = self.get_bitmap(node_name, bitmap_name) 1082 1083 return fields.items() <= ret.items() 1084 1085 def assert_block_path(self, root, path, expected_node, graph=None): 1086 """ 1087 Check whether the node under the given path in the block graph 1088 is @expected_node. 1089 1090 @root is the node name of the node where the @path is rooted. 1091 1092 @path is a string that consists of child names separated by 1093 slashes. It must begin with a slash. 1094 1095 Examples for @root + @path: 1096 - root="qcow2-node", path="/backing/file" 1097 - root="quorum-node", path="/children.2/file" 1098 1099 Hypothetically, @path could be empty, in which case it would 1100 point to @root. However, in practice this case is not useful 1101 and hence not allowed. 1102 1103 @expected_node may be None. (All elements of the path but the 1104 leaf must still exist.) 1105 1106 @graph may be None or the result of an x-debug-query-block-graph 1107 call that has already been performed. 1108 """ 1109 if graph is None: 1110 graph = self.qmp('x-debug-query-block-graph')['return'] 1111 1112 iter_path = iter(path.split('/')) 1113 1114 # Must start with a / 1115 assert next(iter_path) == '' 1116 1117 node = next((node for node in graph['nodes'] if node['name'] == root), 1118 None) 1119 1120 # An empty @path is not allowed, so the root node must be present 1121 assert node is not None, 'Root node %s not found' % root 1122 1123 for child_name in iter_path: 1124 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1125 1126 try: 1127 node_id = next(edge['child'] for edge in graph['edges'] 1128 if (edge['parent'] == node['id'] and 1129 edge['name'] == child_name)) 1130 1131 node = next(node for node in graph['nodes'] 1132 if node['id'] == node_id) 1133 1134 except StopIteration: 1135 node = None 1136 1137 if node is None: 1138 assert expected_node is None, \ 1139 'No node found under %s (but expected %s)' % \ 1140 (path, expected_node) 1141 else: 1142 assert node['name'] == expected_node, \ 1143 'Found node %s under %s (but expected %s)' % \ 1144 (node['name'], path, expected_node) 1145 1146index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1147 1148class QMPTestCase(unittest.TestCase): 1149 '''Abstract base class for QMP test cases''' 1150 1151 def __init__(self, *args, **kwargs): 1152 super().__init__(*args, **kwargs) 1153 # Many users of this class set a VM property we rely on heavily 1154 # in the methods below. 1155 self.vm = None 1156 1157 def dictpath(self, d, path): 1158 '''Traverse a path in a nested dict''' 1159 for component in path.split('/'): 1160 m = index_re.match(component) 1161 if m: 1162 component, idx = m.groups() 1163 idx = int(idx) 1164 1165 if not isinstance(d, dict) or component not in d: 1166 self.fail(f'failed path traversal for "{path}" in "{d}"') 1167 d = d[component] 1168 1169 if m: 1170 if not isinstance(d, list): 1171 self.fail(f'path component "{component}" in "{path}" ' 1172 f'is not a list in "{d}"') 1173 try: 1174 d = d[idx] 1175 except IndexError: 1176 self.fail(f'invalid index "{idx}" in path "{path}" ' 1177 f'in "{d}"') 1178 return d 1179 1180 def assert_qmp_absent(self, d, path): 1181 try: 1182 result = self.dictpath(d, path) 1183 except AssertionError: 1184 return 1185 self.fail('path "%s" has value "%s"' % (path, str(result))) 1186 1187 def assert_qmp(self, d, path, value): 1188 '''Assert that the value for a specific path in a QMP dict 1189 matches. When given a list of values, assert that any of 1190 them matches.''' 1191 1192 result = self.dictpath(d, path) 1193 1194 # [] makes no sense as a list of valid values, so treat it as 1195 # an actual single value. 1196 if isinstance(value, list) and value != []: 1197 for v in value: 1198 if result == v: 1199 return 1200 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1201 else: 1202 self.assertEqual(result, value, 1203 '"%s" is "%s", expected "%s"' 1204 % (path, str(result), str(value))) 1205 1206 def assert_no_active_block_jobs(self): 1207 result = self.vm.qmp('query-block-jobs') 1208 self.assert_qmp(result, 'return', []) 1209 1210 def assert_has_block_node(self, node_name=None, file_name=None): 1211 """Issue a query-named-block-nodes and assert node_name and/or 1212 file_name is present in the result""" 1213 def check_equal_or_none(a, b): 1214 return a is None or b is None or a == b 1215 assert node_name or file_name 1216 result = self.vm.qmp('query-named-block-nodes') 1217 for x in result["return"]: 1218 if check_equal_or_none(x.get("node-name"), node_name) and \ 1219 check_equal_or_none(x.get("file"), file_name): 1220 return 1221 self.fail("Cannot find %s %s in result:\n%s" % 1222 (node_name, file_name, result)) 1223 1224 def assert_json_filename_equal(self, json_filename, reference): 1225 '''Asserts that the given filename is a json: filename and that its 1226 content is equal to the given reference object''' 1227 self.assertEqual(json_filename[:5], 'json:') 1228 self.assertEqual( 1229 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1230 self.vm.flatten_qmp_object(reference) 1231 ) 1232 1233 def cancel_and_wait(self, drive='drive0', force=False, 1234 resume=False, wait=60.0): 1235 '''Cancel a block job and wait for it to finish, returning the event''' 1236 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1237 self.assert_qmp(result, 'return', {}) 1238 1239 if resume: 1240 self.vm.resume_drive(drive) 1241 1242 cancelled = False 1243 result = None 1244 while not cancelled: 1245 for event in self.vm.get_qmp_events(wait=wait): 1246 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1247 event['event'] == 'BLOCK_JOB_CANCELLED': 1248 self.assert_qmp(event, 'data/device', drive) 1249 result = event 1250 cancelled = True 1251 elif event['event'] == 'JOB_STATUS_CHANGE': 1252 self.assert_qmp(event, 'data/id', drive) 1253 1254 1255 self.assert_no_active_block_jobs() 1256 return result 1257 1258 def wait_until_completed(self, drive='drive0', check_offset=True, 1259 wait=60.0, error=None): 1260 '''Wait for a block job to finish, returning the event''' 1261 while True: 1262 for event in self.vm.get_qmp_events(wait=wait): 1263 if event['event'] == 'BLOCK_JOB_COMPLETED': 1264 self.assert_qmp(event, 'data/device', drive) 1265 if error is None: 1266 self.assert_qmp_absent(event, 'data/error') 1267 if check_offset: 1268 self.assert_qmp(event, 'data/offset', 1269 event['data']['len']) 1270 else: 1271 self.assert_qmp(event, 'data/error', error) 1272 self.assert_no_active_block_jobs() 1273 return event 1274 if event['event'] == 'JOB_STATUS_CHANGE': 1275 self.assert_qmp(event, 'data/id', drive) 1276 1277 def wait_ready(self, drive='drive0'): 1278 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1279 return self.vm.events_wait([ 1280 ('BLOCK_JOB_READY', 1281 {'data': {'type': 'mirror', 'device': drive}}), 1282 ('BLOCK_JOB_READY', 1283 {'data': {'type': 'commit', 'device': drive}}) 1284 ]) 1285 1286 def wait_ready_and_cancel(self, drive='drive0'): 1287 self.wait_ready(drive=drive) 1288 event = self.cancel_and_wait(drive=drive) 1289 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1290 self.assert_qmp(event, 'data/type', 'mirror') 1291 self.assert_qmp(event, 'data/offset', event['data']['len']) 1292 1293 def complete_and_wait(self, drive='drive0', wait_ready=True, 1294 completion_error=None): 1295 '''Complete a block job and wait for it to finish''' 1296 if wait_ready: 1297 self.wait_ready(drive=drive) 1298 1299 result = self.vm.qmp('block-job-complete', device=drive) 1300 self.assert_qmp(result, 'return', {}) 1301 1302 event = self.wait_until_completed(drive=drive, error=completion_error) 1303 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1304 1305 def pause_wait(self, job_id='job0'): 1306 with Timeout(3, "Timeout waiting for job to pause"): 1307 while True: 1308 result = self.vm.qmp('query-block-jobs') 1309 found = False 1310 for job in result['return']: 1311 if job['device'] == job_id: 1312 found = True 1313 if job['paused'] and not job['busy']: 1314 return job 1315 break 1316 assert found 1317 1318 def pause_job(self, job_id='job0', wait=True): 1319 result = self.vm.qmp('block-job-pause', device=job_id) 1320 self.assert_qmp(result, 'return', {}) 1321 if wait: 1322 return self.pause_wait(job_id) 1323 return result 1324 1325 def case_skip(self, reason): 1326 '''Skip this test case''' 1327 case_notrun(reason) 1328 self.skipTest(reason) 1329 1330 1331def notrun(reason): 1332 '''Skip this test suite''' 1333 # Each test in qemu-iotests has a number ("seq") 1334 seq = os.path.basename(sys.argv[0]) 1335 1336 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1337 as outfile: 1338 outfile.write(reason + '\n') 1339 logger.warning("%s not run: %s", seq, reason) 1340 sys.exit(0) 1341 1342def case_notrun(reason): 1343 '''Mark this test case as not having been run (without actually 1344 skipping it, that is left to the caller). See 1345 QMPTestCase.case_skip() for a variant that actually skips the 1346 current test case.''' 1347 1348 # Each test in qemu-iotests has a number ("seq") 1349 seq = os.path.basename(sys.argv[0]) 1350 1351 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1352 as outfile: 1353 outfile.write(' [case not run] ' + reason + '\n') 1354 1355def _verify_image_format(supported_fmts: Sequence[str] = (), 1356 unsupported_fmts: Sequence[str] = ()) -> None: 1357 if 'generic' in supported_fmts and \ 1358 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1359 # similar to 1360 # _supported_fmt generic 1361 # for bash tests 1362 supported_fmts = () 1363 1364 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1365 if not_sup or (imgfmt in unsupported_fmts): 1366 notrun('not suitable for this image format: %s' % imgfmt) 1367 1368 if imgfmt == 'luks': 1369 verify_working_luks() 1370 1371def _verify_protocol(supported: Sequence[str] = (), 1372 unsupported: Sequence[str] = ()) -> None: 1373 assert not (supported and unsupported) 1374 1375 if 'generic' in supported: 1376 return 1377 1378 not_sup = supported and (imgproto not in supported) 1379 if not_sup or (imgproto in unsupported): 1380 notrun('not suitable for this protocol: %s' % imgproto) 1381 1382def _verify_platform(supported: Sequence[str] = (), 1383 unsupported: Sequence[str] = ()) -> None: 1384 if any((sys.platform.startswith(x) for x in unsupported)): 1385 notrun('not suitable for this OS: %s' % sys.platform) 1386 1387 if supported: 1388 if not any((sys.platform.startswith(x) for x in supported)): 1389 notrun('not suitable for this OS: %s' % sys.platform) 1390 1391def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1392 if supported_cache_modes and (cachemode not in supported_cache_modes): 1393 notrun('not suitable for this cache mode: %s' % cachemode) 1394 1395def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1396 if supported_aio_modes and (aiomode not in supported_aio_modes): 1397 notrun('not suitable for this aio mode: %s' % aiomode) 1398 1399def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1400 usf_list = list(set(required_formats) - set(supported_formats())) 1401 if usf_list: 1402 notrun(f'formats {usf_list} are not whitelisted') 1403 1404 1405def _verify_virtio_blk() -> None: 1406 out = qemu_pipe('-M', 'none', '-device', 'help') 1407 if 'virtio-blk' not in out: 1408 notrun('Missing virtio-blk in QEMU binary') 1409 1410def _verify_virtio_scsi_pci_or_ccw() -> None: 1411 out = qemu_pipe('-M', 'none', '-device', 'help') 1412 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1413 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1414 1415 1416def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1417 imgopts = os.environ.get('IMGOPTS') 1418 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1419 # but it supported only for bash tests. We don't have a concept of global 1420 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1421 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1422 unsup = list(unsupported) + ['$'] 1423 if imgopts and any(x in imgopts for x in unsup): 1424 notrun(f'not suitable for this imgopts: {imgopts}') 1425 1426 1427def supports_quorum() -> bool: 1428 return 'quorum' in qemu_img('--help').stdout 1429 1430def verify_quorum(): 1431 '''Skip test suite if quorum support is not available''' 1432 if not supports_quorum(): 1433 notrun('quorum support missing') 1434 1435def has_working_luks() -> Tuple[bool, str]: 1436 """ 1437 Check whether our LUKS driver can actually create images 1438 (this extends to LUKS encryption for qcow2). 1439 1440 If not, return the reason why. 1441 """ 1442 1443 img_file = f'{test_dir}/luks-test.luks' 1444 res = qemu_img('create', '-f', 'luks', 1445 '--object', luks_default_secret_object, 1446 '-o', luks_default_key_secret_opt, 1447 '-o', 'iter-time=10', 1448 img_file, '1G', 1449 check=False) 1450 try: 1451 os.remove(img_file) 1452 except OSError: 1453 pass 1454 1455 if res.returncode: 1456 reason = res.stdout 1457 for line in res.stdout.splitlines(): 1458 if img_file + ':' in line: 1459 reason = line.split(img_file + ':', 1)[1].strip() 1460 break 1461 1462 return (False, reason) 1463 else: 1464 return (True, '') 1465 1466def verify_working_luks(): 1467 """ 1468 Skip test suite if LUKS does not work 1469 """ 1470 (working, reason) = has_working_luks() 1471 if not working: 1472 notrun(reason) 1473 1474def qemu_pipe(*args: str) -> str: 1475 """ 1476 Run qemu with an option to print something and exit (e.g. a help option). 1477 1478 :return: QEMU's stdout output. 1479 """ 1480 full_args = [qemu_prog] + qemu_opts + list(args) 1481 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1482 return output 1483 1484def supported_formats(read_only=False): 1485 '''Set 'read_only' to True to check ro-whitelist 1486 Otherwise, rw-whitelist is checked''' 1487 1488 if not hasattr(supported_formats, "formats"): 1489 supported_formats.formats = {} 1490 1491 if read_only not in supported_formats.formats: 1492 format_message = qemu_pipe("-drive", "format=help") 1493 line = 1 if read_only else 0 1494 supported_formats.formats[read_only] = \ 1495 format_message.splitlines()[line].split(":")[1].split() 1496 1497 return supported_formats.formats[read_only] 1498 1499def skip_if_unsupported(required_formats=(), read_only=False): 1500 '''Skip Test Decorator 1501 Runs the test if all the required formats are whitelisted''' 1502 def skip_test_decorator(func): 1503 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1504 **kwargs: Dict[str, Any]) -> None: 1505 if callable(required_formats): 1506 fmts = required_formats(test_case) 1507 else: 1508 fmts = required_formats 1509 1510 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1511 if usf_list: 1512 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1513 test_case.case_skip(msg) 1514 else: 1515 func(test_case, *args, **kwargs) 1516 return func_wrapper 1517 return skip_test_decorator 1518 1519def skip_for_formats(formats: Sequence[str] = ()) \ 1520 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1521 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1522 '''Skip Test Decorator 1523 Skips the test for the given formats''' 1524 def skip_test_decorator(func): 1525 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1526 **kwargs: Dict[str, Any]) -> None: 1527 if imgfmt in formats: 1528 msg = f'{test_case}: Skipped for format {imgfmt}' 1529 test_case.case_skip(msg) 1530 else: 1531 func(test_case, *args, **kwargs) 1532 return func_wrapper 1533 return skip_test_decorator 1534 1535def skip_if_user_is_root(func): 1536 '''Skip Test Decorator 1537 Runs the test only without root permissions''' 1538 def func_wrapper(*args, **kwargs): 1539 if os.getuid() == 0: 1540 case_notrun('{}: cannot be run as root'.format(args[0])) 1541 return None 1542 else: 1543 return func(*args, **kwargs) 1544 return func_wrapper 1545 1546# We need to filter out the time taken from the output so that 1547# qemu-iotest can reliably diff the results against master output, 1548# and hide skipped tests from the reference output. 1549 1550class ReproducibleTestResult(unittest.TextTestResult): 1551 def addSkip(self, test, reason): 1552 # Same as TextTestResult, but print dot instead of "s" 1553 unittest.TestResult.addSkip(self, test, reason) 1554 if self.showAll: 1555 self.stream.writeln("skipped {0!r}".format(reason)) 1556 elif self.dots: 1557 self.stream.write(".") 1558 self.stream.flush() 1559 1560class ReproducibleStreamWrapper: 1561 def __init__(self, stream: TextIO): 1562 self.stream = stream 1563 1564 def __getattr__(self, attr): 1565 if attr in ('stream', '__getstate__'): 1566 raise AttributeError(attr) 1567 return getattr(self.stream, attr) 1568 1569 def write(self, arg=None): 1570 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1571 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1572 self.stream.write(arg) 1573 1574class ReproducibleTestRunner(unittest.TextTestRunner): 1575 def __init__(self, stream: Optional[TextIO] = None, 1576 resultclass: Type[unittest.TestResult] = 1577 ReproducibleTestResult, 1578 **kwargs: Any) -> None: 1579 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1580 super().__init__(stream=rstream, # type: ignore 1581 descriptions=True, 1582 resultclass=resultclass, 1583 **kwargs) 1584 1585def execute_unittest(argv: List[str], debug: bool = False) -> None: 1586 """Executes unittests within the calling module.""" 1587 1588 # Some tests have warnings, especially ResourceWarnings for unclosed 1589 # files and sockets. Ignore them for now to ensure reproducibility of 1590 # the test output. 1591 unittest.main(argv=argv, 1592 testRunner=ReproducibleTestRunner, 1593 verbosity=2 if debug else 1, 1594 warnings=None if sys.warnoptions else 'ignore') 1595 1596def execute_setup_common(supported_fmts: Sequence[str] = (), 1597 supported_platforms: Sequence[str] = (), 1598 supported_cache_modes: Sequence[str] = (), 1599 supported_aio_modes: Sequence[str] = (), 1600 unsupported_fmts: Sequence[str] = (), 1601 supported_protocols: Sequence[str] = (), 1602 unsupported_protocols: Sequence[str] = (), 1603 required_fmts: Sequence[str] = (), 1604 unsupported_imgopts: Sequence[str] = ()) -> bool: 1605 """ 1606 Perform necessary setup for either script-style or unittest-style tests. 1607 1608 :return: Bool; Whether or not debug mode has been requested via the CLI. 1609 """ 1610 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1611 1612 debug = '-d' in sys.argv 1613 if debug: 1614 sys.argv.remove('-d') 1615 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1616 1617 _verify_image_format(supported_fmts, unsupported_fmts) 1618 _verify_protocol(supported_protocols, unsupported_protocols) 1619 _verify_platform(supported=supported_platforms) 1620 _verify_cache_mode(supported_cache_modes) 1621 _verify_aio_mode(supported_aio_modes) 1622 _verify_formats(required_fmts) 1623 _verify_virtio_blk() 1624 _verify_imgopts(unsupported_imgopts) 1625 1626 return debug 1627 1628def execute_test(*args, test_function=None, **kwargs): 1629 """Run either unittest or script-style tests.""" 1630 1631 debug = execute_setup_common(*args, **kwargs) 1632 if not test_function: 1633 execute_unittest(sys.argv, debug) 1634 else: 1635 test_function() 1636 1637def activate_logging(): 1638 """Activate iotests.log() output to stdout for script-style tests.""" 1639 handler = logging.StreamHandler(stream=sys.stdout) 1640 formatter = logging.Formatter('%(message)s') 1641 handler.setFormatter(formatter) 1642 test_logger.addHandler(handler) 1643 test_logger.setLevel(logging.INFO) 1644 test_logger.propagate = False 1645 1646# This is called from script-style iotests without a single point of entry 1647def script_initialize(*args, **kwargs): 1648 """Initialize script-style tests without running any tests.""" 1649 activate_logging() 1650 execute_setup_common(*args, **kwargs) 1651 1652# This is called from script-style iotests with a single point of entry 1653def script_main(test_function, *args, **kwargs): 1654 """Run script-style tests outside of the unittest framework""" 1655 activate_logging() 1656 execute_test(*args, test_function=test_function, **kwargs) 1657 1658# This is called from unittest style iotests 1659def main(*args, **kwargs): 1660 """Run tests using the unittest framework""" 1661 execute_test(*args, **kwargs) 1662