1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd_raw(cmd, args) 464 465 def get_qmp(self) -> QEMUMonitorProtocol: 466 assert self._qmp is not None 467 return self._qmp 468 469 def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 470 -> QMPReturnValue: 471 assert self._qmp is not None 472 return self._qmp.cmd(cmd, **(args or {})) 473 474 def stop(self, kill_signal=15): 475 self._p.send_signal(kill_signal) 476 self._p.wait() 477 self._p = None 478 479 if self._qmp: 480 self._qmp.close() 481 482 if self._qmpsock is not None: 483 try: 484 os.remove(self._qmpsock) 485 except OSError: 486 pass 487 try: 488 os.remove(self.pidfile) 489 except OSError: 490 pass 491 492 def __del__(self): 493 if self._p is not None: 494 self.stop(kill_signal=9) 495 496 497def qemu_nbd(*args): 498 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 499 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 500 501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 502 '''Run qemu-nbd in daemon mode and return both the parent's exit code 503 and its output in case of an error''' 504 full_args = qemu_nbd_args + ['--fork'] + list(args) 505 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 506 connect_stderr=False) 507 return returncode, output if returncode else '' 508 509def qemu_nbd_list_log(*args: str) -> str: 510 '''Run qemu-nbd to list remote exports''' 511 full_args = [qemu_nbd_prog, '-L'] + list(args) 512 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 513 log(output, filters=[filter_testfiles, filter_nbd_exports]) 514 return output 515 516@contextmanager 517def qemu_nbd_popen(*args): 518 '''Context manager running qemu-nbd within the context''' 519 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 520 521 assert not os.path.exists(pid_file) 522 523 cmd = list(qemu_nbd_args) 524 cmd.extend(('--persistent', '--pid-file', pid_file)) 525 cmd.extend(args) 526 527 log('Start NBD server') 528 with subprocess.Popen(cmd) as p: 529 try: 530 while not os.path.exists(pid_file): 531 if p.poll() is not None: 532 raise RuntimeError( 533 "qemu-nbd terminated with exit code {}: {}" 534 .format(p.returncode, ' '.join(cmd))) 535 536 time.sleep(0.01) 537 yield 538 finally: 539 if os.path.exists(pid_file): 540 os.remove(pid_file) 541 log('Kill NBD server') 542 p.kill() 543 p.wait() 544 545def compare_images(img1: str, img2: str, 546 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 547 """ 548 Compare two images with QEMU_IMG; return True if they are identical. 549 550 :raise CalledProcessError: 551 when qemu-img crashes or returns a status code of anything other 552 than 0 (identical) or 1 (different). 553 """ 554 try: 555 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 556 return True 557 except subprocess.CalledProcessError as exc: 558 if exc.returncode == 1: 559 return False 560 raise 561 562def create_image(name, size): 563 '''Create a fully-allocated raw image with sector markers''' 564 with open(name, 'wb') as file: 565 i = 0 566 while i < size: 567 sector = struct.pack('>l504xl', i // 512, i // 512) 568 file.write(sector) 569 i = i + 512 570 571def image_size(img: str) -> int: 572 """Return image's virtual size""" 573 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 574 if not isinstance(value, int): 575 type_name = type(value).__name__ 576 raise TypeError("Expected 'int' for 'virtual-size', " 577 f"got '{value}' of type '{type_name}'") 578 return value 579 580def is_str(val): 581 return isinstance(val, str) 582 583test_dir_re = re.compile(r"%s" % test_dir) 584def filter_test_dir(msg): 585 return test_dir_re.sub("TEST_DIR", msg) 586 587win32_re = re.compile(r"\r") 588def filter_win32(msg): 589 return win32_re.sub("", msg) 590 591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 592 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 593 r"and [0-9\/.inf]* ops\/sec\)") 594def filter_qemu_io(msg): 595 msg = filter_win32(msg) 596 return qemu_io_re.sub("X ops; XX:XX:XX.X " 597 "(XXX YYY/sec and XXX ops/sec)", msg) 598 599chown_re = re.compile(r"chown [0-9]+:[0-9]+") 600def filter_chown(msg): 601 return chown_re.sub("chown UID:GID", msg) 602 603def filter_qmp_event(event): 604 '''Filter a QMP event dict''' 605 event = dict(event) 606 if 'timestamp' in event: 607 event['timestamp']['seconds'] = 'SECS' 608 event['timestamp']['microseconds'] = 'USECS' 609 return event 610 611def filter_qmp(qmsg, filter_fn): 612 '''Given a string filter, filter a QMP object's values. 613 filter_fn takes a (key, value) pair.''' 614 # Iterate through either lists or dicts; 615 if isinstance(qmsg, list): 616 items = enumerate(qmsg) 617 elif isinstance(qmsg, dict): 618 items = qmsg.items() 619 else: 620 return filter_fn(None, qmsg) 621 622 for k, v in items: 623 if isinstance(v, (dict, list)): 624 qmsg[k] = filter_qmp(v, filter_fn) 625 else: 626 qmsg[k] = filter_fn(k, v) 627 return qmsg 628 629def filter_testfiles(msg): 630 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 631 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 632 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 633 634def filter_qmp_testfiles(qmsg): 635 def _filter(_key, value): 636 if is_str(value): 637 return filter_testfiles(value) 638 return value 639 return filter_qmp(qmsg, _filter) 640 641def filter_virtio_scsi(output: str) -> str: 642 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 643 644def filter_qmp_virtio_scsi(qmsg): 645 def _filter(_key, value): 646 if is_str(value): 647 return filter_virtio_scsi(value) 648 return value 649 return filter_qmp(qmsg, _filter) 650 651def filter_generated_node_ids(msg): 652 return re.sub("#block[0-9]+", "NODE_NAME", msg) 653 654def filter_qmp_generated_node_ids(qmsg): 655 def _filter(_key, value): 656 if is_str(value): 657 return filter_generated_node_ids(value) 658 return value 659 return filter_qmp(qmsg, _filter) 660 661def filter_img_info(output: str, filename: str, 662 drop_child_info: bool = True) -> str: 663 lines = [] 664 drop_indented = False 665 for line in output.split('\n'): 666 if 'disk size' in line or 'actual-size' in line: 667 continue 668 669 # Drop child node info 670 if drop_indented: 671 if line.startswith(' '): 672 continue 673 drop_indented = False 674 if drop_child_info and "Child node '/" in line: 675 drop_indented = True 676 continue 677 678 line = line.replace(filename, 'TEST_IMG') 679 line = filter_testfiles(line) 680 line = line.replace(imgfmt, 'IMGFMT') 681 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 682 line = re.sub('uuid: [-a-f0-9]+', 683 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 684 line) 685 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 686 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 687 line) 688 lines.append(line) 689 return '\n'.join(lines) 690 691def filter_imgfmt(msg): 692 return msg.replace(imgfmt, 'IMGFMT') 693 694def filter_qmp_imgfmt(qmsg): 695 def _filter(_key, value): 696 if is_str(value): 697 return filter_imgfmt(value) 698 return value 699 return filter_qmp(qmsg, _filter) 700 701def filter_nbd_exports(output: str) -> str: 702 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 703 704 705Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 706 707def log(msg: Msg, 708 filters: Iterable[Callable[[Msg], Msg]] = (), 709 indent: Optional[int] = None) -> None: 710 """ 711 Logs either a string message or a JSON serializable message (like QMP). 712 If indent is provided, JSON serializable messages are pretty-printed. 713 """ 714 for flt in filters: 715 msg = flt(msg) 716 if isinstance(msg, (dict, list)): 717 # Don't sort if it's already sorted 718 do_sort = not isinstance(msg, OrderedDict) 719 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 720 else: 721 test_logger.info(msg) 722 723class Timeout: 724 def __init__(self, seconds, errmsg="Timeout"): 725 self.seconds = seconds 726 self.errmsg = errmsg 727 def __enter__(self): 728 if qemu_gdb or qemu_valgrind: 729 return self 730 signal.signal(signal.SIGALRM, self.timeout) 731 signal.setitimer(signal.ITIMER_REAL, self.seconds) 732 return self 733 def __exit__(self, exc_type, value, traceback): 734 if qemu_gdb or qemu_valgrind: 735 return False 736 signal.setitimer(signal.ITIMER_REAL, 0) 737 return False 738 def timeout(self, signum, frame): 739 raise TimeoutError(self.errmsg) 740 741def file_pattern(name): 742 return "{0}-{1}".format(os.getpid(), name) 743 744class FilePath: 745 """ 746 Context manager generating multiple file names. The generated files are 747 removed when exiting the context. 748 749 Example usage: 750 751 with FilePath('a.img', 'b.img') as (img_a, img_b): 752 # Use img_a and img_b here... 753 754 # a.img and b.img are automatically removed here. 755 756 By default images are created in iotests.test_dir. To create sockets use 757 iotests.sock_dir: 758 759 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 760 761 For convenience, calling with one argument yields a single file instead of 762 a tuple with one item. 763 764 """ 765 def __init__(self, *names, base_dir=test_dir): 766 self.paths = [os.path.join(base_dir, file_pattern(name)) 767 for name in names] 768 769 def __enter__(self): 770 if len(self.paths) == 1: 771 return self.paths[0] 772 else: 773 return self.paths 774 775 def __exit__(self, exc_type, exc_val, exc_tb): 776 for path in self.paths: 777 try: 778 os.remove(path) 779 except OSError: 780 pass 781 return False 782 783 784def try_remove(img): 785 try: 786 os.remove(img) 787 except OSError: 788 pass 789 790def file_path_remover(): 791 for path in reversed(file_path_remover.paths): 792 try_remove(path) 793 794 795def file_path(*names, base_dir=test_dir): 796 ''' Another way to get auto-generated filename that cleans itself up. 797 798 Use is as simple as: 799 800 img_a, img_b = file_path('a.img', 'b.img') 801 sock = file_path('socket') 802 ''' 803 804 if not hasattr(file_path_remover, 'paths'): 805 file_path_remover.paths = [] 806 atexit.register(file_path_remover) 807 808 paths = [] 809 for name in names: 810 filename = file_pattern(name) 811 path = os.path.join(base_dir, filename) 812 file_path_remover.paths.append(path) 813 paths.append(path) 814 815 return paths[0] if len(paths) == 1 else paths 816 817def remote_filename(path): 818 if imgproto == 'file': 819 return path 820 elif imgproto == 'ssh': 821 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 822 else: 823 raise ValueError("Protocol %s not supported" % (imgproto)) 824 825class VM(qtest.QEMUQtestMachine): 826 '''A QEMU VM''' 827 828 def __init__(self, path_suffix=''): 829 name = "qemu%s-%d" % (path_suffix, os.getpid()) 830 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 831 if qemu_gdb and qemu_valgrind: 832 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 833 sys.exit(1) 834 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 835 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 836 name=name, 837 base_temp_dir=test_dir, 838 qmp_timer=timer) 839 self._num_drives = 0 840 841 def _post_shutdown(self) -> None: 842 super()._post_shutdown() 843 if not qemu_valgrind or not self._popen: 844 return 845 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 846 if self.exitcode() == 99: 847 with open(valgrind_filename, encoding='utf-8') as f: 848 print(f.read()) 849 else: 850 os.remove(valgrind_filename) 851 852 def _pre_launch(self) -> None: 853 super()._pre_launch() 854 if qemu_print: 855 # set QEMU binary output to stdout 856 self._close_qemu_log_file() 857 858 def add_object(self, opts): 859 self._args.append('-object') 860 self._args.append(opts) 861 return self 862 863 def add_device(self, opts): 864 self._args.append('-device') 865 self._args.append(opts) 866 return self 867 868 def add_drive_raw(self, opts): 869 self._args.append('-drive') 870 self._args.append(opts) 871 return self 872 873 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 874 '''Add a virtio-blk drive to the VM''' 875 options = ['if=%s' % interface, 876 'id=drive%d' % self._num_drives] 877 878 if path is not None: 879 options.append('file=%s' % path) 880 options.append('format=%s' % img_format) 881 options.append('cache=%s' % cachemode) 882 options.append('aio=%s' % aiomode) 883 884 if opts: 885 options.append(opts) 886 887 if img_format == 'luks' and 'key-secret' not in opts: 888 # default luks support 889 if luks_default_secret_object not in self._args: 890 self.add_object(luks_default_secret_object) 891 892 options.append(luks_default_key_secret_opt) 893 894 self._args.append('-drive') 895 self._args.append(','.join(options)) 896 self._num_drives += 1 897 return self 898 899 def add_blockdev(self, opts): 900 self._args.append('-blockdev') 901 if isinstance(opts, str): 902 self._args.append(opts) 903 else: 904 self._args.append(','.join(opts)) 905 return self 906 907 def add_incoming(self, addr): 908 self._args.append('-incoming') 909 self._args.append(addr) 910 return self 911 912 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 913 cmd = 'human-monitor-command' 914 kwargs: Dict[str, Any] = {'command-line': command_line} 915 if use_log: 916 return self.qmp_log(cmd, **kwargs) 917 else: 918 return self.qmp(cmd, **kwargs) 919 920 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 921 """Pause drive r/w operations""" 922 if not event: 923 self.pause_drive(drive, "read_aio") 924 self.pause_drive(drive, "write_aio") 925 return 926 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 927 928 def resume_drive(self, drive: str) -> None: 929 """Resume drive r/w operations""" 930 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 931 932 def hmp_qemu_io(self, drive: str, cmd: str, 933 use_log: bool = False, qdev: bool = False) -> QMPMessage: 934 """Write to a given drive using an HMP command""" 935 d = '-d ' if qdev else '' 936 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 937 938 def flatten_qmp_object(self, obj, output=None, basestr=''): 939 if output is None: 940 output = {} 941 if isinstance(obj, list): 942 for i, item in enumerate(obj): 943 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 944 elif isinstance(obj, dict): 945 for key in obj: 946 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 947 else: 948 output[basestr[:-1]] = obj # Strip trailing '.' 949 return output 950 951 def qmp_to_opts(self, obj): 952 obj = self.flatten_qmp_object(obj) 953 output_list = [] 954 for key in obj: 955 output_list += [key + '=' + obj[key]] 956 return ','.join(output_list) 957 958 def get_qmp_events_filtered(self, wait=60.0): 959 result = [] 960 for ev in self.get_qmp_events(wait=wait): 961 result.append(filter_qmp_event(ev)) 962 return result 963 964 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 965 full_cmd = OrderedDict(( 966 ("execute", cmd), 967 ("arguments", ordered_qmp(kwargs)) 968 )) 969 log(full_cmd, filters, indent=indent) 970 result = self.qmp(cmd, **kwargs) 971 log(result, filters, indent=indent) 972 return result 973 974 # Returns None on success, and an error string on failure 975 def run_job(self, job: str, auto_finalize: bool = True, 976 auto_dismiss: bool = False, 977 pre_finalize: Optional[Callable[[], None]] = None, 978 cancel: bool = False, wait: float = 60.0, 979 filters: Iterable[Callable[[Any], Any]] = (), 980 ) -> Optional[str]: 981 """ 982 run_job moves a job from creation through to dismissal. 983 984 :param job: String. ID of recently-launched job 985 :param auto_finalize: Bool. True if the job was launched with 986 auto_finalize. Defaults to True. 987 :param auto_dismiss: Bool. True if the job was launched with 988 auto_dismiss=True. Defaults to False. 989 :param pre_finalize: Callback. A callable that takes no arguments to be 990 invoked prior to issuing job-finalize, if any. 991 :param cancel: Bool. When true, cancels the job after the pre_finalize 992 callback. 993 :param wait: Float. Timeout value specifying how long to wait for any 994 event, in seconds. Defaults to 60.0. 995 """ 996 match_device = {'data': {'device': job}} 997 match_id = {'data': {'id': job}} 998 events = [ 999 ('BLOCK_JOB_COMPLETED', match_device), 1000 ('BLOCK_JOB_CANCELLED', match_device), 1001 ('BLOCK_JOB_ERROR', match_device), 1002 ('BLOCK_JOB_READY', match_device), 1003 ('BLOCK_JOB_PENDING', match_id), 1004 ('JOB_STATUS_CHANGE', match_id) 1005 ] 1006 error = None 1007 while True: 1008 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 1009 if ev['event'] != 'JOB_STATUS_CHANGE': 1010 log(ev, filters=filters) 1011 continue 1012 status = ev['data']['status'] 1013 if status == 'aborting': 1014 result = self.qmp('query-jobs') 1015 for j in result['return']: 1016 if j['id'] == job: 1017 error = j['error'] 1018 log('Job failed: %s' % (j['error']), filters=filters) 1019 elif status == 'ready': 1020 self.qmp_log('job-complete', id=job, filters=filters) 1021 elif status == 'pending' and not auto_finalize: 1022 if pre_finalize: 1023 pre_finalize() 1024 if cancel: 1025 self.qmp_log('job-cancel', id=job, filters=filters) 1026 else: 1027 self.qmp_log('job-finalize', id=job, filters=filters) 1028 elif status == 'concluded' and not auto_dismiss: 1029 self.qmp_log('job-dismiss', id=job, filters=filters) 1030 elif status == 'null': 1031 return error 1032 1033 # Returns None on success, and an error string on failure 1034 def blockdev_create(self, options, job_id='job0', filters=None): 1035 if filters is None: 1036 filters = [filter_qmp_testfiles] 1037 result = self.qmp_log('blockdev-create', filters=filters, 1038 job_id=job_id, options=options) 1039 1040 if 'return' in result: 1041 assert result['return'] == {} 1042 job_result = self.run_job(job_id, filters=filters) 1043 else: 1044 job_result = result['error'] 1045 1046 log("") 1047 return job_result 1048 1049 def enable_migration_events(self, name): 1050 log('Enabling migration QMP events on %s...' % name) 1051 log(self.qmp('migrate-set-capabilities', capabilities=[ 1052 { 1053 'capability': 'events', 1054 'state': True 1055 } 1056 ])) 1057 1058 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1059 while True: 1060 event = self.event_wait('MIGRATION') 1061 # We use the default timeout, and with a timeout, event_wait() 1062 # never returns None 1063 assert event 1064 1065 log(event, filters=[filter_qmp_event]) 1066 if event['data']['status'] in ('completed', 'failed'): 1067 break 1068 1069 if event['data']['status'] == 'completed': 1070 # The event may occur in finish-migrate, so wait for the expected 1071 # post-migration runstate 1072 runstate = None 1073 while runstate != expect_runstate: 1074 runstate = self.qmp('query-status')['return']['status'] 1075 return True 1076 else: 1077 return False 1078 1079 def node_info(self, node_name): 1080 nodes = self.qmp('query-named-block-nodes') 1081 for x in nodes['return']: 1082 if x['node-name'] == node_name: 1083 return x 1084 return None 1085 1086 def query_bitmaps(self): 1087 res = self.qmp("query-named-block-nodes") 1088 return {device['node-name']: device['dirty-bitmaps'] 1089 for device in res['return'] if 'dirty-bitmaps' in device} 1090 1091 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1092 """ 1093 get a specific bitmap from the object returned by query_bitmaps. 1094 :param recording: If specified, filter results by the specified value. 1095 :param bitmaps: If specified, use it instead of call query_bitmaps() 1096 """ 1097 if bitmaps is None: 1098 bitmaps = self.query_bitmaps() 1099 1100 for bitmap in bitmaps[node_name]: 1101 if bitmap.get('name', '') == bitmap_name: 1102 if recording is None or bitmap.get('recording') == recording: 1103 return bitmap 1104 return None 1105 1106 def check_bitmap_status(self, node_name, bitmap_name, fields): 1107 ret = self.get_bitmap(node_name, bitmap_name) 1108 1109 return fields.items() <= ret.items() 1110 1111 def assert_block_path(self, root, path, expected_node, graph=None): 1112 """ 1113 Check whether the node under the given path in the block graph 1114 is @expected_node. 1115 1116 @root is the node name of the node where the @path is rooted. 1117 1118 @path is a string that consists of child names separated by 1119 slashes. It must begin with a slash. 1120 1121 Examples for @root + @path: 1122 - root="qcow2-node", path="/backing/file" 1123 - root="quorum-node", path="/children.2/file" 1124 1125 Hypothetically, @path could be empty, in which case it would 1126 point to @root. However, in practice this case is not useful 1127 and hence not allowed. 1128 1129 @expected_node may be None. (All elements of the path but the 1130 leaf must still exist.) 1131 1132 @graph may be None or the result of an x-debug-query-block-graph 1133 call that has already been performed. 1134 """ 1135 if graph is None: 1136 graph = self.qmp('x-debug-query-block-graph')['return'] 1137 1138 iter_path = iter(path.split('/')) 1139 1140 # Must start with a / 1141 assert next(iter_path) == '' 1142 1143 node = next((node for node in graph['nodes'] if node['name'] == root), 1144 None) 1145 1146 # An empty @path is not allowed, so the root node must be present 1147 assert node is not None, 'Root node %s not found' % root 1148 1149 for child_name in iter_path: 1150 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1151 1152 try: 1153 node_id = next(edge['child'] for edge in graph['edges'] 1154 if (edge['parent'] == node['id'] and 1155 edge['name'] == child_name)) 1156 1157 node = next(node for node in graph['nodes'] 1158 if node['id'] == node_id) 1159 1160 except StopIteration: 1161 node = None 1162 1163 if node is None: 1164 assert expected_node is None, \ 1165 'No node found under %s (but expected %s)' % \ 1166 (path, expected_node) 1167 else: 1168 assert node['name'] == expected_node, \ 1169 'Found node %s under %s (but expected %s)' % \ 1170 (node['name'], path, expected_node) 1171 1172index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1173 1174class QMPTestCase(unittest.TestCase): 1175 '''Abstract base class for QMP test cases''' 1176 1177 def __init__(self, *args, **kwargs): 1178 super().__init__(*args, **kwargs) 1179 # Many users of this class set a VM property we rely on heavily 1180 # in the methods below. 1181 self.vm = None 1182 1183 def dictpath(self, d, path): 1184 '''Traverse a path in a nested dict''' 1185 for component in path.split('/'): 1186 m = index_re.match(component) 1187 if m: 1188 component, idx = m.groups() 1189 idx = int(idx) 1190 1191 if not isinstance(d, dict) or component not in d: 1192 self.fail(f'failed path traversal for "{path}" in "{d}"') 1193 d = d[component] 1194 1195 if m: 1196 if not isinstance(d, list): 1197 self.fail(f'path component "{component}" in "{path}" ' 1198 f'is not a list in "{d}"') 1199 try: 1200 d = d[idx] 1201 except IndexError: 1202 self.fail(f'invalid index "{idx}" in path "{path}" ' 1203 f'in "{d}"') 1204 return d 1205 1206 def assert_qmp_absent(self, d, path): 1207 try: 1208 result = self.dictpath(d, path) 1209 except AssertionError: 1210 return 1211 self.fail('path "%s" has value "%s"' % (path, str(result))) 1212 1213 def assert_qmp(self, d, path, value): 1214 '''Assert that the value for a specific path in a QMP dict 1215 matches. When given a list of values, assert that any of 1216 them matches.''' 1217 1218 result = self.dictpath(d, path) 1219 1220 # [] makes no sense as a list of valid values, so treat it as 1221 # an actual single value. 1222 if isinstance(value, list) and value != []: 1223 for v in value: 1224 if result == v: 1225 return 1226 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1227 else: 1228 self.assertEqual(result, value, 1229 '"%s" is "%s", expected "%s"' 1230 % (path, str(result), str(value))) 1231 1232 def assert_no_active_block_jobs(self): 1233 result = self.vm.qmp('query-block-jobs') 1234 self.assert_qmp(result, 'return', []) 1235 1236 def assert_has_block_node(self, node_name=None, file_name=None): 1237 """Issue a query-named-block-nodes and assert node_name and/or 1238 file_name is present in the result""" 1239 def check_equal_or_none(a, b): 1240 return a is None or b is None or a == b 1241 assert node_name or file_name 1242 result = self.vm.qmp('query-named-block-nodes') 1243 for x in result["return"]: 1244 if check_equal_or_none(x.get("node-name"), node_name) and \ 1245 check_equal_or_none(x.get("file"), file_name): 1246 return 1247 self.fail("Cannot find %s %s in result:\n%s" % 1248 (node_name, file_name, result)) 1249 1250 def assert_json_filename_equal(self, json_filename, reference): 1251 '''Asserts that the given filename is a json: filename and that its 1252 content is equal to the given reference object''' 1253 self.assertEqual(json_filename[:5], 'json:') 1254 self.assertEqual( 1255 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1256 self.vm.flatten_qmp_object(reference) 1257 ) 1258 1259 def cancel_and_wait(self, drive='drive0', force=False, 1260 resume=False, wait=60.0): 1261 '''Cancel a block job and wait for it to finish, returning the event''' 1262 self.vm.cmd('block-job-cancel', device=drive, force=force) 1263 1264 if resume: 1265 self.vm.resume_drive(drive) 1266 1267 cancelled = False 1268 result = None 1269 while not cancelled: 1270 for event in self.vm.get_qmp_events(wait=wait): 1271 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1272 event['event'] == 'BLOCK_JOB_CANCELLED': 1273 self.assert_qmp(event, 'data/device', drive) 1274 result = event 1275 cancelled = True 1276 elif event['event'] == 'JOB_STATUS_CHANGE': 1277 self.assert_qmp(event, 'data/id', drive) 1278 1279 1280 self.assert_no_active_block_jobs() 1281 return result 1282 1283 def wait_until_completed(self, drive='drive0', check_offset=True, 1284 wait=60.0, error=None): 1285 '''Wait for a block job to finish, returning the event''' 1286 while True: 1287 for event in self.vm.get_qmp_events(wait=wait): 1288 if event['event'] == 'BLOCK_JOB_COMPLETED': 1289 self.assert_qmp(event, 'data/device', drive) 1290 if error is None: 1291 self.assert_qmp_absent(event, 'data/error') 1292 if check_offset: 1293 self.assert_qmp(event, 'data/offset', 1294 event['data']['len']) 1295 else: 1296 self.assert_qmp(event, 'data/error', error) 1297 self.assert_no_active_block_jobs() 1298 return event 1299 if event['event'] == 'JOB_STATUS_CHANGE': 1300 self.assert_qmp(event, 'data/id', drive) 1301 1302 def wait_ready(self, drive='drive0'): 1303 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1304 return self.vm.events_wait([ 1305 ('BLOCK_JOB_READY', 1306 {'data': {'type': 'mirror', 'device': drive}}), 1307 ('BLOCK_JOB_READY', 1308 {'data': {'type': 'commit', 'device': drive}}) 1309 ]) 1310 1311 def wait_ready_and_cancel(self, drive='drive0'): 1312 self.wait_ready(drive=drive) 1313 event = self.cancel_and_wait(drive=drive) 1314 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1315 self.assert_qmp(event, 'data/type', 'mirror') 1316 self.assert_qmp(event, 'data/offset', event['data']['len']) 1317 1318 def complete_and_wait(self, drive='drive0', wait_ready=True, 1319 completion_error=None): 1320 '''Complete a block job and wait for it to finish''' 1321 if wait_ready: 1322 self.wait_ready(drive=drive) 1323 1324 self.vm.cmd('block-job-complete', device=drive) 1325 1326 event = self.wait_until_completed(drive=drive, error=completion_error) 1327 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1328 1329 def pause_wait(self, job_id='job0'): 1330 with Timeout(3, "Timeout waiting for job to pause"): 1331 while True: 1332 result = self.vm.qmp('query-block-jobs') 1333 found = False 1334 for job in result['return']: 1335 if job['device'] == job_id: 1336 found = True 1337 if job['paused'] and not job['busy']: 1338 return job 1339 break 1340 assert found 1341 1342 def pause_job(self, job_id='job0', wait=True): 1343 self.vm.cmd('block-job-pause', device=job_id) 1344 if wait: 1345 self.pause_wait(job_id) 1346 1347 def case_skip(self, reason): 1348 '''Skip this test case''' 1349 case_notrun(reason) 1350 self.skipTest(reason) 1351 1352 1353def notrun(reason): 1354 '''Skip this test suite''' 1355 # Each test in qemu-iotests has a number ("seq") 1356 seq = os.path.basename(sys.argv[0]) 1357 1358 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1359 as outfile: 1360 outfile.write(reason + '\n') 1361 logger.warning("%s not run: %s", seq, reason) 1362 sys.exit(0) 1363 1364def case_notrun(reason): 1365 '''Mark this test case as not having been run (without actually 1366 skipping it, that is left to the caller). See 1367 QMPTestCase.case_skip() for a variant that actually skips the 1368 current test case.''' 1369 1370 # Each test in qemu-iotests has a number ("seq") 1371 seq = os.path.basename(sys.argv[0]) 1372 1373 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1374 as outfile: 1375 outfile.write(' [case not run] ' + reason + '\n') 1376 1377def _verify_image_format(supported_fmts: Sequence[str] = (), 1378 unsupported_fmts: Sequence[str] = ()) -> None: 1379 if 'generic' in supported_fmts and \ 1380 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1381 # similar to 1382 # _supported_fmt generic 1383 # for bash tests 1384 supported_fmts = () 1385 1386 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1387 if not_sup or (imgfmt in unsupported_fmts): 1388 notrun('not suitable for this image format: %s' % imgfmt) 1389 1390 if imgfmt == 'luks': 1391 verify_working_luks() 1392 1393def _verify_protocol(supported: Sequence[str] = (), 1394 unsupported: Sequence[str] = ()) -> None: 1395 assert not (supported and unsupported) 1396 1397 if 'generic' in supported: 1398 return 1399 1400 not_sup = supported and (imgproto not in supported) 1401 if not_sup or (imgproto in unsupported): 1402 notrun('not suitable for this protocol: %s' % imgproto) 1403 1404def _verify_platform(supported: Sequence[str] = (), 1405 unsupported: Sequence[str] = ()) -> None: 1406 if any((sys.platform.startswith(x) for x in unsupported)): 1407 notrun('not suitable for this OS: %s' % sys.platform) 1408 1409 if supported: 1410 if not any((sys.platform.startswith(x) for x in supported)): 1411 notrun('not suitable for this OS: %s' % sys.platform) 1412 1413def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1414 if supported_cache_modes and (cachemode not in supported_cache_modes): 1415 notrun('not suitable for this cache mode: %s' % cachemode) 1416 1417def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1418 if supported_aio_modes and (aiomode not in supported_aio_modes): 1419 notrun('not suitable for this aio mode: %s' % aiomode) 1420 1421def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1422 usf_list = list(set(required_formats) - set(supported_formats())) 1423 if usf_list: 1424 notrun(f'formats {usf_list} are not whitelisted') 1425 1426 1427def _verify_virtio_blk() -> None: 1428 out = qemu_pipe('-M', 'none', '-device', 'help') 1429 if 'virtio-blk' not in out: 1430 notrun('Missing virtio-blk in QEMU binary') 1431 1432def verify_virtio_scsi_pci_or_ccw() -> None: 1433 out = qemu_pipe('-M', 'none', '-device', 'help') 1434 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1435 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1436 1437 1438def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1439 imgopts = os.environ.get('IMGOPTS') 1440 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1441 # but it supported only for bash tests. We don't have a concept of global 1442 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1443 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1444 unsup = list(unsupported) + ['$'] 1445 if imgopts and any(x in imgopts for x in unsup): 1446 notrun(f'not suitable for this imgopts: {imgopts}') 1447 1448 1449def supports_quorum() -> bool: 1450 return 'quorum' in qemu_img('--help').stdout 1451 1452def verify_quorum(): 1453 '''Skip test suite if quorum support is not available''' 1454 if not supports_quorum(): 1455 notrun('quorum support missing') 1456 1457def has_working_luks() -> Tuple[bool, str]: 1458 """ 1459 Check whether our LUKS driver can actually create images 1460 (this extends to LUKS encryption for qcow2). 1461 1462 If not, return the reason why. 1463 """ 1464 1465 img_file = f'{test_dir}/luks-test.luks' 1466 res = qemu_img('create', '-f', 'luks', 1467 '--object', luks_default_secret_object, 1468 '-o', luks_default_key_secret_opt, 1469 '-o', 'iter-time=10', 1470 img_file, '1G', 1471 check=False) 1472 try: 1473 os.remove(img_file) 1474 except OSError: 1475 pass 1476 1477 if res.returncode: 1478 reason = res.stdout 1479 for line in res.stdout.splitlines(): 1480 if img_file + ':' in line: 1481 reason = line.split(img_file + ':', 1)[1].strip() 1482 break 1483 1484 return (False, reason) 1485 else: 1486 return (True, '') 1487 1488def verify_working_luks(): 1489 """ 1490 Skip test suite if LUKS does not work 1491 """ 1492 (working, reason) = has_working_luks() 1493 if not working: 1494 notrun(reason) 1495 1496def supports_qcow2_zstd_compression() -> bool: 1497 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1498 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1499 img_file, '0', 1500 check=False) 1501 try: 1502 os.remove(img_file) 1503 except OSError: 1504 pass 1505 1506 if res.returncode == 1 and \ 1507 "'compression-type' does not accept value 'zstd'" in res.stdout: 1508 return False 1509 else: 1510 return True 1511 1512def verify_qcow2_zstd_compression(): 1513 if not supports_qcow2_zstd_compression(): 1514 notrun('zstd compression not supported') 1515 1516def qemu_pipe(*args: str) -> str: 1517 """ 1518 Run qemu with an option to print something and exit (e.g. a help option). 1519 1520 :return: QEMU's stdout output. 1521 """ 1522 full_args = [qemu_prog] + qemu_opts + list(args) 1523 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1524 return output 1525 1526def supported_formats(read_only=False): 1527 '''Set 'read_only' to True to check ro-whitelist 1528 Otherwise, rw-whitelist is checked''' 1529 1530 if not hasattr(supported_formats, "formats"): 1531 supported_formats.formats = {} 1532 1533 if read_only not in supported_formats.formats: 1534 format_message = qemu_pipe("-drive", "format=help") 1535 line = 1 if read_only else 0 1536 supported_formats.formats[read_only] = \ 1537 format_message.splitlines()[line].split(":")[1].split() 1538 1539 return supported_formats.formats[read_only] 1540 1541def skip_if_unsupported(required_formats=(), read_only=False): 1542 '''Skip Test Decorator 1543 Runs the test if all the required formats are whitelisted''' 1544 def skip_test_decorator(func): 1545 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1546 **kwargs: Dict[str, Any]) -> None: 1547 if callable(required_formats): 1548 fmts = required_formats(test_case) 1549 else: 1550 fmts = required_formats 1551 1552 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1553 if usf_list: 1554 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1555 test_case.case_skip(msg) 1556 else: 1557 func(test_case, *args, **kwargs) 1558 return func_wrapper 1559 return skip_test_decorator 1560 1561def skip_for_formats(formats: Sequence[str] = ()) \ 1562 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1563 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1564 '''Skip Test Decorator 1565 Skips the test for the given formats''' 1566 def skip_test_decorator(func): 1567 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1568 **kwargs: Dict[str, Any]) -> None: 1569 if imgfmt in formats: 1570 msg = f'{test_case}: Skipped for format {imgfmt}' 1571 test_case.case_skip(msg) 1572 else: 1573 func(test_case, *args, **kwargs) 1574 return func_wrapper 1575 return skip_test_decorator 1576 1577def skip_if_user_is_root(func): 1578 '''Skip Test Decorator 1579 Runs the test only without root permissions''' 1580 def func_wrapper(*args, **kwargs): 1581 if os.getuid() == 0: 1582 case_notrun('{}: cannot be run as root'.format(args[0])) 1583 return None 1584 else: 1585 return func(*args, **kwargs) 1586 return func_wrapper 1587 1588# We need to filter out the time taken from the output so that 1589# qemu-iotest can reliably diff the results against master output, 1590# and hide skipped tests from the reference output. 1591 1592class ReproducibleTestResult(unittest.TextTestResult): 1593 def addSkip(self, test, reason): 1594 # Same as TextTestResult, but print dot instead of "s" 1595 unittest.TestResult.addSkip(self, test, reason) 1596 if self.showAll: 1597 self.stream.writeln("skipped {0!r}".format(reason)) 1598 elif self.dots: 1599 self.stream.write(".") 1600 self.stream.flush() 1601 1602class ReproducibleStreamWrapper: 1603 def __init__(self, stream: TextIO): 1604 self.stream = stream 1605 1606 def __getattr__(self, attr): 1607 if attr in ('stream', '__getstate__'): 1608 raise AttributeError(attr) 1609 return getattr(self.stream, attr) 1610 1611 def write(self, arg=None): 1612 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1613 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1614 self.stream.write(arg) 1615 1616class ReproducibleTestRunner(unittest.TextTestRunner): 1617 def __init__( 1618 self, 1619 stream: Optional[TextIO] = None, 1620 resultclass: Type[unittest.TextTestResult] = 1621 ReproducibleTestResult, 1622 **kwargs: Any 1623 ) -> None: 1624 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1625 super().__init__(stream=rstream, # type: ignore 1626 descriptions=True, 1627 resultclass=resultclass, 1628 **kwargs) 1629 1630def execute_unittest(argv: List[str], debug: bool = False) -> None: 1631 """Executes unittests within the calling module.""" 1632 1633 # Some tests have warnings, especially ResourceWarnings for unclosed 1634 # files and sockets. Ignore them for now to ensure reproducibility of 1635 # the test output. 1636 unittest.main(argv=argv, 1637 testRunner=ReproducibleTestRunner, 1638 verbosity=2 if debug else 1, 1639 warnings=None if sys.warnoptions else 'ignore') 1640 1641def execute_setup_common(supported_fmts: Sequence[str] = (), 1642 supported_platforms: Sequence[str] = (), 1643 supported_cache_modes: Sequence[str] = (), 1644 supported_aio_modes: Sequence[str] = (), 1645 unsupported_fmts: Sequence[str] = (), 1646 supported_protocols: Sequence[str] = (), 1647 unsupported_protocols: Sequence[str] = (), 1648 required_fmts: Sequence[str] = (), 1649 unsupported_imgopts: Sequence[str] = ()) -> bool: 1650 """ 1651 Perform necessary setup for either script-style or unittest-style tests. 1652 1653 :return: Bool; Whether or not debug mode has been requested via the CLI. 1654 """ 1655 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1656 1657 debug = '-d' in sys.argv 1658 if debug: 1659 sys.argv.remove('-d') 1660 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1661 1662 _verify_image_format(supported_fmts, unsupported_fmts) 1663 _verify_protocol(supported_protocols, unsupported_protocols) 1664 _verify_platform(supported=supported_platforms) 1665 _verify_cache_mode(supported_cache_modes) 1666 _verify_aio_mode(supported_aio_modes) 1667 _verify_formats(required_fmts) 1668 _verify_virtio_blk() 1669 _verify_imgopts(unsupported_imgopts) 1670 1671 return debug 1672 1673def execute_test(*args, test_function=None, **kwargs): 1674 """Run either unittest or script-style tests.""" 1675 1676 debug = execute_setup_common(*args, **kwargs) 1677 if not test_function: 1678 execute_unittest(sys.argv, debug) 1679 else: 1680 test_function() 1681 1682def activate_logging(): 1683 """Activate iotests.log() output to stdout for script-style tests.""" 1684 handler = logging.StreamHandler(stream=sys.stdout) 1685 formatter = logging.Formatter('%(message)s') 1686 handler.setFormatter(formatter) 1687 test_logger.addHandler(handler) 1688 test_logger.setLevel(logging.INFO) 1689 test_logger.propagate = False 1690 1691# This is called from script-style iotests without a single point of entry 1692def script_initialize(*args, **kwargs): 1693 """Initialize script-style tests without running any tests.""" 1694 activate_logging() 1695 execute_setup_common(*args, **kwargs) 1696 1697# This is called from script-style iotests with a single point of entry 1698def script_main(test_function, *args, **kwargs): 1699 """Run script-style tests outside of the unittest framework""" 1700 activate_logging() 1701 execute_test(*args, test_function=test_function, **kwargs) 1702 1703# This is called from unittest style iotests 1704def main(*args, **kwargs): 1705 """Run tests using the unittest framework""" 1706 execute_test(*args, **kwargs) 1707