1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd(cmd, args) 464 465 def stop(self, kill_signal=15): 466 self._p.send_signal(kill_signal) 467 self._p.wait() 468 self._p = None 469 470 if self._qmp: 471 self._qmp.close() 472 473 if self._qmpsock is not None: 474 try: 475 os.remove(self._qmpsock) 476 except OSError: 477 pass 478 try: 479 os.remove(self.pidfile) 480 except OSError: 481 pass 482 483 def __del__(self): 484 if self._p is not None: 485 self.stop(kill_signal=9) 486 487 488def qemu_nbd(*args): 489 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 490 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 491 492def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 493 '''Run qemu-nbd in daemon mode and return both the parent's exit code 494 and its output in case of an error''' 495 full_args = qemu_nbd_args + ['--fork'] + list(args) 496 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 497 connect_stderr=False) 498 return returncode, output if returncode else '' 499 500def qemu_nbd_list_log(*args: str) -> str: 501 '''Run qemu-nbd to list remote exports''' 502 full_args = [qemu_nbd_prog, '-L'] + list(args) 503 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 504 log(output, filters=[filter_testfiles, filter_nbd_exports]) 505 return output 506 507@contextmanager 508def qemu_nbd_popen(*args): 509 '''Context manager running qemu-nbd within the context''' 510 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 511 512 assert not os.path.exists(pid_file) 513 514 cmd = list(qemu_nbd_args) 515 cmd.extend(('--persistent', '--pid-file', pid_file)) 516 cmd.extend(args) 517 518 log('Start NBD server') 519 with subprocess.Popen(cmd) as p: 520 try: 521 while not os.path.exists(pid_file): 522 if p.poll() is not None: 523 raise RuntimeError( 524 "qemu-nbd terminated with exit code {}: {}" 525 .format(p.returncode, ' '.join(cmd))) 526 527 time.sleep(0.01) 528 yield 529 finally: 530 if os.path.exists(pid_file): 531 os.remove(pid_file) 532 log('Kill NBD server') 533 p.kill() 534 p.wait() 535 536def compare_images(img1: str, img2: str, 537 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 538 """ 539 Compare two images with QEMU_IMG; return True if they are identical. 540 541 :raise CalledProcessError: 542 when qemu-img crashes or returns a status code of anything other 543 than 0 (identical) or 1 (different). 544 """ 545 try: 546 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 547 return True 548 except subprocess.CalledProcessError as exc: 549 if exc.returncode == 1: 550 return False 551 raise 552 553def create_image(name, size): 554 '''Create a fully-allocated raw image with sector markers''' 555 with open(name, 'wb') as file: 556 i = 0 557 while i < size: 558 sector = struct.pack('>l504xl', i // 512, i // 512) 559 file.write(sector) 560 i = i + 512 561 562def image_size(img: str) -> int: 563 """Return image's virtual size""" 564 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 565 if not isinstance(value, int): 566 type_name = type(value).__name__ 567 raise TypeError("Expected 'int' for 'virtual-size', " 568 f"got '{value}' of type '{type_name}'") 569 return value 570 571def is_str(val): 572 return isinstance(val, str) 573 574test_dir_re = re.compile(r"%s" % test_dir) 575def filter_test_dir(msg): 576 return test_dir_re.sub("TEST_DIR", msg) 577 578win32_re = re.compile(r"\r") 579def filter_win32(msg): 580 return win32_re.sub("", msg) 581 582qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 583 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 584 r"and [0-9\/.inf]* ops\/sec\)") 585def filter_qemu_io(msg): 586 msg = filter_win32(msg) 587 return qemu_io_re.sub("X ops; XX:XX:XX.X " 588 "(XXX YYY/sec and XXX ops/sec)", msg) 589 590chown_re = re.compile(r"chown [0-9]+:[0-9]+") 591def filter_chown(msg): 592 return chown_re.sub("chown UID:GID", msg) 593 594def filter_qmp_event(event): 595 '''Filter a QMP event dict''' 596 event = dict(event) 597 if 'timestamp' in event: 598 event['timestamp']['seconds'] = 'SECS' 599 event['timestamp']['microseconds'] = 'USECS' 600 return event 601 602def filter_qmp(qmsg, filter_fn): 603 '''Given a string filter, filter a QMP object's values. 604 filter_fn takes a (key, value) pair.''' 605 # Iterate through either lists or dicts; 606 if isinstance(qmsg, list): 607 items = enumerate(qmsg) 608 elif isinstance(qmsg, dict): 609 items = qmsg.items() 610 else: 611 return filter_fn(None, qmsg) 612 613 for k, v in items: 614 if isinstance(v, (dict, list)): 615 qmsg[k] = filter_qmp(v, filter_fn) 616 else: 617 qmsg[k] = filter_fn(k, v) 618 return qmsg 619 620def filter_testfiles(msg): 621 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 622 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 623 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 624 625def filter_qmp_testfiles(qmsg): 626 def _filter(_key, value): 627 if is_str(value): 628 return filter_testfiles(value) 629 return value 630 return filter_qmp(qmsg, _filter) 631 632def filter_virtio_scsi(output: str) -> str: 633 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 634 635def filter_qmp_virtio_scsi(qmsg): 636 def _filter(_key, value): 637 if is_str(value): 638 return filter_virtio_scsi(value) 639 return value 640 return filter_qmp(qmsg, _filter) 641 642def filter_generated_node_ids(msg): 643 return re.sub("#block[0-9]+", "NODE_NAME", msg) 644 645def filter_img_info(output: str, filename: str, 646 drop_child_info: bool = True) -> str: 647 lines = [] 648 drop_indented = False 649 for line in output.split('\n'): 650 if 'disk size' in line or 'actual-size' in line: 651 continue 652 653 # Drop child node info 654 if drop_indented: 655 if line.startswith(' '): 656 continue 657 drop_indented = False 658 if drop_child_info and "Child node '/" in line: 659 drop_indented = True 660 continue 661 662 line = line.replace(filename, 'TEST_IMG') 663 line = filter_testfiles(line) 664 line = line.replace(imgfmt, 'IMGFMT') 665 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 666 line = re.sub('uuid: [-a-f0-9]+', 667 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 668 line) 669 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 670 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 671 line) 672 lines.append(line) 673 return '\n'.join(lines) 674 675def filter_imgfmt(msg): 676 return msg.replace(imgfmt, 'IMGFMT') 677 678def filter_qmp_imgfmt(qmsg): 679 def _filter(_key, value): 680 if is_str(value): 681 return filter_imgfmt(value) 682 return value 683 return filter_qmp(qmsg, _filter) 684 685def filter_nbd_exports(output: str) -> str: 686 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 687 688 689Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 690 691def log(msg: Msg, 692 filters: Iterable[Callable[[Msg], Msg]] = (), 693 indent: Optional[int] = None) -> None: 694 """ 695 Logs either a string message or a JSON serializable message (like QMP). 696 If indent is provided, JSON serializable messages are pretty-printed. 697 """ 698 for flt in filters: 699 msg = flt(msg) 700 if isinstance(msg, (dict, list)): 701 # Don't sort if it's already sorted 702 do_sort = not isinstance(msg, OrderedDict) 703 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 704 else: 705 test_logger.info(msg) 706 707class Timeout: 708 def __init__(self, seconds, errmsg="Timeout"): 709 self.seconds = seconds 710 self.errmsg = errmsg 711 def __enter__(self): 712 if qemu_gdb or qemu_valgrind: 713 return self 714 signal.signal(signal.SIGALRM, self.timeout) 715 signal.setitimer(signal.ITIMER_REAL, self.seconds) 716 return self 717 def __exit__(self, exc_type, value, traceback): 718 if qemu_gdb or qemu_valgrind: 719 return False 720 signal.setitimer(signal.ITIMER_REAL, 0) 721 return False 722 def timeout(self, signum, frame): 723 raise TimeoutError(self.errmsg) 724 725def file_pattern(name): 726 return "{0}-{1}".format(os.getpid(), name) 727 728class FilePath: 729 """ 730 Context manager generating multiple file names. The generated files are 731 removed when exiting the context. 732 733 Example usage: 734 735 with FilePath('a.img', 'b.img') as (img_a, img_b): 736 # Use img_a and img_b here... 737 738 # a.img and b.img are automatically removed here. 739 740 By default images are created in iotests.test_dir. To create sockets use 741 iotests.sock_dir: 742 743 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 744 745 For convenience, calling with one argument yields a single file instead of 746 a tuple with one item. 747 748 """ 749 def __init__(self, *names, base_dir=test_dir): 750 self.paths = [os.path.join(base_dir, file_pattern(name)) 751 for name in names] 752 753 def __enter__(self): 754 if len(self.paths) == 1: 755 return self.paths[0] 756 else: 757 return self.paths 758 759 def __exit__(self, exc_type, exc_val, exc_tb): 760 for path in self.paths: 761 try: 762 os.remove(path) 763 except OSError: 764 pass 765 return False 766 767 768def try_remove(img): 769 try: 770 os.remove(img) 771 except OSError: 772 pass 773 774def file_path_remover(): 775 for path in reversed(file_path_remover.paths): 776 try_remove(path) 777 778 779def file_path(*names, base_dir=test_dir): 780 ''' Another way to get auto-generated filename that cleans itself up. 781 782 Use is as simple as: 783 784 img_a, img_b = file_path('a.img', 'b.img') 785 sock = file_path('socket') 786 ''' 787 788 if not hasattr(file_path_remover, 'paths'): 789 file_path_remover.paths = [] 790 atexit.register(file_path_remover) 791 792 paths = [] 793 for name in names: 794 filename = file_pattern(name) 795 path = os.path.join(base_dir, filename) 796 file_path_remover.paths.append(path) 797 paths.append(path) 798 799 return paths[0] if len(paths) == 1 else paths 800 801def remote_filename(path): 802 if imgproto == 'file': 803 return path 804 elif imgproto == 'ssh': 805 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 806 else: 807 raise ValueError("Protocol %s not supported" % (imgproto)) 808 809class VM(qtest.QEMUQtestMachine): 810 '''A QEMU VM''' 811 812 def __init__(self, path_suffix=''): 813 name = "qemu%s-%d" % (path_suffix, os.getpid()) 814 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 815 if qemu_gdb and qemu_valgrind: 816 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 817 sys.exit(1) 818 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 819 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 820 name=name, 821 base_temp_dir=test_dir, 822 sock_dir=sock_dir, qmp_timer=timer) 823 self._num_drives = 0 824 825 def _post_shutdown(self) -> None: 826 super()._post_shutdown() 827 if not qemu_valgrind or not self._popen: 828 return 829 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 830 if self.exitcode() == 99: 831 with open(valgrind_filename, encoding='utf-8') as f: 832 print(f.read()) 833 else: 834 os.remove(valgrind_filename) 835 836 def _pre_launch(self) -> None: 837 super()._pre_launch() 838 if qemu_print: 839 # set QEMU binary output to stdout 840 self._close_qemu_log_file() 841 842 def add_object(self, opts): 843 self._args.append('-object') 844 self._args.append(opts) 845 return self 846 847 def add_device(self, opts): 848 self._args.append('-device') 849 self._args.append(opts) 850 return self 851 852 def add_drive_raw(self, opts): 853 self._args.append('-drive') 854 self._args.append(opts) 855 return self 856 857 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 858 '''Add a virtio-blk drive to the VM''' 859 options = ['if=%s' % interface, 860 'id=drive%d' % self._num_drives] 861 862 if path is not None: 863 options.append('file=%s' % path) 864 options.append('format=%s' % img_format) 865 options.append('cache=%s' % cachemode) 866 options.append('aio=%s' % aiomode) 867 868 if opts: 869 options.append(opts) 870 871 if img_format == 'luks' and 'key-secret' not in opts: 872 # default luks support 873 if luks_default_secret_object not in self._args: 874 self.add_object(luks_default_secret_object) 875 876 options.append(luks_default_key_secret_opt) 877 878 self._args.append('-drive') 879 self._args.append(','.join(options)) 880 self._num_drives += 1 881 return self 882 883 def add_blockdev(self, opts): 884 self._args.append('-blockdev') 885 if isinstance(opts, str): 886 self._args.append(opts) 887 else: 888 self._args.append(','.join(opts)) 889 return self 890 891 def add_incoming(self, addr): 892 self._args.append('-incoming') 893 self._args.append(addr) 894 return self 895 896 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 897 cmd = 'human-monitor-command' 898 kwargs: Dict[str, Any] = {'command-line': command_line} 899 if use_log: 900 return self.qmp_log(cmd, **kwargs) 901 else: 902 return self.qmp(cmd, **kwargs) 903 904 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 905 """Pause drive r/w operations""" 906 if not event: 907 self.pause_drive(drive, "read_aio") 908 self.pause_drive(drive, "write_aio") 909 return 910 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 911 912 def resume_drive(self, drive: str) -> None: 913 """Resume drive r/w operations""" 914 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 915 916 def hmp_qemu_io(self, drive: str, cmd: str, 917 use_log: bool = False, qdev: bool = False) -> QMPMessage: 918 """Write to a given drive using an HMP command""" 919 d = '-d ' if qdev else '' 920 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 921 922 def flatten_qmp_object(self, obj, output=None, basestr=''): 923 if output is None: 924 output = {} 925 if isinstance(obj, list): 926 for i, item in enumerate(obj): 927 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 928 elif isinstance(obj, dict): 929 for key in obj: 930 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 931 else: 932 output[basestr[:-1]] = obj # Strip trailing '.' 933 return output 934 935 def qmp_to_opts(self, obj): 936 obj = self.flatten_qmp_object(obj) 937 output_list = [] 938 for key in obj: 939 output_list += [key + '=' + obj[key]] 940 return ','.join(output_list) 941 942 def get_qmp_events_filtered(self, wait=60.0): 943 result = [] 944 for ev in self.get_qmp_events(wait=wait): 945 result.append(filter_qmp_event(ev)) 946 return result 947 948 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 949 full_cmd = OrderedDict(( 950 ("execute", cmd), 951 ("arguments", ordered_qmp(kwargs)) 952 )) 953 log(full_cmd, filters, indent=indent) 954 result = self.qmp(cmd, **kwargs) 955 log(result, filters, indent=indent) 956 return result 957 958 # Returns None on success, and an error string on failure 959 def run_job(self, job: str, auto_finalize: bool = True, 960 auto_dismiss: bool = False, 961 pre_finalize: Optional[Callable[[], None]] = None, 962 cancel: bool = False, wait: float = 60.0, 963 filters: Iterable[Callable[[Any], Any]] = (), 964 ) -> Optional[str]: 965 """ 966 run_job moves a job from creation through to dismissal. 967 968 :param job: String. ID of recently-launched job 969 :param auto_finalize: Bool. True if the job was launched with 970 auto_finalize. Defaults to True. 971 :param auto_dismiss: Bool. True if the job was launched with 972 auto_dismiss=True. Defaults to False. 973 :param pre_finalize: Callback. A callable that takes no arguments to be 974 invoked prior to issuing job-finalize, if any. 975 :param cancel: Bool. When true, cancels the job after the pre_finalize 976 callback. 977 :param wait: Float. Timeout value specifying how long to wait for any 978 event, in seconds. Defaults to 60.0. 979 """ 980 match_device = {'data': {'device': job}} 981 match_id = {'data': {'id': job}} 982 events = [ 983 ('BLOCK_JOB_COMPLETED', match_device), 984 ('BLOCK_JOB_CANCELLED', match_device), 985 ('BLOCK_JOB_ERROR', match_device), 986 ('BLOCK_JOB_READY', match_device), 987 ('BLOCK_JOB_PENDING', match_id), 988 ('JOB_STATUS_CHANGE', match_id) 989 ] 990 error = None 991 while True: 992 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 993 if ev['event'] != 'JOB_STATUS_CHANGE': 994 log(ev, filters=filters) 995 continue 996 status = ev['data']['status'] 997 if status == 'aborting': 998 result = self.qmp('query-jobs') 999 for j in result['return']: 1000 if j['id'] == job: 1001 error = j['error'] 1002 log('Job failed: %s' % (j['error']), filters=filters) 1003 elif status == 'ready': 1004 self.qmp_log('job-complete', id=job, filters=filters) 1005 elif status == 'pending' and not auto_finalize: 1006 if pre_finalize: 1007 pre_finalize() 1008 if cancel: 1009 self.qmp_log('job-cancel', id=job, filters=filters) 1010 else: 1011 self.qmp_log('job-finalize', id=job, filters=filters) 1012 elif status == 'concluded' and not auto_dismiss: 1013 self.qmp_log('job-dismiss', id=job, filters=filters) 1014 elif status == 'null': 1015 return error 1016 1017 # Returns None on success, and an error string on failure 1018 def blockdev_create(self, options, job_id='job0', filters=None): 1019 if filters is None: 1020 filters = [filter_qmp_testfiles] 1021 result = self.qmp_log('blockdev-create', filters=filters, 1022 job_id=job_id, options=options) 1023 1024 if 'return' in result: 1025 assert result['return'] == {} 1026 job_result = self.run_job(job_id, filters=filters) 1027 else: 1028 job_result = result['error'] 1029 1030 log("") 1031 return job_result 1032 1033 def enable_migration_events(self, name): 1034 log('Enabling migration QMP events on %s...' % name) 1035 log(self.qmp('migrate-set-capabilities', capabilities=[ 1036 { 1037 'capability': 'events', 1038 'state': True 1039 } 1040 ])) 1041 1042 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1043 while True: 1044 event = self.event_wait('MIGRATION') 1045 # We use the default timeout, and with a timeout, event_wait() 1046 # never returns None 1047 assert event 1048 1049 log(event, filters=[filter_qmp_event]) 1050 if event['data']['status'] in ('completed', 'failed'): 1051 break 1052 1053 if event['data']['status'] == 'completed': 1054 # The event may occur in finish-migrate, so wait for the expected 1055 # post-migration runstate 1056 runstate = None 1057 while runstate != expect_runstate: 1058 runstate = self.qmp('query-status')['return']['status'] 1059 return True 1060 else: 1061 return False 1062 1063 def node_info(self, node_name): 1064 nodes = self.qmp('query-named-block-nodes') 1065 for x in nodes['return']: 1066 if x['node-name'] == node_name: 1067 return x 1068 return None 1069 1070 def query_bitmaps(self): 1071 res = self.qmp("query-named-block-nodes") 1072 return {device['node-name']: device['dirty-bitmaps'] 1073 for device in res['return'] if 'dirty-bitmaps' in device} 1074 1075 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1076 """ 1077 get a specific bitmap from the object returned by query_bitmaps. 1078 :param recording: If specified, filter results by the specified value. 1079 :param bitmaps: If specified, use it instead of call query_bitmaps() 1080 """ 1081 if bitmaps is None: 1082 bitmaps = self.query_bitmaps() 1083 1084 for bitmap in bitmaps[node_name]: 1085 if bitmap.get('name', '') == bitmap_name: 1086 if recording is None or bitmap.get('recording') == recording: 1087 return bitmap 1088 return None 1089 1090 def check_bitmap_status(self, node_name, bitmap_name, fields): 1091 ret = self.get_bitmap(node_name, bitmap_name) 1092 1093 return fields.items() <= ret.items() 1094 1095 def assert_block_path(self, root, path, expected_node, graph=None): 1096 """ 1097 Check whether the node under the given path in the block graph 1098 is @expected_node. 1099 1100 @root is the node name of the node where the @path is rooted. 1101 1102 @path is a string that consists of child names separated by 1103 slashes. It must begin with a slash. 1104 1105 Examples for @root + @path: 1106 - root="qcow2-node", path="/backing/file" 1107 - root="quorum-node", path="/children.2/file" 1108 1109 Hypothetically, @path could be empty, in which case it would 1110 point to @root. However, in practice this case is not useful 1111 and hence not allowed. 1112 1113 @expected_node may be None. (All elements of the path but the 1114 leaf must still exist.) 1115 1116 @graph may be None or the result of an x-debug-query-block-graph 1117 call that has already been performed. 1118 """ 1119 if graph is None: 1120 graph = self.qmp('x-debug-query-block-graph')['return'] 1121 1122 iter_path = iter(path.split('/')) 1123 1124 # Must start with a / 1125 assert next(iter_path) == '' 1126 1127 node = next((node for node in graph['nodes'] if node['name'] == root), 1128 None) 1129 1130 # An empty @path is not allowed, so the root node must be present 1131 assert node is not None, 'Root node %s not found' % root 1132 1133 for child_name in iter_path: 1134 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1135 1136 try: 1137 node_id = next(edge['child'] for edge in graph['edges'] 1138 if (edge['parent'] == node['id'] and 1139 edge['name'] == child_name)) 1140 1141 node = next(node for node in graph['nodes'] 1142 if node['id'] == node_id) 1143 1144 except StopIteration: 1145 node = None 1146 1147 if node is None: 1148 assert expected_node is None, \ 1149 'No node found under %s (but expected %s)' % \ 1150 (path, expected_node) 1151 else: 1152 assert node['name'] == expected_node, \ 1153 'Found node %s under %s (but expected %s)' % \ 1154 (node['name'], path, expected_node) 1155 1156index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1157 1158class QMPTestCase(unittest.TestCase): 1159 '''Abstract base class for QMP test cases''' 1160 1161 def __init__(self, *args, **kwargs): 1162 super().__init__(*args, **kwargs) 1163 # Many users of this class set a VM property we rely on heavily 1164 # in the methods below. 1165 self.vm = None 1166 1167 def dictpath(self, d, path): 1168 '''Traverse a path in a nested dict''' 1169 for component in path.split('/'): 1170 m = index_re.match(component) 1171 if m: 1172 component, idx = m.groups() 1173 idx = int(idx) 1174 1175 if not isinstance(d, dict) or component not in d: 1176 self.fail(f'failed path traversal for "{path}" in "{d}"') 1177 d = d[component] 1178 1179 if m: 1180 if not isinstance(d, list): 1181 self.fail(f'path component "{component}" in "{path}" ' 1182 f'is not a list in "{d}"') 1183 try: 1184 d = d[idx] 1185 except IndexError: 1186 self.fail(f'invalid index "{idx}" in path "{path}" ' 1187 f'in "{d}"') 1188 return d 1189 1190 def assert_qmp_absent(self, d, path): 1191 try: 1192 result = self.dictpath(d, path) 1193 except AssertionError: 1194 return 1195 self.fail('path "%s" has value "%s"' % (path, str(result))) 1196 1197 def assert_qmp(self, d, path, value): 1198 '''Assert that the value for a specific path in a QMP dict 1199 matches. When given a list of values, assert that any of 1200 them matches.''' 1201 1202 result = self.dictpath(d, path) 1203 1204 # [] makes no sense as a list of valid values, so treat it as 1205 # an actual single value. 1206 if isinstance(value, list) and value != []: 1207 for v in value: 1208 if result == v: 1209 return 1210 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1211 else: 1212 self.assertEqual(result, value, 1213 '"%s" is "%s", expected "%s"' 1214 % (path, str(result), str(value))) 1215 1216 def assert_no_active_block_jobs(self): 1217 result = self.vm.qmp('query-block-jobs') 1218 self.assert_qmp(result, 'return', []) 1219 1220 def assert_has_block_node(self, node_name=None, file_name=None): 1221 """Issue a query-named-block-nodes and assert node_name and/or 1222 file_name is present in the result""" 1223 def check_equal_or_none(a, b): 1224 return a is None or b is None or a == b 1225 assert node_name or file_name 1226 result = self.vm.qmp('query-named-block-nodes') 1227 for x in result["return"]: 1228 if check_equal_or_none(x.get("node-name"), node_name) and \ 1229 check_equal_or_none(x.get("file"), file_name): 1230 return 1231 self.fail("Cannot find %s %s in result:\n%s" % 1232 (node_name, file_name, result)) 1233 1234 def assert_json_filename_equal(self, json_filename, reference): 1235 '''Asserts that the given filename is a json: filename and that its 1236 content is equal to the given reference object''' 1237 self.assertEqual(json_filename[:5], 'json:') 1238 self.assertEqual( 1239 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1240 self.vm.flatten_qmp_object(reference) 1241 ) 1242 1243 def cancel_and_wait(self, drive='drive0', force=False, 1244 resume=False, wait=60.0): 1245 '''Cancel a block job and wait for it to finish, returning the event''' 1246 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1247 self.assert_qmp(result, 'return', {}) 1248 1249 if resume: 1250 self.vm.resume_drive(drive) 1251 1252 cancelled = False 1253 result = None 1254 while not cancelled: 1255 for event in self.vm.get_qmp_events(wait=wait): 1256 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1257 event['event'] == 'BLOCK_JOB_CANCELLED': 1258 self.assert_qmp(event, 'data/device', drive) 1259 result = event 1260 cancelled = True 1261 elif event['event'] == 'JOB_STATUS_CHANGE': 1262 self.assert_qmp(event, 'data/id', drive) 1263 1264 1265 self.assert_no_active_block_jobs() 1266 return result 1267 1268 def wait_until_completed(self, drive='drive0', check_offset=True, 1269 wait=60.0, error=None): 1270 '''Wait for a block job to finish, returning the event''' 1271 while True: 1272 for event in self.vm.get_qmp_events(wait=wait): 1273 if event['event'] == 'BLOCK_JOB_COMPLETED': 1274 self.assert_qmp(event, 'data/device', drive) 1275 if error is None: 1276 self.assert_qmp_absent(event, 'data/error') 1277 if check_offset: 1278 self.assert_qmp(event, 'data/offset', 1279 event['data']['len']) 1280 else: 1281 self.assert_qmp(event, 'data/error', error) 1282 self.assert_no_active_block_jobs() 1283 return event 1284 if event['event'] == 'JOB_STATUS_CHANGE': 1285 self.assert_qmp(event, 'data/id', drive) 1286 1287 def wait_ready(self, drive='drive0'): 1288 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1289 return self.vm.events_wait([ 1290 ('BLOCK_JOB_READY', 1291 {'data': {'type': 'mirror', 'device': drive}}), 1292 ('BLOCK_JOB_READY', 1293 {'data': {'type': 'commit', 'device': drive}}) 1294 ]) 1295 1296 def wait_ready_and_cancel(self, drive='drive0'): 1297 self.wait_ready(drive=drive) 1298 event = self.cancel_and_wait(drive=drive) 1299 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1300 self.assert_qmp(event, 'data/type', 'mirror') 1301 self.assert_qmp(event, 'data/offset', event['data']['len']) 1302 1303 def complete_and_wait(self, drive='drive0', wait_ready=True, 1304 completion_error=None): 1305 '''Complete a block job and wait for it to finish''' 1306 if wait_ready: 1307 self.wait_ready(drive=drive) 1308 1309 result = self.vm.qmp('block-job-complete', device=drive) 1310 self.assert_qmp(result, 'return', {}) 1311 1312 event = self.wait_until_completed(drive=drive, error=completion_error) 1313 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1314 1315 def pause_wait(self, job_id='job0'): 1316 with Timeout(3, "Timeout waiting for job to pause"): 1317 while True: 1318 result = self.vm.qmp('query-block-jobs') 1319 found = False 1320 for job in result['return']: 1321 if job['device'] == job_id: 1322 found = True 1323 if job['paused'] and not job['busy']: 1324 return job 1325 break 1326 assert found 1327 1328 def pause_job(self, job_id='job0', wait=True): 1329 result = self.vm.qmp('block-job-pause', device=job_id) 1330 self.assert_qmp(result, 'return', {}) 1331 if wait: 1332 return self.pause_wait(job_id) 1333 return result 1334 1335 def case_skip(self, reason): 1336 '''Skip this test case''' 1337 case_notrun(reason) 1338 self.skipTest(reason) 1339 1340 1341def notrun(reason): 1342 '''Skip this test suite''' 1343 # Each test in qemu-iotests has a number ("seq") 1344 seq = os.path.basename(sys.argv[0]) 1345 1346 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1347 as outfile: 1348 outfile.write(reason + '\n') 1349 logger.warning("%s not run: %s", seq, reason) 1350 sys.exit(0) 1351 1352def case_notrun(reason): 1353 '''Mark this test case as not having been run (without actually 1354 skipping it, that is left to the caller). See 1355 QMPTestCase.case_skip() for a variant that actually skips the 1356 current test case.''' 1357 1358 # Each test in qemu-iotests has a number ("seq") 1359 seq = os.path.basename(sys.argv[0]) 1360 1361 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1362 as outfile: 1363 outfile.write(' [case not run] ' + reason + '\n') 1364 1365def _verify_image_format(supported_fmts: Sequence[str] = (), 1366 unsupported_fmts: Sequence[str] = ()) -> None: 1367 if 'generic' in supported_fmts and \ 1368 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1369 # similar to 1370 # _supported_fmt generic 1371 # for bash tests 1372 supported_fmts = () 1373 1374 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1375 if not_sup or (imgfmt in unsupported_fmts): 1376 notrun('not suitable for this image format: %s' % imgfmt) 1377 1378 if imgfmt == 'luks': 1379 verify_working_luks() 1380 1381def _verify_protocol(supported: Sequence[str] = (), 1382 unsupported: Sequence[str] = ()) -> None: 1383 assert not (supported and unsupported) 1384 1385 if 'generic' in supported: 1386 return 1387 1388 not_sup = supported and (imgproto not in supported) 1389 if not_sup or (imgproto in unsupported): 1390 notrun('not suitable for this protocol: %s' % imgproto) 1391 1392def _verify_platform(supported: Sequence[str] = (), 1393 unsupported: Sequence[str] = ()) -> None: 1394 if any((sys.platform.startswith(x) for x in unsupported)): 1395 notrun('not suitable for this OS: %s' % sys.platform) 1396 1397 if supported: 1398 if not any((sys.platform.startswith(x) for x in supported)): 1399 notrun('not suitable for this OS: %s' % sys.platform) 1400 1401def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1402 if supported_cache_modes and (cachemode not in supported_cache_modes): 1403 notrun('not suitable for this cache mode: %s' % cachemode) 1404 1405def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1406 if supported_aio_modes and (aiomode not in supported_aio_modes): 1407 notrun('not suitable for this aio mode: %s' % aiomode) 1408 1409def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1410 usf_list = list(set(required_formats) - set(supported_formats())) 1411 if usf_list: 1412 notrun(f'formats {usf_list} are not whitelisted') 1413 1414 1415def _verify_virtio_blk() -> None: 1416 out = qemu_pipe('-M', 'none', '-device', 'help') 1417 if 'virtio-blk' not in out: 1418 notrun('Missing virtio-blk in QEMU binary') 1419 1420def _verify_virtio_scsi_pci_or_ccw() -> None: 1421 out = qemu_pipe('-M', 'none', '-device', 'help') 1422 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1423 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1424 1425 1426def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1427 imgopts = os.environ.get('IMGOPTS') 1428 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1429 # but it supported only for bash tests. We don't have a concept of global 1430 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1431 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1432 unsup = list(unsupported) + ['$'] 1433 if imgopts and any(x in imgopts for x in unsup): 1434 notrun(f'not suitable for this imgopts: {imgopts}') 1435 1436 1437def supports_quorum() -> bool: 1438 return 'quorum' in qemu_img('--help').stdout 1439 1440def verify_quorum(): 1441 '''Skip test suite if quorum support is not available''' 1442 if not supports_quorum(): 1443 notrun('quorum support missing') 1444 1445def has_working_luks() -> Tuple[bool, str]: 1446 """ 1447 Check whether our LUKS driver can actually create images 1448 (this extends to LUKS encryption for qcow2). 1449 1450 If not, return the reason why. 1451 """ 1452 1453 img_file = f'{test_dir}/luks-test.luks' 1454 res = qemu_img('create', '-f', 'luks', 1455 '--object', luks_default_secret_object, 1456 '-o', luks_default_key_secret_opt, 1457 '-o', 'iter-time=10', 1458 img_file, '1G', 1459 check=False) 1460 try: 1461 os.remove(img_file) 1462 except OSError: 1463 pass 1464 1465 if res.returncode: 1466 reason = res.stdout 1467 for line in res.stdout.splitlines(): 1468 if img_file + ':' in line: 1469 reason = line.split(img_file + ':', 1)[1].strip() 1470 break 1471 1472 return (False, reason) 1473 else: 1474 return (True, '') 1475 1476def verify_working_luks(): 1477 """ 1478 Skip test suite if LUKS does not work 1479 """ 1480 (working, reason) = has_working_luks() 1481 if not working: 1482 notrun(reason) 1483 1484def supports_qcow2_zstd_compression() -> bool: 1485 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1486 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1487 img_file, '0', 1488 check=False) 1489 try: 1490 os.remove(img_file) 1491 except OSError: 1492 pass 1493 1494 if res.returncode == 1 and \ 1495 "'compression-type' does not accept value 'zstd'" in res.stdout: 1496 return False 1497 else: 1498 return True 1499 1500def verify_qcow2_zstd_compression(): 1501 if not supports_qcow2_zstd_compression(): 1502 notrun('zstd compression not supported') 1503 1504def qemu_pipe(*args: str) -> str: 1505 """ 1506 Run qemu with an option to print something and exit (e.g. a help option). 1507 1508 :return: QEMU's stdout output. 1509 """ 1510 full_args = [qemu_prog] + qemu_opts + list(args) 1511 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1512 return output 1513 1514def supported_formats(read_only=False): 1515 '''Set 'read_only' to True to check ro-whitelist 1516 Otherwise, rw-whitelist is checked''' 1517 1518 if not hasattr(supported_formats, "formats"): 1519 supported_formats.formats = {} 1520 1521 if read_only not in supported_formats.formats: 1522 format_message = qemu_pipe("-drive", "format=help") 1523 line = 1 if read_only else 0 1524 supported_formats.formats[read_only] = \ 1525 format_message.splitlines()[line].split(":")[1].split() 1526 1527 return supported_formats.formats[read_only] 1528 1529def skip_if_unsupported(required_formats=(), read_only=False): 1530 '''Skip Test Decorator 1531 Runs the test if all the required formats are whitelisted''' 1532 def skip_test_decorator(func): 1533 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1534 **kwargs: Dict[str, Any]) -> None: 1535 if callable(required_formats): 1536 fmts = required_formats(test_case) 1537 else: 1538 fmts = required_formats 1539 1540 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1541 if usf_list: 1542 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1543 test_case.case_skip(msg) 1544 else: 1545 func(test_case, *args, **kwargs) 1546 return func_wrapper 1547 return skip_test_decorator 1548 1549def skip_for_formats(formats: Sequence[str] = ()) \ 1550 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1551 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1552 '''Skip Test Decorator 1553 Skips the test for the given formats''' 1554 def skip_test_decorator(func): 1555 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1556 **kwargs: Dict[str, Any]) -> None: 1557 if imgfmt in formats: 1558 msg = f'{test_case}: Skipped for format {imgfmt}' 1559 test_case.case_skip(msg) 1560 else: 1561 func(test_case, *args, **kwargs) 1562 return func_wrapper 1563 return skip_test_decorator 1564 1565def skip_if_user_is_root(func): 1566 '''Skip Test Decorator 1567 Runs the test only without root permissions''' 1568 def func_wrapper(*args, **kwargs): 1569 if os.getuid() == 0: 1570 case_notrun('{}: cannot be run as root'.format(args[0])) 1571 return None 1572 else: 1573 return func(*args, **kwargs) 1574 return func_wrapper 1575 1576# We need to filter out the time taken from the output so that 1577# qemu-iotest can reliably diff the results against master output, 1578# and hide skipped tests from the reference output. 1579 1580class ReproducibleTestResult(unittest.TextTestResult): 1581 def addSkip(self, test, reason): 1582 # Same as TextTestResult, but print dot instead of "s" 1583 unittest.TestResult.addSkip(self, test, reason) 1584 if self.showAll: 1585 self.stream.writeln("skipped {0!r}".format(reason)) 1586 elif self.dots: 1587 self.stream.write(".") 1588 self.stream.flush() 1589 1590class ReproducibleStreamWrapper: 1591 def __init__(self, stream: TextIO): 1592 self.stream = stream 1593 1594 def __getattr__(self, attr): 1595 if attr in ('stream', '__getstate__'): 1596 raise AttributeError(attr) 1597 return getattr(self.stream, attr) 1598 1599 def write(self, arg=None): 1600 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1601 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1602 self.stream.write(arg) 1603 1604class ReproducibleTestRunner(unittest.TextTestRunner): 1605 def __init__(self, stream: Optional[TextIO] = None, 1606 resultclass: Type[unittest.TestResult] = 1607 ReproducibleTestResult, 1608 **kwargs: Any) -> None: 1609 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1610 super().__init__(stream=rstream, # type: ignore 1611 descriptions=True, 1612 resultclass=resultclass, 1613 **kwargs) 1614 1615def execute_unittest(argv: List[str], debug: bool = False) -> None: 1616 """Executes unittests within the calling module.""" 1617 1618 # Some tests have warnings, especially ResourceWarnings for unclosed 1619 # files and sockets. Ignore them for now to ensure reproducibility of 1620 # the test output. 1621 unittest.main(argv=argv, 1622 testRunner=ReproducibleTestRunner, 1623 verbosity=2 if debug else 1, 1624 warnings=None if sys.warnoptions else 'ignore') 1625 1626def execute_setup_common(supported_fmts: Sequence[str] = (), 1627 supported_platforms: Sequence[str] = (), 1628 supported_cache_modes: Sequence[str] = (), 1629 supported_aio_modes: Sequence[str] = (), 1630 unsupported_fmts: Sequence[str] = (), 1631 supported_protocols: Sequence[str] = (), 1632 unsupported_protocols: Sequence[str] = (), 1633 required_fmts: Sequence[str] = (), 1634 unsupported_imgopts: Sequence[str] = ()) -> bool: 1635 """ 1636 Perform necessary setup for either script-style or unittest-style tests. 1637 1638 :return: Bool; Whether or not debug mode has been requested via the CLI. 1639 """ 1640 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1641 1642 debug = '-d' in sys.argv 1643 if debug: 1644 sys.argv.remove('-d') 1645 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1646 1647 _verify_image_format(supported_fmts, unsupported_fmts) 1648 _verify_protocol(supported_protocols, unsupported_protocols) 1649 _verify_platform(supported=supported_platforms) 1650 _verify_cache_mode(supported_cache_modes) 1651 _verify_aio_mode(supported_aio_modes) 1652 _verify_formats(required_fmts) 1653 _verify_virtio_blk() 1654 _verify_imgopts(unsupported_imgopts) 1655 1656 return debug 1657 1658def execute_test(*args, test_function=None, **kwargs): 1659 """Run either unittest or script-style tests.""" 1660 1661 debug = execute_setup_common(*args, **kwargs) 1662 if not test_function: 1663 execute_unittest(sys.argv, debug) 1664 else: 1665 test_function() 1666 1667def activate_logging(): 1668 """Activate iotests.log() output to stdout for script-style tests.""" 1669 handler = logging.StreamHandler(stream=sys.stdout) 1670 formatter = logging.Formatter('%(message)s') 1671 handler.setFormatter(formatter) 1672 test_logger.addHandler(handler) 1673 test_logger.setLevel(logging.INFO) 1674 test_logger.propagate = False 1675 1676# This is called from script-style iotests without a single point of entry 1677def script_initialize(*args, **kwargs): 1678 """Initialize script-style tests without running any tests.""" 1679 activate_logging() 1680 execute_setup_common(*args, **kwargs) 1681 1682# This is called from script-style iotests with a single point of entry 1683def script_main(test_function, *args, **kwargs): 1684 """Run script-style tests outside of the unittest framework""" 1685 activate_logging() 1686 execute_test(*args, test_function=test_function, **kwargs) 1687 1688# This is called from unittest style iotests 1689def main(*args, **kwargs): 1690 """Run tests using the unittest framework""" 1691 execute_test(*args, **kwargs) 1692