1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QMPReturnValue, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, drop_child_info: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path, drop_child_info)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd_raw(cmd, args) 464 465 def get_qmp(self) -> QEMUMonitorProtocol: 466 assert self._qmp is not None 467 return self._qmp 468 469 def cmd(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 470 -> QMPReturnValue: 471 assert self._qmp is not None 472 return self._qmp.cmd(cmd, **(args or {})) 473 474 def stop(self, kill_signal=15): 475 self._p.send_signal(kill_signal) 476 self._p.wait() 477 self._p = None 478 479 if self._qmp: 480 self._qmp.close() 481 482 if self._qmpsock is not None: 483 try: 484 os.remove(self._qmpsock) 485 except OSError: 486 pass 487 try: 488 os.remove(self.pidfile) 489 except OSError: 490 pass 491 492 def __del__(self): 493 if self._p is not None: 494 self.stop(kill_signal=9) 495 496 497def qemu_nbd(*args): 498 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 499 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 500 501def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 502 '''Run qemu-nbd in daemon mode and return both the parent's exit code 503 and its output in case of an error''' 504 full_args = qemu_nbd_args + ['--fork'] + list(args) 505 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 506 connect_stderr=False) 507 return returncode, output if returncode else '' 508 509def qemu_nbd_list_log(*args: str) -> str: 510 '''Run qemu-nbd to list remote exports''' 511 full_args = [qemu_nbd_prog, '-L'] + list(args) 512 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 513 log(output, filters=[filter_testfiles, filter_nbd_exports]) 514 return output 515 516@contextmanager 517def qemu_nbd_popen(*args): 518 '''Context manager running qemu-nbd within the context''' 519 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 520 521 assert not os.path.exists(pid_file) 522 523 cmd = list(qemu_nbd_args) 524 cmd.extend(('--persistent', '--pid-file', pid_file)) 525 cmd.extend(args) 526 527 log('Start NBD server') 528 with subprocess.Popen(cmd) as p: 529 try: 530 while not os.path.exists(pid_file): 531 if p.poll() is not None: 532 raise RuntimeError( 533 "qemu-nbd terminated with exit code {}: {}" 534 .format(p.returncode, ' '.join(cmd))) 535 536 time.sleep(0.01) 537 yield 538 finally: 539 if os.path.exists(pid_file): 540 os.remove(pid_file) 541 log('Kill NBD server') 542 p.kill() 543 p.wait() 544 545def compare_images(img1: str, img2: str, 546 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 547 """ 548 Compare two images with QEMU_IMG; return True if they are identical. 549 550 :raise CalledProcessError: 551 when qemu-img crashes or returns a status code of anything other 552 than 0 (identical) or 1 (different). 553 """ 554 try: 555 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 556 return True 557 except subprocess.CalledProcessError as exc: 558 if exc.returncode == 1: 559 return False 560 raise 561 562def create_image(name, size): 563 '''Create a fully-allocated raw image with sector markers''' 564 with open(name, 'wb') as file: 565 i = 0 566 while i < size: 567 sector = struct.pack('>l504xl', i // 512, i // 512) 568 file.write(sector) 569 i = i + 512 570 571def image_size(img: str) -> int: 572 """Return image's virtual size""" 573 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 574 if not isinstance(value, int): 575 type_name = type(value).__name__ 576 raise TypeError("Expected 'int' for 'virtual-size', " 577 f"got '{value}' of type '{type_name}'") 578 return value 579 580def is_str(val): 581 return isinstance(val, str) 582 583test_dir_re = re.compile(r"%s" % test_dir) 584def filter_test_dir(msg): 585 return test_dir_re.sub("TEST_DIR", msg) 586 587win32_re = re.compile(r"\r") 588def filter_win32(msg): 589 return win32_re.sub("", msg) 590 591qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 592 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 593 r"and [0-9\/.inf]* ops\/sec\)") 594def filter_qemu_io(msg): 595 msg = filter_win32(msg) 596 return qemu_io_re.sub("X ops; XX:XX:XX.X " 597 "(XXX YYY/sec and XXX ops/sec)", msg) 598 599chown_re = re.compile(r"chown [0-9]+:[0-9]+") 600def filter_chown(msg): 601 return chown_re.sub("chown UID:GID", msg) 602 603def filter_qmp_event(event): 604 '''Filter a QMP event dict''' 605 event = dict(event) 606 if 'timestamp' in event: 607 event['timestamp']['seconds'] = 'SECS' 608 event['timestamp']['microseconds'] = 'USECS' 609 return event 610 611def filter_qmp(qmsg, filter_fn): 612 '''Given a string filter, filter a QMP object's values. 613 filter_fn takes a (key, value) pair.''' 614 # Iterate through either lists or dicts; 615 if isinstance(qmsg, list): 616 items = enumerate(qmsg) 617 elif isinstance(qmsg, dict): 618 items = qmsg.items() 619 else: 620 return filter_fn(None, qmsg) 621 622 for k, v in items: 623 if isinstance(v, (dict, list)): 624 qmsg[k] = filter_qmp(v, filter_fn) 625 else: 626 qmsg[k] = filter_fn(k, v) 627 return qmsg 628 629def filter_testfiles(msg): 630 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 631 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 632 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 633 634def filter_qmp_testfiles(qmsg): 635 def _filter(_key, value): 636 if is_str(value): 637 return filter_testfiles(value) 638 return value 639 return filter_qmp(qmsg, _filter) 640 641def filter_virtio_scsi(output: str) -> str: 642 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 643 644def filter_qmp_virtio_scsi(qmsg): 645 def _filter(_key, value): 646 if is_str(value): 647 return filter_virtio_scsi(value) 648 return value 649 return filter_qmp(qmsg, _filter) 650 651def filter_generated_node_ids(msg): 652 return re.sub("#block[0-9]+", "NODE_NAME", msg) 653 654def filter_img_info(output: str, filename: str, 655 drop_child_info: bool = True) -> str: 656 lines = [] 657 drop_indented = False 658 for line in output.split('\n'): 659 if 'disk size' in line or 'actual-size' in line: 660 continue 661 662 # Drop child node info 663 if drop_indented: 664 if line.startswith(' '): 665 continue 666 drop_indented = False 667 if drop_child_info and "Child node '/" in line: 668 drop_indented = True 669 continue 670 671 line = line.replace(filename, 'TEST_IMG') 672 line = filter_testfiles(line) 673 line = line.replace(imgfmt, 'IMGFMT') 674 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 675 line = re.sub('uuid: [-a-f0-9]+', 676 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 677 line) 678 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 679 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 680 line) 681 lines.append(line) 682 return '\n'.join(lines) 683 684def filter_imgfmt(msg): 685 return msg.replace(imgfmt, 'IMGFMT') 686 687def filter_qmp_imgfmt(qmsg): 688 def _filter(_key, value): 689 if is_str(value): 690 return filter_imgfmt(value) 691 return value 692 return filter_qmp(qmsg, _filter) 693 694def filter_nbd_exports(output: str) -> str: 695 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 696 697 698Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 699 700def log(msg: Msg, 701 filters: Iterable[Callable[[Msg], Msg]] = (), 702 indent: Optional[int] = None) -> None: 703 """ 704 Logs either a string message or a JSON serializable message (like QMP). 705 If indent is provided, JSON serializable messages are pretty-printed. 706 """ 707 for flt in filters: 708 msg = flt(msg) 709 if isinstance(msg, (dict, list)): 710 # Don't sort if it's already sorted 711 do_sort = not isinstance(msg, OrderedDict) 712 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 713 else: 714 test_logger.info(msg) 715 716class Timeout: 717 def __init__(self, seconds, errmsg="Timeout"): 718 self.seconds = seconds 719 self.errmsg = errmsg 720 def __enter__(self): 721 if qemu_gdb or qemu_valgrind: 722 return self 723 signal.signal(signal.SIGALRM, self.timeout) 724 signal.setitimer(signal.ITIMER_REAL, self.seconds) 725 return self 726 def __exit__(self, exc_type, value, traceback): 727 if qemu_gdb or qemu_valgrind: 728 return False 729 signal.setitimer(signal.ITIMER_REAL, 0) 730 return False 731 def timeout(self, signum, frame): 732 raise TimeoutError(self.errmsg) 733 734def file_pattern(name): 735 return "{0}-{1}".format(os.getpid(), name) 736 737class FilePath: 738 """ 739 Context manager generating multiple file names. The generated files are 740 removed when exiting the context. 741 742 Example usage: 743 744 with FilePath('a.img', 'b.img') as (img_a, img_b): 745 # Use img_a and img_b here... 746 747 # a.img and b.img are automatically removed here. 748 749 By default images are created in iotests.test_dir. To create sockets use 750 iotests.sock_dir: 751 752 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 753 754 For convenience, calling with one argument yields a single file instead of 755 a tuple with one item. 756 757 """ 758 def __init__(self, *names, base_dir=test_dir): 759 self.paths = [os.path.join(base_dir, file_pattern(name)) 760 for name in names] 761 762 def __enter__(self): 763 if len(self.paths) == 1: 764 return self.paths[0] 765 else: 766 return self.paths 767 768 def __exit__(self, exc_type, exc_val, exc_tb): 769 for path in self.paths: 770 try: 771 os.remove(path) 772 except OSError: 773 pass 774 return False 775 776 777def try_remove(img): 778 try: 779 os.remove(img) 780 except OSError: 781 pass 782 783def file_path_remover(): 784 for path in reversed(file_path_remover.paths): 785 try_remove(path) 786 787 788def file_path(*names, base_dir=test_dir): 789 ''' Another way to get auto-generated filename that cleans itself up. 790 791 Use is as simple as: 792 793 img_a, img_b = file_path('a.img', 'b.img') 794 sock = file_path('socket') 795 ''' 796 797 if not hasattr(file_path_remover, 'paths'): 798 file_path_remover.paths = [] 799 atexit.register(file_path_remover) 800 801 paths = [] 802 for name in names: 803 filename = file_pattern(name) 804 path = os.path.join(base_dir, filename) 805 file_path_remover.paths.append(path) 806 paths.append(path) 807 808 return paths[0] if len(paths) == 1 else paths 809 810def remote_filename(path): 811 if imgproto == 'file': 812 return path 813 elif imgproto == 'ssh': 814 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 815 else: 816 raise ValueError("Protocol %s not supported" % (imgproto)) 817 818class VM(qtest.QEMUQtestMachine): 819 '''A QEMU VM''' 820 821 def __init__(self, path_suffix=''): 822 name = "qemu%s-%d" % (path_suffix, os.getpid()) 823 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 824 if qemu_gdb and qemu_valgrind: 825 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 826 sys.exit(1) 827 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 828 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 829 name=name, 830 base_temp_dir=test_dir, 831 qmp_timer=timer) 832 self._num_drives = 0 833 834 def _post_shutdown(self) -> None: 835 super()._post_shutdown() 836 if not qemu_valgrind or not self._popen: 837 return 838 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 839 if self.exitcode() == 99: 840 with open(valgrind_filename, encoding='utf-8') as f: 841 print(f.read()) 842 else: 843 os.remove(valgrind_filename) 844 845 def _pre_launch(self) -> None: 846 super()._pre_launch() 847 if qemu_print: 848 # set QEMU binary output to stdout 849 self._close_qemu_log_file() 850 851 def add_object(self, opts): 852 self._args.append('-object') 853 self._args.append(opts) 854 return self 855 856 def add_device(self, opts): 857 self._args.append('-device') 858 self._args.append(opts) 859 return self 860 861 def add_drive_raw(self, opts): 862 self._args.append('-drive') 863 self._args.append(opts) 864 return self 865 866 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 867 '''Add a virtio-blk drive to the VM''' 868 options = ['if=%s' % interface, 869 'id=drive%d' % self._num_drives] 870 871 if path is not None: 872 options.append('file=%s' % path) 873 options.append('format=%s' % img_format) 874 options.append('cache=%s' % cachemode) 875 options.append('aio=%s' % aiomode) 876 877 if opts: 878 options.append(opts) 879 880 if img_format == 'luks' and 'key-secret' not in opts: 881 # default luks support 882 if luks_default_secret_object not in self._args: 883 self.add_object(luks_default_secret_object) 884 885 options.append(luks_default_key_secret_opt) 886 887 self._args.append('-drive') 888 self._args.append(','.join(options)) 889 self._num_drives += 1 890 return self 891 892 def add_blockdev(self, opts): 893 self._args.append('-blockdev') 894 if isinstance(opts, str): 895 self._args.append(opts) 896 else: 897 self._args.append(','.join(opts)) 898 return self 899 900 def add_incoming(self, addr): 901 self._args.append('-incoming') 902 self._args.append(addr) 903 return self 904 905 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 906 cmd = 'human-monitor-command' 907 kwargs: Dict[str, Any] = {'command-line': command_line} 908 if use_log: 909 return self.qmp_log(cmd, **kwargs) 910 else: 911 return self.qmp(cmd, **kwargs) 912 913 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 914 """Pause drive r/w operations""" 915 if not event: 916 self.pause_drive(drive, "read_aio") 917 self.pause_drive(drive, "write_aio") 918 return 919 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 920 921 def resume_drive(self, drive: str) -> None: 922 """Resume drive r/w operations""" 923 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 924 925 def hmp_qemu_io(self, drive: str, cmd: str, 926 use_log: bool = False, qdev: bool = False) -> QMPMessage: 927 """Write to a given drive using an HMP command""" 928 d = '-d ' if qdev else '' 929 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 930 931 def flatten_qmp_object(self, obj, output=None, basestr=''): 932 if output is None: 933 output = {} 934 if isinstance(obj, list): 935 for i, item in enumerate(obj): 936 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 937 elif isinstance(obj, dict): 938 for key in obj: 939 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 940 else: 941 output[basestr[:-1]] = obj # Strip trailing '.' 942 return output 943 944 def qmp_to_opts(self, obj): 945 obj = self.flatten_qmp_object(obj) 946 output_list = [] 947 for key in obj: 948 output_list += [key + '=' + obj[key]] 949 return ','.join(output_list) 950 951 def get_qmp_events_filtered(self, wait=60.0): 952 result = [] 953 for ev in self.get_qmp_events(wait=wait): 954 result.append(filter_qmp_event(ev)) 955 return result 956 957 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 958 full_cmd = OrderedDict(( 959 ("execute", cmd), 960 ("arguments", ordered_qmp(kwargs)) 961 )) 962 log(full_cmd, filters, indent=indent) 963 result = self.qmp(cmd, **kwargs) 964 log(result, filters, indent=indent) 965 return result 966 967 # Returns None on success, and an error string on failure 968 def run_job(self, job: str, auto_finalize: bool = True, 969 auto_dismiss: bool = False, 970 pre_finalize: Optional[Callable[[], None]] = None, 971 cancel: bool = False, wait: float = 60.0, 972 filters: Iterable[Callable[[Any], Any]] = (), 973 ) -> Optional[str]: 974 """ 975 run_job moves a job from creation through to dismissal. 976 977 :param job: String. ID of recently-launched job 978 :param auto_finalize: Bool. True if the job was launched with 979 auto_finalize. Defaults to True. 980 :param auto_dismiss: Bool. True if the job was launched with 981 auto_dismiss=True. Defaults to False. 982 :param pre_finalize: Callback. A callable that takes no arguments to be 983 invoked prior to issuing job-finalize, if any. 984 :param cancel: Bool. When true, cancels the job after the pre_finalize 985 callback. 986 :param wait: Float. Timeout value specifying how long to wait for any 987 event, in seconds. Defaults to 60.0. 988 """ 989 match_device = {'data': {'device': job}} 990 match_id = {'data': {'id': job}} 991 events = [ 992 ('BLOCK_JOB_COMPLETED', match_device), 993 ('BLOCK_JOB_CANCELLED', match_device), 994 ('BLOCK_JOB_ERROR', match_device), 995 ('BLOCK_JOB_READY', match_device), 996 ('BLOCK_JOB_PENDING', match_id), 997 ('JOB_STATUS_CHANGE', match_id) 998 ] 999 error = None 1000 while True: 1001 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 1002 if ev['event'] != 'JOB_STATUS_CHANGE': 1003 log(ev, filters=filters) 1004 continue 1005 status = ev['data']['status'] 1006 if status == 'aborting': 1007 result = self.qmp('query-jobs') 1008 for j in result['return']: 1009 if j['id'] == job: 1010 error = j['error'] 1011 log('Job failed: %s' % (j['error']), filters=filters) 1012 elif status == 'ready': 1013 self.qmp_log('job-complete', id=job, filters=filters) 1014 elif status == 'pending' and not auto_finalize: 1015 if pre_finalize: 1016 pre_finalize() 1017 if cancel: 1018 self.qmp_log('job-cancel', id=job, filters=filters) 1019 else: 1020 self.qmp_log('job-finalize', id=job, filters=filters) 1021 elif status == 'concluded' and not auto_dismiss: 1022 self.qmp_log('job-dismiss', id=job, filters=filters) 1023 elif status == 'null': 1024 return error 1025 1026 # Returns None on success, and an error string on failure 1027 def blockdev_create(self, options, job_id='job0', filters=None): 1028 if filters is None: 1029 filters = [filter_qmp_testfiles] 1030 result = self.qmp_log('blockdev-create', filters=filters, 1031 job_id=job_id, options=options) 1032 1033 if 'return' in result: 1034 assert result['return'] == {} 1035 job_result = self.run_job(job_id, filters=filters) 1036 else: 1037 job_result = result['error'] 1038 1039 log("") 1040 return job_result 1041 1042 def enable_migration_events(self, name): 1043 log('Enabling migration QMP events on %s...' % name) 1044 log(self.qmp('migrate-set-capabilities', capabilities=[ 1045 { 1046 'capability': 'events', 1047 'state': True 1048 } 1049 ])) 1050 1051 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1052 while True: 1053 event = self.event_wait('MIGRATION') 1054 # We use the default timeout, and with a timeout, event_wait() 1055 # never returns None 1056 assert event 1057 1058 log(event, filters=[filter_qmp_event]) 1059 if event['data']['status'] in ('completed', 'failed'): 1060 break 1061 1062 if event['data']['status'] == 'completed': 1063 # The event may occur in finish-migrate, so wait for the expected 1064 # post-migration runstate 1065 runstate = None 1066 while runstate != expect_runstate: 1067 runstate = self.qmp('query-status')['return']['status'] 1068 return True 1069 else: 1070 return False 1071 1072 def node_info(self, node_name): 1073 nodes = self.qmp('query-named-block-nodes') 1074 for x in nodes['return']: 1075 if x['node-name'] == node_name: 1076 return x 1077 return None 1078 1079 def query_bitmaps(self): 1080 res = self.qmp("query-named-block-nodes") 1081 return {device['node-name']: device['dirty-bitmaps'] 1082 for device in res['return'] if 'dirty-bitmaps' in device} 1083 1084 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1085 """ 1086 get a specific bitmap from the object returned by query_bitmaps. 1087 :param recording: If specified, filter results by the specified value. 1088 :param bitmaps: If specified, use it instead of call query_bitmaps() 1089 """ 1090 if bitmaps is None: 1091 bitmaps = self.query_bitmaps() 1092 1093 for bitmap in bitmaps[node_name]: 1094 if bitmap.get('name', '') == bitmap_name: 1095 if recording is None or bitmap.get('recording') == recording: 1096 return bitmap 1097 return None 1098 1099 def check_bitmap_status(self, node_name, bitmap_name, fields): 1100 ret = self.get_bitmap(node_name, bitmap_name) 1101 1102 return fields.items() <= ret.items() 1103 1104 def assert_block_path(self, root, path, expected_node, graph=None): 1105 """ 1106 Check whether the node under the given path in the block graph 1107 is @expected_node. 1108 1109 @root is the node name of the node where the @path is rooted. 1110 1111 @path is a string that consists of child names separated by 1112 slashes. It must begin with a slash. 1113 1114 Examples for @root + @path: 1115 - root="qcow2-node", path="/backing/file" 1116 - root="quorum-node", path="/children.2/file" 1117 1118 Hypothetically, @path could be empty, in which case it would 1119 point to @root. However, in practice this case is not useful 1120 and hence not allowed. 1121 1122 @expected_node may be None. (All elements of the path but the 1123 leaf must still exist.) 1124 1125 @graph may be None or the result of an x-debug-query-block-graph 1126 call that has already been performed. 1127 """ 1128 if graph is None: 1129 graph = self.qmp('x-debug-query-block-graph')['return'] 1130 1131 iter_path = iter(path.split('/')) 1132 1133 # Must start with a / 1134 assert next(iter_path) == '' 1135 1136 node = next((node for node in graph['nodes'] if node['name'] == root), 1137 None) 1138 1139 # An empty @path is not allowed, so the root node must be present 1140 assert node is not None, 'Root node %s not found' % root 1141 1142 for child_name in iter_path: 1143 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1144 1145 try: 1146 node_id = next(edge['child'] for edge in graph['edges'] 1147 if (edge['parent'] == node['id'] and 1148 edge['name'] == child_name)) 1149 1150 node = next(node for node in graph['nodes'] 1151 if node['id'] == node_id) 1152 1153 except StopIteration: 1154 node = None 1155 1156 if node is None: 1157 assert expected_node is None, \ 1158 'No node found under %s (but expected %s)' % \ 1159 (path, expected_node) 1160 else: 1161 assert node['name'] == expected_node, \ 1162 'Found node %s under %s (but expected %s)' % \ 1163 (node['name'], path, expected_node) 1164 1165index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1166 1167class QMPTestCase(unittest.TestCase): 1168 '''Abstract base class for QMP test cases''' 1169 1170 def __init__(self, *args, **kwargs): 1171 super().__init__(*args, **kwargs) 1172 # Many users of this class set a VM property we rely on heavily 1173 # in the methods below. 1174 self.vm = None 1175 1176 def dictpath(self, d, path): 1177 '''Traverse a path in a nested dict''' 1178 for component in path.split('/'): 1179 m = index_re.match(component) 1180 if m: 1181 component, idx = m.groups() 1182 idx = int(idx) 1183 1184 if not isinstance(d, dict) or component not in d: 1185 self.fail(f'failed path traversal for "{path}" in "{d}"') 1186 d = d[component] 1187 1188 if m: 1189 if not isinstance(d, list): 1190 self.fail(f'path component "{component}" in "{path}" ' 1191 f'is not a list in "{d}"') 1192 try: 1193 d = d[idx] 1194 except IndexError: 1195 self.fail(f'invalid index "{idx}" in path "{path}" ' 1196 f'in "{d}"') 1197 return d 1198 1199 def assert_qmp_absent(self, d, path): 1200 try: 1201 result = self.dictpath(d, path) 1202 except AssertionError: 1203 return 1204 self.fail('path "%s" has value "%s"' % (path, str(result))) 1205 1206 def assert_qmp(self, d, path, value): 1207 '''Assert that the value for a specific path in a QMP dict 1208 matches. When given a list of values, assert that any of 1209 them matches.''' 1210 1211 result = self.dictpath(d, path) 1212 1213 # [] makes no sense as a list of valid values, so treat it as 1214 # an actual single value. 1215 if isinstance(value, list) and value != []: 1216 for v in value: 1217 if result == v: 1218 return 1219 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1220 else: 1221 self.assertEqual(result, value, 1222 '"%s" is "%s", expected "%s"' 1223 % (path, str(result), str(value))) 1224 1225 def assert_no_active_block_jobs(self): 1226 result = self.vm.qmp('query-block-jobs') 1227 self.assert_qmp(result, 'return', []) 1228 1229 def assert_has_block_node(self, node_name=None, file_name=None): 1230 """Issue a query-named-block-nodes and assert node_name and/or 1231 file_name is present in the result""" 1232 def check_equal_or_none(a, b): 1233 return a is None or b is None or a == b 1234 assert node_name or file_name 1235 result = self.vm.qmp('query-named-block-nodes') 1236 for x in result["return"]: 1237 if check_equal_or_none(x.get("node-name"), node_name) and \ 1238 check_equal_or_none(x.get("file"), file_name): 1239 return 1240 self.fail("Cannot find %s %s in result:\n%s" % 1241 (node_name, file_name, result)) 1242 1243 def assert_json_filename_equal(self, json_filename, reference): 1244 '''Asserts that the given filename is a json: filename and that its 1245 content is equal to the given reference object''' 1246 self.assertEqual(json_filename[:5], 'json:') 1247 self.assertEqual( 1248 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1249 self.vm.flatten_qmp_object(reference) 1250 ) 1251 1252 def cancel_and_wait(self, drive='drive0', force=False, 1253 resume=False, wait=60.0): 1254 '''Cancel a block job and wait for it to finish, returning the event''' 1255 self.vm.cmd('block-job-cancel', device=drive, force=force) 1256 1257 if resume: 1258 self.vm.resume_drive(drive) 1259 1260 cancelled = False 1261 result = None 1262 while not cancelled: 1263 for event in self.vm.get_qmp_events(wait=wait): 1264 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1265 event['event'] == 'BLOCK_JOB_CANCELLED': 1266 self.assert_qmp(event, 'data/device', drive) 1267 result = event 1268 cancelled = True 1269 elif event['event'] == 'JOB_STATUS_CHANGE': 1270 self.assert_qmp(event, 'data/id', drive) 1271 1272 1273 self.assert_no_active_block_jobs() 1274 return result 1275 1276 def wait_until_completed(self, drive='drive0', check_offset=True, 1277 wait=60.0, error=None): 1278 '''Wait for a block job to finish, returning the event''' 1279 while True: 1280 for event in self.vm.get_qmp_events(wait=wait): 1281 if event['event'] == 'BLOCK_JOB_COMPLETED': 1282 self.assert_qmp(event, 'data/device', drive) 1283 if error is None: 1284 self.assert_qmp_absent(event, 'data/error') 1285 if check_offset: 1286 self.assert_qmp(event, 'data/offset', 1287 event['data']['len']) 1288 else: 1289 self.assert_qmp(event, 'data/error', error) 1290 self.assert_no_active_block_jobs() 1291 return event 1292 if event['event'] == 'JOB_STATUS_CHANGE': 1293 self.assert_qmp(event, 'data/id', drive) 1294 1295 def wait_ready(self, drive='drive0'): 1296 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1297 return self.vm.events_wait([ 1298 ('BLOCK_JOB_READY', 1299 {'data': {'type': 'mirror', 'device': drive}}), 1300 ('BLOCK_JOB_READY', 1301 {'data': {'type': 'commit', 'device': drive}}) 1302 ]) 1303 1304 def wait_ready_and_cancel(self, drive='drive0'): 1305 self.wait_ready(drive=drive) 1306 event = self.cancel_and_wait(drive=drive) 1307 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1308 self.assert_qmp(event, 'data/type', 'mirror') 1309 self.assert_qmp(event, 'data/offset', event['data']['len']) 1310 1311 def complete_and_wait(self, drive='drive0', wait_ready=True, 1312 completion_error=None): 1313 '''Complete a block job and wait for it to finish''' 1314 if wait_ready: 1315 self.wait_ready(drive=drive) 1316 1317 self.vm.cmd('block-job-complete', device=drive) 1318 1319 event = self.wait_until_completed(drive=drive, error=completion_error) 1320 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1321 1322 def pause_wait(self, job_id='job0'): 1323 with Timeout(3, "Timeout waiting for job to pause"): 1324 while True: 1325 result = self.vm.qmp('query-block-jobs') 1326 found = False 1327 for job in result['return']: 1328 if job['device'] == job_id: 1329 found = True 1330 if job['paused'] and not job['busy']: 1331 return job 1332 break 1333 assert found 1334 1335 def pause_job(self, job_id='job0', wait=True): 1336 self.vm.cmd('block-job-pause', device=job_id) 1337 if wait: 1338 self.pause_wait(job_id) 1339 1340 def case_skip(self, reason): 1341 '''Skip this test case''' 1342 case_notrun(reason) 1343 self.skipTest(reason) 1344 1345 1346def notrun(reason): 1347 '''Skip this test suite''' 1348 # Each test in qemu-iotests has a number ("seq") 1349 seq = os.path.basename(sys.argv[0]) 1350 1351 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1352 as outfile: 1353 outfile.write(reason + '\n') 1354 logger.warning("%s not run: %s", seq, reason) 1355 sys.exit(0) 1356 1357def case_notrun(reason): 1358 '''Mark this test case as not having been run (without actually 1359 skipping it, that is left to the caller). See 1360 QMPTestCase.case_skip() for a variant that actually skips the 1361 current test case.''' 1362 1363 # Each test in qemu-iotests has a number ("seq") 1364 seq = os.path.basename(sys.argv[0]) 1365 1366 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1367 as outfile: 1368 outfile.write(' [case not run] ' + reason + '\n') 1369 1370def _verify_image_format(supported_fmts: Sequence[str] = (), 1371 unsupported_fmts: Sequence[str] = ()) -> None: 1372 if 'generic' in supported_fmts and \ 1373 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1374 # similar to 1375 # _supported_fmt generic 1376 # for bash tests 1377 supported_fmts = () 1378 1379 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1380 if not_sup or (imgfmt in unsupported_fmts): 1381 notrun('not suitable for this image format: %s' % imgfmt) 1382 1383 if imgfmt == 'luks': 1384 verify_working_luks() 1385 1386def _verify_protocol(supported: Sequence[str] = (), 1387 unsupported: Sequence[str] = ()) -> None: 1388 assert not (supported and unsupported) 1389 1390 if 'generic' in supported: 1391 return 1392 1393 not_sup = supported and (imgproto not in supported) 1394 if not_sup or (imgproto in unsupported): 1395 notrun('not suitable for this protocol: %s' % imgproto) 1396 1397def _verify_platform(supported: Sequence[str] = (), 1398 unsupported: Sequence[str] = ()) -> None: 1399 if any((sys.platform.startswith(x) for x in unsupported)): 1400 notrun('not suitable for this OS: %s' % sys.platform) 1401 1402 if supported: 1403 if not any((sys.platform.startswith(x) for x in supported)): 1404 notrun('not suitable for this OS: %s' % sys.platform) 1405 1406def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1407 if supported_cache_modes and (cachemode not in supported_cache_modes): 1408 notrun('not suitable for this cache mode: %s' % cachemode) 1409 1410def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1411 if supported_aio_modes and (aiomode not in supported_aio_modes): 1412 notrun('not suitable for this aio mode: %s' % aiomode) 1413 1414def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1415 usf_list = list(set(required_formats) - set(supported_formats())) 1416 if usf_list: 1417 notrun(f'formats {usf_list} are not whitelisted') 1418 1419 1420def _verify_virtio_blk() -> None: 1421 out = qemu_pipe('-M', 'none', '-device', 'help') 1422 if 'virtio-blk' not in out: 1423 notrun('Missing virtio-blk in QEMU binary') 1424 1425def verify_virtio_scsi_pci_or_ccw() -> None: 1426 out = qemu_pipe('-M', 'none', '-device', 'help') 1427 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1428 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1429 1430 1431def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1432 imgopts = os.environ.get('IMGOPTS') 1433 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1434 # but it supported only for bash tests. We don't have a concept of global 1435 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1436 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1437 unsup = list(unsupported) + ['$'] 1438 if imgopts and any(x in imgopts for x in unsup): 1439 notrun(f'not suitable for this imgopts: {imgopts}') 1440 1441 1442def supports_quorum() -> bool: 1443 return 'quorum' in qemu_img('--help').stdout 1444 1445def verify_quorum(): 1446 '''Skip test suite if quorum support is not available''' 1447 if not supports_quorum(): 1448 notrun('quorum support missing') 1449 1450def has_working_luks() -> Tuple[bool, str]: 1451 """ 1452 Check whether our LUKS driver can actually create images 1453 (this extends to LUKS encryption for qcow2). 1454 1455 If not, return the reason why. 1456 """ 1457 1458 img_file = f'{test_dir}/luks-test.luks' 1459 res = qemu_img('create', '-f', 'luks', 1460 '--object', luks_default_secret_object, 1461 '-o', luks_default_key_secret_opt, 1462 '-o', 'iter-time=10', 1463 img_file, '1G', 1464 check=False) 1465 try: 1466 os.remove(img_file) 1467 except OSError: 1468 pass 1469 1470 if res.returncode: 1471 reason = res.stdout 1472 for line in res.stdout.splitlines(): 1473 if img_file + ':' in line: 1474 reason = line.split(img_file + ':', 1)[1].strip() 1475 break 1476 1477 return (False, reason) 1478 else: 1479 return (True, '') 1480 1481def verify_working_luks(): 1482 """ 1483 Skip test suite if LUKS does not work 1484 """ 1485 (working, reason) = has_working_luks() 1486 if not working: 1487 notrun(reason) 1488 1489def supports_qcow2_zstd_compression() -> bool: 1490 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1491 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1492 img_file, '0', 1493 check=False) 1494 try: 1495 os.remove(img_file) 1496 except OSError: 1497 pass 1498 1499 if res.returncode == 1 and \ 1500 "'compression-type' does not accept value 'zstd'" in res.stdout: 1501 return False 1502 else: 1503 return True 1504 1505def verify_qcow2_zstd_compression(): 1506 if not supports_qcow2_zstd_compression(): 1507 notrun('zstd compression not supported') 1508 1509def qemu_pipe(*args: str) -> str: 1510 """ 1511 Run qemu with an option to print something and exit (e.g. a help option). 1512 1513 :return: QEMU's stdout output. 1514 """ 1515 full_args = [qemu_prog] + qemu_opts + list(args) 1516 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1517 return output 1518 1519def supported_formats(read_only=False): 1520 '''Set 'read_only' to True to check ro-whitelist 1521 Otherwise, rw-whitelist is checked''' 1522 1523 if not hasattr(supported_formats, "formats"): 1524 supported_formats.formats = {} 1525 1526 if read_only not in supported_formats.formats: 1527 format_message = qemu_pipe("-drive", "format=help") 1528 line = 1 if read_only else 0 1529 supported_formats.formats[read_only] = \ 1530 format_message.splitlines()[line].split(":")[1].split() 1531 1532 return supported_formats.formats[read_only] 1533 1534def skip_if_unsupported(required_formats=(), read_only=False): 1535 '''Skip Test Decorator 1536 Runs the test if all the required formats are whitelisted''' 1537 def skip_test_decorator(func): 1538 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1539 **kwargs: Dict[str, Any]) -> None: 1540 if callable(required_formats): 1541 fmts = required_formats(test_case) 1542 else: 1543 fmts = required_formats 1544 1545 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1546 if usf_list: 1547 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1548 test_case.case_skip(msg) 1549 else: 1550 func(test_case, *args, **kwargs) 1551 return func_wrapper 1552 return skip_test_decorator 1553 1554def skip_for_formats(formats: Sequence[str] = ()) \ 1555 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1556 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1557 '''Skip Test Decorator 1558 Skips the test for the given formats''' 1559 def skip_test_decorator(func): 1560 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1561 **kwargs: Dict[str, Any]) -> None: 1562 if imgfmt in formats: 1563 msg = f'{test_case}: Skipped for format {imgfmt}' 1564 test_case.case_skip(msg) 1565 else: 1566 func(test_case, *args, **kwargs) 1567 return func_wrapper 1568 return skip_test_decorator 1569 1570def skip_if_user_is_root(func): 1571 '''Skip Test Decorator 1572 Runs the test only without root permissions''' 1573 def func_wrapper(*args, **kwargs): 1574 if os.getuid() == 0: 1575 case_notrun('{}: cannot be run as root'.format(args[0])) 1576 return None 1577 else: 1578 return func(*args, **kwargs) 1579 return func_wrapper 1580 1581# We need to filter out the time taken from the output so that 1582# qemu-iotest can reliably diff the results against master output, 1583# and hide skipped tests from the reference output. 1584 1585class ReproducibleTestResult(unittest.TextTestResult): 1586 def addSkip(self, test, reason): 1587 # Same as TextTestResult, but print dot instead of "s" 1588 unittest.TestResult.addSkip(self, test, reason) 1589 if self.showAll: 1590 self.stream.writeln("skipped {0!r}".format(reason)) 1591 elif self.dots: 1592 self.stream.write(".") 1593 self.stream.flush() 1594 1595class ReproducibleStreamWrapper: 1596 def __init__(self, stream: TextIO): 1597 self.stream = stream 1598 1599 def __getattr__(self, attr): 1600 if attr in ('stream', '__getstate__'): 1601 raise AttributeError(attr) 1602 return getattr(self.stream, attr) 1603 1604 def write(self, arg=None): 1605 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1606 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1607 self.stream.write(arg) 1608 1609class ReproducibleTestRunner(unittest.TextTestRunner): 1610 def __init__(self, stream: Optional[TextIO] = None, 1611 resultclass: Type[unittest.TestResult] = 1612 ReproducibleTestResult, 1613 **kwargs: Any) -> None: 1614 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1615 super().__init__(stream=rstream, # type: ignore 1616 descriptions=True, 1617 resultclass=resultclass, 1618 **kwargs) 1619 1620def execute_unittest(argv: List[str], debug: bool = False) -> None: 1621 """Executes unittests within the calling module.""" 1622 1623 # Some tests have warnings, especially ResourceWarnings for unclosed 1624 # files and sockets. Ignore them for now to ensure reproducibility of 1625 # the test output. 1626 unittest.main(argv=argv, 1627 testRunner=ReproducibleTestRunner, 1628 verbosity=2 if debug else 1, 1629 warnings=None if sys.warnoptions else 'ignore') 1630 1631def execute_setup_common(supported_fmts: Sequence[str] = (), 1632 supported_platforms: Sequence[str] = (), 1633 supported_cache_modes: Sequence[str] = (), 1634 supported_aio_modes: Sequence[str] = (), 1635 unsupported_fmts: Sequence[str] = (), 1636 supported_protocols: Sequence[str] = (), 1637 unsupported_protocols: Sequence[str] = (), 1638 required_fmts: Sequence[str] = (), 1639 unsupported_imgopts: Sequence[str] = ()) -> bool: 1640 """ 1641 Perform necessary setup for either script-style or unittest-style tests. 1642 1643 :return: Bool; Whether or not debug mode has been requested via the CLI. 1644 """ 1645 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1646 1647 debug = '-d' in sys.argv 1648 if debug: 1649 sys.argv.remove('-d') 1650 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1651 1652 _verify_image_format(supported_fmts, unsupported_fmts) 1653 _verify_protocol(supported_protocols, unsupported_protocols) 1654 _verify_platform(supported=supported_platforms) 1655 _verify_cache_mode(supported_cache_modes) 1656 _verify_aio_mode(supported_aio_modes) 1657 _verify_formats(required_fmts) 1658 _verify_virtio_blk() 1659 _verify_imgopts(unsupported_imgopts) 1660 1661 return debug 1662 1663def execute_test(*args, test_function=None, **kwargs): 1664 """Run either unittest or script-style tests.""" 1665 1666 debug = execute_setup_common(*args, **kwargs) 1667 if not test_function: 1668 execute_unittest(sys.argv, debug) 1669 else: 1670 test_function() 1671 1672def activate_logging(): 1673 """Activate iotests.log() output to stdout for script-style tests.""" 1674 handler = logging.StreamHandler(stream=sys.stdout) 1675 formatter = logging.Formatter('%(message)s') 1676 handler.setFormatter(formatter) 1677 test_logger.addHandler(handler) 1678 test_logger.setLevel(logging.INFO) 1679 test_logger.propagate = False 1680 1681# This is called from script-style iotests without a single point of entry 1682def script_initialize(*args, **kwargs): 1683 """Initialize script-style tests without running any tests.""" 1684 activate_logging() 1685 execute_setup_common(*args, **kwargs) 1686 1687# This is called from script-style iotests with a single point of entry 1688def script_main(test_function, *args, **kwargs): 1689 """Run script-style tests outside of the unittest framework""" 1690 activate_logging() 1691 execute_test(*args, test_function=test_function, **kwargs) 1692 1693# This is called from unittest style iotests 1694def main(*args, **kwargs): 1695 """Run tests using the unittest framework""" 1696 execute_test(*args, **kwargs) 1697