1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import argparse 20import atexit 21import bz2 22from collections import OrderedDict 23import faulthandler 24import json 25import logging 26import os 27import re 28import shutil 29import signal 30import struct 31import subprocess 32import sys 33import time 34from typing import (Any, Callable, Dict, Iterable, Iterator, 35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar) 36import unittest 37 38from contextlib import contextmanager 39 40from qemu.machine import qtest 41from qemu.qmp import QMPMessage 42from qemu.aqmp.legacy import QEMUMonitorProtocol 43 44# Use this logger for logging messages directly from the iotests module 45logger = logging.getLogger('qemu.iotests') 46logger.addHandler(logging.NullHandler()) 47 48# Use this logger for messages that ought to be used for diff output. 49test_logger = logging.getLogger('qemu.iotests.diff_io') 50 51 52faulthandler.enable() 53 54# This will not work if arguments contain spaces but is necessary if we 55# want to support the override options that ./check supports. 56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 57if os.environ.get('QEMU_IMG_OPTIONS'): 58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 59 60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 61if os.environ.get('QEMU_IO_OPTIONS'): 62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 63 64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 66 qemu_io_args_no_fmt += \ 67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 68 69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 70qemu_nbd_args = [qemu_nbd_prog] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon') 78 79gdb_qemu_env = os.environ.get('GDB_OPTIONS') 80qemu_gdb = [] 81if gdb_qemu_env: 82 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ') 83 84qemu_print = os.environ.get('PRINT_QEMU', False) 85 86imgfmt = os.environ.get('IMGFMT', 'raw') 87imgproto = os.environ.get('IMGPROTO', 'file') 88output_dir = os.environ.get('OUTPUT_DIR', '.') 89 90try: 91 test_dir = os.environ['TEST_DIR'] 92 sock_dir = os.environ['SOCK_DIR'] 93 cachemode = os.environ['CACHEMODE'] 94 aiomode = os.environ['AIOMODE'] 95 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 96except KeyError: 97 # We are using these variables as proxies to indicate that we're 98 # not being run via "check". There may be other things set up by 99 # "check" that individual test cases rely on. 100 sys.stderr.write('Please run this test via the "check" script\n') 101 sys.exit(os.EX_USAGE) 102 103qemu_valgrind = [] 104if os.environ.get('VALGRIND_QEMU') == "y" and \ 105 os.environ.get('NO_VALGRIND') != "y": 106 valgrind_logfile = "--log-file=" + test_dir 107 # %p allows to put the valgrind process PID, since 108 # we don't know it a priori (subprocess.Popen is 109 # not yet invoked) 110 valgrind_logfile += "/%p.valgrind" 111 112 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99'] 113 114luks_default_secret_object = 'secret,id=keysec0,data=' + \ 115 os.environ.get('IMGKEYSECRET', '') 116luks_default_key_secret_opt = 'key-secret=keysec0' 117 118sample_img_dir = os.environ['SAMPLE_IMG_DIR'] 119 120 121@contextmanager 122def change_log_level( 123 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]: 124 """ 125 Utility function for temporarily changing the log level of a logger. 126 127 This can be used to silence errors that are expected or uninteresting. 128 """ 129 _logger = logging.getLogger(logger_name) 130 current_level = _logger.level 131 _logger.setLevel(level) 132 133 try: 134 yield 135 finally: 136 _logger.setLevel(current_level) 137 138 139def unarchive_sample_image(sample, fname): 140 sample_fname = os.path.join(sample_img_dir, sample + '.bz2') 141 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out: 142 shutil.copyfileobj(f_in, f_out) 143 144 145def qemu_tool_popen(args: Sequence[str], 146 connect_stderr: bool = True) -> 'subprocess.Popen[str]': 147 stderr = subprocess.STDOUT if connect_stderr else None 148 # pylint: disable=consider-using-with 149 return subprocess.Popen(args, 150 stdout=subprocess.PIPE, 151 stderr=stderr, 152 universal_newlines=True) 153 154 155def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 156 connect_stderr: bool = True, 157 drop_successful_output: bool = False) \ 158 -> Tuple[str, int]: 159 """ 160 Run a tool and return both its output and its exit code 161 """ 162 with qemu_tool_popen(args, connect_stderr) as subp: 163 output = subp.communicate()[0] 164 if subp.returncode < 0: 165 cmd = ' '.join(args) 166 sys.stderr.write(f'{tool} received signal \ 167 {-subp.returncode}: {cmd}\n') 168 if drop_successful_output and subp.returncode == 0: 169 output = '' 170 return (output, subp.returncode) 171 172def qemu_img_create_prepare_args(args: List[str]) -> List[str]: 173 if not args or args[0] != 'create': 174 return list(args) 175 args = args[1:] 176 177 p = argparse.ArgumentParser(allow_abbrev=False) 178 # -o option may be specified several times 179 p.add_argument('-o', action='append', default=[]) 180 p.add_argument('-f') 181 parsed, remaining = p.parse_known_args(args) 182 183 opts_list = parsed.o 184 185 result = ['create'] 186 if parsed.f is not None: 187 result += ['-f', parsed.f] 188 189 # IMGOPTS most probably contain options specific for the selected format, 190 # like extended_l2 or compression_type for qcow2. Test may want to create 191 # additional images in other formats that doesn't support these options. 192 # So, use IMGOPTS only for images created in imgfmt format. 193 imgopts = os.environ.get('IMGOPTS') 194 if imgopts and parsed.f == imgfmt: 195 opts_list.insert(0, imgopts) 196 197 # default luks support 198 if parsed.f == 'luks' and \ 199 all('key-secret' not in opts for opts in opts_list): 200 result += ['--object', luks_default_secret_object] 201 opts_list.append(luks_default_key_secret_opt) 202 203 for opts in opts_list: 204 result += ['-o', opts] 205 206 result += remaining 207 208 return result 209 210def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 211 """ 212 Run qemu-img and return both its output and its exit code 213 """ 214 is_create = bool(args and args[0] == 'create') 215 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args)) 216 return qemu_tool_pipe_and_status('qemu-img', full_args, 217 drop_successful_output=is_create) 218 219def qemu_img(*args: str) -> int: 220 '''Run qemu-img and return the exit code''' 221 return qemu_img_pipe_and_status(*args)[1] 222 223def ordered_qmp(qmsg, conv_keys=True): 224 # Dictionaries are not ordered prior to 3.6, therefore: 225 if isinstance(qmsg, list): 226 return [ordered_qmp(atom) for atom in qmsg] 227 if isinstance(qmsg, dict): 228 od = OrderedDict() 229 for k, v in sorted(qmsg.items()): 230 if conv_keys: 231 k = k.replace('_', '-') 232 od[k] = ordered_qmp(v, conv_keys=False) 233 return od 234 return qmsg 235 236def qemu_img_create(*args): 237 return qemu_img('create', *args) 238 239def qemu_img_measure(*args): 240 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 241 242def qemu_img_check(*args): 243 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 244 245def qemu_img_pipe(*args: str) -> str: 246 '''Run qemu-img and return its output''' 247 return qemu_img_pipe_and_status(*args)[0] 248 249def qemu_img_log(*args): 250 result = qemu_img_pipe(*args) 251 log(result, filters=[filter_testfiles]) 252 return result 253 254def img_info_log(filename, filter_path=None, use_image_opts=False, 255 extra_args=()): 256 args = ['info'] 257 if use_image_opts: 258 args.append('--image-opts') 259 else: 260 args += ['-f', imgfmt] 261 args += extra_args 262 args.append(filename) 263 264 output = qemu_img_pipe(*args) 265 if not filter_path: 266 filter_path = filename 267 log(filter_img_info(output, filter_path)) 268 269def qemu_io_wrap_args(args: Sequence[str]) -> List[str]: 270 if '-f' in args or '--image-opts' in args: 271 return qemu_io_args_no_fmt + list(args) 272 else: 273 return qemu_io_args + list(args) 274 275def qemu_io_popen(*args): 276 return qemu_tool_popen(qemu_io_wrap_args(args)) 277 278def qemu_io(*args): 279 '''Run qemu-io and return the stdout data''' 280 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0] 281 282def qemu_io_log(*args): 283 result = qemu_io(*args) 284 log(result, filters=[filter_testfiles, filter_qemu_io]) 285 return result 286 287def qemu_io_silent(*args): 288 '''Run qemu-io and return the exit code, suppressing stdout''' 289 args = qemu_io_wrap_args(args) 290 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False) 291 if result.returncode < 0: 292 sys.stderr.write('qemu-io received signal %i: %s\n' % 293 (-result.returncode, ' '.join(args))) 294 return result.returncode 295 296def qemu_io_silent_check(*args): 297 '''Run qemu-io and return the true if subprocess returned 0''' 298 args = qemu_io_wrap_args(args) 299 result = subprocess.run(args, stdout=subprocess.DEVNULL, 300 stderr=subprocess.STDOUT, check=False) 301 return result.returncode == 0 302 303class QemuIoInteractive: 304 def __init__(self, *args): 305 self.args = qemu_io_wrap_args(args) 306 # We need to keep the Popen objext around, and not 307 # close it immediately. Therefore, disable the pylint check: 308 # pylint: disable=consider-using-with 309 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 310 stdout=subprocess.PIPE, 311 stderr=subprocess.STDOUT, 312 universal_newlines=True) 313 out = self._p.stdout.read(9) 314 if out != 'qemu-io> ': 315 # Most probably qemu-io just failed to start. 316 # Let's collect the whole output and exit. 317 out += self._p.stdout.read() 318 self._p.wait(timeout=1) 319 raise ValueError(out) 320 321 def close(self): 322 self._p.communicate('q\n') 323 324 def _read_output(self): 325 pattern = 'qemu-io> ' 326 n = len(pattern) 327 pos = 0 328 s = [] 329 while pos != n: 330 c = self._p.stdout.read(1) 331 # check unexpected EOF 332 assert c != '' 333 s.append(c) 334 if c == pattern[pos]: 335 pos += 1 336 else: 337 pos = 0 338 339 return ''.join(s[:-n]) 340 341 def cmd(self, cmd): 342 # quit command is in close(), '\n' is added automatically 343 assert '\n' not in cmd 344 cmd = cmd.strip() 345 assert cmd not in ('q', 'quit') 346 self._p.stdin.write(cmd + '\n') 347 self._p.stdin.flush() 348 return self._read_output() 349 350 351class QemuStorageDaemon: 352 _qmp: Optional[QEMUMonitorProtocol] = None 353 _qmpsock: Optional[str] = None 354 # Python < 3.8 would complain if this type were not a string literal 355 # (importing `annotations` from `__future__` would work; but not on <= 3.6) 356 _p: 'Optional[subprocess.Popen[bytes]]' = None 357 358 def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False): 359 assert '--pidfile' not in args 360 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid') 361 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile] 362 363 if qmp: 364 self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock') 365 all_args += ['--chardev', 366 f'socket,id=qmp-sock,path={self._qmpsock}', 367 '--monitor', 'qmp-sock'] 368 369 self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True) 370 371 # Cannot use with here, we want the subprocess to stay around 372 # pylint: disable=consider-using-with 373 self._p = subprocess.Popen(all_args) 374 if self._qmp is not None: 375 self._qmp.accept() 376 while not os.path.exists(self.pidfile): 377 if self._p.poll() is not None: 378 cmd = ' '.join(all_args) 379 raise RuntimeError( 380 'qemu-storage-daemon terminated with exit code ' + 381 f'{self._p.returncode}: {cmd}') 382 383 time.sleep(0.01) 384 385 with open(self.pidfile, encoding='utf-8') as f: 386 self._pid = int(f.read().strip()) 387 388 assert self._pid == self._p.pid 389 390 def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \ 391 -> QMPMessage: 392 assert self._qmp is not None 393 return self._qmp.cmd(cmd, args) 394 395 def stop(self, kill_signal=15): 396 self._p.send_signal(kill_signal) 397 self._p.wait() 398 self._p = None 399 400 if self._qmp: 401 self._qmp.close() 402 403 if self._qmpsock is not None: 404 try: 405 os.remove(self._qmpsock) 406 except OSError: 407 pass 408 try: 409 os.remove(self.pidfile) 410 except OSError: 411 pass 412 413 def __del__(self): 414 if self._p is not None: 415 self.stop(kill_signal=9) 416 417 418def qemu_nbd(*args): 419 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 420 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 421 422def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 423 '''Run qemu-nbd in daemon mode and return both the parent's exit code 424 and its output in case of an error''' 425 full_args = qemu_nbd_args + ['--fork'] + list(args) 426 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 427 connect_stderr=False) 428 return returncode, output if returncode else '' 429 430def qemu_nbd_list_log(*args: str) -> str: 431 '''Run qemu-nbd to list remote exports''' 432 full_args = [qemu_nbd_prog, '-L'] + list(args) 433 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 434 log(output, filters=[filter_testfiles, filter_nbd_exports]) 435 return output 436 437@contextmanager 438def qemu_nbd_popen(*args): 439 '''Context manager running qemu-nbd within the context''' 440 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 441 442 assert not os.path.exists(pid_file) 443 444 cmd = list(qemu_nbd_args) 445 cmd.extend(('--persistent', '--pid-file', pid_file)) 446 cmd.extend(args) 447 448 log('Start NBD server') 449 with subprocess.Popen(cmd) as p: 450 try: 451 while not os.path.exists(pid_file): 452 if p.poll() is not None: 453 raise RuntimeError( 454 "qemu-nbd terminated with exit code {}: {}" 455 .format(p.returncode, ' '.join(cmd))) 456 457 time.sleep(0.01) 458 yield 459 finally: 460 if os.path.exists(pid_file): 461 os.remove(pid_file) 462 log('Kill NBD server') 463 p.kill() 464 p.wait() 465 466def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 467 '''Return True if two image files are identical''' 468 return qemu_img('compare', '-f', fmt1, 469 '-F', fmt2, img1, img2) == 0 470 471def create_image(name, size): 472 '''Create a fully-allocated raw image with sector markers''' 473 with open(name, 'wb') as file: 474 i = 0 475 while i < size: 476 sector = struct.pack('>l504xl', i // 512, i // 512) 477 file.write(sector) 478 i = i + 512 479 480def image_size(img): 481 '''Return image's virtual size''' 482 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 483 return json.loads(r)['virtual-size'] 484 485def is_str(val): 486 return isinstance(val, str) 487 488test_dir_re = re.compile(r"%s" % test_dir) 489def filter_test_dir(msg): 490 return test_dir_re.sub("TEST_DIR", msg) 491 492win32_re = re.compile(r"\r") 493def filter_win32(msg): 494 return win32_re.sub("", msg) 495 496qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 497 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 498 r"and [0-9\/.inf]* ops\/sec\)") 499def filter_qemu_io(msg): 500 msg = filter_win32(msg) 501 return qemu_io_re.sub("X ops; XX:XX:XX.X " 502 "(XXX YYY/sec and XXX ops/sec)", msg) 503 504chown_re = re.compile(r"chown [0-9]+:[0-9]+") 505def filter_chown(msg): 506 return chown_re.sub("chown UID:GID", msg) 507 508def filter_qmp_event(event): 509 '''Filter a QMP event dict''' 510 event = dict(event) 511 if 'timestamp' in event: 512 event['timestamp']['seconds'] = 'SECS' 513 event['timestamp']['microseconds'] = 'USECS' 514 return event 515 516def filter_qmp(qmsg, filter_fn): 517 '''Given a string filter, filter a QMP object's values. 518 filter_fn takes a (key, value) pair.''' 519 # Iterate through either lists or dicts; 520 if isinstance(qmsg, list): 521 items = enumerate(qmsg) 522 else: 523 items = qmsg.items() 524 525 for k, v in items: 526 if isinstance(v, (dict, list)): 527 qmsg[k] = filter_qmp(v, filter_fn) 528 else: 529 qmsg[k] = filter_fn(k, v) 530 return qmsg 531 532def filter_testfiles(msg): 533 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 534 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 535 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 536 537def filter_qmp_testfiles(qmsg): 538 def _filter(_key, value): 539 if is_str(value): 540 return filter_testfiles(value) 541 return value 542 return filter_qmp(qmsg, _filter) 543 544def filter_virtio_scsi(output: str) -> str: 545 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 546 547def filter_qmp_virtio_scsi(qmsg): 548 def _filter(_key, value): 549 if is_str(value): 550 return filter_virtio_scsi(value) 551 return value 552 return filter_qmp(qmsg, _filter) 553 554def filter_generated_node_ids(msg): 555 return re.sub("#block[0-9]+", "NODE_NAME", msg) 556 557def filter_img_info(output, filename): 558 lines = [] 559 for line in output.split('\n'): 560 if 'disk size' in line or 'actual-size' in line: 561 continue 562 line = line.replace(filename, 'TEST_IMG') 563 line = filter_testfiles(line) 564 line = line.replace(imgfmt, 'IMGFMT') 565 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 566 line = re.sub('uuid: [-a-f0-9]+', 567 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 568 line) 569 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 570 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE', 571 line) 572 lines.append(line) 573 return '\n'.join(lines) 574 575def filter_imgfmt(msg): 576 return msg.replace(imgfmt, 'IMGFMT') 577 578def filter_qmp_imgfmt(qmsg): 579 def _filter(_key, value): 580 if is_str(value): 581 return filter_imgfmt(value) 582 return value 583 return filter_qmp(qmsg, _filter) 584 585def filter_nbd_exports(output: str) -> str: 586 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 587 588 589Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 590 591def log(msg: Msg, 592 filters: Iterable[Callable[[Msg], Msg]] = (), 593 indent: Optional[int] = None) -> None: 594 """ 595 Logs either a string message or a JSON serializable message (like QMP). 596 If indent is provided, JSON serializable messages are pretty-printed. 597 """ 598 for flt in filters: 599 msg = flt(msg) 600 if isinstance(msg, (dict, list)): 601 # Don't sort if it's already sorted 602 do_sort = not isinstance(msg, OrderedDict) 603 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 604 else: 605 test_logger.info(msg) 606 607class Timeout: 608 def __init__(self, seconds, errmsg="Timeout"): 609 self.seconds = seconds 610 self.errmsg = errmsg 611 def __enter__(self): 612 if qemu_gdb or qemu_valgrind: 613 return self 614 signal.signal(signal.SIGALRM, self.timeout) 615 signal.setitimer(signal.ITIMER_REAL, self.seconds) 616 return self 617 def __exit__(self, exc_type, value, traceback): 618 if qemu_gdb or qemu_valgrind: 619 return False 620 signal.setitimer(signal.ITIMER_REAL, 0) 621 return False 622 def timeout(self, signum, frame): 623 raise Exception(self.errmsg) 624 625def file_pattern(name): 626 return "{0}-{1}".format(os.getpid(), name) 627 628class FilePath: 629 """ 630 Context manager generating multiple file names. The generated files are 631 removed when exiting the context. 632 633 Example usage: 634 635 with FilePath('a.img', 'b.img') as (img_a, img_b): 636 # Use img_a and img_b here... 637 638 # a.img and b.img are automatically removed here. 639 640 By default images are created in iotests.test_dir. To create sockets use 641 iotests.sock_dir: 642 643 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 644 645 For convenience, calling with one argument yields a single file instead of 646 a tuple with one item. 647 648 """ 649 def __init__(self, *names, base_dir=test_dir): 650 self.paths = [os.path.join(base_dir, file_pattern(name)) 651 for name in names] 652 653 def __enter__(self): 654 if len(self.paths) == 1: 655 return self.paths[0] 656 else: 657 return self.paths 658 659 def __exit__(self, exc_type, exc_val, exc_tb): 660 for path in self.paths: 661 try: 662 os.remove(path) 663 except OSError: 664 pass 665 return False 666 667 668def try_remove(img): 669 try: 670 os.remove(img) 671 except OSError: 672 pass 673 674def file_path_remover(): 675 for path in reversed(file_path_remover.paths): 676 try_remove(path) 677 678 679def file_path(*names, base_dir=test_dir): 680 ''' Another way to get auto-generated filename that cleans itself up. 681 682 Use is as simple as: 683 684 img_a, img_b = file_path('a.img', 'b.img') 685 sock = file_path('socket') 686 ''' 687 688 if not hasattr(file_path_remover, 'paths'): 689 file_path_remover.paths = [] 690 atexit.register(file_path_remover) 691 692 paths = [] 693 for name in names: 694 filename = file_pattern(name) 695 path = os.path.join(base_dir, filename) 696 file_path_remover.paths.append(path) 697 paths.append(path) 698 699 return paths[0] if len(paths) == 1 else paths 700 701def remote_filename(path): 702 if imgproto == 'file': 703 return path 704 elif imgproto == 'ssh': 705 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 706 else: 707 raise Exception("Protocol %s not supported" % (imgproto)) 708 709class VM(qtest.QEMUQtestMachine): 710 '''A QEMU VM''' 711 712 def __init__(self, path_suffix=''): 713 name = "qemu%s-%d" % (path_suffix, os.getpid()) 714 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None 715 if qemu_gdb and qemu_valgrind: 716 sys.stderr.write('gdb and valgrind are mutually exclusive\n') 717 sys.exit(1) 718 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind 719 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper, 720 name=name, 721 base_temp_dir=test_dir, 722 sock_dir=sock_dir, qmp_timer=timer) 723 self._num_drives = 0 724 725 def _post_shutdown(self) -> None: 726 super()._post_shutdown() 727 if not qemu_valgrind or not self._popen: 728 return 729 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind" 730 if self.exitcode() == 99: 731 with open(valgrind_filename, encoding='utf-8') as f: 732 print(f.read()) 733 else: 734 os.remove(valgrind_filename) 735 736 def _pre_launch(self) -> None: 737 super()._pre_launch() 738 if qemu_print: 739 # set QEMU binary output to stdout 740 self._close_qemu_log_file() 741 742 def add_object(self, opts): 743 self._args.append('-object') 744 self._args.append(opts) 745 return self 746 747 def add_device(self, opts): 748 self._args.append('-device') 749 self._args.append(opts) 750 return self 751 752 def add_drive_raw(self, opts): 753 self._args.append('-drive') 754 self._args.append(opts) 755 return self 756 757 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 758 '''Add a virtio-blk drive to the VM''' 759 options = ['if=%s' % interface, 760 'id=drive%d' % self._num_drives] 761 762 if path is not None: 763 options.append('file=%s' % path) 764 options.append('format=%s' % img_format) 765 options.append('cache=%s' % cachemode) 766 options.append('aio=%s' % aiomode) 767 768 if opts: 769 options.append(opts) 770 771 if img_format == 'luks' and 'key-secret' not in opts: 772 # default luks support 773 if luks_default_secret_object not in self._args: 774 self.add_object(luks_default_secret_object) 775 776 options.append(luks_default_key_secret_opt) 777 778 self._args.append('-drive') 779 self._args.append(','.join(options)) 780 self._num_drives += 1 781 return self 782 783 def add_blockdev(self, opts): 784 self._args.append('-blockdev') 785 if isinstance(opts, str): 786 self._args.append(opts) 787 else: 788 self._args.append(','.join(opts)) 789 return self 790 791 def add_incoming(self, addr): 792 self._args.append('-incoming') 793 self._args.append(addr) 794 return self 795 796 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 797 cmd = 'human-monitor-command' 798 kwargs: Dict[str, Any] = {'command-line': command_line} 799 if use_log: 800 return self.qmp_log(cmd, **kwargs) 801 else: 802 return self.qmp(cmd, **kwargs) 803 804 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 805 """Pause drive r/w operations""" 806 if not event: 807 self.pause_drive(drive, "read_aio") 808 self.pause_drive(drive, "write_aio") 809 return 810 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 811 812 def resume_drive(self, drive: str) -> None: 813 """Resume drive r/w operations""" 814 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 815 816 def hmp_qemu_io(self, drive: str, cmd: str, 817 use_log: bool = False, qdev: bool = False) -> QMPMessage: 818 """Write to a given drive using an HMP command""" 819 d = '-d ' if qdev else '' 820 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log) 821 822 def flatten_qmp_object(self, obj, output=None, basestr=''): 823 if output is None: 824 output = {} 825 if isinstance(obj, list): 826 for i, item in enumerate(obj): 827 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 828 elif isinstance(obj, dict): 829 for key in obj: 830 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 831 else: 832 output[basestr[:-1]] = obj # Strip trailing '.' 833 return output 834 835 def qmp_to_opts(self, obj): 836 obj = self.flatten_qmp_object(obj) 837 output_list = [] 838 for key in obj: 839 output_list += [key + '=' + obj[key]] 840 return ','.join(output_list) 841 842 def get_qmp_events_filtered(self, wait=60.0): 843 result = [] 844 for ev in self.get_qmp_events(wait=wait): 845 result.append(filter_qmp_event(ev)) 846 return result 847 848 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 849 full_cmd = OrderedDict(( 850 ("execute", cmd), 851 ("arguments", ordered_qmp(kwargs)) 852 )) 853 log(full_cmd, filters, indent=indent) 854 result = self.qmp(cmd, **kwargs) 855 log(result, filters, indent=indent) 856 return result 857 858 # Returns None on success, and an error string on failure 859 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 860 pre_finalize=None, cancel=False, wait=60.0): 861 """ 862 run_job moves a job from creation through to dismissal. 863 864 :param job: String. ID of recently-launched job 865 :param auto_finalize: Bool. True if the job was launched with 866 auto_finalize. Defaults to True. 867 :param auto_dismiss: Bool. True if the job was launched with 868 auto_dismiss=True. Defaults to False. 869 :param pre_finalize: Callback. A callable that takes no arguments to be 870 invoked prior to issuing job-finalize, if any. 871 :param cancel: Bool. When true, cancels the job after the pre_finalize 872 callback. 873 :param wait: Float. Timeout value specifying how long to wait for any 874 event, in seconds. Defaults to 60.0. 875 """ 876 match_device = {'data': {'device': job}} 877 match_id = {'data': {'id': job}} 878 events = [ 879 ('BLOCK_JOB_COMPLETED', match_device), 880 ('BLOCK_JOB_CANCELLED', match_device), 881 ('BLOCK_JOB_ERROR', match_device), 882 ('BLOCK_JOB_READY', match_device), 883 ('BLOCK_JOB_PENDING', match_id), 884 ('JOB_STATUS_CHANGE', match_id) 885 ] 886 error = None 887 while True: 888 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 889 if ev['event'] != 'JOB_STATUS_CHANGE': 890 log(ev) 891 continue 892 status = ev['data']['status'] 893 if status == 'aborting': 894 result = self.qmp('query-jobs') 895 for j in result['return']: 896 if j['id'] == job: 897 error = j['error'] 898 log('Job failed: %s' % (j['error'])) 899 elif status == 'ready': 900 self.qmp_log('job-complete', id=job) 901 elif status == 'pending' and not auto_finalize: 902 if pre_finalize: 903 pre_finalize() 904 if cancel: 905 self.qmp_log('job-cancel', id=job) 906 else: 907 self.qmp_log('job-finalize', id=job) 908 elif status == 'concluded' and not auto_dismiss: 909 self.qmp_log('job-dismiss', id=job) 910 elif status == 'null': 911 return error 912 913 # Returns None on success, and an error string on failure 914 def blockdev_create(self, options, job_id='job0', filters=None): 915 if filters is None: 916 filters = [filter_qmp_testfiles] 917 result = self.qmp_log('blockdev-create', filters=filters, 918 job_id=job_id, options=options) 919 920 if 'return' in result: 921 assert result['return'] == {} 922 job_result = self.run_job(job_id) 923 else: 924 job_result = result['error'] 925 926 log("") 927 return job_result 928 929 def enable_migration_events(self, name): 930 log('Enabling migration QMP events on %s...' % name) 931 log(self.qmp('migrate-set-capabilities', capabilities=[ 932 { 933 'capability': 'events', 934 'state': True 935 } 936 ])) 937 938 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 939 while True: 940 event = self.event_wait('MIGRATION') 941 # We use the default timeout, and with a timeout, event_wait() 942 # never returns None 943 assert event 944 945 log(event, filters=[filter_qmp_event]) 946 if event['data']['status'] in ('completed', 'failed'): 947 break 948 949 if event['data']['status'] == 'completed': 950 # The event may occur in finish-migrate, so wait for the expected 951 # post-migration runstate 952 runstate = None 953 while runstate != expect_runstate: 954 runstate = self.qmp('query-status')['return']['status'] 955 return True 956 else: 957 return False 958 959 def node_info(self, node_name): 960 nodes = self.qmp('query-named-block-nodes') 961 for x in nodes['return']: 962 if x['node-name'] == node_name: 963 return x 964 return None 965 966 def query_bitmaps(self): 967 res = self.qmp("query-named-block-nodes") 968 return {device['node-name']: device['dirty-bitmaps'] 969 for device in res['return'] if 'dirty-bitmaps' in device} 970 971 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 972 """ 973 get a specific bitmap from the object returned by query_bitmaps. 974 :param recording: If specified, filter results by the specified value. 975 :param bitmaps: If specified, use it instead of call query_bitmaps() 976 """ 977 if bitmaps is None: 978 bitmaps = self.query_bitmaps() 979 980 for bitmap in bitmaps[node_name]: 981 if bitmap.get('name', '') == bitmap_name: 982 if recording is None or bitmap.get('recording') == recording: 983 return bitmap 984 return None 985 986 def check_bitmap_status(self, node_name, bitmap_name, fields): 987 ret = self.get_bitmap(node_name, bitmap_name) 988 989 return fields.items() <= ret.items() 990 991 def assert_block_path(self, root, path, expected_node, graph=None): 992 """ 993 Check whether the node under the given path in the block graph 994 is @expected_node. 995 996 @root is the node name of the node where the @path is rooted. 997 998 @path is a string that consists of child names separated by 999 slashes. It must begin with a slash. 1000 1001 Examples for @root + @path: 1002 - root="qcow2-node", path="/backing/file" 1003 - root="quorum-node", path="/children.2/file" 1004 1005 Hypothetically, @path could be empty, in which case it would 1006 point to @root. However, in practice this case is not useful 1007 and hence not allowed. 1008 1009 @expected_node may be None. (All elements of the path but the 1010 leaf must still exist.) 1011 1012 @graph may be None or the result of an x-debug-query-block-graph 1013 call that has already been performed. 1014 """ 1015 if graph is None: 1016 graph = self.qmp('x-debug-query-block-graph')['return'] 1017 1018 iter_path = iter(path.split('/')) 1019 1020 # Must start with a / 1021 assert next(iter_path) == '' 1022 1023 node = next((node for node in graph['nodes'] if node['name'] == root), 1024 None) 1025 1026 # An empty @path is not allowed, so the root node must be present 1027 assert node is not None, 'Root node %s not found' % root 1028 1029 for child_name in iter_path: 1030 assert node is not None, 'Cannot follow path %s%s' % (root, path) 1031 1032 try: 1033 node_id = next(edge['child'] for edge in graph['edges'] 1034 if (edge['parent'] == node['id'] and 1035 edge['name'] == child_name)) 1036 1037 node = next(node for node in graph['nodes'] 1038 if node['id'] == node_id) 1039 1040 except StopIteration: 1041 node = None 1042 1043 if node is None: 1044 assert expected_node is None, \ 1045 'No node found under %s (but expected %s)' % \ 1046 (path, expected_node) 1047 else: 1048 assert node['name'] == expected_node, \ 1049 'Found node %s under %s (but expected %s)' % \ 1050 (node['name'], path, expected_node) 1051 1052index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 1053 1054class QMPTestCase(unittest.TestCase): 1055 '''Abstract base class for QMP test cases''' 1056 1057 def __init__(self, *args, **kwargs): 1058 super().__init__(*args, **kwargs) 1059 # Many users of this class set a VM property we rely on heavily 1060 # in the methods below. 1061 self.vm = None 1062 1063 def dictpath(self, d, path): 1064 '''Traverse a path in a nested dict''' 1065 for component in path.split('/'): 1066 m = index_re.match(component) 1067 if m: 1068 component, idx = m.groups() 1069 idx = int(idx) 1070 1071 if not isinstance(d, dict) or component not in d: 1072 self.fail(f'failed path traversal for "{path}" in "{d}"') 1073 d = d[component] 1074 1075 if m: 1076 if not isinstance(d, list): 1077 self.fail(f'path component "{component}" in "{path}" ' 1078 f'is not a list in "{d}"') 1079 try: 1080 d = d[idx] 1081 except IndexError: 1082 self.fail(f'invalid index "{idx}" in path "{path}" ' 1083 f'in "{d}"') 1084 return d 1085 1086 def assert_qmp_absent(self, d, path): 1087 try: 1088 result = self.dictpath(d, path) 1089 except AssertionError: 1090 return 1091 self.fail('path "%s" has value "%s"' % (path, str(result))) 1092 1093 def assert_qmp(self, d, path, value): 1094 '''Assert that the value for a specific path in a QMP dict 1095 matches. When given a list of values, assert that any of 1096 them matches.''' 1097 1098 result = self.dictpath(d, path) 1099 1100 # [] makes no sense as a list of valid values, so treat it as 1101 # an actual single value. 1102 if isinstance(value, list) and value != []: 1103 for v in value: 1104 if result == v: 1105 return 1106 self.fail('no match for "%s" in %s' % (str(result), str(value))) 1107 else: 1108 self.assertEqual(result, value, 1109 '"%s" is "%s", expected "%s"' 1110 % (path, str(result), str(value))) 1111 1112 def assert_no_active_block_jobs(self): 1113 result = self.vm.qmp('query-block-jobs') 1114 self.assert_qmp(result, 'return', []) 1115 1116 def assert_has_block_node(self, node_name=None, file_name=None): 1117 """Issue a query-named-block-nodes and assert node_name and/or 1118 file_name is present in the result""" 1119 def check_equal_or_none(a, b): 1120 return a is None or b is None or a == b 1121 assert node_name or file_name 1122 result = self.vm.qmp('query-named-block-nodes') 1123 for x in result["return"]: 1124 if check_equal_or_none(x.get("node-name"), node_name) and \ 1125 check_equal_or_none(x.get("file"), file_name): 1126 return 1127 self.fail("Cannot find %s %s in result:\n%s" % 1128 (node_name, file_name, result)) 1129 1130 def assert_json_filename_equal(self, json_filename, reference): 1131 '''Asserts that the given filename is a json: filename and that its 1132 content is equal to the given reference object''' 1133 self.assertEqual(json_filename[:5], 'json:') 1134 self.assertEqual( 1135 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 1136 self.vm.flatten_qmp_object(reference) 1137 ) 1138 1139 def cancel_and_wait(self, drive='drive0', force=False, 1140 resume=False, wait=60.0): 1141 '''Cancel a block job and wait for it to finish, returning the event''' 1142 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 1143 self.assert_qmp(result, 'return', {}) 1144 1145 if resume: 1146 self.vm.resume_drive(drive) 1147 1148 cancelled = False 1149 result = None 1150 while not cancelled: 1151 for event in self.vm.get_qmp_events(wait=wait): 1152 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 1153 event['event'] == 'BLOCK_JOB_CANCELLED': 1154 self.assert_qmp(event, 'data/device', drive) 1155 result = event 1156 cancelled = True 1157 elif event['event'] == 'JOB_STATUS_CHANGE': 1158 self.assert_qmp(event, 'data/id', drive) 1159 1160 1161 self.assert_no_active_block_jobs() 1162 return result 1163 1164 def wait_until_completed(self, drive='drive0', check_offset=True, 1165 wait=60.0, error=None): 1166 '''Wait for a block job to finish, returning the event''' 1167 while True: 1168 for event in self.vm.get_qmp_events(wait=wait): 1169 if event['event'] == 'BLOCK_JOB_COMPLETED': 1170 self.assert_qmp(event, 'data/device', drive) 1171 if error is None: 1172 self.assert_qmp_absent(event, 'data/error') 1173 if check_offset: 1174 self.assert_qmp(event, 'data/offset', 1175 event['data']['len']) 1176 else: 1177 self.assert_qmp(event, 'data/error', error) 1178 self.assert_no_active_block_jobs() 1179 return event 1180 if event['event'] == 'JOB_STATUS_CHANGE': 1181 self.assert_qmp(event, 'data/id', drive) 1182 1183 def wait_ready(self, drive='drive0'): 1184 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1185 return self.vm.events_wait([ 1186 ('BLOCK_JOB_READY', 1187 {'data': {'type': 'mirror', 'device': drive}}), 1188 ('BLOCK_JOB_READY', 1189 {'data': {'type': 'commit', 'device': drive}}) 1190 ]) 1191 1192 def wait_ready_and_cancel(self, drive='drive0'): 1193 self.wait_ready(drive=drive) 1194 event = self.cancel_and_wait(drive=drive) 1195 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1196 self.assert_qmp(event, 'data/type', 'mirror') 1197 self.assert_qmp(event, 'data/offset', event['data']['len']) 1198 1199 def complete_and_wait(self, drive='drive0', wait_ready=True, 1200 completion_error=None): 1201 '''Complete a block job and wait for it to finish''' 1202 if wait_ready: 1203 self.wait_ready(drive=drive) 1204 1205 result = self.vm.qmp('block-job-complete', device=drive) 1206 self.assert_qmp(result, 'return', {}) 1207 1208 event = self.wait_until_completed(drive=drive, error=completion_error) 1209 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1210 1211 def pause_wait(self, job_id='job0'): 1212 with Timeout(3, "Timeout waiting for job to pause"): 1213 while True: 1214 result = self.vm.qmp('query-block-jobs') 1215 found = False 1216 for job in result['return']: 1217 if job['device'] == job_id: 1218 found = True 1219 if job['paused'] and not job['busy']: 1220 return job 1221 break 1222 assert found 1223 1224 def pause_job(self, job_id='job0', wait=True): 1225 result = self.vm.qmp('block-job-pause', device=job_id) 1226 self.assert_qmp(result, 'return', {}) 1227 if wait: 1228 return self.pause_wait(job_id) 1229 return result 1230 1231 def case_skip(self, reason): 1232 '''Skip this test case''' 1233 case_notrun(reason) 1234 self.skipTest(reason) 1235 1236 1237def notrun(reason): 1238 '''Skip this test suite''' 1239 # Each test in qemu-iotests has a number ("seq") 1240 seq = os.path.basename(sys.argv[0]) 1241 1242 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \ 1243 as outfile: 1244 outfile.write(reason + '\n') 1245 logger.warning("%s not run: %s", seq, reason) 1246 sys.exit(0) 1247 1248def case_notrun(reason): 1249 '''Mark this test case as not having been run (without actually 1250 skipping it, that is left to the caller). See 1251 QMPTestCase.case_skip() for a variant that actually skips the 1252 current test case.''' 1253 1254 # Each test in qemu-iotests has a number ("seq") 1255 seq = os.path.basename(sys.argv[0]) 1256 1257 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \ 1258 as outfile: 1259 outfile.write(' [case not run] ' + reason + '\n') 1260 1261def _verify_image_format(supported_fmts: Sequence[str] = (), 1262 unsupported_fmts: Sequence[str] = ()) -> None: 1263 if 'generic' in supported_fmts and \ 1264 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1265 # similar to 1266 # _supported_fmt generic 1267 # for bash tests 1268 supported_fmts = () 1269 1270 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1271 if not_sup or (imgfmt in unsupported_fmts): 1272 notrun('not suitable for this image format: %s' % imgfmt) 1273 1274 if imgfmt == 'luks': 1275 verify_working_luks() 1276 1277def _verify_protocol(supported: Sequence[str] = (), 1278 unsupported: Sequence[str] = ()) -> None: 1279 assert not (supported and unsupported) 1280 1281 if 'generic' in supported: 1282 return 1283 1284 not_sup = supported and (imgproto not in supported) 1285 if not_sup or (imgproto in unsupported): 1286 notrun('not suitable for this protocol: %s' % imgproto) 1287 1288def _verify_platform(supported: Sequence[str] = (), 1289 unsupported: Sequence[str] = ()) -> None: 1290 if any((sys.platform.startswith(x) for x in unsupported)): 1291 notrun('not suitable for this OS: %s' % sys.platform) 1292 1293 if supported: 1294 if not any((sys.platform.startswith(x) for x in supported)): 1295 notrun('not suitable for this OS: %s' % sys.platform) 1296 1297def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1298 if supported_cache_modes and (cachemode not in supported_cache_modes): 1299 notrun('not suitable for this cache mode: %s' % cachemode) 1300 1301def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1302 if supported_aio_modes and (aiomode not in supported_aio_modes): 1303 notrun('not suitable for this aio mode: %s' % aiomode) 1304 1305def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1306 usf_list = list(set(required_formats) - set(supported_formats())) 1307 if usf_list: 1308 notrun(f'formats {usf_list} are not whitelisted') 1309 1310 1311def _verify_virtio_blk() -> None: 1312 out = qemu_pipe('-M', 'none', '-device', 'help') 1313 if 'virtio-blk' not in out: 1314 notrun('Missing virtio-blk in QEMU binary') 1315 1316def _verify_virtio_scsi_pci_or_ccw() -> None: 1317 out = qemu_pipe('-M', 'none', '-device', 'help') 1318 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out: 1319 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary') 1320 1321 1322def _verify_imgopts(unsupported: Sequence[str] = ()) -> None: 1323 imgopts = os.environ.get('IMGOPTS') 1324 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file" 1325 # but it supported only for bash tests. We don't have a concept of global 1326 # TEST_IMG in iotests.py, not saying about somehow parsing $variables. 1327 # So, for simplicity let's just not support any IMGOPTS with '$' inside. 1328 unsup = list(unsupported) + ['$'] 1329 if imgopts and any(x in imgopts for x in unsup): 1330 notrun(f'not suitable for this imgopts: {imgopts}') 1331 1332 1333def supports_quorum(): 1334 return 'quorum' in qemu_img_pipe('--help') 1335 1336def verify_quorum(): 1337 '''Skip test suite if quorum support is not available''' 1338 if not supports_quorum(): 1339 notrun('quorum support missing') 1340 1341def has_working_luks() -> Tuple[bool, str]: 1342 """ 1343 Check whether our LUKS driver can actually create images 1344 (this extends to LUKS encryption for qcow2). 1345 1346 If not, return the reason why. 1347 """ 1348 1349 img_file = f'{test_dir}/luks-test.luks' 1350 (output, status) = \ 1351 qemu_img_pipe_and_status('create', '-f', 'luks', 1352 '--object', luks_default_secret_object, 1353 '-o', luks_default_key_secret_opt, 1354 '-o', 'iter-time=10', 1355 img_file, '1G') 1356 try: 1357 os.remove(img_file) 1358 except OSError: 1359 pass 1360 1361 if status != 0: 1362 reason = output 1363 for line in output.splitlines(): 1364 if img_file + ':' in line: 1365 reason = line.split(img_file + ':', 1)[1].strip() 1366 break 1367 1368 return (False, reason) 1369 else: 1370 return (True, '') 1371 1372def verify_working_luks(): 1373 """ 1374 Skip test suite if LUKS does not work 1375 """ 1376 (working, reason) = has_working_luks() 1377 if not working: 1378 notrun(reason) 1379 1380def qemu_pipe(*args: str) -> str: 1381 """ 1382 Run qemu with an option to print something and exit (e.g. a help option). 1383 1384 :return: QEMU's stdout output. 1385 """ 1386 full_args = [qemu_prog] + qemu_opts + list(args) 1387 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1388 return output 1389 1390def supported_formats(read_only=False): 1391 '''Set 'read_only' to True to check ro-whitelist 1392 Otherwise, rw-whitelist is checked''' 1393 1394 if not hasattr(supported_formats, "formats"): 1395 supported_formats.formats = {} 1396 1397 if read_only not in supported_formats.formats: 1398 format_message = qemu_pipe("-drive", "format=help") 1399 line = 1 if read_only else 0 1400 supported_formats.formats[read_only] = \ 1401 format_message.splitlines()[line].split(":")[1].split() 1402 1403 return supported_formats.formats[read_only] 1404 1405def skip_if_unsupported(required_formats=(), read_only=False): 1406 '''Skip Test Decorator 1407 Runs the test if all the required formats are whitelisted''' 1408 def skip_test_decorator(func): 1409 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1410 **kwargs: Dict[str, Any]) -> None: 1411 if callable(required_formats): 1412 fmts = required_formats(test_case) 1413 else: 1414 fmts = required_formats 1415 1416 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1417 if usf_list: 1418 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1419 test_case.case_skip(msg) 1420 else: 1421 func(test_case, *args, **kwargs) 1422 return func_wrapper 1423 return skip_test_decorator 1424 1425def skip_for_formats(formats: Sequence[str] = ()) \ 1426 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1427 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1428 '''Skip Test Decorator 1429 Skips the test for the given formats''' 1430 def skip_test_decorator(func): 1431 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1432 **kwargs: Dict[str, Any]) -> None: 1433 if imgfmt in formats: 1434 msg = f'{test_case}: Skipped for format {imgfmt}' 1435 test_case.case_skip(msg) 1436 else: 1437 func(test_case, *args, **kwargs) 1438 return func_wrapper 1439 return skip_test_decorator 1440 1441def skip_if_user_is_root(func): 1442 '''Skip Test Decorator 1443 Runs the test only without root permissions''' 1444 def func_wrapper(*args, **kwargs): 1445 if os.getuid() == 0: 1446 case_notrun('{}: cannot be run as root'.format(args[0])) 1447 return None 1448 else: 1449 return func(*args, **kwargs) 1450 return func_wrapper 1451 1452# We need to filter out the time taken from the output so that 1453# qemu-iotest can reliably diff the results against master output, 1454# and hide skipped tests from the reference output. 1455 1456class ReproducibleTestResult(unittest.TextTestResult): 1457 def addSkip(self, test, reason): 1458 # Same as TextTestResult, but print dot instead of "s" 1459 unittest.TestResult.addSkip(self, test, reason) 1460 if self.showAll: 1461 self.stream.writeln("skipped {0!r}".format(reason)) 1462 elif self.dots: 1463 self.stream.write(".") 1464 self.stream.flush() 1465 1466class ReproducibleStreamWrapper: 1467 def __init__(self, stream: TextIO): 1468 self.stream = stream 1469 1470 def __getattr__(self, attr): 1471 if attr in ('stream', '__getstate__'): 1472 raise AttributeError(attr) 1473 return getattr(self.stream, attr) 1474 1475 def write(self, arg=None): 1476 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg) 1477 arg = re.sub(r' \(skipped=\d+\)', r'', arg) 1478 self.stream.write(arg) 1479 1480class ReproducibleTestRunner(unittest.TextTestRunner): 1481 def __init__(self, stream: Optional[TextIO] = None, 1482 resultclass: Type[unittest.TestResult] = 1483 ReproducibleTestResult, 1484 **kwargs: Any) -> None: 1485 rstream = ReproducibleStreamWrapper(stream or sys.stdout) 1486 super().__init__(stream=rstream, # type: ignore 1487 descriptions=True, 1488 resultclass=resultclass, 1489 **kwargs) 1490 1491def execute_unittest(argv: List[str], debug: bool = False) -> None: 1492 """Executes unittests within the calling module.""" 1493 1494 # Some tests have warnings, especially ResourceWarnings for unclosed 1495 # files and sockets. Ignore them for now to ensure reproducibility of 1496 # the test output. 1497 unittest.main(argv=argv, 1498 testRunner=ReproducibleTestRunner, 1499 verbosity=2 if debug else 1, 1500 warnings=None if sys.warnoptions else 'ignore') 1501 1502def execute_setup_common(supported_fmts: Sequence[str] = (), 1503 supported_platforms: Sequence[str] = (), 1504 supported_cache_modes: Sequence[str] = (), 1505 supported_aio_modes: Sequence[str] = (), 1506 unsupported_fmts: Sequence[str] = (), 1507 supported_protocols: Sequence[str] = (), 1508 unsupported_protocols: Sequence[str] = (), 1509 required_fmts: Sequence[str] = (), 1510 unsupported_imgopts: Sequence[str] = ()) -> bool: 1511 """ 1512 Perform necessary setup for either script-style or unittest-style tests. 1513 1514 :return: Bool; Whether or not debug mode has been requested via the CLI. 1515 """ 1516 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1517 1518 debug = '-d' in sys.argv 1519 if debug: 1520 sys.argv.remove('-d') 1521 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1522 1523 _verify_image_format(supported_fmts, unsupported_fmts) 1524 _verify_protocol(supported_protocols, unsupported_protocols) 1525 _verify_platform(supported=supported_platforms) 1526 _verify_cache_mode(supported_cache_modes) 1527 _verify_aio_mode(supported_aio_modes) 1528 _verify_formats(required_fmts) 1529 _verify_virtio_blk() 1530 _verify_imgopts(unsupported_imgopts) 1531 1532 return debug 1533 1534def execute_test(*args, test_function=None, **kwargs): 1535 """Run either unittest or script-style tests.""" 1536 1537 debug = execute_setup_common(*args, **kwargs) 1538 if not test_function: 1539 execute_unittest(sys.argv, debug) 1540 else: 1541 test_function() 1542 1543def activate_logging(): 1544 """Activate iotests.log() output to stdout for script-style tests.""" 1545 handler = logging.StreamHandler(stream=sys.stdout) 1546 formatter = logging.Formatter('%(message)s') 1547 handler.setFormatter(formatter) 1548 test_logger.addHandler(handler) 1549 test_logger.setLevel(logging.INFO) 1550 test_logger.propagate = False 1551 1552# This is called from script-style iotests without a single point of entry 1553def script_initialize(*args, **kwargs): 1554 """Initialize script-style tests without running any tests.""" 1555 activate_logging() 1556 execute_setup_common(*args, **kwargs) 1557 1558# This is called from script-style iotests with a single point of entry 1559def script_main(test_function, *args, **kwargs): 1560 """Run script-style tests outside of the unittest framework""" 1561 activate_logging() 1562 execute_test(*args, test_function=test_function, **kwargs) 1563 1564# This is called from unittest style iotests 1565def main(*args, **kwargs): 1566 """Run tests using the unittest framework""" 1567 execute_test(*args, **kwargs) 1568