1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol 42from qemu.utils import VerboseProcessError 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True 210 ) -> 'subprocess.CompletedProcess[str]': 211 """ 212 Run qemu_img and return the status code and console output. 213 214 This function always prepends QEMU_IMG_OPTIONS and may further alter 215 the args for 'create' commands. 216 217 :param args: command-line arguments to qemu-img. 218 :param check: Enforce a return code of zero. 219 :param combine_stdio: set to False to keep stdout/stderr separated. 220 221 :raise VerboseProcessError: 222 When the return code is negative, or on any non-zero exit code 223 when 'check=True' was provided (the default). This exception has 224 'stdout', 'stderr', and 'returncode' properties that may be 225 inspected to show greater detail. If this exception is not 226 handled, the command-line, return code, and all console output 227 will be included at the bottom of the stack trace. 228 229 :return: 230 a CompletedProcess. This object has args, returncode, and stdout 231 properties. If streams are not combined, it will also have a 232 stderr property. 233 """ 234 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 235 236 subp = subprocess.run( 237 full_args, 238 stdout=subprocess.PIPE, 239 stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE, 240 universal_newlines=True, 241 check=False 242 ) 243 244 if check and subp.returncode or (subp.returncode < 0): 245 raise VerboseProcessError( 246 subp.returncode, full_args, 247 output=subp.stdout, 248 stderr=subp.stderr, 249 ) 250 251 return subp 252 253 254def ordered_qmp(qmsg, conv_keys=True): 255 # Dictionaries are not ordered prior to 3.6, therefore: 256 if isinstance(qmsg, list): 257 return [ordered_qmp(atom) for atom in qmsg] 258 if isinstance(qmsg, dict): 259 od = OrderedDict() 260 for k, v in sorted(qmsg.items()): 261 if conv_keys: 262 k = k.replace('_', '-') 263 od[k] = ordered_qmp(v, conv_keys=False) 264 return od 265 return qmsg 266 267def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]': 268 return qemu_img('create', *args) 269 270def qemu_img_json(*args: str) -> Any: 271 """ 272 Run qemu-img and return its output as deserialized JSON. 273 274 :raise CalledProcessError: 275 When qemu-img crashes, or returns a non-zero exit code without 276 producing a valid JSON document to stdout. 277 :raise JSONDecoderError: 278 When qemu-img returns 0, but failed to produce a valid JSON document. 279 280 :return: A deserialized JSON object; probably a dict[str, Any]. 281 """ 282 try: 283 res = qemu_img(*args, combine_stdio=False) 284 except subprocess.CalledProcessError as exc: 285 # Terminated due to signal. Don't bother. 286 if exc.returncode < 0: 287 raise 288 289 # Commands like 'check' can return failure (exit codes 2 and 3) 290 # to indicate command completion, but with errors found. For 291 # multi-command flexibility, ignore the exact error codes and 292 # *try* to load JSON. 293 try: 294 return json.loads(exc.stdout) 295 except json.JSONDecodeError: 296 # Nope. This thing is toast. Raise the /process/ error. 297 pass 298 raise 299 300 return json.loads(res.stdout) 301 302def qemu_img_measure(*args: str) -> Any: 303 return qemu_img_json("measure", "--output", "json", *args) 304 305def qemu_img_check(*args: str) -> Any: 306 return qemu_img_json("check", "--output", "json", *args) 307 308def qemu_img_info(*args: str) -> Any: 309 return qemu_img_json('info', "--output", "json", *args) 310 311def qemu_img_map(*args: str) -> Any: 312 return qemu_img_json('map', "--output", "json", *args) 313 314def qemu_img_log(*args: str, check: bool = True 315 ) -> 'subprocess.CompletedProcess[str]': 316 result = qemu_img(*args, check=check) 317 log(result.stdout, filters=[filter_testfiles]) 318 return result 319 320def img_info_log(filename: str, filter_path: Optional[str] = None, 321 use_image_opts: bool = False, extra_args: Sequence[str] = (), 322 check: bool = True, 323 ) -> None: 324 args = ['info'] 325 if use_image_opts: 326 args.append('--image-opts') 327 else: 328 args += ['-f', imgfmt] 329 args += extra_args 330 args.append(filename) 331 332 output = qemu_img(*args, check=check).stdout 333 if not filter_path: 334 filter_path = filename 335 log(filter_img_info(output, filter_path)) 336 337def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 338 if '-f' in args or '--image-opts' in args: 339 return qemu_io_args_no_fmt + list(args) 340 else: 341 return qemu_io_args + list(args) 342 343def qemu_io_popen(*args): 344 return qemu_tool_popen(qemu_io_wrap_args(args)) 345 346def qemu_io(*args): 347 '''Run qemu-io and return the stdout data''' 348 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 349 350def qemu_io_pipe_and_status(*args): 351 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args)) 352 353def qemu_io_log(*args): 354 result = qemu_io(*args) 355 log(result, filters=[filter_testfiles, filter_qemu_io]) 356 return result 357 358def qemu_io_silent(*args): 359 '''Run qemu-io and return the exit code, suppressing stdout''' 360 args = qemu_io_wrap_args(args) 361 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 362 if result.returncode < 0: 363 sys.stderr.write('qemu-io received signal %i: %s\n' % 364 (-result.returncode, ' '.join(args))) 365 return result.returncode 366 367def qemu_io_silent_check(*args): 368 '''Run qemu-io and return the true if subprocess returned 0''' 369 args = qemu_io_wrap_args(args) 370 result = subprocess.run(args, stdout=subprocess.DEVNULL, 371 stderr=subprocess.STDOUT, check=False) 372 return result.returncode == 0 373 374class QemuIoInteractive: 375 def __init__(self, *args): 376 self.args = qemu_io_wrap_args(args) 377 # We need to keep the Popen objext around, and not 378 # close it immediately. Therefore, disable the pylint check: 379 # pylint: disable=consider-using-with 380 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 381 stdout=subprocess.PIPE, 382 stderr=subprocess.STDOUT, 383 universal_newlines=True) 384 out = self._p.stdout.read(9) 385 if out != 'qemu-io> ': 386 # Most probably qemu-io just failed to start. 387 # Let's collect the whole output and exit. 388 out += self._p.stdout.read() 389 self._p.wait(timeout=1) 390 raise ValueError(out) 391 392 def close(self): 393 self._p.communicate('q\n') 394 395 def _read_output(self): 396 pattern = 'qemu-io> ' 397 n = len(pattern) 398 pos = 0 399 s = [] 400 while pos != n: 401 c = self._p.stdout.read(1) 402 # check unexpected EOF 403 assert c != '' 404 s.append(c) 405 if c == pattern[pos]: 406 pos += 1 407 else: 408 pos = 0 409 410 return ''.join(s[:-n]) 411 412 def cmd(self, cmd): 413 # quit command is in close(), '\n' is added automatically 414 assert '\n' not in cmd 415 cmd = cmd.strip() 416 assert cmd not in ('q', 'quit') 417 self._p.stdin.write(cmd + '\n') 418 self._p.stdin.flush() 419 return self._read_output() 420 421 422class QemuStorageDaemon: 423 _qmp: Optional[QEMUMonitorProtocol] = None 424 _qmpsock: Optional[str] = None 425 # Python < 3.8 would complain if this type were not a string literal 426 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 427 _p: 'Optional[subprocess.Popen[bytes]]' = None 428 429 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 430 assert '--pidfile' not in args 431 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 432 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 433 434 if qmp: 435 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 436 all_args += ['--chardev', 437 f'socket,id=qmp-sock,path={self._qmpsock}', 438 '--monitor', 'qmp-sock'] 439 440 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 441 442 # Cannot use with here, we want the subprocess to stay around 443 # pylint: disable=consider-using-with 444 self._p = subprocess.Popen(all_args) 445 if self._qmp is not None: 446 self._qmp.accept() 447 while not os.path.exists(self.pidfile): 448 if self._p.poll() is not None: 449 cmd = ' '.join(all_args) 450 raise RuntimeError( 451 'qemu-storage-daemon terminated with exit code ' + 452 f'{self._p.returncode}: {cmd}') 453 454 time.sleep(0.01) 455 456 with open(self.pidfile, encoding='utf-8') as f: 457 self._pid = int(f.read().strip()) 458 459 assert self._pid == self._p.pid 460 461 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 462 -> QMPMessage: 463 assert self._qmp is not None 464 return self._qmp.cmd(cmd, args) 465 466 def stop(self, kill_signal=15): 467 self._p.send_signal(kill_signal) 468 self._p.wait() 469 self._p = None 470 471 if self._qmp: 472 self._qmp.close() 473 474 if self._qmpsock is not None: 475 try: 476 os.remove(self._qmpsock) 477 except OSError: 478 pass 479 try: 480 os.remove(self.pidfile) 481 except OSError: 482 pass 483 484 def __del__(self): 485 if self._p is not None: 486 self.stop(kill_signal=9) 487 488 489def qemu_nbd(*args): 490 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 491 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 492 493def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 494 '''Run qemu-nbd in daemon mode and return both the parent's exit code 495 and its output in case of an error''' 496 full_args = qemu_nbd_args + ['--fork'] + list(args) 497 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 498 connect_stderr=False) 499 return returncode, output if returncode else '' 500 501def qemu_nbd_list_log(*args: str) -> str: 502 '''Run qemu-nbd to list remote exports''' 503 full_args = [qemu_nbd_prog, '-L'] + list(args) 504 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 505 log(output, filters=[filter_testfiles, filter_nbd_exports]) 506 return output 507 508@contextmanager 509def qemu_nbd_popen(*args): 510 '''Context manager running qemu-nbd within the context''' 511 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 512 513 assert not os.path.exists(pid_file) 514 515 cmd = list(qemu_nbd_args) 516 cmd.extend(('--persistent', '--pid-file', pid_file)) 517 cmd.extend(args) 518 519 log('Start NBD server') 520 with subprocess.Popen(cmd) as p: 521 try: 522 while not os.path.exists(pid_file): 523 if p.poll() is not None: 524 raise RuntimeError( 525 "qemu-nbd terminated with exit code {}: {}" 526 .format(p.returncode, ' '.join(cmd))) 527 528 time.sleep(0.01) 529 yield 530 finally: 531 if os.path.exists(pid_file): 532 os.remove(pid_file) 533 log('Kill NBD server') 534 p.kill() 535 p.wait() 536 537def compare_images(img1: str, img2: str, 538 fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool: 539 """ 540 Compare two images with QEMU_IMG; return True if they are identical. 541 542 :raise CalledProcessError: 543 when qemu-img crashes or returns a status code of anything other 544 than 0 (identical) or 1 (different). 545 """ 546 try: 547 qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2) 548 return True 549 except subprocess.CalledProcessError as exc: 550 if exc.returncode == 1: 551 return False 552 raise 553 554def create_image(name, size): 555 '''Create a fully-allocated raw image with sector markers''' 556 with open(name, 'wb') as file: 557 i = 0 558 while i < size: 559 sector = struct.pack('>l504xl', i // 512, i // 512) 560 file.write(sector) 561 i = i + 512 562 563def image_size(img: str) -> int: 564 """Return image's virtual size""" 565 value = qemu_img_info('-f', imgfmt, img)['virtual-size'] 566 if not isinstance(value, int): 567 type_name = type(value).__name__ 568 raise TypeError("Expected 'int' for 'virtual-size', " 569 f"got '{value}' of type '{type_name}'") 570 return value 571 572def is_str(val): 573 return isinstance(val, str) 574 575test_dir_re = re.compile(r"%s" % test_dir) 576def filter_test_dir(msg): 577 return test_dir_re.sub("TEST_DIR", msg) 578 579win32_re = re.compile(r"\r") 580def filter_win32(msg): 581 return win32_re.sub("", msg) 582 583qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 584 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 585 r"and [0-9\/.inf]* ops\/sec\)") 586def filter_qemu_io(msg): 587 msg = filter_win32(msg) 588 return qemu_io_re.sub("X ops; XX:XX:XX.X " 589 "(XXX YYY/sec and XXX ops/sec)", msg) 590 591chown_re = re.compile(r"chown [0-9]+:[0-9]+") 592def filter_chown(msg): 593 return chown_re.sub("chown UID:GID", msg) 594 595def filter_qmp_event(event): 596 '''Filter a QMP event dict''' 597 event = dict(event) 598 if 'timestamp' in event: 599 event['timestamp']['seconds'] = 'SECS' 600 event['timestamp']['microseconds'] = 'USECS' 601 return event 602 603def filter_qmp(qmsg, filter_fn): 604 '''Given a string filter, filter a QMP object's values. 605 filter_fn takes a (key, value) pair.''' 606 # Iterate through either lists or dicts; 607 if isinstance(qmsg, list): 608 items = enumerate(qmsg) 609 elif isinstance(qmsg, dict): 610 items = qmsg.items() 611 else: 612 return filter_fn(None, qmsg) 613 614 for k, v in items: 615 if isinstance(v, (dict, list)): 616 qmsg[k] = filter_qmp(v, filter_fn) 617 else: 618 qmsg[k] = filter_fn(k, v) 619 return qmsg 620 621def filter_testfiles(msg): 622 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 623 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 624 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 625 626def filter_qmp_testfiles(qmsg): 627 def _filter(_key, value): 628 if is_str(value): 629 return filter_testfiles(value) 630 return value 631 return filter_qmp(qmsg, _filter) 632 633def filter_virtio_scsi(output: str) -> str: 634 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 635 636def filter_qmp_virtio_scsi(qmsg): 637 def _filter(_key, value): 638 if is_str(value): 639 return filter_virtio_scsi(value) 640 return value 641 return filter_qmp(qmsg, _filter) 642 643def filter_generated_node_ids(msg): 644 return re.sub("#block[0-9]+", "NODE_NAME", msg) 645 646def filter_img_info(output, filename): 647 lines = [] 648 for line in output.split('\n'): 649 if 'disk size' in line or 'actual-size' in line: 650 continue 651 line = line.replace(filename, 'TEST_IMG') 652 line = filter_testfiles(line) 653 line = line.replace(imgfmt, 'IMGFMT') 654 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 655 line = re.sub('uuid: [-a-f0-9]+', 656 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 657 line) 658 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 659 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 660 line) 661 lines.append(line) 662 return '\n'.join(lines) 663 664def filter_imgfmt(msg): 665 return msg.replace(imgfmt, 'IMGFMT') 666 667def filter_qmp_imgfmt(qmsg): 668 def _filter(_key, value): 669 if is_str(value): 670 return filter_imgfmt(value) 671 return value 672 return filter_qmp(qmsg, _filter) 673 674def filter_nbd_exports(output: str) -> str: 675 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 676 677 678Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 679 680def log(msg: Msg, 681 filters: Iterable[Callable[[Msg], Msg]] = (), 682 indent: Optional[int] = None) -> None: 683 """ 684 Logs either a string message or a JSON serializable message (like QMP). 685 If indent is provided, JSON serializable messages are pretty-printed. 686 """ 687 for flt in filters: 688 msg = flt(msg) 689 if isinstance(msg, (dict, list)): 690 # Don't sort if it's already sorted 691 do_sort = not isinstance(msg, OrderedDict) 692 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 693 else: 694 test_logger.info(msg) 695 696class Timeout: 697 def __init__(self, seconds, errmsg="Timeout"): 698 self.seconds = seconds 699 self.errmsg = errmsg 700 def __enter__(self): 701 if qemu_gdb or qemu_valgrind: 702 return self 703 signal.signal(signal.SIGALRM, self.timeout) 704 signal.setitimer(signal.ITIMER_REAL, self.seconds) 705 return self 706 def __exit__(self, exc_type, value, traceback): 707 if qemu_gdb or qemu_valgrind: 708 return False 709 signal.setitimer(signal.ITIMER_REAL, 0) 710 return False 711 def timeout(self, signum, frame): 712 raise Exception(self.errmsg) 713 714def file_pattern(name): 715 return "{0}-{1}".format(os.getpid(), name) 716 717class FilePath: 718 """ 719 Context manager generating multiple file names. The generated files are 720 removed when exiting the context. 721 722 Example usage: 723 724 with FilePath('a.img', 'b.img') as (img_a, img_b): 725 # Use img_a and img_b here... 726 727 # a.img and b.img are automatically removed here. 728 729 By default images are created in iotests.test_dir. To create sockets use 730 iotests.sock_dir: 731 732 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 733 734 For convenience, calling with one argument yields a single file instead of 735 a tuple with one item. 736 737 """ 738 def __init__(self, *names, base_dir=test_dir): 739 self.paths = [os.path.join(base_dir, file_pattern(name)) 740 for name in names] 741 742 def __enter__(self): 743 if len(self.paths) == 1: 744 return self.paths[0] 745 else: 746 return self.paths 747 748 def __exit__(self, exc_type, exc_val, exc_tb): 749 for path in self.paths: 750 try: 751 os.remove(path) 752 except OSError: 753 pass 754 return False 755 756 757def try_remove(img): 758 try: 759 os.remove(img) 760 except OSError: 761 pass 762 763def file_path_remover(): 764 for path in reversed(file_path_remover.paths): 765 try_remove(path) 766 767 768def file_path(*names, base_dir=test_dir): 769 ''' Another way to get auto-generated filename that cleans itself up. 770 771 Use is as simple as: 772 773 img_a, img_b = file_path('a.img', 'b.img') 774 sock = file_path('socket') 775 ''' 776 777 if not hasattr(file_path_remover, 'paths'): 778 file_path_remover.paths = [] 779 atexit.register(file_path_remover) 780 781 paths = [] 782 for name in names: 783 filename = file_pattern(name) 784 path = os.path.join(base_dir, filename) 785 file_path_remover.paths.append(path) 786 paths.append(path) 787 788 return paths[0] if len(paths) == 1 else paths 789 790def remote_filename(path): 791 if imgproto == 'file': 792 return path 793 elif imgproto == 'ssh': 794 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 795 else: 796 raise Exception("Protocol %s not supported" % (imgproto)) 797 798class VM(qtest.QEMUQtestMachine): 799 '''A QEMU VM''' 800 801 def __init__(self, path_suffix=''): 802 name = "qemu%s-%d" % (path_suffix, os.getpid()) 803 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 804 if qemu_gdb and qemu_valgrind: 805 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 806 sys.exit(1) 807 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 808 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 809 name=name, 810 base_temp_dir=test_dir, 811 sock_dir=sock_dir, qmp_timer=timer) 812 self._num_drives = 0 813 814 def _post_shutdown(self) -> None: 815 super()._post_shutdown() 816 if not qemu_valgrind or not self._popen: 817 return 818 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 819 if self.exitcode() == 99: 820 with open(valgrind_filename, encoding='utf-8') as f: 821 print(f.read()) 822 else: 823 os.remove(valgrind_filename) 824 825 def _pre_launch(self) -> None: 826 super()._pre_launch() 827 if qemu_print: 828 # set QEMU binary output to stdout 829 self._close_qemu_log_file() 830 831 def add_object(self, opts): 832 self._args.append('-object') 833 self._args.append(opts) 834 return self 835 836 def add_device(self, opts): 837 self._args.append('-device') 838 self._args.append(opts) 839 return self 840 841 def add_drive_raw(self, opts): 842 self._args.append('-drive') 843 self._args.append(opts) 844 return self 845 846 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 847 '''Add a virtio-blk drive to the VM''' 848 options = ['if=%s' % interface, 849 'id=drive%d' % self._num_drives] 850 851 if path is not None: 852 options.append('file=%s' % path) 853 options.append('format=%s' % img_format) 854 options.append('cache=%s' % cachemode) 855 options.append('aio=%s' % aiomode) 856 857 if opts: 858 options.append(opts) 859 860 if img_format == 'luks' and 'key-secret' not in opts: 861 # default luks support 862 if luks_default_secret_object not in self._args: 863 self.add_object(luks_default_secret_object) 864 865 options.append(luks_default_key_secret_opt) 866 867 self._args.append('-drive') 868 self._args.append(','.join(options)) 869 self._num_drives += 1 870 return self 871 872 def add_blockdev(self, opts): 873 self._args.append('-blockdev') 874 if isinstance(opts, str): 875 self._args.append(opts) 876 else: 877 self._args.append(','.join(opts)) 878 return self 879 880 def add_incoming(self, addr): 881 self._args.append('-incoming') 882 self._args.append(addr) 883 return self 884 885 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 886 cmd = 'human-monitor-command' 887 kwargs: Dict[str, Any] = {'command-line': command_line} 888 if use_log: 889 return self.qmp_log(cmd, **kwargs) 890 else: 891 return self.qmp(cmd, **kwargs) 892 893 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 894 """Pause drive r/w operations""" 895 if not event: 896 self.pause_drive(drive, "read_aio") 897 self.pause_drive(drive, "write_aio") 898 return 899 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 900 901 def resume_drive(self, drive: str) -> None: 902 """Resume drive r/w operations""" 903 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 904 905 def hmp_qemu_io(self, drive: str, cmd: str, 906 use_log: bool = False, qdev: bool = False) -> QMPMessage: 907 """Write to a given drive using an HMP command""" 908 d = '-d ' if qdev else '' 909 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 910 911 def flatten_qmp_object(self, obj, output=None, basestr=''): 912 if output is None: 913 output = {} 914 if isinstance(obj, list): 915 for i, item in enumerate(obj): 916 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 917 elif isinstance(obj, dict): 918 for key in obj: 919 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 920 else: 921 output[basestr[:-1]] = obj # Strip trailing '.' 922 return output 923 924 def qmp_to_opts(self, obj): 925 obj = self.flatten_qmp_object(obj) 926 output_list = [] 927 for key in obj: 928 output_list += [key + '=' + obj[key]] 929 return ','.join(output_list) 930 931 def get_qmp_events_filtered(self, wait=60.0): 932 result = [] 933 for ev in self.get_qmp_events(wait=wait): 934 result.append(filter_qmp_event(ev)) 935 return result 936 937 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 938 full_cmd = OrderedDict(( 939 ("execute", cmd), 940 ("arguments", ordered_qmp(kwargs)) 941 )) 942 log(full_cmd, filters, indent=indent) 943 result = self.qmp(cmd, **kwargs) 944 log(result, filters, indent=indent) 945 return result 946 947 # Returns None on success, and an error string on failure 948 def run_job(self, job: str, auto_finalize: bool = True, 949 auto_dismiss: bool = False, 950 pre_finalize: Optional[Callable[[], None]] = None, 951 cancel: bool = False, wait: float = 60.0, 952 filters: Iterable[Callable[[Any], Any]] = (), 953 ) -> Optional[str]: 954 """ 955 run_job moves a job from creation through to dismissal. 956 957 :param job: String. ID of recently-launched job 958 :param auto_finalize: Bool. True if the job was launched with 959 auto_finalize. Defaults to True. 960 :param auto_dismiss: Bool. True if the job was launched with 961 auto_dismiss=True. Defaults to False. 962 :param pre_finalize: Callback. A callable that takes no arguments to be 963 invoked prior to issuing job-finalize, if any. 964 :param cancel: Bool. When true, cancels the job after the pre_finalize 965 callback. 966 :param wait: Float. Timeout value specifying how long to wait for any 967 event, in seconds. Defaults to 60.0. 968 """ 969 match_device = {'data': {'device': job}} 970 match_id = {'data': {'id': job}} 971 events = [ 972 ('BLOCK_JOB_COMPLETED', match_device), 973 ('BLOCK_JOB_CANCELLED', match_device), 974 ('BLOCK_JOB_ERROR', match_device), 975 ('BLOCK_JOB_READY', match_device), 976 ('BLOCK_JOB_PENDING', match_id), 977 ('JOB_STATUS_CHANGE', match_id) 978 ] 979 error = None 980 while True: 981 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 982 if ev['event'] != 'JOB_STATUS_CHANGE': 983 log(ev, filters=filters) 984 continue 985 status = ev['data']['status'] 986 if status == 'aborting': 987 result = self.qmp('query-jobs') 988 for j in result['return']: 989 if j['id'] == job: 990 error = j['error'] 991 log('Job failed: %s' % (j['error']), filters=filters) 992 elif status == 'ready': 993 self.qmp_log('job-complete', id=job, filters=filters) 994 elif status == 'pending' and not auto_finalize: 995 if pre_finalize: 996 pre_finalize() 997 if cancel: 998 self.qmp_log('job-cancel', id=job, filters=filters) 999 else: 1000 self.qmp_log('job-finalize', id=job, filters=filters) 1001 elif status == 'concluded' and not auto_dismiss: 1002 self.qmp_log('job-dismiss', id=job, filters=filters) 1003 elif status == 'null': 1004 return error 1005 1006 # Returns None on success, and an error string on failure 1007 def blockdev_create(self, options, job_id='job0', filters=None): 1008 if filters is None: 1009 filters = [filter_qmp_testfiles] 1010 result = self.qmp_log('blockdev-create', filters=filters, 1011 job_id=job_id, options=options) 1012 1013 if 'return' in result: 1014 assert result['return'] == {} 1015 job_result = self.run_job(job_id, filters=filters) 1016 else: 1017 job_result = result['error'] 1018 1019 log("") 1020 return job_result 1021 1022 def enable_migration_events(self, name): 1023 log('Enabling migration QMP events on %s...' % name) 1024 log(self.qmp('migrate-set-capabilities', capabilities=[ 1025 { 1026 'capability': 'events', 1027 'state': True 1028 } 1029 ])) 1030 1031 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 1032 while True: 1033 event = self.event_wait('MIGRATION') 1034 # We use the default timeout, and with a timeout, event_wait() 1035 # never returns None 1036 assert event 1037 1038 log(event, filters=[filter_qmp_event]) 1039 if event['data']['status'] in ('completed', 'failed'): 1040 break 1041 1042 if event['data']['status'] == 'completed': 1043 # The event may occur in finish-migrate, so wait for the expected 1044 # post-migration runstate 1045 runstate = None 1046 while runstate != expect_runstate: 1047 runstate = self.qmp('query-status')['return']['status'] 1048 return True 1049 else: 1050 return False 1051 1052 def node_info(self, node_name): 1053 nodes = self.qmp('query-named-block-nodes') 1054 for x in nodes['return']: 1055 if x['node-name'] == node_name: 1056 return x 1057 return None 1058 1059 def query_bitmaps(self): 1060 res = self.qmp("query-named-block-nodes") 1061 return {device['node-name']: device['dirty-bitmaps'] 1062 for device in res['return'] if 'dirty-bitmaps' in device} 1063 1064 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 1065 """ 1066 get a specific bitmap from the object returned by query_bitmaps. 1067 :param recording: If specified, filter results by the specified value. 1068 :param bitmaps: If specified, use it instead of call query_bitmaps() 1069 """ 1070 if bitmaps is None: 1071 bitmaps = self.query_bitmaps() 1072 1073 for bitmap in bitmaps[node_name]: 1074 if bitmap.get('name', '') == bitmap_name: 1075 if recording is None or bitmap.get('recording') == recording: 1076 return bitmap 1077 return None 1078 1079 def check_bitmap_status(self, node_name, bitmap_name, fields): 1080 ret = self.get_bitmap(node_name, bitmap_name) 1081 1082 return fields.items() <= ret.items() 1083 1084 def assert_block_path(self, root, path, expected_node, graph=None): 1085 """ 1086 Check whether the node under the given path in the block graph 1087 is @expected_node. 1088 1089 @root is the node name of the node where the @path is rooted. 1090 1091 @path is a string that consists of child names separated by 1092 slashes. It must begin with a slash. 1093 1094 Examples for @root + @path: 1095 - root="qcow2-node", path="/backing/file" 1096 - root="quorum-node", path="/children.2/file" 1097 1098 Hypothetically, @path could be empty, in which case it would 1099 point to @root. However, in practice this case is not useful 1100 and hence not allowed. 1101 1102 @expected_node may be None. (All elements of the path but the 1103 leaf must still exist.) 1104 1105 @graph may be None or the result of an x-debug-query-block-graph 1106 call that has already been performed. 1107 """ 1108 if graph is None: 1109 graph = self.qmp('x-debug-query-block-graph')['return'] 1110 1111 iter_path = iter(path.split('/')) 1112 1113 # Must start with a / 1114 assert next(iter_path) == '' 1115 1116 node = next((node for node in graph['nodes'] if node['name'] == root), 1117 None) 1118 1119 # An empty @path is not allowed, so the root node must be present 1120 assert node is not None, 'Root node %s not found' % root 1121 1122 for child_name in iter_path: 1123 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1124 1125 try: 1126 node_id = next(edge['child'] for edge in graph['edges'] 1127 if (edge['parent'] == node['id'] and 1128 edge['name'] == child_name)) 1129 1130 node = next(node for node in graph['nodes'] 1131 if node['id'] == node_id) 1132 1133 except StopIteration: 1134 node = None 1135 1136 if node is None: 1137 assert expected_node is None, \ 1138 'No node found under %s (but expected %s)' % \ 1139 (path, expected_node) 1140 else: 1141 assert node['name'] == expected_node, \ 1142 'Found node %s under %s (but expected %s)' % \ 1143 (node['name'], path, expected_node) 1144 1145index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1146 1147class QMPTestCase(unittest.TestCase): 1148 '''Abstract base class for QMP test cases''' 1149 1150 def __init__(self, *args, **kwargs): 1151 super().__init__(*args, **kwargs) 1152 # Many users of this class set a VM property we rely on heavily 1153 # in the methods below. 1154 self.vm = None 1155 1156 def dictpath(self, d, path): 1157 '''Traverse a path in a nested dict''' 1158 for component in path.split('/'): 1159 m = index_re.match(component) 1160 if m: 1161 component, idx = m.groups() 1162 idx = int(idx) 1163 1164 if not isinstance(d, dict) or component not in d: 1165 self.fail(f'failed path traversal for "{path}" in "{d}"') 1166 d = d[component] 1167 1168 if m: 1169 if not isinstance(d, list): 1170 self.fail(f'path component "{component}" in "{path}" ' 1171 f'is not a list in "{d}"') 1172 try: 1173 d = d[idx] 1174 except IndexError: 1175 self.fail(f'invalid index "{idx}" in path "{path}" ' 1176 f'in "{d}"') 1177 return d 1178 1179 def assert_qmp_absent(self, d, path): 1180 try: 1181 result = self.dictpath(d, path) 1182 except AssertionError: 1183 return 1184 self.fail('path "%s" has value "%s"' % (path, str(result))) 1185 1186 def assert_qmp(self, d, path, value): 1187 '''Assert that the value for a specific path in a QMP dict 1188 matches. When given a list of values, assert that any of 1189 them matches.''' 1190 1191 result = self.dictpath(d, path) 1192 1193 # [] makes no sense as a list of valid values, so treat it as 1194 # an actual single value. 1195 if isinstance(value, list) and value != []: 1196 for v in value: 1197 if result == v: 1198 return 1199 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1200 else: 1201 self.assertEqual(result, value, 1202 '"%s" is "%s", expected "%s"' 1203 % (path, str(result), str(value))) 1204 1205 def assert_no_active_block_jobs(self): 1206 result = self.vm.qmp('query-block-jobs') 1207 self.assert_qmp(result, 'return', []) 1208 1209 def assert_has_block_node(self, node_name=None, file_name=None): 1210 """Issue a query-named-block-nodes and assert node_name and/or 1211 file_name is present in the result""" 1212 def check_equal_or_none(a, b): 1213 return a is None or b is None or a == b 1214 assert node_name or file_name 1215 result = self.vm.qmp('query-named-block-nodes') 1216 for x in result["return"]: 1217 if check_equal_or_none(x.get("node-name"), node_name) and \ 1218 check_equal_or_none(x.get("file"), file_name): 1219 return 1220 self.fail("Cannot find %s %s in result:\n%s" % 1221 (node_name, file_name, result)) 1222 1223 def assert_json_filename_equal(self, json_filename, reference): 1224 '''Asserts that the given filename is a json: filename and that its 1225 content is equal to the given reference object''' 1226 self.assertEqual(json_filename[:5], 'json:') 1227 self.assertEqual( 1228 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1229 self.vm.flatten_qmp_object(reference) 1230 ) 1231 1232 def cancel_and_wait(self, drive='drive0', force=False, 1233 resume=False, wait=60.0): 1234 '''Cancel a block job and wait for it to finish, returning the event''' 1235 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1236 self.assert_qmp(result, 'return', {}) 1237 1238 if resume: 1239 self.vm.resume_drive(drive) 1240 1241 cancelled = False 1242 result = None 1243 while not cancelled: 1244 for event in self.vm.get_qmp_events(wait=wait): 1245 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1246 event['event'] == 'BLOCK_JOB_CANCELLED': 1247 self.assert_qmp(event, 'data/device', drive) 1248 result = event 1249 cancelled = True 1250 elif event['event'] == 'JOB_STATUS_CHANGE': 1251 self.assert_qmp(event, 'data/id', drive) 1252 1253 1254 self.assert_no_active_block_jobs() 1255 return result 1256 1257 def wait_until_completed(self, drive='drive0', check_offset=True, 1258 wait=60.0, error=None): 1259 '''Wait for a block job to finish, returning the event''' 1260 while True: 1261 for event in self.vm.get_qmp_events(wait=wait): 1262 if event['event'] == 'BLOCK_JOB_COMPLETED': 1263 self.assert_qmp(event, 'data/device', drive) 1264 if error is None: 1265 self.assert_qmp_absent(event, 'data/error') 1266 if check_offset: 1267 self.assert_qmp(event, 'data/offset', 1268 event['data']['len']) 1269 else: 1270 self.assert_qmp(event, 'data/error', error) 1271 self.assert_no_active_block_jobs() 1272 return event 1273 if event['event'] == 'JOB_STATUS_CHANGE': 1274 self.assert_qmp(event, 'data/id', drive) 1275 1276 def wait_ready(self, drive='drive0'): 1277 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1278 return self.vm.events_wait([ 1279 ('BLOCK_JOB_READY', 1280 {'data': {'type': 'mirror', 'device': drive}}), 1281 ('BLOCK_JOB_READY', 1282 {'data': {'type': 'commit', 'device': drive}}) 1283 ]) 1284 1285 def wait_ready_and_cancel(self, drive='drive0'): 1286 self.wait_ready(drive=drive) 1287 event = self.cancel_and_wait(drive=drive) 1288 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1289 self.assert_qmp(event, 'data/type', 'mirror') 1290 self.assert_qmp(event, 'data/offset', event['data']['len']) 1291 1292 def complete_and_wait(self, drive='drive0', wait_ready=True, 1293 completion_error=None): 1294 '''Complete a block job and wait for it to finish''' 1295 if wait_ready: 1296 self.wait_ready(drive=drive) 1297 1298 result = self.vm.qmp('block-job-complete', device=drive) 1299 self.assert_qmp(result, 'return', {}) 1300 1301 event = self.wait_until_completed(drive=drive, error=completion_error) 1302 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1303 1304 def pause_wait(self, job_id='job0'): 1305 with Timeout(3, "Timeout waiting for job to pause"): 1306 while True: 1307 result = self.vm.qmp('query-block-jobs') 1308 found = False 1309 for job in result['return']: 1310 if job['device'] == job_id: 1311 found = True 1312 if job['paused'] and not job['busy']: 1313 return job 1314 break 1315 assert found 1316 1317 def pause_job(self, job_id='job0', wait=True): 1318 result = self.vm.qmp('block-job-pause', device=job_id) 1319 self.assert_qmp(result, 'return', {}) 1320 if wait: 1321 return self.pause_wait(job_id) 1322 return result 1323 1324 def case_skip(self, reason): 1325 '''Skip this test case''' 1326 case_notrun(reason) 1327 self.skipTest(reason) 1328 1329 1330def notrun(reason): 1331 '''Skip this test suite''' 1332 # Each test in qemu-iotests has a number ("seq") 1333 seq = os.path.basename(sys.argv[0]) 1334 1335 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1336 as outfile: 1337 outfile.write(reason + '\n') 1338 logger.warning("%s not run: %s", seq, reason) 1339 sys.exit(0) 1340 1341def case_notrun(reason): 1342 '''Mark this test case as not having been run (without actually 1343 skipping it, that is left to the caller). See 1344 QMPTestCase.case_skip() for a variant that actually skips the 1345 current test case.''' 1346 1347 # Each test in qemu-iotests has a number ("seq") 1348 seq = os.path.basename(sys.argv[0]) 1349 1350 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1351 as outfile: 1352 outfile.write(' [case not run] ' + reason + '\n') 1353 1354def _verify_image_format(supported_fmts: Sequence[str] = (), 1355 unsupported_fmts: Sequence[str] = ()) -> None: 1356 if 'generic' in supported_fmts and \ 1357 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1358 # similar to 1359 # _supported_fmt generic 1360 # for bash tests 1361 supported_fmts = () 1362 1363 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1364 if not_sup or (imgfmt in unsupported_fmts): 1365 notrun('not suitable for this image format: %s' % imgfmt) 1366 1367 if imgfmt == 'luks': 1368 verify_working_luks() 1369 1370def _verify_protocol(supported: Sequence[str] = (), 1371 unsupported: Sequence[str] = ()) -> None: 1372 assert not (supported and unsupported) 1373 1374 if 'generic' in supported: 1375 return 1376 1377 not_sup = supported and (imgproto not in supported) 1378 if not_sup or (imgproto in unsupported): 1379 notrun('not suitable for this protocol: %s' % imgproto) 1380 1381def _verify_platform(supported: Sequence[str] = (), 1382 unsupported: Sequence[str] = ()) -> None: 1383 if any((sys.platform.startswith(x) for x in unsupported)): 1384 notrun('not suitable for this OS: %s' % sys.platform) 1385 1386 if supported: 1387 if not any((sys.platform.startswith(x) for x in supported)): 1388 notrun('not suitable for this OS: %s' % sys.platform) 1389 1390def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1391 if supported_cache_modes and (cachemode not in supported_cache_modes): 1392 notrun('not suitable for this cache mode: %s' % cachemode) 1393 1394def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1395 if supported_aio_modes and (aiomode not in supported_aio_modes): 1396 notrun('not suitable for this aio mode: %s' % aiomode) 1397 1398def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1399 usf_list = list(set(required_formats) - set(supported_formats())) 1400 if usf_list: 1401 notrun(f'formats {usf_list} are not whitelisted') 1402 1403 1404def _verify_virtio_blk() -> None: 1405 out = qemu_pipe('-M', 'none', '-device', 'help') 1406 if 'virtio-blk' not in out: 1407 notrun('Missing virtio-blk in QEMU binary') 1408 1409def _verify_virtio_scsi_pci_or_ccw() -> None: 1410 out = qemu_pipe('-M', 'none', '-device', 'help') 1411 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1412 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1413 1414 1415def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1416 imgopts = os.environ.get('IMGOPTS') 1417 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1418 # but it supported only for bash tests. We don't have a concept of global 1419 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1420 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1421 unsup = list(unsupported) + ['$'] 1422 if imgopts and any(x in imgopts for x in unsup): 1423 notrun(f'not suitable for this imgopts: {imgopts}') 1424 1425 1426def supports_quorum() -> bool: 1427 return 'quorum' in qemu_img('--help').stdout 1428 1429def verify_quorum(): 1430 '''Skip test suite if quorum support is not available''' 1431 if not supports_quorum(): 1432 notrun('quorum support missing') 1433 1434def has_working_luks() -> Tuple[bool, str]: 1435 """ 1436 Check whether our LUKS driver can actually create images 1437 (this extends to LUKS encryption for qcow2). 1438 1439 If not, return the reason why. 1440 """ 1441 1442 img_file = f'{test_dir}/luks-test.luks' 1443 res = qemu_img('create', '-f', 'luks', 1444 '--object', luks_default_secret_object, 1445 '-o', luks_default_key_secret_opt, 1446 '-o', 'iter-time=10', 1447 img_file, '1G', 1448 check=False) 1449 try: 1450 os.remove(img_file) 1451 except OSError: 1452 pass 1453 1454 if res.returncode: 1455 reason = res.stdout 1456 for line in res.stdout.splitlines(): 1457 if img_file + ':' in line: 1458 reason = line.split(img_file + ':', 1)[1].strip() 1459 break 1460 1461 return (False, reason) 1462 else: 1463 return (True, '') 1464 1465def verify_working_luks(): 1466 """ 1467 Skip test suite if LUKS does not work 1468 """ 1469 (working, reason) = has_working_luks() 1470 if not working: 1471 notrun(reason) 1472 1473def supports_qcow2_zstd_compression() -> bool: 1474 img_file = f'{test_dir}/qcow2-zstd-test.qcow2' 1475 res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd', 1476 img_file, '0', 1477 check=False) 1478 try: 1479 os.remove(img_file) 1480 except OSError: 1481 pass 1482 1483 if res.returncode == 1 and \ 1484 "'compression-type' does not accept value 'zstd'" in res.stdout: 1485 return False 1486 else: 1487 return True 1488 1489def verify_qcow2_zstd_compression(): 1490 if not supports_qcow2_zstd_compression(): 1491 notrun('zstd compression not supported') 1492 1493def qemu_pipe(*args: str) -> str: 1494 """ 1495 Run qemu with an option to print something and exit (e.g. a help option). 1496 1497 :return: QEMU's stdout output. 1498 """ 1499 full_args = [qemu_prog] + qemu_opts + list(args) 1500 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1501 return output 1502 1503def supported_formats(read_only=False): 1504 '''Set 'read_only' to True to check ro-whitelist 1505 Otherwise, rw-whitelist is checked''' 1506 1507 if not hasattr(supported_formats, "formats"): 1508 supported_formats.formats = {} 1509 1510 if read_only not in supported_formats.formats: 1511 format_message = qemu_pipe("-drive", "format=help") 1512 line = 1 if read_only else 0 1513 supported_formats.formats[read_only] = \ 1514 format_message.splitlines()[line].split(":")[1].split() 1515 1516 return supported_formats.formats[read_only] 1517 1518def skip_if_unsupported(required_formats=(), read_only=False): 1519 '''Skip Test Decorator 1520 Runs the test if all the required formats are whitelisted''' 1521 def skip_test_decorator(func): 1522 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1523 **kwargs: Dict[str, Any]) -> None: 1524 if callable(required_formats): 1525 fmts = required_formats(test_case) 1526 else: 1527 fmts = required_formats 1528 1529 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1530 if usf_list: 1531 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1532 test_case.case_skip(msg) 1533 else: 1534 func(test_case, *args, **kwargs) 1535 return func_wrapper 1536 return skip_test_decorator 1537 1538def skip_for_formats(formats: Sequence[str] = ()) \ 1539 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1540 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1541 '''Skip Test Decorator 1542 Skips the test for the given formats''' 1543 def skip_test_decorator(func): 1544 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1545 **kwargs: Dict[str, Any]) -> None: 1546 if imgfmt in formats: 1547 msg = f'{test_case}: Skipped for format {imgfmt}' 1548 test_case.case_skip(msg) 1549 else: 1550 func(test_case, *args, **kwargs) 1551 return func_wrapper 1552 return skip_test_decorator 1553 1554def skip_if_user_is_root(func): 1555 '''Skip Test Decorator 1556 Runs the test only without root permissions''' 1557 def func_wrapper(*args, **kwargs): 1558 if os.getuid() == 0: 1559 case_notrun('{}: cannot be run as root'.format(args[0])) 1560 return None 1561 else: 1562 return func(*args, **kwargs) 1563 return func_wrapper 1564 1565# We need to filter out the time taken from the output so that 1566# qemu-iotest can reliably diff the results against master output, 1567# and hide skipped tests from the reference output. 1568 1569class ReproducibleTestResult(unittest.TextTestResult): 1570 def addSkip(self, test, reason): 1571 # Same as TextTestResult, but print dot instead of "s" 1572 unittest.TestResult.addSkip(self, test, reason) 1573 if self.showAll: 1574 self.stream.writeln("skipped {0!r}".format(reason)) 1575 elif self.dots: 1576 self.stream.write(".") 1577 self.stream.flush() 1578 1579class ReproducibleStreamWrapper: 1580 def __init__(self, stream: TextIO): 1581 self.stream = stream 1582 1583 def __getattr__(self, attr): 1584 if attr in ('stream', '__getstate__'): 1585 raise AttributeError(attr) 1586 return getattr(self.stream, attr) 1587 1588 def write(self, arg=None): 1589 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1590 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1591 self.stream.write(arg) 1592 1593class ReproducibleTestRunner(unittest.TextTestRunner): 1594 def __init__(self, stream: Optional[TextIO] = None, 1595 resultclass: Type[unittest.TestResult] = 1596 ReproducibleTestResult, 1597 **kwargs: Any) -> None: 1598 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1599 super().__init__(stream=rstream, # type: ignore 1600 descriptions=True, 1601 resultclass=resultclass, 1602 **kwargs) 1603 1604def execute_unittest(argv: List[str], debug: bool = False) -> None: 1605 """Executes unittests within the calling module.""" 1606 1607 # Some tests have warnings, especially ResourceWarnings for unclosed 1608 # files and sockets. Ignore them for now to ensure reproducibility of 1609 # the test output. 1610 unittest.main(argv=argv, 1611 testRunner=ReproducibleTestRunner, 1612 verbosity=2 if debug else 1, 1613 warnings=None if sys.warnoptions else 'ignore') 1614 1615def execute_setup_common(supported_fmts: Sequence[str] = (), 1616 supported_platforms: Sequence[str] = (), 1617 supported_cache_modes: Sequence[str] = (), 1618 supported_aio_modes: Sequence[str] = (), 1619 unsupported_fmts: Sequence[str] = (), 1620 supported_protocols: Sequence[str] = (), 1621 unsupported_protocols: Sequence[str] = (), 1622 required_fmts: Sequence[str] = (), 1623 unsupported_imgopts: Sequence[str] = ()) -> bool: 1624 """ 1625 Perform necessary setup for either script-style or unittest-style tests. 1626 1627 :return: Bool; Whether or not debug mode has been requested via the CLI. 1628 """ 1629 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1630 1631 debug = '-d' in sys.argv 1632 if debug: 1633 sys.argv.remove('-d') 1634 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1635 1636 _verify_image_format(supported_fmts, unsupported_fmts) 1637 _verify_protocol(supported_protocols, unsupported_protocols) 1638 _verify_platform(supported=supported_platforms) 1639 _verify_cache_mode(supported_cache_modes) 1640 _verify_aio_mode(supported_aio_modes) 1641 _verify_formats(required_fmts) 1642 _verify_virtio_blk() 1643 _verify_imgopts(unsupported_imgopts) 1644 1645 return debug 1646 1647def execute_test(*args, test_function=None, **kwargs): 1648 """Run either unittest or script-style tests.""" 1649 1650 debug = execute_setup_common(*args, **kwargs) 1651 if not test_function: 1652 execute_unittest(sys.argv, debug) 1653 else: 1654 test_function() 1655 1656def activate_logging(): 1657 """Activate iotests.log() output to stdout for script-style tests.""" 1658 handler = logging.StreamHandler(stream=sys.stdout) 1659 formatter = logging.Formatter('%(message)s') 1660 handler.setFormatter(formatter) 1661 test_logger.addHandler(handler) 1662 test_logger.setLevel(logging.INFO) 1663 test_logger.propagate = False 1664 1665# This is called from script-style iotests without a single point of entry 1666def script_initialize(*args, **kwargs): 1667 """Initialize script-style tests without running any tests.""" 1668 activate_logging() 1669 execute_setup_common(*args, **kwargs) 1670 1671# This is called from script-style iotests with a single point of entry 1672def script_main(test_function, *args, **kwargs): 1673 """Run script-style tests outside of the unittest framework""" 1674 activate_logging() 1675 execute_test(*args, test_function=test_function, **kwargs) 1676 1677# This is called from unittest style iotests 1678def main(*args, **kwargs): 1679 """Run tests using the unittest framework""" 1680 execute_test(*args, **kwargs) 1681