1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp import QMPMessage 42from qemu.aqmp.legacy import QEMUMonitorProtocol 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88 89try: 90 test_dir = os.environ['TEST_DIR'] 91 sock_dir = os.environ['SOCK_DIR'] 92 cachemode = os.environ['CACHEMODE'] 93 aiomode = os.environ['AIOMODE'] 94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 95except KeyError: 96 # We are using these variables as proxies to indicate that we're 97 # not being run via "check". There may be other things set up by 98 # "check" that individual test cases rely on. 99 sys.stderr.write('Please run this test via the "check" script\n') 100 sys.exit(os.EX_USAGE) 101 102qemu_valgrind = [] 103if os.environ.get('VALGRIND_QEMU') == "y" and \ 104 os.environ.get('NO_VALGRIND') != "y": 105 valgrind_logfile = "--log-file=" + test_dir 106 # %p allows to put the valgrind process PID, since 107 # we don't know it a priori (subprocess.Popen is 108 # not yet invoked) 109 valgrind_logfile += "/%p.valgrind" 110 111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 112 113luks_default_secret_object = 'secret,id=keysec0,data=' + \ 114 os.environ.get('IMGKEYSECRET', '') 115luks_default_key_secret_opt = 'key-secret=keysec0' 116 117sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 118 119 120@contextmanager 121def change_log_level( 122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 123 """ 124 Utility function for temporarily changing the log level of a logger. 125 126 This can be used to silence errors that are expected or uninteresting. 127 """ 128 _logger = logging.getLogger(logger_name) 129 current_level = _logger.level 130 _logger.setLevel(level) 131 132 try: 133 yield 134 finally: 135 _logger.setLevel(current_level) 136 137 138def unarchive_sample_image(sample, fname): 139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 141 shutil.copyfileobj(f_in, f_out) 142 143 144def qemu_tool_popen(args: Sequence[str], 145 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 146 stderr = subprocess.STDOUT if connect_stderr else None 147 # pylint: disable=consider-using-with 148 return subprocess.Popen(args, 149 stdout=subprocess.PIPE, 150 stderr=stderr, 151 universal_newlines=True) 152 153 154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 155 connect_stderr: bool = True, 156 drop_successful_output: bool = False) \ 157 -> Tuple[str, int]: 158 """ 159 Run a tool and return both its output and its exit code 160 """ 161 with qemu_tool_popen(args, connect_stderr) as subp: 162 output = subp.communicate()[0] 163 if subp.returncode < 0: 164 cmd = ' '.join(args) 165 sys.stderr.write(f'{tool} received signal \ 166 {-subp.returncode}: {cmd}\n') 167 if drop_successful_output and subp.returncode == 0: 168 output = '' 169 return (output, subp.returncode) 170 171def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 172 if not args or args[0] != 'create': 173 return list(args) 174 args = args[1:] 175 176 p = argparse.ArgumentParser(allow_abbrev=False) 177 # -o option may be specified several times 178 p.add_argument('-o', action='append', default=[]) 179 p.add_argument('-f') 180 parsed, remaining = p.parse_known_args(args) 181 182 opts_list = parsed.o 183 184 result = ['create'] 185 if parsed.f is not None: 186 result += ['-f', parsed.f] 187 188 # IMGOPTS most probably contain options specific for the selected format, 189 # like extended_l2 or compression_type for qcow2. Test may want to create 190 # additional images in other formats that doesn't support these options. 191 # So, use IMGOPTS only for images created in imgfmt format. 192 imgopts = os.environ.get('IMGOPTS') 193 if imgopts and parsed.f == imgfmt: 194 opts_list.insert(0, imgopts) 195 196 # default luks support 197 if parsed.f == 'luks' and \ 198 all('key-secret' not in opts for opts in opts_list): 199 result += ['--object', luks_default_secret_object] 200 opts_list.append(luks_default_key_secret_opt) 201 202 for opts in opts_list: 203 result += ['-o', opts] 204 205 result += remaining 206 207 return result 208 209def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 210 """ 211 Run qemu-img and return both its output and its exit code 212 """ 213 is_create = bool(args and args[0] == 'create') 214 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 215 return qemu_tool_pipe_and_status('qemu-img', full_args, 216 drop_successful_output=is_create) 217 218def qemu_img(*args: str) -> int: 219 '''Run qemu-img and return the exit code''' 220 return qemu_img_pipe_and_status(*args)[1] 221 222def ordered_qmp(qmsg, conv_keys=True): 223 # Dictionaries are not ordered prior to 3.6, therefore: 224 if isinstance(qmsg, list): 225 return [ordered_qmp(atom) for atom in qmsg] 226 if isinstance(qmsg, dict): 227 od = OrderedDict() 228 for k, v in sorted(qmsg.items()): 229 if conv_keys: 230 k = k.replace('_', '-') 231 od[k] = ordered_qmp(v, conv_keys=False) 232 return od 233 return qmsg 234 235def qemu_img_create(*args): 236 return qemu_img('create', *args) 237 238def qemu_img_measure(*args): 239 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 240 241def qemu_img_check(*args): 242 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 243 244def qemu_img_pipe(*args: str) -> str: 245 '''Run qemu-img and return its output''' 246 return qemu_img_pipe_and_status(*args)[0] 247 248def qemu_img_log(*args): 249 result = qemu_img_pipe(*args) 250 log(result, filters=[filter_testfiles]) 251 return result 252 253def img_info_log(filename, filter_path=None, use_image_opts=False, 254 extra_args=()): 255 args = ['info'] 256 if use_image_opts: 257 args.append('--image-opts') 258 else: 259 args += ['-f', imgfmt] 260 args += extra_args 261 args.append(filename) 262 263 output = qemu_img_pipe(*args) 264 if not filter_path: 265 filter_path = filename 266 log(filter_img_info(output, filter_path)) 267 268def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 269 if '-f' in args or '--image-opts' in args: 270 return qemu_io_args_no_fmt + list(args) 271 else: 272 return qemu_io_args + list(args) 273 274def qemu_io_popen(*args): 275 return qemu_tool_popen(qemu_io_wrap_args(args)) 276 277def qemu_io(*args): 278 '''Run qemu-io and return the stdout data''' 279 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 280 281def qemu_io_pipe_and_status(*args): 282 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args)) 283 284def qemu_io_log(*args): 285 result = qemu_io(*args) 286 log(result, filters=[filter_testfiles, filter_qemu_io]) 287 return result 288 289def qemu_io_silent(*args): 290 '''Run qemu-io and return the exit code, suppressing stdout''' 291 args = qemu_io_wrap_args(args) 292 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 293 if result.returncode < 0: 294 sys.stderr.write('qemu-io received signal %i: %s\n' % 295 (-result.returncode, ' '.join(args))) 296 return result.returncode 297 298def qemu_io_silent_check(*args): 299 '''Run qemu-io and return the true if subprocess returned 0''' 300 args = qemu_io_wrap_args(args) 301 result = subprocess.run(args, stdout=subprocess.DEVNULL, 302 stderr=subprocess.STDOUT, check=False) 303 return result.returncode == 0 304 305class QemuIoInteractive: 306 def __init__(self, *args): 307 self.args = qemu_io_wrap_args(args) 308 # We need to keep the Popen objext around, and not 309 # close it immediately. Therefore, disable the pylint check: 310 # pylint: disable=consider-using-with 311 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 312 stdout=subprocess.PIPE, 313 stderr=subprocess.STDOUT, 314 universal_newlines=True) 315 out = self._p.stdout.read(9) 316 if out != 'qemu-io> ': 317 # Most probably qemu-io just failed to start. 318 # Let's collect the whole output and exit. 319 out += self._p.stdout.read() 320 self._p.wait(timeout=1) 321 raise ValueError(out) 322 323 def close(self): 324 self._p.communicate('q\n') 325 326 def _read_output(self): 327 pattern = 'qemu-io> ' 328 n = len(pattern) 329 pos = 0 330 s = [] 331 while pos != n: 332 c = self._p.stdout.read(1) 333 # check unexpected EOF 334 assert c != '' 335 s.append(c) 336 if c == pattern[pos]: 337 pos += 1 338 else: 339 pos = 0 340 341 return ''.join(s[:-n]) 342 343 def cmd(self, cmd): 344 # quit command is in close(), '\n' is added automatically 345 assert '\n' not in cmd 346 cmd = cmd.strip() 347 assert cmd not in ('q', 'quit') 348 self._p.stdin.write(cmd + '\n') 349 self._p.stdin.flush() 350 return self._read_output() 351 352 353class QemuStorageDaemon: 354 _qmp: Optional[QEMUMonitorProtocol] = None 355 _qmpsock: Optional[str] = None 356 # Python < 3.8 would complain if this type were not a string literal 357 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 358 _p: 'Optional[subprocess.Popen[bytes]]' = None 359 360 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 361 assert '--pidfile' not in args 362 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 363 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 364 365 if qmp: 366 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 367 all_args += ['--chardev', 368 f'socket,id=qmp-sock,path={self._qmpsock}', 369 '--monitor', 'qmp-sock'] 370 371 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 372 373 # Cannot use with here, we want the subprocess to stay around 374 # pylint: disable=consider-using-with 375 self._p = subprocess.Popen(all_args) 376 if self._qmp is not None: 377 self._qmp.accept() 378 while not os.path.exists(self.pidfile): 379 if self._p.poll() is not None: 380 cmd = ' '.join(all_args) 381 raise RuntimeError( 382 'qemu-storage-daemon terminated with exit code ' + 383 f'{self._p.returncode}: {cmd}') 384 385 time.sleep(0.01) 386 387 with open(self.pidfile, encoding='utf-8') as f: 388 self._pid = int(f.read().strip()) 389 390 assert self._pid == self._p.pid 391 392 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 393 -> QMPMessage: 394 assert self._qmp is not None 395 return self._qmp.cmd(cmd, args) 396 397 def stop(self, kill_signal=15): 398 self._p.send_signal(kill_signal) 399 self._p.wait() 400 self._p = None 401 402 if self._qmp: 403 self._qmp.close() 404 405 if self._qmpsock is not None: 406 try: 407 os.remove(self._qmpsock) 408 except OSError: 409 pass 410 try: 411 os.remove(self.pidfile) 412 except OSError: 413 pass 414 415 def __del__(self): 416 if self._p is not None: 417 self.stop(kill_signal=9) 418 419 420def qemu_nbd(*args): 421 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 422 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 423 424def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 425 '''Run qemu-nbd in daemon mode and return both the parent's exit code 426 and its output in case of an error''' 427 full_args = qemu_nbd_args + ['--fork'] + list(args) 428 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 429 connect_stderr=False) 430 return returncode, output if returncode else '' 431 432def qemu_nbd_list_log(*args: str) -> str: 433 '''Run qemu-nbd to list remote exports''' 434 full_args = [qemu_nbd_prog, '-L'] + list(args) 435 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 436 log(output, filters=[filter_testfiles, filter_nbd_exports]) 437 return output 438 439@contextmanager 440def qemu_nbd_popen(*args): 441 '''Context manager running qemu-nbd within the context''' 442 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 443 444 assert not os.path.exists(pid_file) 445 446 cmd = list(qemu_nbd_args) 447 cmd.extend(('--persistent', '--pid-file', pid_file)) 448 cmd.extend(args) 449 450 log('Start NBD server') 451 with subprocess.Popen(cmd) as p: 452 try: 453 while not os.path.exists(pid_file): 454 if p.poll() is not None: 455 raise RuntimeError( 456 "qemu-nbd terminated with exit code {}: {}" 457 .format(p.returncode, ' '.join(cmd))) 458 459 time.sleep(0.01) 460 yield 461 finally: 462 if os.path.exists(pid_file): 463 os.remove(pid_file) 464 log('Kill NBD server') 465 p.kill() 466 p.wait() 467 468def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 469 '''Return True if two image files are identical''' 470 return qemu_img('compare', '-f', fmt1, 471 '-F', fmt2, img1, img2) == 0 472 473def create_image(name, size): 474 '''Create a fully-allocated raw image with sector markers''' 475 with open(name, 'wb') as file: 476 i = 0 477 while i < size: 478 sector = struct.pack('>l504xl', i // 512, i // 512) 479 file.write(sector) 480 i = i + 512 481 482def image_size(img): 483 '''Return image's virtual size''' 484 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 485 return json.loads(r)['virtual-size'] 486 487def is_str(val): 488 return isinstance(val, str) 489 490test_dir_re = re.compile(r"%s" % test_dir) 491def filter_test_dir(msg): 492 return test_dir_re.sub("TEST_DIR", msg) 493 494win32_re = re.compile(r"\r") 495def filter_win32(msg): 496 return win32_re.sub("", msg) 497 498qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 499 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 500 r"and [0-9\/.inf]* ops\/sec\)") 501def filter_qemu_io(msg): 502 msg = filter_win32(msg) 503 return qemu_io_re.sub("X ops; XX:XX:XX.X " 504 "(XXX YYY/sec and XXX ops/sec)", msg) 505 506chown_re = re.compile(r"chown [0-9]+:[0-9]+") 507def filter_chown(msg): 508 return chown_re.sub("chown UID:GID", msg) 509 510def filter_qmp_event(event): 511 '''Filter a QMP event dict''' 512 event = dict(event) 513 if 'timestamp' in event: 514 event['timestamp']['seconds'] = 'SECS' 515 event['timestamp']['microseconds'] = 'USECS' 516 return event 517 518def filter_qmp(qmsg, filter_fn): 519 '''Given a string filter, filter a QMP object's values. 520 filter_fn takes a (key, value) pair.''' 521 # Iterate through either lists or dicts; 522 if isinstance(qmsg, list): 523 items = enumerate(qmsg) 524 else: 525 items = qmsg.items() 526 527 for k, v in items: 528 if isinstance(v, (dict, list)): 529 qmsg[k] = filter_qmp(v, filter_fn) 530 else: 531 qmsg[k] = filter_fn(k, v) 532 return qmsg 533 534def filter_testfiles(msg): 535 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 536 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 537 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 538 539def filter_qmp_testfiles(qmsg): 540 def _filter(_key, value): 541 if is_str(value): 542 return filter_testfiles(value) 543 return value 544 return filter_qmp(qmsg, _filter) 545 546def filter_virtio_scsi(output: str) -> str: 547 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 548 549def filter_qmp_virtio_scsi(qmsg): 550 def _filter(_key, value): 551 if is_str(value): 552 return filter_virtio_scsi(value) 553 return value 554 return filter_qmp(qmsg, _filter) 555 556def filter_generated_node_ids(msg): 557 return re.sub("#block[0-9]+", "NODE_NAME", msg) 558 559def filter_img_info(output, filename): 560 lines = [] 561 for line in output.split('\n'): 562 if 'disk size' in line or 'actual-size' in line: 563 continue 564 line = line.replace(filename, 'TEST_IMG') 565 line = filter_testfiles(line) 566 line = line.replace(imgfmt, 'IMGFMT') 567 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 568 line = re.sub('uuid: [-a-f0-9]+', 569 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 570 line) 571 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 572 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 573 line) 574 lines.append(line) 575 return '\n'.join(lines) 576 577def filter_imgfmt(msg): 578 return msg.replace(imgfmt, 'IMGFMT') 579 580def filter_qmp_imgfmt(qmsg): 581 def _filter(_key, value): 582 if is_str(value): 583 return filter_imgfmt(value) 584 return value 585 return filter_qmp(qmsg, _filter) 586 587def filter_nbd_exports(output: str) -> str: 588 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 589 590 591Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 592 593def log(msg: Msg, 594 filters: Iterable[Callable[[Msg], Msg]] = (), 595 indent: Optional[int] = None) -> None: 596 """ 597 Logs either a string message or a JSON serializable message (like QMP). 598 If indent is provided, JSON serializable messages are pretty-printed. 599 """ 600 for flt in filters: 601 msg = flt(msg) 602 if isinstance(msg, (dict, list)): 603 # Don't sort if it's already sorted 604 do_sort = not isinstance(msg, OrderedDict) 605 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 606 else: 607 test_logger.info(msg) 608 609class Timeout: 610 def __init__(self, seconds, errmsg="Timeout"): 611 self.seconds = seconds 612 self.errmsg = errmsg 613 def __enter__(self): 614 if qemu_gdb or qemu_valgrind: 615 return self 616 signal.signal(signal.SIGALRM, self.timeout) 617 signal.setitimer(signal.ITIMER_REAL, self.seconds) 618 return self 619 def __exit__(self, exc_type, value, traceback): 620 if qemu_gdb or qemu_valgrind: 621 return False 622 signal.setitimer(signal.ITIMER_REAL, 0) 623 return False 624 def timeout(self, signum, frame): 625 raise Exception(self.errmsg) 626 627def file_pattern(name): 628 return "{0}-{1}".format(os.getpid(), name) 629 630class FilePath: 631 """ 632 Context manager generating multiple file names. The generated files are 633 removed when exiting the context. 634 635 Example usage: 636 637 with FilePath('a.img', 'b.img') as (img_a, img_b): 638 # Use img_a and img_b here... 639 640 # a.img and b.img are automatically removed here. 641 642 By default images are created in iotests.test_dir. To create sockets use 643 iotests.sock_dir: 644 645 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 646 647 For convenience, calling with one argument yields a single file instead of 648 a tuple with one item. 649 650 """ 651 def __init__(self, *names, base_dir=test_dir): 652 self.paths = [os.path.join(base_dir, file_pattern(name)) 653 for name in names] 654 655 def __enter__(self): 656 if len(self.paths) == 1: 657 return self.paths[0] 658 else: 659 return self.paths 660 661 def __exit__(self, exc_type, exc_val, exc_tb): 662 for path in self.paths: 663 try: 664 os.remove(path) 665 except OSError: 666 pass 667 return False 668 669 670def try_remove(img): 671 try: 672 os.remove(img) 673 except OSError: 674 pass 675 676def file_path_remover(): 677 for path in reversed(file_path_remover.paths): 678 try_remove(path) 679 680 681def file_path(*names, base_dir=test_dir): 682 ''' Another way to get auto-generated filename that cleans itself up. 683 684 Use is as simple as: 685 686 img_a, img_b = file_path('a.img', 'b.img') 687 sock = file_path('socket') 688 ''' 689 690 if not hasattr(file_path_remover, 'paths'): 691 file_path_remover.paths = [] 692 atexit.register(file_path_remover) 693 694 paths = [] 695 for name in names: 696 filename = file_pattern(name) 697 path = os.path.join(base_dir, filename) 698 file_path_remover.paths.append(path) 699 paths.append(path) 700 701 return paths[0] if len(paths) == 1 else paths 702 703def remote_filename(path): 704 if imgproto == 'file': 705 return path 706 elif imgproto == 'ssh': 707 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 708 else: 709 raise Exception("Protocol %s not supported" % (imgproto)) 710 711class VM(qtest.QEMUQtestMachine): 712 '''A QEMU VM''' 713 714 def __init__(self, path_suffix=''): 715 name = "qemu%s-%d" % (path_suffix, os.getpid()) 716 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 717 if qemu_gdb and qemu_valgrind: 718 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 719 sys.exit(1) 720 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 721 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 722 name=name, 723 base_temp_dir=test_dir, 724 sock_dir=sock_dir, qmp_timer=timer) 725 self._num_drives = 0 726 727 def _post_shutdown(self) -> None: 728 super()._post_shutdown() 729 if not qemu_valgrind or not self._popen: 730 return 731 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 732 if self.exitcode() == 99: 733 with open(valgrind_filename, encoding='utf-8') as f: 734 print(f.read()) 735 else: 736 os.remove(valgrind_filename) 737 738 def _pre_launch(self) -> None: 739 super()._pre_launch() 740 if qemu_print: 741 # set QEMU binary output to stdout 742 self._close_qemu_log_file() 743 744 def add_object(self, opts): 745 self._args.append('-object') 746 self._args.append(opts) 747 return self 748 749 def add_device(self, opts): 750 self._args.append('-device') 751 self._args.append(opts) 752 return self 753 754 def add_drive_raw(self, opts): 755 self._args.append('-drive') 756 self._args.append(opts) 757 return self 758 759 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 760 '''Add a virtio-blk drive to the VM''' 761 options = ['if=%s' % interface, 762 'id=drive%d' % self._num_drives] 763 764 if path is not None: 765 options.append('file=%s' % path) 766 options.append('format=%s' % img_format) 767 options.append('cache=%s' % cachemode) 768 options.append('aio=%s' % aiomode) 769 770 if opts: 771 options.append(opts) 772 773 if img_format == 'luks' and 'key-secret' not in opts: 774 # default luks support 775 if luks_default_secret_object not in self._args: 776 self.add_object(luks_default_secret_object) 777 778 options.append(luks_default_key_secret_opt) 779 780 self._args.append('-drive') 781 self._args.append(','.join(options)) 782 self._num_drives += 1 783 return self 784 785 def add_blockdev(self, opts): 786 self._args.append('-blockdev') 787 if isinstance(opts, str): 788 self._args.append(opts) 789 else: 790 self._args.append(','.join(opts)) 791 return self 792 793 def add_incoming(self, addr): 794 self._args.append('-incoming') 795 self._args.append(addr) 796 return self 797 798 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 799 cmd = 'human-monitor-command' 800 kwargs: Dict[str, Any] = {'command-line': command_line} 801 if use_log: 802 return self.qmp_log(cmd, **kwargs) 803 else: 804 return self.qmp(cmd, **kwargs) 805 806 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 807 """Pause drive r/w operations""" 808 if not event: 809 self.pause_drive(drive, "read_aio") 810 self.pause_drive(drive, "write_aio") 811 return 812 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 813 814 def resume_drive(self, drive: str) -> None: 815 """Resume drive r/w operations""" 816 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 817 818 def hmp_qemu_io(self, drive: str, cmd: str, 819 use_log: bool = False, qdev: bool = False) -> QMPMessage: 820 """Write to a given drive using an HMP command""" 821 d = '-d ' if qdev else '' 822 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 823 824 def flatten_qmp_object(self, obj, output=None, basestr=''): 825 if output is None: 826 output = {} 827 if isinstance(obj, list): 828 for i, item in enumerate(obj): 829 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 830 elif isinstance(obj, dict): 831 for key in obj: 832 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 833 else: 834 output[basestr[:-1]] = obj # Strip trailing '.' 835 return output 836 837 def qmp_to_opts(self, obj): 838 obj = self.flatten_qmp_object(obj) 839 output_list = [] 840 for key in obj: 841 output_list += [key + '=' + obj[key]] 842 return ','.join(output_list) 843 844 def get_qmp_events_filtered(self, wait=60.0): 845 result = [] 846 for ev in self.get_qmp_events(wait=wait): 847 result.append(filter_qmp_event(ev)) 848 return result 849 850 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 851 full_cmd = OrderedDict(( 852 ("execute", cmd), 853 ("arguments", ordered_qmp(kwargs)) 854 )) 855 log(full_cmd, filters, indent=indent) 856 result = self.qmp(cmd, **kwargs) 857 log(result, filters, indent=indent) 858 return result 859 860 # Returns None on success, and an error string on failure 861 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 862 pre_finalize=None, cancel=False, wait=60.0): 863 """ 864 run_job moves a job from creation through to dismissal. 865 866 :param job: String. ID of recently-launched job 867 :param auto_finalize: Bool. True if the job was launched with 868 auto_finalize. Defaults to True. 869 :param auto_dismiss: Bool. True if the job was launched with 870 auto_dismiss=True. Defaults to False. 871 :param pre_finalize: Callback. A callable that takes no arguments to be 872 invoked prior to issuing job-finalize, if any. 873 :param cancel: Bool. When true, cancels the job after the pre_finalize 874 callback. 875 :param wait: Float. Timeout value specifying how long to wait for any 876 event, in seconds. Defaults to 60.0. 877 """ 878 match_device = {'data': {'device': job}} 879 match_id = {'data': {'id': job}} 880 events = [ 881 ('BLOCK_JOB_COMPLETED', match_device), 882 ('BLOCK_JOB_CANCELLED', match_device), 883 ('BLOCK_JOB_ERROR', match_device), 884 ('BLOCK_JOB_READY', match_device), 885 ('BLOCK_JOB_PENDING', match_id), 886 ('JOB_STATUS_CHANGE', match_id) 887 ] 888 error = None 889 while True: 890 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 891 if ev['event'] != 'JOB_STATUS_CHANGE': 892 log(ev) 893 continue 894 status = ev['data']['status'] 895 if status == 'aborting': 896 result = self.qmp('query-jobs') 897 for j in result['return']: 898 if j['id'] == job: 899 error = j['error'] 900 log('Job failed: %s' % (j['error'])) 901 elif status == 'ready': 902 self.qmp_log('job-complete', id=job) 903 elif status == 'pending' and not auto_finalize: 904 if pre_finalize: 905 pre_finalize() 906 if cancel: 907 self.qmp_log('job-cancel', id=job) 908 else: 909 self.qmp_log('job-finalize', id=job) 910 elif status == 'concluded' and not auto_dismiss: 911 self.qmp_log('job-dismiss', id=job) 912 elif status == 'null': 913 return error 914 915 # Returns None on success, and an error string on failure 916 def blockdev_create(self, options, job_id='job0', filters=None): 917 if filters is None: 918 filters = [filter_qmp_testfiles] 919 result = self.qmp_log('blockdev-create', filters=filters, 920 job_id=job_id, options=options) 921 922 if 'return' in result: 923 assert result['return'] == {} 924 job_result = self.run_job(job_id) 925 else: 926 job_result = result['error'] 927 928 log("") 929 return job_result 930 931 def enable_migration_events(self, name): 932 log('Enabling migration QMP events on %s...' % name) 933 log(self.qmp('migrate-set-capabilities', capabilities=[ 934 { 935 'capability': 'events', 936 'state': True 937 } 938 ])) 939 940 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 941 while True: 942 event = self.event_wait('MIGRATION') 943 # We use the default timeout, and with a timeout, event_wait() 944 # never returns None 945 assert event 946 947 log(event, filters=[filter_qmp_event]) 948 if event['data']['status'] in ('completed', 'failed'): 949 break 950 951 if event['data']['status'] == 'completed': 952 # The event may occur in finish-migrate, so wait for the expected 953 # post-migration runstate 954 runstate = None 955 while runstate != expect_runstate: 956 runstate = self.qmp('query-status')['return']['status'] 957 return True 958 else: 959 return False 960 961 def node_info(self, node_name): 962 nodes = self.qmp('query-named-block-nodes') 963 for x in nodes['return']: 964 if x['node-name'] == node_name: 965 return x 966 return None 967 968 def query_bitmaps(self): 969 res = self.qmp("query-named-block-nodes") 970 return {device['node-name']: device['dirty-bitmaps'] 971 for device in res['return'] if 'dirty-bitmaps' in device} 972 973 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 974 """ 975 get a specific bitmap from the object returned by query_bitmaps. 976 :param recording: If specified, filter results by the specified value. 977 :param bitmaps: If specified, use it instead of call query_bitmaps() 978 """ 979 if bitmaps is None: 980 bitmaps = self.query_bitmaps() 981 982 for bitmap in bitmaps[node_name]: 983 if bitmap.get('name', '') == bitmap_name: 984 if recording is None or bitmap.get('recording') == recording: 985 return bitmap 986 return None 987 988 def check_bitmap_status(self, node_name, bitmap_name, fields): 989 ret = self.get_bitmap(node_name, bitmap_name) 990 991 return fields.items() <= ret.items() 992 993 def assert_block_path(self, root, path, expected_node, graph=None): 994 """ 995 Check whether the node under the given path in the block graph 996 is @expected_node. 997 998 @root is the node name of the node where the @path is rooted. 999 1000 @path is a string that consists of child names separated by 1001 slashes. It must begin with a slash. 1002 1003 Examples for @root + @path: 1004 - root="qcow2-node", path="/backing/file" 1005 - root="quorum-node", path="/children.2/file" 1006 1007 Hypothetically, @path could be empty, in which case it would 1008 point to @root. However, in practice this case is not useful 1009 and hence not allowed. 1010 1011 @expected_node may be None. (All elements of the path but the 1012 leaf must still exist.) 1013 1014 @graph may be None or the result of an x-debug-query-block-graph 1015 call that has already been performed. 1016 """ 1017 if graph is None: 1018 graph = self.qmp('x-debug-query-block-graph')['return'] 1019 1020 iter_path = iter(path.split('/')) 1021 1022 # Must start with a / 1023 assert next(iter_path) == '' 1024 1025 node = next((node for node in graph['nodes'] if node['name'] == root), 1026 None) 1027 1028 # An empty @path is not allowed, so the root node must be present 1029 assert node is not None, 'Root node %s not found' % root 1030 1031 for child_name in iter_path: 1032 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1033 1034 try: 1035 node_id = next(edge['child'] for edge in graph['edges'] 1036 if (edge['parent'] == node['id'] and 1037 edge['name'] == child_name)) 1038 1039 node = next(node for node in graph['nodes'] 1040 if node['id'] == node_id) 1041 1042 except StopIteration: 1043 node = None 1044 1045 if node is None: 1046 assert expected_node is None, \ 1047 'No node found under %s (but expected %s)' % \ 1048 (path, expected_node) 1049 else: 1050 assert node['name'] == expected_node, \ 1051 'Found node %s under %s (but expected %s)' % \ 1052 (node['name'], path, expected_node) 1053 1054index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1055 1056class QMPTestCase(unittest.TestCase): 1057 '''Abstract base class for QMP test cases''' 1058 1059 def __init__(self, *args, **kwargs): 1060 super().__init__(*args, **kwargs) 1061 # Many users of this class set a VM property we rely on heavily 1062 # in the methods below. 1063 self.vm = None 1064 1065 def dictpath(self, d, path): 1066 '''Traverse a path in a nested dict''' 1067 for component in path.split('/'): 1068 m = index_re.match(component) 1069 if m: 1070 component, idx = m.groups() 1071 idx = int(idx) 1072 1073 if not isinstance(d, dict) or component not in d: 1074 self.fail(f'failed path traversal for "{path}" in "{d}"') 1075 d = d[component] 1076 1077 if m: 1078 if not isinstance(d, list): 1079 self.fail(f'path component "{component}" in "{path}" ' 1080 f'is not a list in "{d}"') 1081 try: 1082 d = d[idx] 1083 except IndexError: 1084 self.fail(f'invalid index "{idx}" in path "{path}" ' 1085 f'in "{d}"') 1086 return d 1087 1088 def assert_qmp_absent(self, d, path): 1089 try: 1090 result = self.dictpath(d, path) 1091 except AssertionError: 1092 return 1093 self.fail('path "%s" has value "%s"' % (path, str(result))) 1094 1095 def assert_qmp(self, d, path, value): 1096 '''Assert that the value for a specific path in a QMP dict 1097 matches. When given a list of values, assert that any of 1098 them matches.''' 1099 1100 result = self.dictpath(d, path) 1101 1102 # [] makes no sense as a list of valid values, so treat it as 1103 # an actual single value. 1104 if isinstance(value, list) and value != []: 1105 for v in value: 1106 if result == v: 1107 return 1108 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1109 else: 1110 self.assertEqual(result, value, 1111 '"%s" is "%s", expected "%s"' 1112 % (path, str(result), str(value))) 1113 1114 def assert_no_active_block_jobs(self): 1115 result = self.vm.qmp('query-block-jobs') 1116 self.assert_qmp(result, 'return', []) 1117 1118 def assert_has_block_node(self, node_name=None, file_name=None): 1119 """Issue a query-named-block-nodes and assert node_name and/or 1120 file_name is present in the result""" 1121 def check_equal_or_none(a, b): 1122 return a is None or b is None or a == b 1123 assert node_name or file_name 1124 result = self.vm.qmp('query-named-block-nodes') 1125 for x in result["return"]: 1126 if check_equal_or_none(x.get("node-name"), node_name) and \ 1127 check_equal_or_none(x.get("file"), file_name): 1128 return 1129 self.fail("Cannot find %s %s in result:\n%s" % 1130 (node_name, file_name, result)) 1131 1132 def assert_json_filename_equal(self, json_filename, reference): 1133 '''Asserts that the given filename is a json: filename and that its 1134 content is equal to the given reference object''' 1135 self.assertEqual(json_filename[:5], 'json:') 1136 self.assertEqual( 1137 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1138 self.vm.flatten_qmp_object(reference) 1139 ) 1140 1141 def cancel_and_wait(self, drive='drive0', force=False, 1142 resume=False, wait=60.0): 1143 '''Cancel a block job and wait for it to finish, returning the event''' 1144 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1145 self.assert_qmp(result, 'return', {}) 1146 1147 if resume: 1148 self.vm.resume_drive(drive) 1149 1150 cancelled = False 1151 result = None 1152 while not cancelled: 1153 for event in self.vm.get_qmp_events(wait=wait): 1154 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1155 event['event'] == 'BLOCK_JOB_CANCELLED': 1156 self.assert_qmp(event, 'data/device', drive) 1157 result = event 1158 cancelled = True 1159 elif event['event'] == 'JOB_STATUS_CHANGE': 1160 self.assert_qmp(event, 'data/id', drive) 1161 1162 1163 self.assert_no_active_block_jobs() 1164 return result 1165 1166 def wait_until_completed(self, drive='drive0', check_offset=True, 1167 wait=60.0, error=None): 1168 '''Wait for a block job to finish, returning the event''' 1169 while True: 1170 for event in self.vm.get_qmp_events(wait=wait): 1171 if event['event'] == 'BLOCK_JOB_COMPLETED': 1172 self.assert_qmp(event, 'data/device', drive) 1173 if error is None: 1174 self.assert_qmp_absent(event, 'data/error') 1175 if check_offset: 1176 self.assert_qmp(event, 'data/offset', 1177 event['data']['len']) 1178 else: 1179 self.assert_qmp(event, 'data/error', error) 1180 self.assert_no_active_block_jobs() 1181 return event 1182 if event['event'] == 'JOB_STATUS_CHANGE': 1183 self.assert_qmp(event, 'data/id', drive) 1184 1185 def wait_ready(self, drive='drive0'): 1186 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1187 return self.vm.events_wait([ 1188 ('BLOCK_JOB_READY', 1189 {'data': {'type': 'mirror', 'device': drive}}), 1190 ('BLOCK_JOB_READY', 1191 {'data': {'type': 'commit', 'device': drive}}) 1192 ]) 1193 1194 def wait_ready_and_cancel(self, drive='drive0'): 1195 self.wait_ready(drive=drive) 1196 event = self.cancel_and_wait(drive=drive) 1197 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1198 self.assert_qmp(event, 'data/type', 'mirror') 1199 self.assert_qmp(event, 'data/offset', event['data']['len']) 1200 1201 def complete_and_wait(self, drive='drive0', wait_ready=True, 1202 completion_error=None): 1203 '''Complete a block job and wait for it to finish''' 1204 if wait_ready: 1205 self.wait_ready(drive=drive) 1206 1207 result = self.vm.qmp('block-job-complete', device=drive) 1208 self.assert_qmp(result, 'return', {}) 1209 1210 event = self.wait_until_completed(drive=drive, error=completion_error) 1211 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1212 1213 def pause_wait(self, job_id='job0'): 1214 with Timeout(3, "Timeout waiting for job to pause"): 1215 while True: 1216 result = self.vm.qmp('query-block-jobs') 1217 found = False 1218 for job in result['return']: 1219 if job['device'] == job_id: 1220 found = True 1221 if job['paused'] and not job['busy']: 1222 return job 1223 break 1224 assert found 1225 1226 def pause_job(self, job_id='job0', wait=True): 1227 result = self.vm.qmp('block-job-pause', device=job_id) 1228 self.assert_qmp(result, 'return', {}) 1229 if wait: 1230 return self.pause_wait(job_id) 1231 return result 1232 1233 def case_skip(self, reason): 1234 '''Skip this test case''' 1235 case_notrun(reason) 1236 self.skipTest(reason) 1237 1238 1239def notrun(reason): 1240 '''Skip this test suite''' 1241 # Each test in qemu-iotests has a number ("seq") 1242 seq = os.path.basename(sys.argv[0]) 1243 1244 with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \ 1245 as outfile: 1246 outfile.write(reason + '\n') 1247 logger.warning("%s not run: %s", seq, reason) 1248 sys.exit(0) 1249 1250def case_notrun(reason): 1251 '''Mark this test case as not having been run (without actually 1252 skipping it, that is left to the caller). See 1253 QMPTestCase.case_skip() for a variant that actually skips the 1254 current test case.''' 1255 1256 # Each test in qemu-iotests has a number ("seq") 1257 seq = os.path.basename(sys.argv[0]) 1258 1259 with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \ 1260 as outfile: 1261 outfile.write(' [case not run] ' + reason + '\n') 1262 1263def _verify_image_format(supported_fmts: Sequence[str] = (), 1264 unsupported_fmts: Sequence[str] = ()) -> None: 1265 if 'generic' in supported_fmts and \ 1266 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1267 # similar to 1268 # _supported_fmt generic 1269 # for bash tests 1270 supported_fmts = () 1271 1272 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1273 if not_sup or (imgfmt in unsupported_fmts): 1274 notrun('not suitable for this image format: %s' % imgfmt) 1275 1276 if imgfmt == 'luks': 1277 verify_working_luks() 1278 1279def _verify_protocol(supported: Sequence[str] = (), 1280 unsupported: Sequence[str] = ()) -> None: 1281 assert not (supported and unsupported) 1282 1283 if 'generic' in supported: 1284 return 1285 1286 not_sup = supported and (imgproto not in supported) 1287 if not_sup or (imgproto in unsupported): 1288 notrun('not suitable for this protocol: %s' % imgproto) 1289 1290def _verify_platform(supported: Sequence[str] = (), 1291 unsupported: Sequence[str] = ()) -> None: 1292 if any((sys.platform.startswith(x) for x in unsupported)): 1293 notrun('not suitable for this OS: %s' % sys.platform) 1294 1295 if supported: 1296 if not any((sys.platform.startswith(x) for x in supported)): 1297 notrun('not suitable for this OS: %s' % sys.platform) 1298 1299def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1300 if supported_cache_modes and (cachemode not in supported_cache_modes): 1301 notrun('not suitable for this cache mode: %s' % cachemode) 1302 1303def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1304 if supported_aio_modes and (aiomode not in supported_aio_modes): 1305 notrun('not suitable for this aio mode: %s' % aiomode) 1306 1307def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1308 usf_list = list(set(required_formats) - set(supported_formats())) 1309 if usf_list: 1310 notrun(f'formats {usf_list} are not whitelisted') 1311 1312 1313def _verify_virtio_blk() -> None: 1314 out = qemu_pipe('-M', 'none', '-device', 'help') 1315 if 'virtio-blk' not in out: 1316 notrun('Missing virtio-blk in QEMU binary') 1317 1318def _verify_virtio_scsi_pci_or_ccw() -> None: 1319 out = qemu_pipe('-M', 'none', '-device', 'help') 1320 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1321 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1322 1323 1324def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1325 imgopts = os.environ.get('IMGOPTS') 1326 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1327 # but it supported only for bash tests. We don't have a concept of global 1328 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1329 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1330 unsup = list(unsupported) + ['$'] 1331 if imgopts and any(x in imgopts for x in unsup): 1332 notrun(f'not suitable for this imgopts: {imgopts}') 1333 1334 1335def supports_quorum(): 1336 return 'quorum' in qemu_img_pipe('--help') 1337 1338def verify_quorum(): 1339 '''Skip test suite if quorum support is not available''' 1340 if not supports_quorum(): 1341 notrun('quorum support missing') 1342 1343def has_working_luks() -> Tuple[bool, str]: 1344 """ 1345 Check whether our LUKS driver can actually create images 1346 (this extends to LUKS encryption for qcow2). 1347 1348 If not, return the reason why. 1349 """ 1350 1351 img_file = f'{test_dir}/luks-test.luks' 1352 (output, status) = \ 1353 qemu_img_pipe_and_status('create', '-f', 'luks', 1354 '--object', luks_default_secret_object, 1355 '-o', luks_default_key_secret_opt, 1356 '-o', 'iter-time=10', 1357 img_file, '1G') 1358 try: 1359 os.remove(img_file) 1360 except OSError: 1361 pass 1362 1363 if status != 0: 1364 reason = output 1365 for line in output.splitlines(): 1366 if img_file + ':' in line: 1367 reason = line.split(img_file + ':', 1)[1].strip() 1368 break 1369 1370 return (False, reason) 1371 else: 1372 return (True, '') 1373 1374def verify_working_luks(): 1375 """ 1376 Skip test suite if LUKS does not work 1377 """ 1378 (working, reason) = has_working_luks() 1379 if not working: 1380 notrun(reason) 1381 1382def qemu_pipe(*args: str) -> str: 1383 """ 1384 Run qemu with an option to print something and exit (e.g. a help option). 1385 1386 :return: QEMU's stdout output. 1387 """ 1388 full_args = [qemu_prog] + qemu_opts + list(args) 1389 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1390 return output 1391 1392def supported_formats(read_only=False): 1393 '''Set 'read_only' to True to check ro-whitelist 1394 Otherwise, rw-whitelist is checked''' 1395 1396 if not hasattr(supported_formats, "formats"): 1397 supported_formats.formats = {} 1398 1399 if read_only not in supported_formats.formats: 1400 format_message = qemu_pipe("-drive", "format=help") 1401 line = 1 if read_only else 0 1402 supported_formats.formats[read_only] = \ 1403 format_message.splitlines()[line].split(":")[1].split() 1404 1405 return supported_formats.formats[read_only] 1406 1407def skip_if_unsupported(required_formats=(), read_only=False): 1408 '''Skip Test Decorator 1409 Runs the test if all the required formats are whitelisted''' 1410 def skip_test_decorator(func): 1411 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1412 **kwargs: Dict[str, Any]) -> None: 1413 if callable(required_formats): 1414 fmts = required_formats(test_case) 1415 else: 1416 fmts = required_formats 1417 1418 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1419 if usf_list: 1420 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1421 test_case.case_skip(msg) 1422 else: 1423 func(test_case, *args, **kwargs) 1424 return func_wrapper 1425 return skip_test_decorator 1426 1427def skip_for_formats(formats: Sequence[str] = ()) \ 1428 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1429 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1430 '''Skip Test Decorator 1431 Skips the test for the given formats''' 1432 def skip_test_decorator(func): 1433 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1434 **kwargs: Dict[str, Any]) -> None: 1435 if imgfmt in formats: 1436 msg = f'{test_case}: Skipped for format {imgfmt}' 1437 test_case.case_skip(msg) 1438 else: 1439 func(test_case, *args, **kwargs) 1440 return func_wrapper 1441 return skip_test_decorator 1442 1443def skip_if_user_is_root(func): 1444 '''Skip Test Decorator 1445 Runs the test only without root permissions''' 1446 def func_wrapper(*args, **kwargs): 1447 if os.getuid() == 0: 1448 case_notrun('{}: cannot be run as root'.format(args[0])) 1449 return None 1450 else: 1451 return func(*args, **kwargs) 1452 return func_wrapper 1453 1454# We need to filter out the time taken from the output so that 1455# qemu-iotest can reliably diff the results against master output, 1456# and hide skipped tests from the reference output. 1457 1458class ReproducibleTestResult(unittest.TextTestResult): 1459 def addSkip(self, test, reason): 1460 # Same as TextTestResult, but print dot instead of "s" 1461 unittest.TestResult.addSkip(self, test, reason) 1462 if self.showAll: 1463 self.stream.writeln("skipped {0!r}".format(reason)) 1464 elif self.dots: 1465 self.stream.write(".") 1466 self.stream.flush() 1467 1468class ReproducibleStreamWrapper: 1469 def __init__(self, stream: TextIO): 1470 self.stream = stream 1471 1472 def __getattr__(self, attr): 1473 if attr in ('stream', '__getstate__'): 1474 raise AttributeError(attr) 1475 return getattr(self.stream, attr) 1476 1477 def write(self, arg=None): 1478 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1479 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1480 self.stream.write(arg) 1481 1482class ReproducibleTestRunner(unittest.TextTestRunner): 1483 def __init__(self, stream: Optional[TextIO] = None, 1484 resultclass: Type[unittest.TestResult] = 1485 ReproducibleTestResult, 1486 **kwargs: Any) -> None: 1487 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1488 super().__init__(stream=rstream, # type: ignore 1489 descriptions=True, 1490 resultclass=resultclass, 1491 **kwargs) 1492 1493def execute_unittest(argv: List[str], debug: bool = False) -> None: 1494 """Executes unittests within the calling module.""" 1495 1496 # Some tests have warnings, especially ResourceWarnings for unclosed 1497 # files and sockets. Ignore them for now to ensure reproducibility of 1498 # the test output. 1499 unittest.main(argv=argv, 1500 testRunner=ReproducibleTestRunner, 1501 verbosity=2 if debug else 1, 1502 warnings=None if sys.warnoptions else 'ignore') 1503 1504def execute_setup_common(supported_fmts: Sequence[str] = (), 1505 supported_platforms: Sequence[str] = (), 1506 supported_cache_modes: Sequence[str] = (), 1507 supported_aio_modes: Sequence[str] = (), 1508 unsupported_fmts: Sequence[str] = (), 1509 supported_protocols: Sequence[str] = (), 1510 unsupported_protocols: Sequence[str] = (), 1511 required_fmts: Sequence[str] = (), 1512 unsupported_imgopts: Sequence[str] = ()) -> bool: 1513 """ 1514 Perform necessary setup for either script-style or unittest-style tests. 1515 1516 :return: Bool; Whether or not debug mode has been requested via the CLI. 1517 """ 1518 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1519 1520 debug = '-d' in sys.argv 1521 if debug: 1522 sys.argv.remove('-d') 1523 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1524 1525 _verify_image_format(supported_fmts, unsupported_fmts) 1526 _verify_protocol(supported_protocols, unsupported_protocols) 1527 _verify_platform(supported=supported_platforms) 1528 _verify_cache_mode(supported_cache_modes) 1529 _verify_aio_mode(supported_aio_modes) 1530 _verify_formats(required_fmts) 1531 _verify_virtio_blk() 1532 _verify_imgopts(unsupported_imgopts) 1533 1534 return debug 1535 1536def execute_test(*args, test_function=None, **kwargs): 1537 """Run either unittest or script-style tests.""" 1538 1539 debug = execute_setup_common(*args, **kwargs) 1540 if not test_function: 1541 execute_unittest(sys.argv, debug) 1542 else: 1543 test_function() 1544 1545def activate_logging(): 1546 """Activate iotests.log() output to stdout for script-style tests.""" 1547 handler = logging.StreamHandler(stream=sys.stdout) 1548 formatter = logging.Formatter('%(message)s') 1549 handler.setFormatter(formatter) 1550 test_logger.addHandler(handler) 1551 test_logger.setLevel(logging.INFO) 1552 test_logger.propagate = False 1553 1554# This is called from script-style iotests without a single point of entry 1555def script_initialize(*args, **kwargs): 1556 """Initialize script-style tests without running any tests.""" 1557 activate_logging() 1558 execute_setup_common(*args, **kwargs) 1559 1560# This is called from script-style iotests with a single point of entry 1561def script_main(test_function, *args, **kwargs): 1562 """Run script-style tests outside of the unittest framework""" 1563 activate_logging() 1564 execute_test(*args, test_function=test_function, **kwargs) 1565 1566# This is called from unittest style iotests 1567def main(*args, **kwargs): 1568 """Run tests using the unittest framework""" 1569 execute_test(*args, **kwargs) 1570