1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78output_dir = os.environ.get('OUTPUT_DIR', '.') 79 80try: 81 test_dir = os.environ['TEST_DIR'] 82 sock_dir = os.environ['SOCK_DIR'] 83 cachemode = os.environ['CACHEMODE'] 84 aiomode = os.environ['AIOMODE'] 85 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 86except KeyError: 87 # We are using these variables as proxies to indicate that we're 88 # not being run via "check". There may be other things set up by 89 # "check" that individual test cases rely on. 90 sys.stderr.write('Please run this test via the "check" script\n') 91 sys.exit(os.EX_USAGE) 92 93socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 94 95luks_default_secret_object = 'secret,id=keysec0,data=' + \ 96 os.environ.get('IMGKEYSECRET', '') 97luks_default_key_secret_opt = 'key-secret=keysec0' 98 99 100def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 101 connect_stderr: bool = True) -> Tuple[str, int]: 102 """ 103 Run a tool and return both its output and its exit code 104 """ 105 stderr = subprocess.STDOUT if connect_stderr else None 106 subp = subprocess.Popen(args, 107 stdout=subprocess.PIPE, 108 stderr=stderr, 109 universal_newlines=True) 110 output = subp.communicate()[0] 111 if subp.returncode < 0: 112 cmd = ' '.join(args) 113 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') 114 return (output, subp.returncode) 115 116def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 117 """ 118 Run qemu-img and return both its output and its exit code 119 """ 120 full_args = qemu_img_args + list(args) 121 return qemu_tool_pipe_and_status('qemu-img', full_args) 122 123def qemu_img(*args: str) -> int: 124 '''Run qemu-img and return the exit code''' 125 return qemu_img_pipe_and_status(*args)[1] 126 127def ordered_qmp(qmsg, conv_keys=True): 128 # Dictionaries are not ordered prior to 3.6, therefore: 129 if isinstance(qmsg, list): 130 return [ordered_qmp(atom) for atom in qmsg] 131 if isinstance(qmsg, dict): 132 od = OrderedDict() 133 for k, v in sorted(qmsg.items()): 134 if conv_keys: 135 k = k.replace('_', '-') 136 od[k] = ordered_qmp(v, conv_keys=False) 137 return od 138 return qmsg 139 140def qemu_img_create(*args): 141 args = list(args) 142 143 # default luks support 144 if '-f' in args and args[args.index('-f') + 1] == 'luks': 145 if '-o' in args: 146 i = args.index('-o') 147 if 'key-secret' not in args[i + 1]: 148 args[i + 1].append(luks_default_key_secret_opt) 149 args.insert(i + 2, '--object') 150 args.insert(i + 3, luks_default_secret_object) 151 else: 152 args = ['-o', luks_default_key_secret_opt, 153 '--object', luks_default_secret_object] + args 154 155 args.insert(0, 'create') 156 157 return qemu_img(*args) 158 159def qemu_img_measure(*args): 160 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 161 162def qemu_img_check(*args): 163 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 164 165def qemu_img_verbose(*args): 166 '''Run qemu-img without suppressing its output and return the exit code''' 167 exitcode = subprocess.call(qemu_img_args + list(args)) 168 if exitcode < 0: 169 sys.stderr.write('qemu-img received signal %i: %s\n' 170 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 171 return exitcode 172 173def qemu_img_pipe(*args: str) -> str: 174 '''Run qemu-img and return its output''' 175 return qemu_img_pipe_and_status(*args)[0] 176 177def qemu_img_log(*args): 178 result = qemu_img_pipe(*args) 179 log(result, filters=[filter_testfiles]) 180 return result 181 182def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 183 args = ['info'] 184 if imgopts: 185 args.append('--image-opts') 186 else: 187 args += ['-f', imgfmt] 188 args += extra_args 189 args.append(filename) 190 191 output = qemu_img_pipe(*args) 192 if not filter_path: 193 filter_path = filename 194 log(filter_img_info(output, filter_path)) 195 196def qemu_io(*args): 197 '''Run qemu-io and return the stdout data''' 198 args = qemu_io_args + list(args) 199 return qemu_tool_pipe_and_status('qemu-io', args)[0] 200 201def qemu_io_log(*args): 202 result = qemu_io(*args) 203 log(result, filters=[filter_testfiles, filter_qemu_io]) 204 return result 205 206def qemu_io_silent(*args): 207 '''Run qemu-io and return the exit code, suppressing stdout''' 208 if '-f' in args or '--image-opts' in args: 209 default_args = qemu_io_args_no_fmt 210 else: 211 default_args = qemu_io_args 212 213 args = default_args + list(args) 214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 215 if exitcode < 0: 216 sys.stderr.write('qemu-io received signal %i: %s\n' % 217 (-exitcode, ' '.join(args))) 218 return exitcode 219 220def qemu_io_silent_check(*args): 221 '''Run qemu-io and return the true if subprocess returned 0''' 222 args = qemu_io_args + list(args) 223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 224 stderr=subprocess.STDOUT) 225 return exitcode == 0 226 227def get_virtio_scsi_device(): 228 if qemu_default_machine == 's390-ccw-virtio': 229 return 'virtio-scsi-ccw' 230 return 'virtio-scsi-pci' 231 232class QemuIoInteractive: 233 def __init__(self, *args): 234 self.args = qemu_io_args_no_fmt + list(args) 235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 236 stdout=subprocess.PIPE, 237 stderr=subprocess.STDOUT, 238 universal_newlines=True) 239 out = self._p.stdout.read(9) 240 if out != 'qemu-io> ': 241 # Most probably qemu-io just failed to start. 242 # Let's collect the whole output and exit. 243 out += self._p.stdout.read() 244 self._p.wait(timeout=1) 245 raise ValueError(out) 246 247 def close(self): 248 self._p.communicate('q\n') 249 250 def _read_output(self): 251 pattern = 'qemu-io> ' 252 n = len(pattern) 253 pos = 0 254 s = [] 255 while pos != n: 256 c = self._p.stdout.read(1) 257 # check unexpected EOF 258 assert c != '' 259 s.append(c) 260 if c == pattern[pos]: 261 pos += 1 262 else: 263 pos = 0 264 265 return ''.join(s[:-n]) 266 267 def cmd(self, cmd): 268 # quit command is in close(), '\n' is added automatically 269 assert '\n' not in cmd 270 cmd = cmd.strip() 271 assert cmd not in ('q', 'quit') 272 self._p.stdin.write(cmd + '\n') 273 self._p.stdin.flush() 274 return self._read_output() 275 276 277def qemu_nbd(*args): 278 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 280 281def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 282 '''Run qemu-nbd in daemon mode and return both the parent's exit code 283 and its output in case of an error''' 284 full_args = qemu_nbd_args + ['--fork'] + list(args) 285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 286 connect_stderr=False) 287 return returncode, output if returncode else '' 288 289def qemu_nbd_list_log(*args: str) -> str: 290 '''Run qemu-nbd to list remote exports''' 291 full_args = [qemu_nbd_prog, '-L'] + list(args) 292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 293 log(output, filters=[filter_testfiles, filter_nbd_exports]) 294 return output 295 296@contextmanager 297def qemu_nbd_popen(*args): 298 '''Context manager running qemu-nbd within the context''' 299 pid_file = file_path("pid") 300 301 cmd = list(qemu_nbd_args) 302 cmd.extend(('--persistent', '--pid-file', pid_file)) 303 cmd.extend(args) 304 305 log('Start NBD server') 306 p = subprocess.Popen(cmd) 307 try: 308 while not os.path.exists(pid_file): 309 if p.poll() is not None: 310 raise RuntimeError( 311 "qemu-nbd terminated with exit code {}: {}" 312 .format(p.returncode, ' '.join(cmd))) 313 314 time.sleep(0.01) 315 yield 316 finally: 317 log('Kill NBD server') 318 p.kill() 319 p.wait() 320 321def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 322 '''Return True if two image files are identical''' 323 return qemu_img('compare', '-f', fmt1, 324 '-F', fmt2, img1, img2) == 0 325 326def create_image(name, size): 327 '''Create a fully-allocated raw image with sector markers''' 328 file = open(name, 'wb') 329 i = 0 330 while i < size: 331 sector = struct.pack('>l504xl', i // 512, i // 512) 332 file.write(sector) 333 i = i + 512 334 file.close() 335 336def image_size(img): 337 '''Return image's virtual size''' 338 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 339 return json.loads(r)['virtual-size'] 340 341def is_str(val): 342 return isinstance(val, str) 343 344test_dir_re = re.compile(r"%s" % test_dir) 345def filter_test_dir(msg): 346 return test_dir_re.sub("TEST_DIR", msg) 347 348win32_re = re.compile(r"\r") 349def filter_win32(msg): 350 return win32_re.sub("", msg) 351 352qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 353 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 354 r"and [0-9\/.inf]* ops\/sec\)") 355def filter_qemu_io(msg): 356 msg = filter_win32(msg) 357 return qemu_io_re.sub("X ops; XX:XX:XX.X " 358 "(XXX YYY/sec and XXX ops/sec)", msg) 359 360chown_re = re.compile(r"chown [0-9]+:[0-9]+") 361def filter_chown(msg): 362 return chown_re.sub("chown UID:GID", msg) 363 364def filter_qmp_event(event): 365 '''Filter a QMP event dict''' 366 event = dict(event) 367 if 'timestamp' in event: 368 event['timestamp']['seconds'] = 'SECS' 369 event['timestamp']['microseconds'] = 'USECS' 370 return event 371 372def filter_qmp(qmsg, filter_fn): 373 '''Given a string filter, filter a QMP object's values. 374 filter_fn takes a (key, value) pair.''' 375 # Iterate through either lists or dicts; 376 if isinstance(qmsg, list): 377 items = enumerate(qmsg) 378 else: 379 items = qmsg.items() 380 381 for k, v in items: 382 if isinstance(v, (dict, list)): 383 qmsg[k] = filter_qmp(v, filter_fn) 384 else: 385 qmsg[k] = filter_fn(k, v) 386 return qmsg 387 388def filter_testfiles(msg): 389 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 390 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 391 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 392 393def filter_qmp_testfiles(qmsg): 394 def _filter(_key, value): 395 if is_str(value): 396 return filter_testfiles(value) 397 return value 398 return filter_qmp(qmsg, _filter) 399 400def filter_virtio_scsi(output: str) -> str: 401 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 402 403def filter_qmp_virtio_scsi(qmsg): 404 def _filter(_key, value): 405 if is_str(value): 406 return filter_virtio_scsi(value) 407 return value 408 return filter_qmp(qmsg, _filter) 409 410def filter_generated_node_ids(msg): 411 return re.sub("#block[0-9]+", "NODE_NAME", msg) 412 413def filter_img_info(output, filename): 414 lines = [] 415 for line in output.split('\n'): 416 if 'disk size' in line or 'actual-size' in line: 417 continue 418 line = line.replace(filename, 'TEST_IMG') 419 line = filter_testfiles(line) 420 line = line.replace(imgfmt, 'IMGFMT') 421 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 422 line = re.sub('uuid: [-a-f0-9]+', 423 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 424 line) 425 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 426 lines.append(line) 427 return '\n'.join(lines) 428 429def filter_imgfmt(msg): 430 return msg.replace(imgfmt, 'IMGFMT') 431 432def filter_qmp_imgfmt(qmsg): 433 def _filter(_key, value): 434 if is_str(value): 435 return filter_imgfmt(value) 436 return value 437 return filter_qmp(qmsg, _filter) 438 439def filter_nbd_exports(output: str) -> str: 440 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 441 442 443Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 444 445def log(msg: Msg, 446 filters: Iterable[Callable[[Msg], Msg]] = (), 447 indent: Optional[int] = None) -> None: 448 """ 449 Logs either a string message or a JSON serializable message (like QMP). 450 If indent is provided, JSON serializable messages are pretty-printed. 451 """ 452 for flt in filters: 453 msg = flt(msg) 454 if isinstance(msg, (dict, list)): 455 # Don't sort if it's already sorted 456 do_sort = not isinstance(msg, OrderedDict) 457 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 458 else: 459 test_logger.info(msg) 460 461class Timeout: 462 def __init__(self, seconds, errmsg="Timeout"): 463 self.seconds = seconds 464 self.errmsg = errmsg 465 def __enter__(self): 466 signal.signal(signal.SIGALRM, self.timeout) 467 signal.setitimer(signal.ITIMER_REAL, self.seconds) 468 return self 469 def __exit__(self, exc_type, value, traceback): 470 signal.setitimer(signal.ITIMER_REAL, 0) 471 return False 472 def timeout(self, signum, frame): 473 raise Exception(self.errmsg) 474 475def file_pattern(name): 476 return "{0}-{1}".format(os.getpid(), name) 477 478class FilePath: 479 """ 480 Context manager generating multiple file names. The generated files are 481 removed when exiting the context. 482 483 Example usage: 484 485 with FilePath('a.img', 'b.img') as (img_a, img_b): 486 # Use img_a and img_b here... 487 488 # a.img and b.img are automatically removed here. 489 490 By default images are created in iotests.test_dir. To create sockets use 491 iotests.sock_dir: 492 493 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 494 495 For convenience, calling with one argument yields a single file instead of 496 a tuple with one item. 497 498 """ 499 def __init__(self, *names, base_dir=test_dir): 500 self.paths = [os.path.join(base_dir, file_pattern(name)) 501 for name in names] 502 503 def __enter__(self): 504 if len(self.paths) == 1: 505 return self.paths[0] 506 else: 507 return self.paths 508 509 def __exit__(self, exc_type, exc_val, exc_tb): 510 for path in self.paths: 511 try: 512 os.remove(path) 513 except OSError: 514 pass 515 return False 516 517 518def try_remove(img): 519 try: 520 os.remove(img) 521 except OSError: 522 pass 523 524def file_path_remover(): 525 for path in reversed(file_path_remover.paths): 526 try_remove(path) 527 528 529def file_path(*names, base_dir=test_dir): 530 ''' Another way to get auto-generated filename that cleans itself up. 531 532 Use is as simple as: 533 534 img_a, img_b = file_path('a.img', 'b.img') 535 sock = file_path('socket') 536 ''' 537 538 if not hasattr(file_path_remover, 'paths'): 539 file_path_remover.paths = [] 540 atexit.register(file_path_remover) 541 542 paths = [] 543 for name in names: 544 filename = file_pattern(name) 545 path = os.path.join(base_dir, filename) 546 file_path_remover.paths.append(path) 547 paths.append(path) 548 549 return paths[0] if len(paths) == 1 else paths 550 551def remote_filename(path): 552 if imgproto == 'file': 553 return path 554 elif imgproto == 'ssh': 555 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 556 else: 557 raise Exception("Protocol %s not supported" % (imgproto)) 558 559class VM(qtest.QEMUQtestMachine): 560 '''A QEMU VM''' 561 562 def __init__(self, path_suffix=''): 563 name = "qemu%s-%d" % (path_suffix, os.getpid()) 564 super().__init__(qemu_prog, qemu_opts, name=name, 565 test_dir=test_dir, 566 socket_scm_helper=socket_scm_helper, 567 sock_dir=sock_dir) 568 self._num_drives = 0 569 570 def add_object(self, opts): 571 self._args.append('-object') 572 self._args.append(opts) 573 return self 574 575 def add_device(self, opts): 576 self._args.append('-device') 577 self._args.append(opts) 578 return self 579 580 def add_drive_raw(self, opts): 581 self._args.append('-drive') 582 self._args.append(opts) 583 return self 584 585 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 586 '''Add a virtio-blk drive to the VM''' 587 options = ['if=%s' % interface, 588 'id=drive%d' % self._num_drives] 589 590 if path is not None: 591 options.append('file=%s' % path) 592 options.append('format=%s' % img_format) 593 options.append('cache=%s' % cachemode) 594 options.append('aio=%s' % aiomode) 595 596 if opts: 597 options.append(opts) 598 599 if img_format == 'luks' and 'key-secret' not in opts: 600 # default luks support 601 if luks_default_secret_object not in self._args: 602 self.add_object(luks_default_secret_object) 603 604 options.append(luks_default_key_secret_opt) 605 606 self._args.append('-drive') 607 self._args.append(','.join(options)) 608 self._num_drives += 1 609 return self 610 611 def add_blockdev(self, opts): 612 self._args.append('-blockdev') 613 if isinstance(opts, str): 614 self._args.append(opts) 615 else: 616 self._args.append(','.join(opts)) 617 return self 618 619 def add_incoming(self, addr): 620 self._args.append('-incoming') 621 self._args.append(addr) 622 return self 623 624 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 625 cmd = 'human-monitor-command' 626 kwargs: Dict[str, Any] = {'command-line': command_line} 627 if use_log: 628 return self.qmp_log(cmd, **kwargs) 629 else: 630 return self.qmp(cmd, **kwargs) 631 632 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 633 """Pause drive r/w operations""" 634 if not event: 635 self.pause_drive(drive, "read_aio") 636 self.pause_drive(drive, "write_aio") 637 return 638 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 639 640 def resume_drive(self, drive: str) -> None: 641 """Resume drive r/w operations""" 642 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 643 644 def hmp_qemu_io(self, drive: str, cmd: str, 645 use_log: bool = False) -> QMPMessage: 646 """Write to a given drive using an HMP command""" 647 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 648 649 def flatten_qmp_object(self, obj, output=None, basestr=''): 650 if output is None: 651 output = dict() 652 if isinstance(obj, list): 653 for i, item in enumerate(obj): 654 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 655 elif isinstance(obj, dict): 656 for key in obj: 657 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 658 else: 659 output[basestr[:-1]] = obj # Strip trailing '.' 660 return output 661 662 def qmp_to_opts(self, obj): 663 obj = self.flatten_qmp_object(obj) 664 output_list = list() 665 for key in obj: 666 output_list += [key + '=' + obj[key]] 667 return ','.join(output_list) 668 669 def get_qmp_events_filtered(self, wait=60.0): 670 result = [] 671 for ev in self.get_qmp_events(wait=wait): 672 result.append(filter_qmp_event(ev)) 673 return result 674 675 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 676 full_cmd = OrderedDict(( 677 ("execute", cmd), 678 ("arguments", ordered_qmp(kwargs)) 679 )) 680 log(full_cmd, filters, indent=indent) 681 result = self.qmp(cmd, **kwargs) 682 log(result, filters, indent=indent) 683 return result 684 685 # Returns None on success, and an error string on failure 686 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 687 pre_finalize=None, cancel=False, wait=60.0): 688 """ 689 run_job moves a job from creation through to dismissal. 690 691 :param job: String. ID of recently-launched job 692 :param auto_finalize: Bool. True if the job was launched with 693 auto_finalize. Defaults to True. 694 :param auto_dismiss: Bool. True if the job was launched with 695 auto_dismiss=True. Defaults to False. 696 :param pre_finalize: Callback. A callable that takes no arguments to be 697 invoked prior to issuing job-finalize, if any. 698 :param cancel: Bool. When true, cancels the job after the pre_finalize 699 callback. 700 :param wait: Float. Timeout value specifying how long to wait for any 701 event, in seconds. Defaults to 60.0. 702 """ 703 match_device = {'data': {'device': job}} 704 match_id = {'data': {'id': job}} 705 events = [ 706 ('BLOCK_JOB_COMPLETED', match_device), 707 ('BLOCK_JOB_CANCELLED', match_device), 708 ('BLOCK_JOB_ERROR', match_device), 709 ('BLOCK_JOB_READY', match_device), 710 ('BLOCK_JOB_PENDING', match_id), 711 ('JOB_STATUS_CHANGE', match_id) 712 ] 713 error = None 714 while True: 715 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 716 if ev['event'] != 'JOB_STATUS_CHANGE': 717 log(ev) 718 continue 719 status = ev['data']['status'] 720 if status == 'aborting': 721 result = self.qmp('query-jobs') 722 for j in result['return']: 723 if j['id'] == job: 724 error = j['error'] 725 log('Job failed: %s' % (j['error'])) 726 elif status == 'ready': 727 self.qmp_log('job-complete', id=job) 728 elif status == 'pending' and not auto_finalize: 729 if pre_finalize: 730 pre_finalize() 731 if cancel: 732 self.qmp_log('job-cancel', id=job) 733 else: 734 self.qmp_log('job-finalize', id=job) 735 elif status == 'concluded' and not auto_dismiss: 736 self.qmp_log('job-dismiss', id=job) 737 elif status == 'null': 738 return error 739 740 # Returns None on success, and an error string on failure 741 def blockdev_create(self, options, job_id='job0', filters=None): 742 if filters is None: 743 filters = [filter_qmp_testfiles] 744 result = self.qmp_log('blockdev-create', filters=filters, 745 job_id=job_id, options=options) 746 747 if 'return' in result: 748 assert result['return'] == {} 749 job_result = self.run_job(job_id) 750 else: 751 job_result = result['error'] 752 753 log("") 754 return job_result 755 756 def enable_migration_events(self, name): 757 log('Enabling migration QMP events on %s...' % name) 758 log(self.qmp('migrate-set-capabilities', capabilities=[ 759 { 760 'capability': 'events', 761 'state': True 762 } 763 ])) 764 765 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 766 while True: 767 event = self.event_wait('MIGRATION') 768 # We use the default timeout, and with a timeout, event_wait() 769 # never returns None 770 assert event 771 772 log(event, filters=[filter_qmp_event]) 773 if event['data']['status'] in ('completed', 'failed'): 774 break 775 776 if event['data']['status'] == 'completed': 777 # The event may occur in finish-migrate, so wait for the expected 778 # post-migration runstate 779 runstate = None 780 while runstate != expect_runstate: 781 runstate = self.qmp('query-status')['return']['status'] 782 return True 783 else: 784 return False 785 786 def node_info(self, node_name): 787 nodes = self.qmp('query-named-block-nodes') 788 for x in nodes['return']: 789 if x['node-name'] == node_name: 790 return x 791 return None 792 793 def query_bitmaps(self): 794 res = self.qmp("query-named-block-nodes") 795 return {device['node-name']: device['dirty-bitmaps'] 796 for device in res['return'] if 'dirty-bitmaps' in device} 797 798 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 799 """ 800 get a specific bitmap from the object returned by query_bitmaps. 801 :param recording: If specified, filter results by the specified value. 802 :param bitmaps: If specified, use it instead of call query_bitmaps() 803 """ 804 if bitmaps is None: 805 bitmaps = self.query_bitmaps() 806 807 for bitmap in bitmaps[node_name]: 808 if bitmap.get('name', '') == bitmap_name: 809 if recording is None or bitmap.get('recording') == recording: 810 return bitmap 811 return None 812 813 def check_bitmap_status(self, node_name, bitmap_name, fields): 814 ret = self.get_bitmap(node_name, bitmap_name) 815 816 return fields.items() <= ret.items() 817 818 def assert_block_path(self, root, path, expected_node, graph=None): 819 """ 820 Check whether the node under the given path in the block graph 821 is @expected_node. 822 823 @root is the node name of the node where the @path is rooted. 824 825 @path is a string that consists of child names separated by 826 slashes. It must begin with a slash. 827 828 Examples for @root + @path: 829 - root="qcow2-node", path="/backing/file" 830 - root="quorum-node", path="/children.2/file" 831 832 Hypothetically, @path could be empty, in which case it would 833 point to @root. However, in practice this case is not useful 834 and hence not allowed. 835 836 @expected_node may be None. (All elements of the path but the 837 leaf must still exist.) 838 839 @graph may be None or the result of an x-debug-query-block-graph 840 call that has already been performed. 841 """ 842 if graph is None: 843 graph = self.qmp('x-debug-query-block-graph')['return'] 844 845 iter_path = iter(path.split('/')) 846 847 # Must start with a / 848 assert next(iter_path) == '' 849 850 node = next((node for node in graph['nodes'] if node['name'] == root), 851 None) 852 853 # An empty @path is not allowed, so the root node must be present 854 assert node is not None, 'Root node %s not found' % root 855 856 for child_name in iter_path: 857 assert node is not None, 'Cannot follow path %s%s' % (root, path) 858 859 try: 860 node_id = next(edge['child'] for edge in graph['edges'] 861 if (edge['parent'] == node['id'] and 862 edge['name'] == child_name)) 863 864 node = next(node for node in graph['nodes'] 865 if node['id'] == node_id) 866 867 except StopIteration: 868 node = None 869 870 if node is None: 871 assert expected_node is None, \ 872 'No node found under %s (but expected %s)' % \ 873 (path, expected_node) 874 else: 875 assert node['name'] == expected_node, \ 876 'Found node %s under %s (but expected %s)' % \ 877 (node['name'], path, expected_node) 878 879index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 880 881class QMPTestCase(unittest.TestCase): 882 '''Abstract base class for QMP test cases''' 883 884 def __init__(self, *args, **kwargs): 885 super().__init__(*args, **kwargs) 886 # Many users of this class set a VM property we rely on heavily 887 # in the methods below. 888 self.vm = None 889 890 def dictpath(self, d, path): 891 '''Traverse a path in a nested dict''' 892 for component in path.split('/'): 893 m = index_re.match(component) 894 if m: 895 component, idx = m.groups() 896 idx = int(idx) 897 898 if not isinstance(d, dict) or component not in d: 899 self.fail(f'failed path traversal for "{path}" in "{d}"') 900 d = d[component] 901 902 if m: 903 if not isinstance(d, list): 904 self.fail(f'path component "{component}" in "{path}" ' 905 f'is not a list in "{d}"') 906 try: 907 d = d[idx] 908 except IndexError: 909 self.fail(f'invalid index "{idx}" in path "{path}" ' 910 f'in "{d}"') 911 return d 912 913 def assert_qmp_absent(self, d, path): 914 try: 915 result = self.dictpath(d, path) 916 except AssertionError: 917 return 918 self.fail('path "%s" has value "%s"' % (path, str(result))) 919 920 def assert_qmp(self, d, path, value): 921 '''Assert that the value for a specific path in a QMP dict 922 matches. When given a list of values, assert that any of 923 them matches.''' 924 925 result = self.dictpath(d, path) 926 927 # [] makes no sense as a list of valid values, so treat it as 928 # an actual single value. 929 if isinstance(value, list) and value != []: 930 for v in value: 931 if result == v: 932 return 933 self.fail('no match for "%s" in %s' % (str(result), str(value))) 934 else: 935 self.assertEqual(result, value, 936 '"%s" is "%s", expected "%s"' 937 % (path, str(result), str(value))) 938 939 def assert_no_active_block_jobs(self): 940 result = self.vm.qmp('query-block-jobs') 941 self.assert_qmp(result, 'return', []) 942 943 def assert_has_block_node(self, node_name=None, file_name=None): 944 """Issue a query-named-block-nodes and assert node_name and/or 945 file_name is present in the result""" 946 def check_equal_or_none(a, b): 947 return a is None or b is None or a == b 948 assert node_name or file_name 949 result = self.vm.qmp('query-named-block-nodes') 950 for x in result["return"]: 951 if check_equal_or_none(x.get("node-name"), node_name) and \ 952 check_equal_or_none(x.get("file"), file_name): 953 return 954 self.fail("Cannot find %s %s in result:\n%s" % 955 (node_name, file_name, result)) 956 957 def assert_json_filename_equal(self, json_filename, reference): 958 '''Asserts that the given filename is a json: filename and that its 959 content is equal to the given reference object''' 960 self.assertEqual(json_filename[:5], 'json:') 961 self.assertEqual( 962 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 963 self.vm.flatten_qmp_object(reference) 964 ) 965 966 def cancel_and_wait(self, drive='drive0', force=False, 967 resume=False, wait=60.0): 968 '''Cancel a block job and wait for it to finish, returning the event''' 969 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 970 self.assert_qmp(result, 'return', {}) 971 972 if resume: 973 self.vm.resume_drive(drive) 974 975 cancelled = False 976 result = None 977 while not cancelled: 978 for event in self.vm.get_qmp_events(wait=wait): 979 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 980 event['event'] == 'BLOCK_JOB_CANCELLED': 981 self.assert_qmp(event, 'data/device', drive) 982 result = event 983 cancelled = True 984 elif event['event'] == 'JOB_STATUS_CHANGE': 985 self.assert_qmp(event, 'data/id', drive) 986 987 988 self.assert_no_active_block_jobs() 989 return result 990 991 def wait_until_completed(self, drive='drive0', check_offset=True, 992 wait=60.0, error=None): 993 '''Wait for a block job to finish, returning the event''' 994 while True: 995 for event in self.vm.get_qmp_events(wait=wait): 996 if event['event'] == 'BLOCK_JOB_COMPLETED': 997 self.assert_qmp(event, 'data/device', drive) 998 if error is None: 999 self.assert_qmp_absent(event, 'data/error') 1000 if check_offset: 1001 self.assert_qmp(event, 'data/offset', 1002 event['data']['len']) 1003 else: 1004 self.assert_qmp(event, 'data/error', error) 1005 self.assert_no_active_block_jobs() 1006 return event 1007 if event['event'] == 'JOB_STATUS_CHANGE': 1008 self.assert_qmp(event, 'data/id', drive) 1009 1010 def wait_ready(self, drive='drive0'): 1011 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1012 return self.vm.events_wait([ 1013 ('BLOCK_JOB_READY', 1014 {'data': {'type': 'mirror', 'device': drive}}), 1015 ('BLOCK_JOB_READY', 1016 {'data': {'type': 'commit', 'device': drive}}) 1017 ]) 1018 1019 def wait_ready_and_cancel(self, drive='drive0'): 1020 self.wait_ready(drive=drive) 1021 event = self.cancel_and_wait(drive=drive) 1022 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1023 self.assert_qmp(event, 'data/type', 'mirror') 1024 self.assert_qmp(event, 'data/offset', event['data']['len']) 1025 1026 def complete_and_wait(self, drive='drive0', wait_ready=True, 1027 completion_error=None): 1028 '''Complete a block job and wait for it to finish''' 1029 if wait_ready: 1030 self.wait_ready(drive=drive) 1031 1032 result = self.vm.qmp('block-job-complete', device=drive) 1033 self.assert_qmp(result, 'return', {}) 1034 1035 event = self.wait_until_completed(drive=drive, error=completion_error) 1036 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1037 1038 def pause_wait(self, job_id='job0'): 1039 with Timeout(3, "Timeout waiting for job to pause"): 1040 while True: 1041 result = self.vm.qmp('query-block-jobs') 1042 found = False 1043 for job in result['return']: 1044 if job['device'] == job_id: 1045 found = True 1046 if job['paused'] and not job['busy']: 1047 return job 1048 break 1049 assert found 1050 1051 def pause_job(self, job_id='job0', wait=True): 1052 result = self.vm.qmp('block-job-pause', device=job_id) 1053 self.assert_qmp(result, 'return', {}) 1054 if wait: 1055 return self.pause_wait(job_id) 1056 return result 1057 1058 def case_skip(self, reason): 1059 '''Skip this test case''' 1060 case_notrun(reason) 1061 self.skipTest(reason) 1062 1063 1064def notrun(reason): 1065 '''Skip this test suite''' 1066 # Each test in qemu-iotests has a number ("seq") 1067 seq = os.path.basename(sys.argv[0]) 1068 1069 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1070 logger.warning("%s not run: %s", seq, reason) 1071 sys.exit(0) 1072 1073def case_notrun(reason): 1074 '''Mark this test case as not having been run (without actually 1075 skipping it, that is left to the caller). See 1076 QMPTestCase.case_skip() for a variant that actually skips the 1077 current test case.''' 1078 1079 # Each test in qemu-iotests has a number ("seq") 1080 seq = os.path.basename(sys.argv[0]) 1081 1082 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1083 ' [case not run] ' + reason + '\n') 1084 1085def _verify_image_format(supported_fmts: Sequence[str] = (), 1086 unsupported_fmts: Sequence[str] = ()) -> None: 1087 if 'generic' in supported_fmts and \ 1088 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1089 # similar to 1090 # _supported_fmt generic 1091 # for bash tests 1092 supported_fmts = () 1093 1094 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1095 if not_sup or (imgfmt in unsupported_fmts): 1096 notrun('not suitable for this image format: %s' % imgfmt) 1097 1098 if imgfmt == 'luks': 1099 verify_working_luks() 1100 1101def _verify_protocol(supported: Sequence[str] = (), 1102 unsupported: Sequence[str] = ()) -> None: 1103 assert not (supported and unsupported) 1104 1105 if 'generic' in supported: 1106 return 1107 1108 not_sup = supported and (imgproto not in supported) 1109 if not_sup or (imgproto in unsupported): 1110 notrun('not suitable for this protocol: %s' % imgproto) 1111 1112def _verify_platform(supported: Sequence[str] = (), 1113 unsupported: Sequence[str] = ()) -> None: 1114 if any((sys.platform.startswith(x) for x in unsupported)): 1115 notrun('not suitable for this OS: %s' % sys.platform) 1116 1117 if supported: 1118 if not any((sys.platform.startswith(x) for x in supported)): 1119 notrun('not suitable for this OS: %s' % sys.platform) 1120 1121def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1122 if supported_cache_modes and (cachemode not in supported_cache_modes): 1123 notrun('not suitable for this cache mode: %s' % cachemode) 1124 1125def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1126 if supported_aio_modes and (aiomode not in supported_aio_modes): 1127 notrun('not suitable for this aio mode: %s' % aiomode) 1128 1129def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1130 usf_list = list(set(required_formats) - set(supported_formats())) 1131 if usf_list: 1132 notrun(f'formats {usf_list} are not whitelisted') 1133 1134 1135def _verify_virtio_blk() -> None: 1136 out = qemu_pipe('-M', 'none', '-device', 'help') 1137 if 'virtio-blk' not in out: 1138 notrun('Missing virtio-blk in QEMU binary') 1139 1140 1141def supports_quorum(): 1142 return 'quorum' in qemu_img_pipe('--help') 1143 1144def verify_quorum(): 1145 '''Skip test suite if quorum support is not available''' 1146 if not supports_quorum(): 1147 notrun('quorum support missing') 1148 1149def has_working_luks() -> Tuple[bool, str]: 1150 """ 1151 Check whether our LUKS driver can actually create images 1152 (this extends to LUKS encryption for qcow2). 1153 1154 If not, return the reason why. 1155 """ 1156 1157 img_file = f'{test_dir}/luks-test.luks' 1158 (output, status) = \ 1159 qemu_img_pipe_and_status('create', '-f', 'luks', 1160 '--object', luks_default_secret_object, 1161 '-o', luks_default_key_secret_opt, 1162 '-o', 'iter-time=10', 1163 img_file, '1G') 1164 try: 1165 os.remove(img_file) 1166 except OSError: 1167 pass 1168 1169 if status != 0: 1170 reason = output 1171 for line in output.splitlines(): 1172 if img_file + ':' in line: 1173 reason = line.split(img_file + ':', 1)[1].strip() 1174 break 1175 1176 return (False, reason) 1177 else: 1178 return (True, '') 1179 1180def verify_working_luks(): 1181 """ 1182 Skip test suite if LUKS does not work 1183 """ 1184 (working, reason) = has_working_luks() 1185 if not working: 1186 notrun(reason) 1187 1188def qemu_pipe(*args: str) -> str: 1189 """ 1190 Run qemu with an option to print something and exit (e.g. a help option). 1191 1192 :return: QEMU's stdout output. 1193 """ 1194 full_args = [qemu_prog] + qemu_opts + list(args) 1195 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1196 return output 1197 1198def supported_formats(read_only=False): 1199 '''Set 'read_only' to True to check ro-whitelist 1200 Otherwise, rw-whitelist is checked''' 1201 1202 if not hasattr(supported_formats, "formats"): 1203 supported_formats.formats = {} 1204 1205 if read_only not in supported_formats.formats: 1206 format_message = qemu_pipe("-drive", "format=help") 1207 line = 1 if read_only else 0 1208 supported_formats.formats[read_only] = \ 1209 format_message.splitlines()[line].split(":")[1].split() 1210 1211 return supported_formats.formats[read_only] 1212 1213def skip_if_unsupported(required_formats=(), read_only=False): 1214 '''Skip Test Decorator 1215 Runs the test if all the required formats are whitelisted''' 1216 def skip_test_decorator(func): 1217 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1218 **kwargs: Dict[str, Any]) -> None: 1219 if callable(required_formats): 1220 fmts = required_formats(test_case) 1221 else: 1222 fmts = required_formats 1223 1224 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1225 if usf_list: 1226 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1227 test_case.case_skip(msg) 1228 else: 1229 func(test_case, *args, **kwargs) 1230 return func_wrapper 1231 return skip_test_decorator 1232 1233def skip_for_formats(formats: Sequence[str] = ()) \ 1234 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1235 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1236 '''Skip Test Decorator 1237 Skips the test for the given formats''' 1238 def skip_test_decorator(func): 1239 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1240 **kwargs: Dict[str, Any]) -> None: 1241 if imgfmt in formats: 1242 msg = f'{test_case}: Skipped for format {imgfmt}' 1243 test_case.case_skip(msg) 1244 else: 1245 func(test_case, *args, **kwargs) 1246 return func_wrapper 1247 return skip_test_decorator 1248 1249def skip_if_user_is_root(func): 1250 '''Skip Test Decorator 1251 Runs the test only without root permissions''' 1252 def func_wrapper(*args, **kwargs): 1253 if os.getuid() == 0: 1254 case_notrun('{}: cannot be run as root'.format(args[0])) 1255 return None 1256 else: 1257 return func(*args, **kwargs) 1258 return func_wrapper 1259 1260def execute_unittest(debug=False): 1261 """Executes unittests within the calling module.""" 1262 1263 verbosity = 2 if debug else 1 1264 1265 if debug: 1266 output = sys.stdout 1267 else: 1268 # We need to filter out the time taken from the output so that 1269 # qemu-iotest can reliably diff the results against master output. 1270 output = io.StringIO() 1271 1272 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1273 verbosity=verbosity) 1274 try: 1275 # unittest.main() will use sys.exit(); so expect a SystemExit 1276 # exception 1277 unittest.main(testRunner=runner) 1278 finally: 1279 # We need to filter out the time taken from the output so that 1280 # qemu-iotest can reliably diff the results against master output. 1281 if not debug: 1282 out = output.getvalue() 1283 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1284 1285 # Hide skipped tests from the reference output 1286 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1287 out_first_line, out_rest = out.split('\n', 1) 1288 out = out_first_line.replace('s', '.') + '\n' + out_rest 1289 1290 sys.stderr.write(out) 1291 1292def execute_setup_common(supported_fmts: Sequence[str] = (), 1293 supported_platforms: Sequence[str] = (), 1294 supported_cache_modes: Sequence[str] = (), 1295 supported_aio_modes: Sequence[str] = (), 1296 unsupported_fmts: Sequence[str] = (), 1297 supported_protocols: Sequence[str] = (), 1298 unsupported_protocols: Sequence[str] = (), 1299 required_fmts: Sequence[str] = ()) -> bool: 1300 """ 1301 Perform necessary setup for either script-style or unittest-style tests. 1302 1303 :return: Bool; Whether or not debug mode has been requested via the CLI. 1304 """ 1305 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1306 1307 debug = '-d' in sys.argv 1308 if debug: 1309 sys.argv.remove('-d') 1310 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1311 1312 _verify_image_format(supported_fmts, unsupported_fmts) 1313 _verify_protocol(supported_protocols, unsupported_protocols) 1314 _verify_platform(supported=supported_platforms) 1315 _verify_cache_mode(supported_cache_modes) 1316 _verify_aio_mode(supported_aio_modes) 1317 _verify_formats(required_fmts) 1318 _verify_virtio_blk() 1319 1320 return debug 1321 1322def execute_test(*args, test_function=None, **kwargs): 1323 """Run either unittest or script-style tests.""" 1324 1325 debug = execute_setup_common(*args, **kwargs) 1326 if not test_function: 1327 execute_unittest(debug) 1328 else: 1329 test_function() 1330 1331def activate_logging(): 1332 """Activate iotests.log() output to stdout for script-style tests.""" 1333 handler = logging.StreamHandler(stream=sys.stdout) 1334 formatter = logging.Formatter('%(message)s') 1335 handler.setFormatter(formatter) 1336 test_logger.addHandler(handler) 1337 test_logger.setLevel(logging.INFO) 1338 test_logger.propagate = False 1339 1340# This is called from script-style iotests without a single point of entry 1341def script_initialize(*args, **kwargs): 1342 """Initialize script-style tests without running any tests.""" 1343 activate_logging() 1344 execute_setup_common(*args, **kwargs) 1345 1346# This is called from script-style iotests with a single point of entry 1347def script_main(test_function, *args, **kwargs): 1348 """Run script-style tests outside of the unittest framework""" 1349 activate_logging() 1350 execute_test(*args, test_function=test_function, **kwargs) 1351 1352# This is called from unittest style iotests 1353def main(*args, **kwargs): 1354 """Run tests using the unittest framework""" 1355 execute_test(*args, **kwargs) 1356