1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209 210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True 211 ) -> 'subprocess.CompletedProcess[str]': 212 """ 213 Run a qemu tool and return its status code and console output. 214 215 :param args: full command line to run. 216 :param check: Enforce a return code of zero. 217 :param combine_stdio: set to False to keep stdout/stderr separated. 218 219 :raise VerboseProcessError: 220 When the return code is negative, or on any non-zero exit code 221 when 'check=True' was provided (the default). This exception has 222 'stdout', 'stderr', and 'returncode' properties that may be 223 inspected to show greater detail. If this exception is not 224 handled, the command-line, return code, and all console output 225 will be included at the bottom of the stack trace. 226 227 :return: 228 a CompletedProcess. This object has args, returncode, and stdout 229 properties. If streams are not combined, it will also have a 230 stderr property. 231 """ 232 subp = subprocess.run( 233 args, 234 stdout=subprocess.PIPE, 235 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 236 universal_newlines=True, 237 check=False 238 ) 239 240 if check and subp.returncode or (subp.returncode < 0): 241 raise VerboseProcessError( 242 subp.returncode, args, 243 output=subp.stdout, 244 stderr=subp.stderr, 245 ) 246 247 return subp 248 249 250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 251 ) -> 'subprocess.CompletedProcess[str]': 252 """ 253 Run QEMU_IMG_PROG and return its status code and console output. 254 255 This function always prepends QEMU_IMG_OPTIONS and may further alter 256 the args for 'create' commands. 257 258 See `qemu_tool()` for greater detail. 259 """ 260 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 261 return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio) 262 263 264def ordered_qmp(qmsg, conv_keys=True): 265 # Dictionaries are not ordered prior to 3.6, therefore: 266 if isinstance(qmsg, list): 267 return [ordered_qmp(atom) for atom in qmsg] 268 if isinstance(qmsg, dict): 269 od = OrderedDict() 270 for k, v in sorted(qmsg.items()): 271 if conv_keys: 272 k = k.replace('_', '-') 273 od[k] = ordered_qmp(v, conv_keys=False) 274 return od 275 return qmsg 276 277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 278 return qemu_img('create', *args) 279 280def qemu_img_json(*args: str) -> Any: 281 """ 282 Run qemu-img and return its output as deserialized JSON. 283 284 :raise CalledProcessError: 285 When qemu-img crashes, or returns a non-zero exit code without 286 producing a valid JSON document to stdout. 287 :raise JSONDecoderError: 288 When qemu-img returns 0, but failed to produce a valid JSON document. 289 290 :return: A deserialized JSON object; probably a dict[str, Any]. 291 """ 292 try: 293 res = qemu_img(*args, combine_stdio=False) 294 except subprocess.CalledProcessError as exc: 295 # Terminated due to signal. Don't bother. 296 if exc.returncode < 0: 297 raise 298 299 # Commands like 'check' can return failure (exit codes 2 and 3) 300 # to indicate command completion, but with errors found. For 301 # multi-command flexibility, ignore the exact error codes and 302 # *try* to load JSON. 303 try: 304 return json.loads(exc.stdout) 305 except json.JSONDecodeError: 306 # Nope. This thing is toast. Raise the /process/ error. 307 pass 308 raise 309 310 return json.loads(res.stdout) 311 312def qemu_img_measure(*args: str) -> Any: 313 return qemu_img_json("measure", "--output", "json", *args) 314 315def qemu_img_check(*args: str) -> Any: 316 return qemu_img_json("check", "--output", "json", *args) 317 318def qemu_img_info(*args: str) -> Any: 319 return qemu_img_json('info', "--output", "json", *args) 320 321def qemu_img_map(*args: str) -> Any: 322 return qemu_img_json('map', "--output", "json", *args) 323 324def qemu_img_log(*args: str, check: bool = True 325 ) -> 'subprocess.CompletedProcess[str]': 326 result = qemu_img(*args, check=check) 327 log(result.stdout, filters=[filter_testfiles]) 328 return result 329 330def img_info_log(filename: str, filter_path: Optional[str] = None, 331 use_image_opts: bool = False, extra_args: Sequence[str] = (), 332 check: bool = True, 333 ) -> None: 334 args = ['info'] 335 if use_image_opts: 336 args.append('--image-opts') 337 else: 338 args += ['-f', imgfmt] 339 args += extra_args 340 args.append(filename) 341 342 output = qemu_img(*args, check=check).stdout 343 if not filter_path: 344 filter_path = filename 345 log(filter_img_info(output, filter_path)) 346 347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 348 if '-f' in args or '--image-opts' in args: 349 return qemu_io_args_no_fmt + list(args) 350 else: 351 return qemu_io_args + list(args) 352 353def qemu_io_popen(*args): 354 return qemu_tool_popen(qemu_io_wrap_args(args)) 355 356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True 357 ) -> 'subprocess.CompletedProcess[str]': 358 """ 359 Run QEMU_IO_PROG and return the status code and console output. 360 361 This function always prepends either QEMU_IO_OPTIONS or 362 QEMU_IO_OPTIONS_NO_FMT. 363 """ 364 return qemu_tool(*qemu_io_wrap_args(args), 365 check=check, combine_stdio=combine_stdio) 366 367def qemu_io_log(*args: str, check: bool = True 368 ) -> 'subprocess.CompletedProcess[str]': 369 result = qemu_io(*args, check=check) 370 log(result.stdout, filters=[filter_testfiles, filter_qemu_io]) 371 return result 372 373class QemuIoInteractive: 374 def __init__(self, *args): 375 self.args = qemu_io_wrap_args(args) 376 # We need to keep the Popen objext around, and not 377 # close it immediately. Therefore, disable the pylint check: 378 # pylint: disable=consider-using-with 379 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 380 stdout=subprocess.PIPE, 381 stderr=subprocess.STDOUT, 382 universal_newlines=True) 383 out = self._p.stdout.read(9) 384 if out != 'qemu-io> ': 385 # Most probably qemu-io just failed to start. 386 # Let's collect the whole output and exit. 387 out += self._p.stdout.read() 388 self._p.wait(timeout=1) 389 raise ValueError(out) 390 391 def close(self): 392 self._p.communicate('q\n') 393 394 def _read_output(self): 395 pattern = 'qemu-io> ' 396 n = len(pattern) 397 pos = 0 398 s = [] 399 while pos != n: 400 c = self._p.stdout.read(1) 401 # check unexpected EOF 402 assert c != '' 403 s.append(c) 404 if c == pattern[pos]: 405 pos += 1 406 else: 407 pos = 0 408 409 return ''.join(s[:-n]) 410 411 def cmd(self, cmd): 412 # quit command is in close(), '\n' is added automatically 413 assert '\n' not in cmd 414 cmd = cmd.strip() 415 assert cmd not in ('q', 'quit') 416 self._p.stdin.write(cmd + '\n') 417 self._p.stdin.flush() 418 return self._read_output() 419 420 421class QemuStorageDaemon: 422 _qmp: Optional[QEMUMonitorProtocol] = None 423 _qmpsock: Optional[str] = None 424 # Python < 3.8 would complain if this type were not a string literal 425 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 426 _p: 'Optional[subprocess.Popen[bytes]]' = None 427 428 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 429 assert '--pidfile' not in args 430 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 431 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 432 433 if qmp: 434 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 435 all_args += ['--chardev', 436 f'socket,id=qmp-sock,path={self._qmpsock}', 437 '--monitor', 'qmp-sock'] 438 439 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 440 441 # Cannot use with here, we want the subprocess to stay around 442 # pylint: disable=consider-using-with 443 self._p = subprocess.Popen(all_args) 444 if self._qmp is not None: 445 self._qmp.accept() 446 while not os.path.exists(self.pidfile): 447 if self._p.poll() is not None: 448 cmd = ' '.join(all_args) 449 raise RuntimeError( 450 'qemu-storage-daemon terminated with exit code ' + 451 f'{self._p.returncode}: {cmd}') 452 453 time.sleep(0.01) 454 455 with open(self.pidfile, encoding='utf-8') as f: 456 self._pid = int(f.read().strip()) 457 458 assert self._pid == self._p.pid 459 460 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 461 -> QMPMessage: 462 assert self._qmp is not None 463 return self._qmp.cmd(cmd, args) 464 465 def stop(self, kill_signal=15): 466 self._p.send_signal(kill_signal) 467 self._p.wait() 468 self._p = None 469 470 if self._qmp: 471 self._qmp.close() 472 473 if self._qmpsock is not None: 474 try: 475 os.remove(self._qmpsock) 476 except OSError: 477 pass 478 try: 479 os.remove(self.pidfile) 480 except OSError: 481 pass 482 483 def __del__(self): 484 if self._p is not None: 485 self.stop(kill_signal=9) 486 487 488def qemu_nbd(*args): 489 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 490 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 491 492def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 493 '''Run qemu-nbd in daemon mode and return both the parent's exit code 494 and its output in case of an error''' 495 full_args = qemu_nbd_args + ['--fork'] + list(args) 496 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 497 connect_stderr=False) 498 return returncode, output if returncode else '' 499 500def qemu_nbd_list_log(*args: str) -> str: 501 '''Run qemu-nbd to list remote exports''' 502 full_args = [qemu_nbd_prog, '-L'] + list(args) 503 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 504 log(output, filters=[filter_testfiles, filter_nbd_exports]) 505 return output 506 507@contextmanager 508def qemu_nbd_popen(*args): 509 '''Context manager running qemu-nbd within the context''' 510 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 511 512 assert not os.path.exists(pid_file) 513 514 cmd = list(qemu_nbd_args) 515 cmd.extend(('--persistent', '--pid-file', pid_file)) 516 cmd.extend(args) 517 518 log('Start NBD server') 519 with subprocess.Popen(cmd) as p: 520 try: 521 while not os.path.exists(pid_file): 522 if p.poll() is not None: 523 raise RuntimeError( 524 "qemu-nbd terminated with exit code {}: {}" 525 .format(p.returncode, ' '.join(cmd))) 526 527 time.sleep(0.01) 528 yield 529 finally: 530 if os.path.exists(pid_file): 531 os.remove(pid_file) 532 log('Kill NBD server') 533 p.kill() 534 p.wait() 535 536def compare_images(img1: str, img2: str, 537 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 538 """ 539 Compare two images with QEMU_IMG; return True if they are identical. 540 541 :raise CalledProcessError: 542 when qemu-img crashes or returns a status code of anything other 543 than 0 (identical) or 1 (different). 544 """ 545 try: 546 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 547 return True 548 except subprocess.CalledProcessError as exc: 549 if exc.returncode == 1: 550 return False 551 raise 552 553def create_image(name, size): 554 '''Create a fully-allocated raw image with sector markers''' 555 with open(name, 'wb') as file: 556 i = 0 557 while i < size: 558 sector = struct.pack('>l504xl', i // 512, i // 512) 559 file.write(sector) 560 i = i + 512 561 562def image_size(img: str) -> int: 563 """Return image's virtual size""" 564 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 565 if not isinstance(value, int): 566 type_name = type(value).__name__ 567 raise TypeError("Expected 'int' for 'virtual-size', " 568 f"got '{value}' of type '{type_name}'") 569 return value 570 571def is_str(val): 572 return isinstance(val, str) 573 574test_dir_re = re.compile(r"%s" % test_dir) 575def filter_test_dir(msg): 576 return test_dir_re.sub("TEST_DIR", msg) 577 578win32_re = re.compile(r"\r") 579def filter_win32(msg): 580 return win32_re.sub("", msg) 581 582qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 583 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 584 r"and [0-9\/.inf]* ops\/sec\)") 585def filter_qemu_io(msg): 586 msg = filter_win32(msg) 587 return qemu_io_re.sub("X ops; XX:XX:XX.X " 588 "(XXX YYY/sec and XXX ops/sec)", msg) 589 590chown_re = re.compile(r"chown [0-9]+:[0-9]+") 591def filter_chown(msg): 592 return chown_re.sub("chown UID:GID", msg) 593 594def filter_qmp_event(event): 595 '''Filter a QMP event dict''' 596 event = dict(event) 597 if 'timestamp' in event: 598 event['timestamp']['seconds'] = 'SECS' 599 event['timestamp']['microseconds'] = 'USECS' 600 return event 601 602def filter_qmp(qmsg, filter_fn): 603 '''Given a string filter, filter a QMP object's values. 604 filter_fn takes a (key, value) pair.''' 605 # Iterate through either lists or dicts; 606 if isinstance(qmsg, list): 607 items = enumerate(qmsg) 608 elif isinstance(qmsg, dict): 609 items = qmsg.items() 610 else: 611 return filter_fn(None, qmsg) 612 613 for k, v in items: 614 if isinstance(v, (dict, list)): 615 qmsg[k] = filter_qmp(v, filter_fn) 616 else: 617 qmsg[k] = filter_fn(k, v) 618 return qmsg 619 620def filter_testfiles(msg): 621 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 622 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 623 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 624 625def filter_qmp_testfiles(qmsg): 626 def _filter(_key, value): 627 if is_str(value): 628 return filter_testfiles(value) 629 return value 630 return filter_qmp(qmsg, _filter) 631 632def filter_virtio_scsi(output: str) -> str: 633 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 634 635def filter_qmp_virtio_scsi(qmsg): 636 def _filter(_key, value): 637 if is_str(value): 638 return filter_virtio_scsi(value) 639 return value 640 return filter_qmp(qmsg, _filter) 641 642def filter_generated_node_ids(msg): 643 return re.sub("#block[0-9]+", "NODE_NAME", msg) 644 645def filter_img_info(output, filename): 646 lines = [] 647 for line in output.split('\n'): 648 if 'disk size' in line or 'actual-size' in line: 649 continue 650 line = line.replace(filename, 'TEST_IMG') 651 line = filter_testfiles(line) 652 line = line.replace(imgfmt, 'IMGFMT') 653 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 654 line = re.sub('uuid: [-a-f0-9]+', 655 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 656 line) 657 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 658 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 659 line) 660 lines.append(line) 661 return '\n'.join(lines) 662 663def filter_imgfmt(msg): 664 return msg.replace(imgfmt, 'IMGFMT') 665 666def filter_qmp_imgfmt(qmsg): 667 def _filter(_key, value): 668 if is_str(value): 669 return filter_imgfmt(value) 670 return value 671 return filter_qmp(qmsg, _filter) 672 673def filter_nbd_exports(output: str) -> str: 674 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 675 676 677Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 678 679def log(msg: Msg, 680 filters: Iterable[Callable[[Msg], Msg]] = (), 681 indent: Optional[int] = None) -> None: 682 """ 683 Logs either a string message or a JSON serializable message (like QMP). 684 If indent is provided, JSON serializable messages are pretty-printed. 685 """ 686 for flt in filters: 687 msg = flt(msg) 688 if isinstance(msg, (dict, list)): 689 # Don't sort if it's already sorted 690 do_sort = not isinstance(msg, OrderedDict) 691 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 692 else: 693 test_logger.info(msg) 694 695class Timeout: 696 def __init__(self, seconds, errmsg="Timeout"): 697 self.seconds = seconds 698 self.errmsg = errmsg 699 def __enter__(self): 700 if qemu_gdb or qemu_valgrind: 701 return self 702 signal.signal(signal.SIGALRM, self.timeout) 703 signal.setitimer(signal.ITIMER_REAL, self.seconds) 704 return self 705 def __exit__(self, exc_type, value, traceback): 706 if qemu_gdb or qemu_valgrind: 707 return False 708 signal.setitimer(signal.ITIMER_REAL, 0) 709 return False 710 def timeout(self, signum, frame): 711 raise Exception(self.errmsg) 712 713def file_pattern(name): 714 return "{0}-{1}".format(os.getpid(), name) 715 716class FilePath: 717 """ 718 Context manager generating multiple file names. The generated files are 719 removed when exiting the context. 720 721 Example usage: 722 723 with FilePath('a.img', 'b.img') as (img_a, img_b): 724 # Use img_a and img_b here... 725 726 # a.img and b.img are automatically removed here. 727 728 By default images are created in iotests.test_dir. To create sockets use 729 iotests.sock_dir: 730 731 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 732 733 For convenience, calling with one argument yields a single file instead of 734 a tuple with one item. 735 736 """ 737 def __init__(self, *names, base_dir=test_dir): 738 self.paths = [os.path.join(base_dir, file_pattern(name)) 739 for name in names] 740 741 def __enter__(self): 742 if len(self.paths) == 1: 743 return self.paths[0] 744 else: 745 return self.paths 746 747 def __exit__(self, exc_type, exc_val, exc_tb): 748 for path in self.paths: 749 try: 750 os.remove(path) 751 except OSError: 752 pass 753 return False 754 755 756def try_remove(img): 757 try: 758 os.remove(img) 759 except OSError: 760 pass 761 762def file_path_remover(): 763 for path in reversed(file_path_remover.paths): 764 try_remove(path) 765 766 767def file_path(*names, base_dir=test_dir): 768 ''' Another way to get auto-generated filename that cleans itself up. 769 770 Use is as simple as: 771 772 img_a, img_b = file_path('a.img', 'b.img') 773 sock = file_path('socket') 774 ''' 775 776 if not hasattr(file_path_remover, 'paths'): 777 file_path_remover.paths = [] 778 atexit.register(file_path_remover) 779 780 paths = [] 781 for name in names: 782 filename = file_pattern(name) 783 path = os.path.join(base_dir, filename) 784 file_path_remover.paths.append(path) 785 paths.append(path) 786 787 return paths[0] if len(paths) == 1 else paths 788 789def remote_filename(path): 790 if imgproto == 'file': 791 return path 792 elif imgproto == 'ssh': 793 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 794 else: 795 raise Exception("Protocol %s not supported" % (imgproto)) 796 797class VM(qtest.QEMUQtestMachine): 798 '''A QEMU VM''' 799 800 def __init__(self, path_suffix=''): 801 name = "qemu%s-%d" % (path_suffix, os.getpid()) 802 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 803 if qemu_gdb and qemu_valgrind: 804 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 805 sys.exit(1) 806 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 807 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 808 name=name, 809 base_temp_dir=test_dir, 810 sock_dir=sock_dir, qmp_timer=timer) 811 self._num_drives = 0 812 813 def _post_shutdown(self) -> None: 814 super()._post_shutdown() 815 if not qemu_valgrind or not self._popen: 816 return 817 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 818 if self.exitcode() == 99: 819 with open(valgrind_filename, encoding='utf-8') as f: 820 print(f.read()) 821 else: 822 os.remove(valgrind_filename) 823 824 def _pre_launch(self) -> None: 825 super()._pre_launch() 826 if qemu_print: 827 # set QEMU binary output to stdout 828 self._close_qemu_log_file() 829 830 def add_object(self, opts): 831 self._args.append('-object') 832 self._args.append(opts) 833 return self 834 835 def add_device(self, opts): 836 self._args.append('-device') 837 self._args.append(opts) 838 return self 839 840 def add_drive_raw(self, opts): 841 self._args.append('-drive') 842 self._args.append(opts) 843 return self 844 845 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 846 '''Add a virtio-blk drive to the VM''' 847 options = ['if=%s' % interface, 848 'id=drive%d' % self._num_drives] 849 850 if path is not None: 851 options.append('file=%s' % path) 852 options.append('format=%s' % img_format) 853 options.append('cache=%s' % cachemode) 854 options.append('aio=%s' % aiomode) 855 856 if opts: 857 options.append(opts) 858 859 if img_format == 'luks' and 'key-secret' not in opts: 860 # default luks support 861 if luks_default_secret_object not in self._args: 862 self.add_object(luks_default_secret_object) 863 864 options.append(luks_default_key_secret_opt) 865 866 self._args.append('-drive') 867 self._args.append(','.join(options)) 868 self._num_drives += 1 869 return self 870 871 def add_blockdev(self, opts): 872 self._args.append('-blockdev') 873 if isinstance(opts, str): 874 self._args.append(opts) 875 else: 876 self._args.append(','.join(opts)) 877 return self 878 879 def add_incoming(self, addr): 880 self._args.append('-incoming') 881 self._args.append(addr) 882 return self 883 884 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 885 cmd = 'human-monitor-command' 886 kwargs: Dict[str, Any] = {'command-line': command_line} 887 if use_log: 888 return self.qmp_log(cmd, **kwargs) 889 else: 890 return self.qmp(cmd, **kwargs) 891 892 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 893 """Pause drive r/w operations""" 894 if not event: 895 self.pause_drive(drive, "read_aio") 896 self.pause_drive(drive, "write_aio") 897 return 898 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 899 900 def resume_drive(self, drive: str) -> None: 901 """Resume drive r/w operations""" 902 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 903 904 def hmp_qemu_io(self, drive: str, cmd: str, 905 use_log: bool = False, qdev: bool = False) -> QMPMessage: 906 """Write to a given drive using an HMP command""" 907 d = '-d ' if qdev else '' 908 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 909 910 def flatten_qmp_object(self, obj, output=None, basestr=''): 911 if output is None: 912 output = {} 913 if isinstance(obj, list): 914 for i, item in enumerate(obj): 915 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 916 elif isinstance(obj, dict): 917 for key in obj: 918 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 919 else: 920 output[basestr[:-1]] = obj # Strip trailing '.' 921 return output 922 923 def qmp_to_opts(self, obj): 924 obj = self.flatten_qmp_object(obj) 925 output_list = [] 926 for key in obj: 927 output_list += [key + '=' + obj[key]] 928 return ','.join(output_list) 929 930 def get_qmp_events_filtered(self, wait=60.0): 931 result = [] 932 for ev in self.get_qmp_events(wait=wait): 933 result.append(filter_qmp_event(ev)) 934 return result 935 936 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 937 full_cmd = OrderedDict(( 938 ("execute", cmd), 939 ("arguments", ordered_qmp(kwargs)) 940 )) 941 log(full_cmd, filters, indent=indent) 942 result = self.qmp(cmd, **kwargs) 943 log(result, filters, indent=indent) 944 return result 945 946 # Returns None on success, and an error string on failure 947 def run_job(self, job: str, auto_finalize: bool = True, 948 auto_dismiss: bool = False, 949 pre_finalize: Optional[Callable[[], None]] = None, 950 cancel: bool = False, wait: float = 60.0, 951 filters: Iterable[Callable[[Any], Any]] = (), 952 ) -> Optional[str]: 953 """ 954 run_job moves a job from creation through to dismissal. 955 956 :param job: String. ID of recently-launched job 957 :param auto_finalize: Bool. True if the job was launched with 958 auto_finalize. Defaults to True. 959 :param auto_dismiss: Bool. True if the job was launched with 960 auto_dismiss=True. Defaults to False. 961 :param pre_finalize: Callback. A callable that takes no arguments to be 962 invoked prior to issuing job-finalize, if any. 963 :param cancel: Bool. When true, cancels the job after the pre_finalize 964 callback. 965 :param wait: Float. Timeout value specifying how long to wait for any 966 event, in seconds. Defaults to 60.0. 967 """ 968 match_device = {'data': {'device': job}} 969 match_id = {'data': {'id': job}} 970 events = [ 971 ('BLOCK_JOB_COMPLETED', match_device), 972 ('BLOCK_JOB_CANCELLED', match_device), 973 ('BLOCK_JOB_ERROR', match_device), 974 ('BLOCK_JOB_READY', match_device), 975 ('BLOCK_JOB_PENDING', match_id), 976 ('JOB_STATUS_CHANGE', match_id) 977 ] 978 error = None 979 while True: 980 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 981 if ev['event'] != 'JOB_STATUS_CHANGE': 982 log(ev, filters=filters) 983 continue 984 status = ev['data']['status'] 985 if status == 'aborting': 986 result = self.qmp('query-jobs') 987 for j in result['return']: 988 if j['id'] == job: 989 error = j['error'] 990 log('Job failed: %s' % (j['error']), filters=filters) 991 elif status == 'ready': 992 self.qmp_log('job-complete', id=job, filters=filters) 993 elif status == 'pending' and not auto_finalize: 994 if pre_finalize: 995 pre_finalize() 996 if cancel: 997 self.qmp_log('job-cancel', id=job, filters=filters) 998 else: 999 self.qmp_log('job-finalize', id=job, filters=filters) 1000 elif status == 'concluded' and not auto_dismiss: 1001 self.qmp_log('job-dismiss', id=job, filters=filters) 1002 elif status == 'null': 1003 return error 1004 1005 # Returns None on success, and an error string on failure 1006 def blockdev_create(self, options, job_id='job0', filters=None): 1007 if filters is None: 1008 filters = [filter_qmp_testfiles] 1009 result = self.qmp_log('blockdev-create', filters=filters, 1010 job_id=job_id, options=options) 1011 1012 if 'return' in result: 1013 assert result['return'] == {} 1014 job_result = self.run_job(job_id, filters=filters) 1015 else: 1016 job_result = result['error'] 1017 1018 log("") 1019 return job_result 1020 1021 def enable_migration_events(self, name): 1022 log('Enabling migration QMP events on %s...' % name) 1023 log(self.qmp('migrate-set-capabilities', capabilities=[ 1024 { 1025 'capability': 'events', 1026 'state': True 1027 } 1028 ])) 1029 1030 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1031 while True: 1032 event = self.event_wait('MIGRATION') 1033 # We use the default timeout, and with a timeout, event_wait() 1034 # never returns None 1035 assert event 1036 1037 log(event, filters=[filter_qmp_event]) 1038 if event['data']['status'] in ('completed', 'failed'): 1039 break 1040 1041 if event['data']['status'] == 'completed': 1042 # The event may occur in finish-migrate, so wait for the expected 1043 # post-migration runstate 1044 runstate = None 1045 while runstate != expect_runstate: 1046 runstate = self.qmp('query-status')['return']['status'] 1047 return True 1048 else: 1049 return False 1050 1051 def node_info(self, node_name): 1052 nodes = self.qmp('query-named-block-nodes') 1053 for x in nodes['return']: 1054 if x['node-name'] == node_name: 1055 return x 1056 return None 1057 1058 def query_bitmaps(self): 1059 res = self.qmp("query-named-block-nodes") 1060 return {device['node-name']: device['dirty-bitmaps'] 1061 for device in res['return'] if 'dirty-bitmaps' in device} 1062 1063 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1064 """ 1065 get a specific bitmap from the object returned by query_bitmaps. 1066 :param recording: If specified, filter results by the specified value. 1067 :param bitmaps: If specified, use it instead of call query_bitmaps() 1068 """ 1069 if bitmaps is None: 1070 bitmaps = self.query_bitmaps() 1071 1072 for bitmap in bitmaps[node_name]: 1073 if bitmap.get('name', '') == bitmap_name: 1074 if recording is None or bitmap.get('recording') == recording: 1075 return bitmap 1076 return None 1077 1078 def check_bitmap_status(self, node_name, bitmap_name, fields): 1079 ret = self.get_bitmap(node_name, bitmap_name) 1080 1081 return fields.items() <= ret.items() 1082 1083 def assert_block_path(self, root, path, expected_node, graph=None): 1084 """ 1085 Check whether the node under the given path in the block graph 1086 is @expected_node. 1087 1088 @root is the node name of the node where the @path is rooted. 1089 1090 @path is a string that consists of child names separated by 1091 slashes. It must begin with a slash. 1092 1093 Examples for @root + @path: 1094 - root="qcow2-node", path="/backing/file" 1095 - root="quorum-node", path="/children.2/file" 1096 1097 Hypothetically, @path could be empty, in which case it would 1098 point to @root. However, in practice this case is not useful 1099 and hence not allowed. 1100 1101 @expected_node may be None. (All elements of the path but the 1102 leaf must still exist.) 1103 1104 @graph may be None or the result of an x-debug-query-block-graph 1105 call that has already been performed. 1106 """ 1107 if graph is None: 1108 graph = self.qmp('x-debug-query-block-graph')['return'] 1109 1110 iter_path = iter(path.split('/')) 1111 1112 # Must start with a / 1113 assert next(iter_path) == '' 1114 1115 node = next((node for node in graph['nodes'] if node['name'] == root), 1116 None) 1117 1118 # An empty @path is not allowed, so the root node must be present 1119 assert node is not None, 'Root node %s not found' % root 1120 1121 for child_name in iter_path: 1122 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1123 1124 try: 1125 node_id = next(edge['child'] for edge in graph['edges'] 1126 if (edge['parent'] == node['id'] and 1127 edge['name'] == child_name)) 1128 1129 node = next(node for node in graph['nodes'] 1130 if node['id'] == node_id) 1131 1132 except StopIteration: 1133 node = None 1134 1135 if node is None: 1136 assert expected_node is None, \ 1137 'No node found under %s (but expected %s)' % \ 1138 (path, expected_node) 1139 else: 1140 assert node['name'] == expected_node, \ 1141 'Found node %s under %s (but expected %s)' % \ 1142 (node['name'], path, expected_node) 1143 1144index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1145 1146class QMPTestCase(unittest.TestCase): 1147 '''Abstract base class for QMP test cases''' 1148 1149 def __init__(self, *args, **kwargs): 1150 super().__init__(*args, **kwargs) 1151 # Many users of this class set a VM property we rely on heavily 1152 # in the methods below. 1153 self.vm = None 1154 1155 def dictpath(self, d, path): 1156 '''Traverse a path in a nested dict''' 1157 for component in path.split('/'): 1158 m = index_re.match(component) 1159 if m: 1160 component, idx = m.groups() 1161 idx = int(idx) 1162 1163 if not isinstance(d, dict) or component not in d: 1164 self.fail(f'failed path traversal for "{path}" in "{d}"') 1165 d = d[component] 1166 1167 if m: 1168 if not isinstance(d, list): 1169 self.fail(f'path component "{component}" in "{path}" ' 1170 f'is not a list in "{d}"') 1171 try: 1172 d = d[idx] 1173 except IndexError: 1174 self.fail(f'invalid index "{idx}" in path "{path}" ' 1175 f'in "{d}"') 1176 return d 1177 1178 def assert_qmp_absent(self, d, path): 1179 try: 1180 result = self.dictpath(d, path) 1181 except AssertionError: 1182 return 1183 self.fail('path "%s" has value "%s"' % (path, str(result))) 1184 1185 def assert_qmp(self, d, path, value): 1186 '''Assert that the value for a specific path in a QMP dict 1187 matches. When given a list of values, assert that any of 1188 them matches.''' 1189 1190 result = self.dictpath(d, path) 1191 1192 # [] makes no sense as a list of valid values, so treat it as 1193 # an actual single value. 1194 if isinstance(value, list) and value != []: 1195 for v in value: 1196 if result == v: 1197 return 1198 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1199 else: 1200 self.assertEqual(result, value, 1201 '"%s" is "%s", expected "%s"' 1202 % (path, str(result), str(value))) 1203 1204 def assert_no_active_block_jobs(self): 1205 result = self.vm.qmp('query-block-jobs') 1206 self.assert_qmp(result, 'return', []) 1207 1208 def assert_has_block_node(self, node_name=None, file_name=None): 1209 """Issue a query-named-block-nodes and assert node_name and/or 1210 file_name is present in the result""" 1211 def check_equal_or_none(a, b): 1212 return a is None or b is None or a == b 1213 assert node_name or file_name 1214 result = self.vm.qmp('query-named-block-nodes') 1215 for x in result["return"]: 1216 if check_equal_or_none(x.get("node-name"), node_name) and \ 1217 check_equal_or_none(x.get("file"), file_name): 1218 return 1219 self.fail("Cannot find %s %s in result:\n%s" % 1220 (node_name, file_name, result)) 1221 1222 def assert_json_filename_equal(self, json_filename, reference): 1223 '''Asserts that the given filename is a json: filename and that its 1224 content is equal to the given reference object''' 1225 self.assertEqual(json_filename[:5], 'json:') 1226 self.assertEqual( 1227 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1228 self.vm.flatten_qmp_object(reference) 1229 ) 1230 1231 def cancel_and_wait(self, drive='drive0', force=False, 1232 resume=False, wait=60.0): 1233 '''Cancel a block job and wait for it to finish, returning the event''' 1234 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1235 self.assert_qmp(result, 'return', {}) 1236 1237 if resume: 1238 self.vm.resume_drive(drive) 1239 1240 cancelled = False 1241 result = None 1242 while not cancelled: 1243 for event in self.vm.get_qmp_events(wait=wait): 1244 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1245 event['event'] == 'BLOCK_JOB_CANCELLED': 1246 self.assert_qmp(event, 'data/device', drive) 1247 result = event 1248 cancelled = True 1249 elif event['event'] == 'JOB_STATUS_CHANGE': 1250 self.assert_qmp(event, 'data/id', drive) 1251 1252 1253 self.assert_no_active_block_jobs() 1254 return result 1255 1256 def wait_until_completed(self, drive='drive0', check_offset=True, 1257 wait=60.0, error=None): 1258 '''Wait for a block job to finish, returning the event''' 1259 while True: 1260 for event in self.vm.get_qmp_events(wait=wait): 1261 if event['event'] == 'BLOCK_JOB_COMPLETED': 1262 self.assert_qmp(event, 'data/device', drive) 1263 if error is None: 1264 self.assert_qmp_absent(event, 'data/error') 1265 if check_offset: 1266 self.assert_qmp(event, 'data/offset', 1267 event['data']['len']) 1268 else: 1269 self.assert_qmp(event, 'data/error', error) 1270 self.assert_no_active_block_jobs() 1271 return event 1272 if event['event'] == 'JOB_STATUS_CHANGE': 1273 self.assert_qmp(event, 'data/id', drive) 1274 1275 def wait_ready(self, drive='drive0'): 1276 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1277 return self.vm.events_wait([ 1278 ('BLOCK_JOB_READY', 1279 {'data': {'type': 'mirror', 'device': drive}}), 1280 ('BLOCK_JOB_READY', 1281 {'data': {'type': 'commit', 'device': drive}}) 1282 ]) 1283 1284 def wait_ready_and_cancel(self, drive='drive0'): 1285 self.wait_ready(drive=drive) 1286 event = self.cancel_and_wait(drive=drive) 1287 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1288 self.assert_qmp(event, 'data/type', 'mirror') 1289 self.assert_qmp(event, 'data/offset', event['data']['len']) 1290 1291 def complete_and_wait(self, drive='drive0', wait_ready=True, 1292 completion_error=None): 1293 '''Complete a block job and wait for it to finish''' 1294 if wait_ready: 1295 self.wait_ready(drive=drive) 1296 1297 result = self.vm.qmp('block-job-complete', device=drive) 1298 self.assert_qmp(result, 'return', {}) 1299 1300 event = self.wait_until_completed(drive=drive, error=completion_error) 1301 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1302 1303 def pause_wait(self, job_id='job0'): 1304 with Timeout(3, "Timeout waiting for job to pause"): 1305 while True: 1306 result = self.vm.qmp('query-block-jobs') 1307 found = False 1308 for job in result['return']: 1309 if job['device'] == job_id: 1310 found = True 1311 if job['paused'] and not job['busy']: 1312 return job 1313 break 1314 assert found 1315 1316 def pause_job(self, job_id='job0', wait=True): 1317 result = self.vm.qmp('block-job-pause', device=job_id) 1318 self.assert_qmp(result, 'return', {}) 1319 if wait: 1320 return self.pause_wait(job_id) 1321 return result 1322 1323 def case_skip(self, reason): 1324 '''Skip this test case''' 1325 case_notrun(reason) 1326 self.skipTest(reason) 1327 1328 1329def notrun(reason): 1330 '''Skip this test suite''' 1331 # Each test in qemu-iotests has a number ("seq") 1332 seq = os.path.basename(sys.argv[0]) 1333 1334 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1335 as outfile: 1336 outfile.write(reason + '\n') 1337 logger.warning("%s not run: %s", seq, reason) 1338 sys.exit(0) 1339 1340def case_notrun(reason): 1341 '''Mark this test case as not having been run (without actually 1342 skipping it, that is left to the caller). See 1343 QMPTestCase.case_skip() for a variant that actually skips the 1344 current test case.''' 1345 1346 # Each test in qemu-iotests has a number ("seq") 1347 seq = os.path.basename(sys.argv[0]) 1348 1349 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1350 as outfile: 1351 outfile.write(' [case not run] ' + reason + '\n') 1352 1353def _verify_image_format(supported_fmts: Sequence[str] = (), 1354 unsupported_fmts: Sequence[str] = ()) -> None: 1355 if 'generic' in supported_fmts and \ 1356 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1357 # similar to 1358 # _supported_fmt generic 1359 # for bash tests 1360 supported_fmts = () 1361 1362 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1363 if not_sup or (imgfmt in unsupported_fmts): 1364 notrun('not suitable for this image format: %s' % imgfmt) 1365 1366 if imgfmt == 'luks': 1367 verify_working_luks() 1368 1369def _verify_protocol(supported: Sequence[str] = (), 1370 unsupported: Sequence[str] = ()) -> None: 1371 assert not (supported and unsupported) 1372 1373 if 'generic' in supported: 1374 return 1375 1376 not_sup = supported and (imgproto not in supported) 1377 if not_sup or (imgproto in unsupported): 1378 notrun('not suitable for this protocol: %s' % imgproto) 1379 1380def _verify_platform(supported: Sequence[str] = (), 1381 unsupported: Sequence[str] = ()) -> None: 1382 if any((sys.platform.startswith(x) for x in unsupported)): 1383 notrun('not suitable for this OS: %s' % sys.platform) 1384 1385 if supported: 1386 if not any((sys.platform.startswith(x) for x in supported)): 1387 notrun('not suitable for this OS: %s' % sys.platform) 1388 1389def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1390 if supported_cache_modes and (cachemode not in supported_cache_modes): 1391 notrun('not suitable for this cache mode: %s' % cachemode) 1392 1393def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1394 if supported_aio_modes and (aiomode not in supported_aio_modes): 1395 notrun('not suitable for this aio mode: %s' % aiomode) 1396 1397def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1398 usf_list = list(set(required_formats) - set(supported_formats())) 1399 if usf_list: 1400 notrun(f'formats {usf_list} are not whitelisted') 1401 1402 1403def _verify_virtio_blk() -> None: 1404 out = qemu_pipe('-M', 'none', '-device', 'help') 1405 if 'virtio-blk' not in out: 1406 notrun('Missing virtio-blk in QEMU binary') 1407 1408def _verify_virtio_scsi_pci_or_ccw() -> None: 1409 out = qemu_pipe('-M', 'none', '-device', 'help') 1410 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1411 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1412 1413 1414def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1415 imgopts = os.environ.get('IMGOPTS') 1416 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1417 # but it supported only for bash tests. We don't have a concept of global 1418 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1419 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1420 unsup = list(unsupported) + ['$'] 1421 if imgopts and any(x in imgopts for x in unsup): 1422 notrun(f'not suitable for this imgopts: {imgopts}') 1423 1424 1425def supports_quorum() -> bool: 1426 return 'quorum' in qemu_img('--help').stdout 1427 1428def verify_quorum(): 1429 '''Skip test suite if quorum support is not available''' 1430 if not supports_quorum(): 1431 notrun('quorum support missing') 1432 1433def has_working_luks() -> Tuple[bool, str]: 1434 """ 1435 Check whether our LUKS driver can actually create images 1436 (this extends to LUKS encryption for qcow2). 1437 1438 If not, return the reason why. 1439 """ 1440 1441 img_file = f'{test_dir}/luks-test.luks' 1442 res = qemu_img('create', '-f', 'luks', 1443 '--object', luks_default_secret_object, 1444 '-o', luks_default_key_secret_opt, 1445 '-o', 'iter-time=10', 1446 img_file, '1G', 1447 check=False) 1448 try: 1449 os.remove(img_file) 1450 except OSError: 1451 pass 1452 1453 if res.returncode: 1454 reason = res.stdout 1455 for line in res.stdout.splitlines(): 1456 if img_file + ':' in line: 1457 reason = line.split(img_file + ':', 1)[1].strip() 1458 break 1459 1460 return (False, reason) 1461 else: 1462 return (True, '') 1463 1464def verify_working_luks(): 1465 """ 1466 Skip test suite if LUKS does not work 1467 """ 1468 (working, reason) = has_working_luks() 1469 if not working: 1470 notrun(reason) 1471 1472def supports_qcow2_zstd_compression() -> bool: 1473 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1474 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1475 img_file, '0', 1476 check=False) 1477 try: 1478 os.remove(img_file) 1479 except OSError: 1480 pass 1481 1482 if res.returncode == 1 and \ 1483 "'compression-type' does not accept value 'zstd'" in res.stdout: 1484 return False 1485 else: 1486 return True 1487 1488def verify_qcow2_zstd_compression(): 1489 if not supports_qcow2_zstd_compression(): 1490 notrun('zstd compression not supported') 1491 1492def qemu_pipe(*args: str) -> str: 1493 """ 1494 Run qemu with an option to print something and exit (e.g. a help option). 1495 1496 :return: QEMU's stdout output. 1497 """ 1498 full_args = [qemu_prog] + qemu_opts + list(args) 1499 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1500 return output 1501 1502def supported_formats(read_only=False): 1503 '''Set 'read_only' to True to check ro-whitelist 1504 Otherwise, rw-whitelist is checked''' 1505 1506 if not hasattr(supported_formats, "formats"): 1507 supported_formats.formats = {} 1508 1509 if read_only not in supported_formats.formats: 1510 format_message = qemu_pipe("-drive", "format=help") 1511 line = 1 if read_only else 0 1512 supported_formats.formats[read_only] = \ 1513 format_message.splitlines()[line].split(":")[1].split() 1514 1515 return supported_formats.formats[read_only] 1516 1517def skip_if_unsupported(required_formats=(), read_only=False): 1518 '''Skip Test Decorator 1519 Runs the test if all the required formats are whitelisted''' 1520 def skip_test_decorator(func): 1521 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1522 **kwargs: Dict[str, Any]) -> None: 1523 if callable(required_formats): 1524 fmts = required_formats(test_case) 1525 else: 1526 fmts = required_formats 1527 1528 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1529 if usf_list: 1530 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1531 test_case.case_skip(msg) 1532 else: 1533 func(test_case, *args, **kwargs) 1534 return func_wrapper 1535 return skip_test_decorator 1536 1537def skip_for_formats(formats: Sequence[str] = ()) \ 1538 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1539 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1540 '''Skip Test Decorator 1541 Skips the test for the given formats''' 1542 def skip_test_decorator(func): 1543 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1544 **kwargs: Dict[str, Any]) -> None: 1545 if imgfmt in formats: 1546 msg = f'{test_case}: Skipped for format {imgfmt}' 1547 test_case.case_skip(msg) 1548 else: 1549 func(test_case, *args, **kwargs) 1550 return func_wrapper 1551 return skip_test_decorator 1552 1553def skip_if_user_is_root(func): 1554 '''Skip Test Decorator 1555 Runs the test only without root permissions''' 1556 def func_wrapper(*args, **kwargs): 1557 if os.getuid() == 0: 1558 case_notrun('{}: cannot be run as root'.format(args[0])) 1559 return None 1560 else: 1561 return func(*args, **kwargs) 1562 return func_wrapper 1563 1564# We need to filter out the time taken from the output so that 1565# qemu-iotest can reliably diff the results against master output, 1566# and hide skipped tests from the reference output. 1567 1568class ReproducibleTestResult(unittest.TextTestResult): 1569 def addSkip(self, test, reason): 1570 # Same as TextTestResult, but print dot instead of "s" 1571 unittest.TestResult.addSkip(self, test, reason) 1572 if self.showAll: 1573 self.stream.writeln("skipped {0!r}".format(reason)) 1574 elif self.dots: 1575 self.stream.write(".") 1576 self.stream.flush() 1577 1578class ReproducibleStreamWrapper: 1579 def __init__(self, stream: TextIO): 1580 self.stream = stream 1581 1582 def __getattr__(self, attr): 1583 if attr in ('stream', '__getstate__'): 1584 raise AttributeError(attr) 1585 return getattr(self.stream, attr) 1586 1587 def write(self, arg=None): 1588 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1589 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1590 self.stream.write(arg) 1591 1592class ReproducibleTestRunner(unittest.TextTestRunner): 1593 def __init__(self, stream: Optional[TextIO] = None, 1594 resultclass: Type[unittest.TestResult] = 1595 ReproducibleTestResult, 1596 **kwargs: Any) -> None: 1597 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1598 super().__init__(stream=rstream, # type: ignore 1599 descriptions=True, 1600 resultclass=resultclass, 1601 **kwargs) 1602 1603def execute_unittest(argv: List[str], debug: bool = False) -> None: 1604 """Executes unittests within the calling module.""" 1605 1606 # Some tests have warnings, especially ResourceWarnings for unclosed 1607 # files and sockets. Ignore them for now to ensure reproducibility of 1608 # the test output. 1609 unittest.main(argv=argv, 1610 testRunner=ReproducibleTestRunner, 1611 verbosity=2 if debug else 1, 1612 warnings=None if sys.warnoptions else 'ignore') 1613 1614def execute_setup_common(supported_fmts: Sequence[str] = (), 1615 supported_platforms: Sequence[str] = (), 1616 supported_cache_modes: Sequence[str] = (), 1617 supported_aio_modes: Sequence[str] = (), 1618 unsupported_fmts: Sequence[str] = (), 1619 supported_protocols: Sequence[str] = (), 1620 unsupported_protocols: Sequence[str] = (), 1621 required_fmts: Sequence[str] = (), 1622 unsupported_imgopts: Sequence[str] = ()) -> bool: 1623 """ 1624 Perform necessary setup for either script-style or unittest-style tests. 1625 1626 :return: Bool; Whether or not debug mode has been requested via the CLI. 1627 """ 1628 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1629 1630 debug = '-d' in sys.argv 1631 if debug: 1632 sys.argv.remove('-d') 1633 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1634 1635 _verify_image_format(supported_fmts, unsupported_fmts) 1636 _verify_protocol(supported_protocols, unsupported_protocols) 1637 _verify_platform(supported=supported_platforms) 1638 _verify_cache_mode(supported_cache_modes) 1639 _verify_aio_mode(supported_aio_modes) 1640 _verify_formats(required_fmts) 1641 _verify_virtio_blk() 1642 _verify_imgopts(unsupported_imgopts) 1643 1644 return debug 1645 1646def execute_test(*args, test_function=None, **kwargs): 1647 """Run either unittest or script-style tests.""" 1648 1649 debug = execute_setup_common(*args, **kwargs) 1650 if not test_function: 1651 execute_unittest(sys.argv, debug) 1652 else: 1653 test_function() 1654 1655def activate_logging(): 1656 """Activate iotests.log() output to stdout for script-style tests.""" 1657 handler = logging.StreamHandler(stream=sys.stdout) 1658 formatter = logging.Formatter('%(message)s') 1659 handler.setFormatter(formatter) 1660 test_logger.addHandler(handler) 1661 test_logger.setLevel(logging.INFO) 1662 test_logger.propagate = False 1663 1664# This is called from script-style iotests without a single point of entry 1665def script_initialize(*args, **kwargs): 1666 """Initialize script-style tests without running any tests.""" 1667 activate_logging() 1668 execute_setup_common(*args, **kwargs) 1669 1670# This is called from script-style iotests with a single point of entry 1671def script_main(test_function, *args, **kwargs): 1672 """Run script-style tests outside of the unittest framework""" 1673 activate_logging() 1674 execute_test(*args, test_function=test_function, **kwargs) 1675 1676# This is called from unittest style iotests 1677def main(*args, **kwargs): 1678 """Run tests using the unittest framework""" 1679 execute_test(*args, **kwargs) 1680