1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd(cmd, args) 464 465 def get_qmp(self) -> QEMUMonitorProtocol: 466 assert self._qmp is not None 467 return self._qmp 468 469 def stop(self, kill_signal=15): 470 self._p.send_signal(kill_signal) 471 self._p.wait() 472 self._p = None 473 474 if self._qmp: 475 self._qmp.close() 476 477 if self._qmpsock is not None: 478 try: 479 os.remove(self._qmpsock) 480 except OSError: 481 pass 482 try: 483 os.remove(self.pidfile) 484 except OSError: 485 pass 486 487 def __del__(self): 488 if self._p is not None: 489 self.stop(kill_signal=9) 490 491 492def qemu_nbd(*args): 493 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 494 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 495 496def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 497 '''Run qemu-nbd in daemon mode and return both the parent's exit code 498 and its output in case of an error''' 499 full_args = qemu_nbd_args + ['--fork'] + list(args) 500 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 501 connect_stderr=False) 502 return returncode, output if returncode else '' 503 504def qemu_nbd_list_log(*args: str) -> str: 505 '''Run qemu-nbd to list remote exports''' 506 full_args = [qemu_nbd_prog, '-L'] + list(args) 507 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 508 log(output, filters=[filter_testfiles, filter_nbd_exports]) 509 return output 510 511@contextmanager 512def qemu_nbd_popen(*args): 513 '''Context manager running qemu-nbd within the context''' 514 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 515 516 assert not os.path.exists(pid_file) 517 518 cmd = list(qemu_nbd_args) 519 cmd.extend(('--persistent', '--pid-file', pid_file)) 520 cmd.extend(args) 521 522 log('Start NBD server') 523 with subprocess.Popen(cmd) as p: 524 try: 525 while not os.path.exists(pid_file): 526 if p.poll() is not None: 527 raise RuntimeError( 528 "qemu-nbd terminated with exit code {}: {}" 529 .format(p.returncode, ' '.join(cmd))) 530 531 time.sleep(0.01) 532 yield 533 finally: 534 if os.path.exists(pid_file): 535 os.remove(pid_file) 536 log('Kill NBD server') 537 p.kill() 538 p.wait() 539 540def compare_images(img1: str, img2: str, 541 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 542 """ 543 Compare two images with QEMU_IMG; return True if they are identical. 544 545 :raise CalledProcessError: 546 when qemu-img crashes or returns a status code of anything other 547 than 0 (identical) or 1 (different). 548 """ 549 try: 550 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 551 return True 552 except subprocess.CalledProcessError as exc: 553 if exc.returncode == 1: 554 return False 555 raise 556 557def create_image(name, size): 558 '''Create a fully-allocated raw image with sector markers''' 559 with open(name, 'wb') as file: 560 i = 0 561 while i < size: 562 sector = struct.pack('>l504xl', i // 512, i // 512) 563 file.write(sector) 564 i = i + 512 565 566def image_size(img: str) -> int: 567 """Return image's virtual size""" 568 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 569 if not isinstance(value, int): 570 type_name = type(value).__name__ 571 raise TypeError("Expected 'int' for 'virtual-size', " 572 f"got '{value}' of type '{type_name}'") 573 return value 574 575def is_str(val): 576 return isinstance(val, str) 577 578test_dir_re = re.compile(r"%s" % test_dir) 579def filter_test_dir(msg): 580 return test_dir_re.sub("TEST_DIR", msg) 581 582win32_re = re.compile(r"\r") 583def filter_win32(msg): 584 return win32_re.sub("", msg) 585 586qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 587 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 588 r"and [0-9\/.inf]* ops\/sec\)") 589def filter_qemu_io(msg): 590 msg = filter_win32(msg) 591 return qemu_io_re.sub("X ops; XX:XX:XX.X " 592 "(XXX YYY/sec and XXX ops/sec)", msg) 593 594chown_re = re.compile(r"chown [0-9]+:[0-9]+") 595def filter_chown(msg): 596 return chown_re.sub("chown UID:GID", msg) 597 598def filter_qmp_event(event): 599 '''Filter a QMP event dict''' 600 event = dict(event) 601 if 'timestamp' in event: 602 event['timestamp']['seconds'] = 'SECS' 603 event['timestamp']['microseconds'] = 'USECS' 604 return event 605 606def filter_qmp(qmsg, filter_fn): 607 '''Given a string filter, filter a QMP object's values. 608 filter_fn takes a (key, value) pair.''' 609 # Iterate through either lists or dicts; 610 if isinstance(qmsg, list): 611 items = enumerate(qmsg) 612 elif isinstance(qmsg, dict): 613 items = qmsg.items() 614 else: 615 return filter_fn(None, qmsg) 616 617 for k, v in items: 618 if isinstance(v, (dict, list)): 619 qmsg[k] = filter_qmp(v, filter_fn) 620 else: 621 qmsg[k] = filter_fn(k, v) 622 return qmsg 623 624def filter_testfiles(msg): 625 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 626 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 627 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 628 629def filter_qmp_testfiles(qmsg): 630 def _filter(_key, value): 631 if is_str(value): 632 return filter_testfiles(value) 633 return value 634 return filter_qmp(qmsg, _filter) 635 636def filter_virtio_scsi(output: str) -> str: 637 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 638 639def filter_qmp_virtio_scsi(qmsg): 640 def _filter(_key, value): 641 if is_str(value): 642 return filter_virtio_scsi(value) 643 return value 644 return filter_qmp(qmsg, _filter) 645 646def filter_generated_node_ids(msg): 647 return re.sub("#block[0-9]+", "NODE_NAME", msg) 648 649def filter_img_info(output: str, filename: str, 650 drop_child_info: bool = True) -> str: 651 lines = [] 652 drop_indented = False 653 for line in output.split('\n'): 654 if 'disk size' in line or 'actual-size' in line: 655 continue 656 657 # Drop child node info 658 if drop_indented: 659 if line.startswith(' '): 660 continue 661 drop_indented = False 662 if drop_child_info and "Child node '/" in line: 663 drop_indented = True 664 continue 665 666 line = line.replace(filename, 'TEST_IMG') 667 line = filter_testfiles(line) 668 line = line.replace(imgfmt, 'IMGFMT') 669 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 670 line = re.sub('uuid: [-a-f0-9]+', 671 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 672 line) 673 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 674 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 675 line) 676 lines.append(line) 677 return '\n'.join(lines) 678 679def filter_imgfmt(msg): 680 return msg.replace(imgfmt, 'IMGFMT') 681 682def filter_qmp_imgfmt(qmsg): 683 def _filter(_key, value): 684 if is_str(value): 685 return filter_imgfmt(value) 686 return value 687 return filter_qmp(qmsg, _filter) 688 689def filter_nbd_exports(output: str) -> str: 690 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 691 692 693Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 694 695def log(msg: Msg, 696 filters: Iterable[Callable[[Msg], Msg]] = (), 697 indent: Optional[int] = None) -> None: 698 """ 699 Logs either a string message or a JSON serializable message (like QMP). 700 If indent is provided, JSON serializable messages are pretty-printed. 701 """ 702 for flt in filters: 703 msg = flt(msg) 704 if isinstance(msg, (dict, list)): 705 # Don't sort if it's already sorted 706 do_sort = not isinstance(msg, OrderedDict) 707 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 708 else: 709 test_logger.info(msg) 710 711class Timeout: 712 def __init__(self, seconds, errmsg="Timeout"): 713 self.seconds = seconds 714 self.errmsg = errmsg 715 def __enter__(self): 716 if qemu_gdb or qemu_valgrind: 717 return self 718 signal.signal(signal.SIGALRM, self.timeout) 719 signal.setitimer(signal.ITIMER_REAL, self.seconds) 720 return self 721 def __exit__(self, exc_type, value, traceback): 722 if qemu_gdb or qemu_valgrind: 723 return False 724 signal.setitimer(signal.ITIMER_REAL, 0) 725 return False 726 def timeout(self, signum, frame): 727 raise TimeoutError(self.errmsg) 728 729def file_pattern(name): 730 return "{0}-{1}".format(os.getpid(), name) 731 732class FilePath: 733 """ 734 Context manager generating multiple file names. The generated files are 735 removed when exiting the context. 736 737 Example usage: 738 739 with FilePath('a.img', 'b.img') as (img_a, img_b): 740 # Use img_a and img_b here... 741 742 # a.img and b.img are automatically removed here. 743 744 By default images are created in iotests.test_dir. To create sockets use 745 iotests.sock_dir: 746 747 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 748 749 For convenience, calling with one argument yields a single file instead of 750 a tuple with one item. 751 752 """ 753 def __init__(self, *names, base_dir=test_dir): 754 self.paths = [os.path.join(base_dir, file_pattern(name)) 755 for name in names] 756 757 def __enter__(self): 758 if len(self.paths) == 1: 759 return self.paths[0] 760 else: 761 return self.paths 762 763 def __exit__(self, exc_type, exc_val, exc_tb): 764 for path in self.paths: 765 try: 766 os.remove(path) 767 except OSError: 768 pass 769 return False 770 771 772def try_remove(img): 773 try: 774 os.remove(img) 775 except OSError: 776 pass 777 778def file_path_remover(): 779 for path in reversed(file_path_remover.paths): 780 try_remove(path) 781 782 783def file_path(*names, base_dir=test_dir): 784 ''' Another way to get auto-generated filename that cleans itself up. 785 786 Use is as simple as: 787 788 img_a, img_b = file_path('a.img', 'b.img') 789 sock = file_path('socket') 790 ''' 791 792 if not hasattr(file_path_remover, 'paths'): 793 file_path_remover.paths = [] 794 atexit.register(file_path_remover) 795 796 paths = [] 797 for name in names: 798 filename = file_pattern(name) 799 path = os.path.join(base_dir, filename) 800 file_path_remover.paths.append(path) 801 paths.append(path) 802 803 return paths[0] if len(paths) == 1 else paths 804 805def remote_filename(path): 806 if imgproto == 'file': 807 return path 808 elif imgproto == 'ssh': 809 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 810 else: 811 raise ValueError("Protocol %s not supported" % (imgproto)) 812 813class VM(qtest.QEMUQtestMachine): 814 '''A QEMU VM''' 815 816 def __init__(self, path_suffix=''): 817 name = "qemu%s-%d" % (path_suffix, os.getpid()) 818 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 819 if qemu_gdb and qemu_valgrind: 820 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 821 sys.exit(1) 822 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 823 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 824 name=name, 825 base_temp_dir=test_dir, 826 sock_dir=sock_dir, qmp_timer=timer) 827 self._num_drives = 0 828 829 def _post_shutdown(self) -> None: 830 super()._post_shutdown() 831 if not qemu_valgrind or not self._popen: 832 return 833 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 834 if self.exitcode() == 99: 835 with open(valgrind_filename, encoding='utf-8') as f: 836 print(f.read()) 837 else: 838 os.remove(valgrind_filename) 839 840 def _pre_launch(self) -> None: 841 super()._pre_launch() 842 if qemu_print: 843 # set QEMU binary output to stdout 844 self._close_qemu_log_file() 845 846 def add_object(self, opts): 847 self._args.append('-object') 848 self._args.append(opts) 849 return self 850 851 def add_device(self, opts): 852 self._args.append('-device') 853 self._args.append(opts) 854 return self 855 856 def add_drive_raw(self, opts): 857 self._args.append('-drive') 858 self._args.append(opts) 859 return self 860 861 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 862 '''Add a virtio-blk drive to the VM''' 863 options = ['if=%s' % interface, 864 'id=drive%d' % self._num_drives] 865 866 if path is not None: 867 options.append('file=%s' % path) 868 options.append('format=%s' % img_format) 869 options.append('cache=%s' % cachemode) 870 options.append('aio=%s' % aiomode) 871 872 if opts: 873 options.append(opts) 874 875 if img_format == 'luks' and 'key-secret' not in opts: 876 # default luks support 877 if luks_default_secret_object not in self._args: 878 self.add_object(luks_default_secret_object) 879 880 options.append(luks_default_key_secret_opt) 881 882 self._args.append('-drive') 883 self._args.append(','.join(options)) 884 self._num_drives += 1 885 return self 886 887 def add_blockdev(self, opts): 888 self._args.append('-blockdev') 889 if isinstance(opts, str): 890 self._args.append(opts) 891 else: 892 self._args.append(','.join(opts)) 893 return self 894 895 def add_incoming(self, addr): 896 self._args.append('-incoming') 897 self._args.append(addr) 898 return self 899 900 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 901 cmd = 'human-monitor-command' 902 kwargs: Dict[str, Any] = {'command-line': command_line} 903 if use_log: 904 return self.qmp_log(cmd, **kwargs) 905 else: 906 return self.qmp(cmd, **kwargs) 907 908 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 909 """Pause drive r/w operations""" 910 if not event: 911 self.pause_drive(drive, "read_aio") 912 self.pause_drive(drive, "write_aio") 913 return 914 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 915 916 def resume_drive(self, drive: str) -> None: 917 """Resume drive r/w operations""" 918 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 919 920 def hmp_qemu_io(self, drive: str, cmd: str, 921 use_log: bool = False, qdev: bool = False) -> QMPMessage: 922 """Write to a given drive using an HMP command""" 923 d = '-d ' if qdev else '' 924 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 925 926 def flatten_qmp_object(self, obj, output=None, basestr=''): 927 if output is None: 928 output = {} 929 if isinstance(obj, list): 930 for i, item in enumerate(obj): 931 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 932 elif isinstance(obj, dict): 933 for key in obj: 934 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 935 else: 936 output[basestr[:-1]] = obj # Strip trailing '.' 937 return output 938 939 def qmp_to_opts(self, obj): 940 obj = self.flatten_qmp_object(obj) 941 output_list = [] 942 for key in obj: 943 output_list += [key + '=' + obj[key]] 944 return ','.join(output_list) 945 946 def get_qmp_events_filtered(self, wait=60.0): 947 result = [] 948 for ev in self.get_qmp_events(wait=wait): 949 result.append(filter_qmp_event(ev)) 950 return result 951 952 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 953 full_cmd = OrderedDict(( 954 ("execute", cmd), 955 ("arguments", ordered_qmp(kwargs)) 956 )) 957 log(full_cmd, filters, indent=indent) 958 result = self.qmp(cmd, **kwargs) 959 log(result, filters, indent=indent) 960 return result 961 962 # Returns None on success, and an error string on failure 963 def run_job(self, job: str, auto_finalize: bool = True, 964 auto_dismiss: bool = False, 965 pre_finalize: Optional[Callable[[], None]] = None, 966 cancel: bool = False, wait: float = 60.0, 967 filters: Iterable[Callable[[Any], Any]] = (), 968 ) -> Optional[str]: 969 """ 970 run_job moves a job from creation through to dismissal. 971 972 :param job: String. ID of recently-launched job 973 :param auto_finalize: Bool. True if the job was launched with 974 auto_finalize. Defaults to True. 975 :param auto_dismiss: Bool. True if the job was launched with 976 auto_dismiss=True. Defaults to False. 977 :param pre_finalize: Callback. A callable that takes no arguments to be 978 invoked prior to issuing job-finalize, if any. 979 :param cancel: Bool. When true, cancels the job after the pre_finalize 980 callback. 981 :param wait: Float. Timeout value specifying how long to wait for any 982 event, in seconds. Defaults to 60.0. 983 """ 984 match_device = {'data': {'device': job}} 985 match_id = {'data': {'id': job}} 986 events = [ 987 ('BLOCK_JOB_COMPLETED', match_device), 988 ('BLOCK_JOB_CANCELLED', match_device), 989 ('BLOCK_JOB_ERROR', match_device), 990 ('BLOCK_JOB_READY', match_device), 991 ('BLOCK_JOB_PENDING', match_id), 992 ('JOB_STATUS_CHANGE', match_id) 993 ] 994 error = None 995 while True: 996 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 997 if ev['event'] != 'JOB_STATUS_CHANGE': 998 log(ev, filters=filters) 999 continue 1000 status = ev['data']['status'] 1001 if status == 'aborting': 1002 result = self.qmp('query-jobs') 1003 for j in result['return']: 1004 if j['id'] == job: 1005 error = j['error'] 1006 log('Job failed: %s' % (j['error']), filters=filters) 1007 elif status == 'ready': 1008 self.qmp_log('job-complete', id=job, filters=filters) 1009 elif status == 'pending' and not auto_finalize: 1010 if pre_finalize: 1011 pre_finalize() 1012 if cancel: 1013 self.qmp_log('job-cancel', id=job, filters=filters) 1014 else: 1015 self.qmp_log('job-finalize', id=job, filters=filters) 1016 elif status == 'concluded' and not auto_dismiss: 1017 self.qmp_log('job-dismiss', id=job, filters=filters) 1018 elif status == 'null': 1019 return error 1020 1021 # Returns None on success, and an error string on failure 1022 def blockdev_create(self, options, job_id='job0', filters=None): 1023 if filters is None: 1024 filters = [filter_qmp_testfiles] 1025 result = self.qmp_log('blockdev-create', filters=filters, 1026 job_id=job_id, options=options) 1027 1028 if 'return' in result: 1029 assert result['return'] == {} 1030 job_result = self.run_job(job_id, filters=filters) 1031 else: 1032 job_result = result['error'] 1033 1034 log("") 1035 return job_result 1036 1037 def enable_migration_events(self, name): 1038 log('Enabling migration QMP events on %s...' % name) 1039 log(self.qmp('migrate-set-capabilities', capabilities=[ 1040 { 1041 'capability': 'events', 1042 'state': True 1043 } 1044 ])) 1045 1046 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1047 while True: 1048 event = self.event_wait('MIGRATION') 1049 # We use the default timeout, and with a timeout, event_wait() 1050 # never returns None 1051 assert event 1052 1053 log(event, filters=[filter_qmp_event]) 1054 if event['data']['status'] in ('completed', 'failed'): 1055 break 1056 1057 if event['data']['status'] == 'completed': 1058 # The event may occur in finish-migrate, so wait for the expected 1059 # post-migration runstate 1060 runstate = None 1061 while runstate != expect_runstate: 1062 runstate = self.qmp('query-status')['return']['status'] 1063 return True 1064 else: 1065 return False 1066 1067 def node_info(self, node_name): 1068 nodes = self.qmp('query-named-block-nodes') 1069 for x in nodes['return']: 1070 if x['node-name'] == node_name: 1071 return x 1072 return None 1073 1074 def query_bitmaps(self): 1075 res = self.qmp("query-named-block-nodes") 1076 return {device['node-name']: device['dirty-bitmaps'] 1077 for device in res['return'] if 'dirty-bitmaps' in device} 1078 1079 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1080 """ 1081 get a specific bitmap from the object returned by query_bitmaps. 1082 :param recording: If specified, filter results by the specified value. 1083 :param bitmaps: If specified, use it instead of call query_bitmaps() 1084 """ 1085 if bitmaps is None: 1086 bitmaps = self.query_bitmaps() 1087 1088 for bitmap in bitmaps[node_name]: 1089 if bitmap.get('name', '') == bitmap_name: 1090 if recording is None or bitmap.get('recording') == recording: 1091 return bitmap 1092 return None 1093 1094 def check_bitmap_status(self, node_name, bitmap_name, fields): 1095 ret = self.get_bitmap(node_name, bitmap_name) 1096 1097 return fields.items() <= ret.items() 1098 1099 def assert_block_path(self, root, path, expected_node, graph=None): 1100 """ 1101 Check whether the node under the given path in the block graph 1102 is @expected_node. 1103 1104 @root is the node name of the node where the @path is rooted. 1105 1106 @path is a string that consists of child names separated by 1107 slashes. It must begin with a slash. 1108 1109 Examples for @root + @path: 1110 - root="qcow2-node", path="/backing/file" 1111 - root="quorum-node", path="/children.2/file" 1112 1113 Hypothetically, @path could be empty, in which case it would 1114 point to @root. However, in practice this case is not useful 1115 and hence not allowed. 1116 1117 @expected_node may be None. (All elements of the path but the 1118 leaf must still exist.) 1119 1120 @graph may be None or the result of an x-debug-query-block-graph 1121 call that has already been performed. 1122 """ 1123 if graph is None: 1124 graph = self.qmp('x-debug-query-block-graph')['return'] 1125 1126 iter_path = iter(path.split('/')) 1127 1128 # Must start with a / 1129 assert next(iter_path) == '' 1130 1131 node = next((node for node in graph['nodes'] if node['name'] == root), 1132 None) 1133 1134 # An empty @path is not allowed, so the root node must be present 1135 assert node is not None, 'Root node %s not found' % root 1136 1137 for child_name in iter_path: 1138 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1139 1140 try: 1141 node_id = next(edge['child'] for edge in graph['edges'] 1142 if (edge['parent'] == node['id'] and 1143 edge['name'] == child_name)) 1144 1145 node = next(node for node in graph['nodes'] 1146 if node['id'] == node_id) 1147 1148 except StopIteration: 1149 node = None 1150 1151 if node is None: 1152 assert expected_node is None, \ 1153 'No node found under %s (but expected %s)' % \ 1154 (path, expected_node) 1155 else: 1156 assert node['name'] == expected_node, \ 1157 'Found node %s under %s (but expected %s)' % \ 1158 (node['name'], path, expected_node) 1159 1160index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1161 1162class QMPTestCase(unittest.TestCase): 1163 '''Abstract base class for QMP test cases''' 1164 1165 def __init__(self, *args, **kwargs): 1166 super().__init__(*args, **kwargs) 1167 # Many users of this class set a VM property we rely on heavily 1168 # in the methods below. 1169 self.vm = None 1170 1171 def dictpath(self, d, path): 1172 '''Traverse a path in a nested dict''' 1173 for component in path.split('/'): 1174 m = index_re.match(component) 1175 if m: 1176 component, idx = m.groups() 1177 idx = int(idx) 1178 1179 if not isinstance(d, dict) or component not in d: 1180 self.fail(f'failed path traversal for "{path}" in "{d}"') 1181 d = d[component] 1182 1183 if m: 1184 if not isinstance(d, list): 1185 self.fail(f'path component "{component}" in "{path}" ' 1186 f'is not a list in "{d}"') 1187 try: 1188 d = d[idx] 1189 except IndexError: 1190 self.fail(f'invalid index "{idx}" in path "{path}" ' 1191 f'in "{d}"') 1192 return d 1193 1194 def assert_qmp_absent(self, d, path): 1195 try: 1196 result = self.dictpath(d, path) 1197 except AssertionError: 1198 return 1199 self.fail('path "%s" has value "%s"' % (path, str(result))) 1200 1201 def assert_qmp(self, d, path, value): 1202 '''Assert that the value for a specific path in a QMP dict 1203 matches. When given a list of values, assert that any of 1204 them matches.''' 1205 1206 result = self.dictpath(d, path) 1207 1208 # [] makes no sense as a list of valid values, so treat it as 1209 # an actual single value. 1210 if isinstance(value, list) and value != []: 1211 for v in value: 1212 if result == v: 1213 return 1214 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1215 else: 1216 self.assertEqual(result, value, 1217 '"%s" is "%s", expected "%s"' 1218 % (path, str(result), str(value))) 1219 1220 def assert_no_active_block_jobs(self): 1221 result = self.vm.qmp('query-block-jobs') 1222 self.assert_qmp(result, 'return', []) 1223 1224 def assert_has_block_node(self, node_name=None, file_name=None): 1225 """Issue a query-named-block-nodes and assert node_name and/or 1226 file_name is present in the result""" 1227 def check_equal_or_none(a, b): 1228 return a is None or b is None or a == b 1229 assert node_name or file_name 1230 result = self.vm.qmp('query-named-block-nodes') 1231 for x in result["return"]: 1232 if check_equal_or_none(x.get("node-name"), node_name) and \ 1233 check_equal_or_none(x.get("file"), file_name): 1234 return 1235 self.fail("Cannot find %s %s in result:\n%s" % 1236 (node_name, file_name, result)) 1237 1238 def assert_json_filename_equal(self, json_filename, reference): 1239 '''Asserts that the given filename is a json: filename and that its 1240 content is equal to the given reference object''' 1241 self.assertEqual(json_filename[:5], 'json:') 1242 self.assertEqual( 1243 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1244 self.vm.flatten_qmp_object(reference) 1245 ) 1246 1247 def cancel_and_wait(self, drive='drive0', force=False, 1248 resume=False, wait=60.0): 1249 '''Cancel a block job and wait for it to finish, returning the event''' 1250 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1251 self.assert_qmp(result, 'return', {}) 1252 1253 if resume: 1254 self.vm.resume_drive(drive) 1255 1256 cancelled = False 1257 result = None 1258 while not cancelled: 1259 for event in self.vm.get_qmp_events(wait=wait): 1260 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1261 event['event'] == 'BLOCK_JOB_CANCELLED': 1262 self.assert_qmp(event, 'data/device', drive) 1263 result = event 1264 cancelled = True 1265 elif event['event'] == 'JOB_STATUS_CHANGE': 1266 self.assert_qmp(event, 'data/id', drive) 1267 1268 1269 self.assert_no_active_block_jobs() 1270 return result 1271 1272 def wait_until_completed(self, drive='drive0', check_offset=True, 1273 wait=60.0, error=None): 1274 '''Wait for a block job to finish, returning the event''' 1275 while True: 1276 for event in self.vm.get_qmp_events(wait=wait): 1277 if event['event'] == 'BLOCK_JOB_COMPLETED': 1278 self.assert_qmp(event, 'data/device', drive) 1279 if error is None: 1280 self.assert_qmp_absent(event, 'data/error') 1281 if check_offset: 1282 self.assert_qmp(event, 'data/offset', 1283 event['data']['len']) 1284 else: 1285 self.assert_qmp(event, 'data/error', error) 1286 self.assert_no_active_block_jobs() 1287 return event 1288 if event['event'] == 'JOB_STATUS_CHANGE': 1289 self.assert_qmp(event, 'data/id', drive) 1290 1291 def wait_ready(self, drive='drive0'): 1292 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1293 return self.vm.events_wait([ 1294 ('BLOCK_JOB_READY', 1295 {'data': {'type': 'mirror', 'device': drive}}), 1296 ('BLOCK_JOB_READY', 1297 {'data': {'type': 'commit', 'device': drive}}) 1298 ]) 1299 1300 def wait_ready_and_cancel(self, drive='drive0'): 1301 self.wait_ready(drive=drive) 1302 event = self.cancel_and_wait(drive=drive) 1303 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1304 self.assert_qmp(event, 'data/type', 'mirror') 1305 self.assert_qmp(event, 'data/offset', event['data']['len']) 1306 1307 def complete_and_wait(self, drive='drive0', wait_ready=True, 1308 completion_error=None): 1309 '''Complete a block job and wait for it to finish''' 1310 if wait_ready: 1311 self.wait_ready(drive=drive) 1312 1313 result = self.vm.qmp('block-job-complete', device=drive) 1314 self.assert_qmp(result, 'return', {}) 1315 1316 event = self.wait_until_completed(drive=drive, error=completion_error) 1317 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1318 1319 def pause_wait(self, job_id='job0'): 1320 with Timeout(3, "Timeout waiting for job to pause"): 1321 while True: 1322 result = self.vm.qmp('query-block-jobs') 1323 found = False 1324 for job in result['return']: 1325 if job['device'] == job_id: 1326 found = True 1327 if job['paused'] and not job['busy']: 1328 return job 1329 break 1330 assert found 1331 1332 def pause_job(self, job_id='job0', wait=True): 1333 result = self.vm.qmp('block-job-pause', device=job_id) 1334 self.assert_qmp(result, 'return', {}) 1335 if wait: 1336 return self.pause_wait(job_id) 1337 return result 1338 1339 def case_skip(self, reason): 1340 '''Skip this test case''' 1341 case_notrun(reason) 1342 self.skipTest(reason) 1343 1344 1345def notrun(reason): 1346 '''Skip this test suite''' 1347 # Each test in qemu-iotests has a number ("seq") 1348 seq = os.path.basename(sys.argv[0]) 1349 1350 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1351 as outfile: 1352 outfile.write(reason + '\n') 1353 logger.warning("%s not run: %s", seq, reason) 1354 sys.exit(0) 1355 1356def case_notrun(reason): 1357 '''Mark this test case as not having been run (without actually 1358 skipping it, that is left to the caller). See 1359 QMPTestCase.case_skip() for a variant that actually skips the 1360 current test case.''' 1361 1362 # Each test in qemu-iotests has a number ("seq") 1363 seq = os.path.basename(sys.argv[0]) 1364 1365 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1366 as outfile: 1367 outfile.write(' [case not run] ' + reason + '\n') 1368 1369def _verify_image_format(supported_fmts: Sequence[str] = (), 1370 unsupported_fmts: Sequence[str] = ()) -> None: 1371 if 'generic' in supported_fmts and \ 1372 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1373 # similar to 1374 # _supported_fmt generic 1375 # for bash tests 1376 supported_fmts = () 1377 1378 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1379 if not_sup or (imgfmt in unsupported_fmts): 1380 notrun('not suitable for this image format: %s' % imgfmt) 1381 1382 if imgfmt == 'luks': 1383 verify_working_luks() 1384 1385def _verify_protocol(supported: Sequence[str] = (), 1386 unsupported: Sequence[str] = ()) -> None: 1387 assert not (supported and unsupported) 1388 1389 if 'generic' in supported: 1390 return 1391 1392 not_sup = supported and (imgproto not in supported) 1393 if not_sup or (imgproto in unsupported): 1394 notrun('not suitable for this protocol: %s' % imgproto) 1395 1396def _verify_platform(supported: Sequence[str] = (), 1397 unsupported: Sequence[str] = ()) -> None: 1398 if any((sys.platform.startswith(x) for x in unsupported)): 1399 notrun('not suitable for this OS: %s' % sys.platform) 1400 1401 if supported: 1402 if not any((sys.platform.startswith(x) for x in supported)): 1403 notrun('not suitable for this OS: %s' % sys.platform) 1404 1405def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1406 if supported_cache_modes and (cachemode not in supported_cache_modes): 1407 notrun('not suitable for this cache mode: %s' % cachemode) 1408 1409def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1410 if supported_aio_modes and (aiomode not in supported_aio_modes): 1411 notrun('not suitable for this aio mode: %s' % aiomode) 1412 1413def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1414 usf_list = list(set(required_formats) - set(supported_formats())) 1415 if usf_list: 1416 notrun(f'formats {usf_list} are not whitelisted') 1417 1418 1419def _verify_virtio_blk() -> None: 1420 out = qemu_pipe('-M', 'none', '-device', 'help') 1421 if 'virtio-blk' not in out: 1422 notrun('Missing virtio-blk in QEMU binary') 1423 1424def verify_virtio_scsi_pci_or_ccw() -> None: 1425 out = qemu_pipe('-M', 'none', '-device', 'help') 1426 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1427 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1428 1429 1430def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1431 imgopts = os.environ.get('IMGOPTS') 1432 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1433 # but it supported only for bash tests. We don't have a concept of global 1434 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1435 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1436 unsup = list(unsupported) + ['$'] 1437 if imgopts and any(x in imgopts for x in unsup): 1438 notrun(f'not suitable for this imgopts: {imgopts}') 1439 1440 1441def supports_quorum() -> bool: 1442 return 'quorum' in qemu_img('--help').stdout 1443 1444def verify_quorum(): 1445 '''Skip test suite if quorum support is not available''' 1446 if not supports_quorum(): 1447 notrun('quorum support missing') 1448 1449def has_working_luks() -> Tuple[bool, str]: 1450 """ 1451 Check whether our LUKS driver can actually create images 1452 (this extends to LUKS encryption for qcow2). 1453 1454 If not, return the reason why. 1455 """ 1456 1457 img_file = f'{test_dir}/luks-test.luks' 1458 res = qemu_img('create', '-f', 'luks', 1459 '--object', luks_default_secret_object, 1460 '-o', luks_default_key_secret_opt, 1461 '-o', 'iter-time=10', 1462 img_file, '1G', 1463 check=False) 1464 try: 1465 os.remove(img_file) 1466 except OSError: 1467 pass 1468 1469 if res.returncode: 1470 reason = res.stdout 1471 for line in res.stdout.splitlines(): 1472 if img_file + ':' in line: 1473 reason = line.split(img_file + ':', 1)[1].strip() 1474 break 1475 1476 return (False, reason) 1477 else: 1478 return (True, '') 1479 1480def verify_working_luks(): 1481 """ 1482 Skip test suite if LUKS does not work 1483 """ 1484 (working, reason) = has_working_luks() 1485 if not working: 1486 notrun(reason) 1487 1488def supports_qcow2_zstd_compression() -> bool: 1489 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1490 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1491 img_file, '0', 1492 check=False) 1493 try: 1494 os.remove(img_file) 1495 except OSError: 1496 pass 1497 1498 if res.returncode == 1 and \ 1499 "'compression-type' does not accept value 'zstd'" in res.stdout: 1500 return False 1501 else: 1502 return True 1503 1504def verify_qcow2_zstd_compression(): 1505 if not supports_qcow2_zstd_compression(): 1506 notrun('zstd compression not supported') 1507 1508def qemu_pipe(*args: str) -> str: 1509 """ 1510 Run qemu with an option to print something and exit (e.g. a help option). 1511 1512 :return: QEMU's stdout output. 1513 """ 1514 full_args = [qemu_prog] + qemu_opts + list(args) 1515 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1516 return output 1517 1518def supported_formats(read_only=False): 1519 '''Set 'read_only' to True to check ro-whitelist 1520 Otherwise, rw-whitelist is checked''' 1521 1522 if not hasattr(supported_formats, "formats"): 1523 supported_formats.formats = {} 1524 1525 if read_only not in supported_formats.formats: 1526 format_message = qemu_pipe("-drive", "format=help") 1527 line = 1 if read_only else 0 1528 supported_formats.formats[read_only] = \ 1529 format_message.splitlines()[line].split(":")[1].split() 1530 1531 return supported_formats.formats[read_only] 1532 1533def skip_if_unsupported(required_formats=(), read_only=False): 1534 '''Skip Test Decorator 1535 Runs the test if all the required formats are whitelisted''' 1536 def skip_test_decorator(func): 1537 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1538 **kwargs: Dict[str, Any]) -> None: 1539 if callable(required_formats): 1540 fmts = required_formats(test_case) 1541 else: 1542 fmts = required_formats 1543 1544 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1545 if usf_list: 1546 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1547 test_case.case_skip(msg) 1548 else: 1549 func(test_case, *args, **kwargs) 1550 return func_wrapper 1551 return skip_test_decorator 1552 1553def skip_for_formats(formats: Sequence[str] = ()) \ 1554 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1555 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1556 '''Skip Test Decorator 1557 Skips the test for the given formats''' 1558 def skip_test_decorator(func): 1559 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1560 **kwargs: Dict[str, Any]) -> None: 1561 if imgfmt in formats: 1562 msg = f'{test_case}: Skipped for format {imgfmt}' 1563 test_case.case_skip(msg) 1564 else: 1565 func(test_case, *args, **kwargs) 1566 return func_wrapper 1567 return skip_test_decorator 1568 1569def skip_if_user_is_root(func): 1570 '''Skip Test Decorator 1571 Runs the test only without root permissions''' 1572 def func_wrapper(*args, **kwargs): 1573 if os.getuid() == 0: 1574 case_notrun('{}: cannot be run as root'.format(args[0])) 1575 return None 1576 else: 1577 return func(*args, **kwargs) 1578 return func_wrapper 1579 1580# We need to filter out the time taken from the output so that 1581# qemu-iotest can reliably diff the results against master output, 1582# and hide skipped tests from the reference output. 1583 1584class ReproducibleTestResult(unittest.TextTestResult): 1585 def addSkip(self, test, reason): 1586 # Same as TextTestResult, but print dot instead of "s" 1587 unittest.TestResult.addSkip(self, test, reason) 1588 if self.showAll: 1589 self.stream.writeln("skipped {0!r}".format(reason)) 1590 elif self.dots: 1591 self.stream.write(".") 1592 self.stream.flush() 1593 1594class ReproducibleStreamWrapper: 1595 def __init__(self, stream: TextIO): 1596 self.stream = stream 1597 1598 def __getattr__(self, attr): 1599 if attr in ('stream', '__getstate__'): 1600 raise AttributeError(attr) 1601 return getattr(self.stream, attr) 1602 1603 def write(self, arg=None): 1604 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1605 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1606 self.stream.write(arg) 1607 1608class ReproducibleTestRunner(unittest.TextTestRunner): 1609 def __init__(self, stream: Optional[TextIO] = None, 1610 resultclass: Type[unittest.TestResult] = 1611 ReproducibleTestResult, 1612 **kwargs: Any) -> None: 1613 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1614 super().__init__(stream=rstream, # type: ignore 1615 descriptions=True, 1616 resultclass=resultclass, 1617 **kwargs) 1618 1619def execute_unittest(argv: List[str], debug: bool = False) -> None: 1620 """Executes unittests within the calling module.""" 1621 1622 # Some tests have warnings, especially ResourceWarnings for unclosed 1623 # files and sockets. Ignore them for now to ensure reproducibility of 1624 # the test output. 1625 unittest.main(argv=argv, 1626 testRunner=ReproducibleTestRunner, 1627 verbosity=2 if debug else 1, 1628 warnings=None if sys.warnoptions else 'ignore') 1629 1630def execute_setup_common(supported_fmts: Sequence[str] = (), 1631 supported_platforms: Sequence[str] = (), 1632 supported_cache_modes: Sequence[str] = (), 1633 supported_aio_modes: Sequence[str] = (), 1634 unsupported_fmts: Sequence[str] = (), 1635 supported_protocols: Sequence[str] = (), 1636 unsupported_protocols: Sequence[str] = (), 1637 required_fmts: Sequence[str] = (), 1638 unsupported_imgopts: Sequence[str] = ()) -> bool: 1639 """ 1640 Perform necessary setup for either script-style or unittest-style tests. 1641 1642 :return: Bool; Whether or not debug mode has been requested via the CLI. 1643 """ 1644 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1645 1646 debug = '-d' in sys.argv 1647 if debug: 1648 sys.argv.remove('-d') 1649 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1650 1651 _verify_image_format(supported_fmts, unsupported_fmts) 1652 _verify_protocol(supported_protocols, unsupported_protocols) 1653 _verify_platform(supported=supported_platforms) 1654 _verify_cache_mode(supported_cache_modes) 1655 _verify_aio_mode(supported_aio_modes) 1656 _verify_formats(required_fmts) 1657 _verify_virtio_blk() 1658 _verify_imgopts(unsupported_imgopts) 1659 1660 return debug 1661 1662def execute_test(*args, test_function=None, **kwargs): 1663 """Run either unittest or script-style tests.""" 1664 1665 debug = execute_setup_common(*args, **kwargs) 1666 if not test_function: 1667 execute_unittest(sys.argv, debug) 1668 else: 1669 test_function() 1670 1671def activate_logging(): 1672 """Activate iotests.log() output to stdout for script-style tests.""" 1673 handler = logging.StreamHandler(stream=sys.stdout) 1674 formatter = logging.Formatter('%(message)s') 1675 handler.setFormatter(formatter) 1676 test_logger.addHandler(handler) 1677 test_logger.setLevel(logging.INFO) 1678 test_logger.propagate = False 1679 1680# This is called from script-style iotests without a single point of entry 1681def script_initialize(*args, **kwargs): 1682 """Initialize script-style tests without running any tests.""" 1683 activate_logging() 1684 execute_setup_common(*args, **kwargs) 1685 1686# This is called from script-style iotests with a single point of entry 1687def script_main(test_function, *args, **kwargs): 1688 """Run script-style tests outside of the unittest framework""" 1689 activate_logging() 1690 execute_test(*args, test_function=test_function, **kwargs) 1691 1692# This is called from unittest style iotests 1693def main(*args, **kwargs): 1694 """Run tests using the unittest framework""" 1695 execute_test(*args, **kwargs) 1696