1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78output_dir = os.environ.get('OUTPUT_DIR', '.') 79 80try: 81 test_dir = os.environ['TEST_DIR'] 82 sock_dir = os.environ['SOCK_DIR'] 83 cachemode = os.environ['CACHEMODE'] 84 aiomode = os.environ['AIOMODE'] 85 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE'] 86except KeyError: 87 # We are using these variables as proxies to indicate that we're 88 # not being run via "check". There may be other things set up by 89 # "check" that individual test cases rely on. 90 sys.stderr.write('Please run this test via the "check" script\n') 91 sys.exit(os.EX_USAGE) 92 93socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 94 95luks_default_secret_object = 'secret,id=keysec0,data=' + \ 96 os.environ.get('IMGKEYSECRET', '') 97luks_default_key_secret_opt = 'key-secret=keysec0' 98 99 100def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 101 connect_stderr: bool = True) -> Tuple[str, int]: 102 """ 103 Run a tool and return both its output and its exit code 104 """ 105 stderr = subprocess.STDOUT if connect_stderr else None 106 subp = subprocess.Popen(args, 107 stdout=subprocess.PIPE, 108 stderr=stderr, 109 universal_newlines=True) 110 output = subp.communicate()[0] 111 if subp.returncode < 0: 112 cmd = ' '.join(args) 113 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') 114 return (output, subp.returncode) 115 116def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 117 """ 118 Run qemu-img and return both its output and its exit code 119 """ 120 full_args = qemu_img_args + list(args) 121 return qemu_tool_pipe_and_status('qemu-img', full_args) 122 123def qemu_img(*args: str) -> int: 124 '''Run qemu-img and return the exit code''' 125 return qemu_img_pipe_and_status(*args)[1] 126 127def ordered_qmp(qmsg, conv_keys=True): 128 # Dictionaries are not ordered prior to 3.6, therefore: 129 if isinstance(qmsg, list): 130 return [ordered_qmp(atom) for atom in qmsg] 131 if isinstance(qmsg, dict): 132 od = OrderedDict() 133 for k, v in sorted(qmsg.items()): 134 if conv_keys: 135 k = k.replace('_', '-') 136 od[k] = ordered_qmp(v, conv_keys=False) 137 return od 138 return qmsg 139 140def qemu_img_create(*args): 141 args = list(args) 142 143 # default luks support 144 if '-f' in args and args[args.index('-f') + 1] == 'luks': 145 if '-o' in args: 146 i = args.index('-o') 147 if 'key-secret' not in args[i + 1]: 148 args[i + 1].append(luks_default_key_secret_opt) 149 args.insert(i + 2, '--object') 150 args.insert(i + 3, luks_default_secret_object) 151 else: 152 args = ['-o', luks_default_key_secret_opt, 153 '--object', luks_default_secret_object] + args 154 155 args.insert(0, 'create') 156 157 return qemu_img(*args) 158 159def qemu_img_measure(*args): 160 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 161 162def qemu_img_check(*args): 163 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 164 165def qemu_img_verbose(*args): 166 '''Run qemu-img without suppressing its output and return the exit code''' 167 exitcode = subprocess.call(qemu_img_args + list(args)) 168 if exitcode < 0: 169 sys.stderr.write('qemu-img received signal %i: %s\n' 170 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 171 return exitcode 172 173def qemu_img_pipe(*args: str) -> str: 174 '''Run qemu-img and return its output''' 175 return qemu_img_pipe_and_status(*args)[0] 176 177def qemu_img_log(*args): 178 result = qemu_img_pipe(*args) 179 log(result, filters=[filter_testfiles]) 180 return result 181 182def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 183 args = ['info'] 184 if imgopts: 185 args.append('--image-opts') 186 else: 187 args += ['-f', imgfmt] 188 args += extra_args 189 args.append(filename) 190 191 output = qemu_img_pipe(*args) 192 if not filter_path: 193 filter_path = filename 194 log(filter_img_info(output, filter_path)) 195 196def qemu_io(*args): 197 '''Run qemu-io and return the stdout data''' 198 args = qemu_io_args + list(args) 199 return qemu_tool_pipe_and_status('qemu-io', args)[0] 200 201def qemu_io_log(*args): 202 result = qemu_io(*args) 203 log(result, filters=[filter_testfiles, filter_qemu_io]) 204 return result 205 206def qemu_io_silent(*args): 207 '''Run qemu-io and return the exit code, suppressing stdout''' 208 if '-f' in args or '--image-opts' in args: 209 default_args = qemu_io_args_no_fmt 210 else: 211 default_args = qemu_io_args 212 213 args = default_args + list(args) 214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 215 if exitcode < 0: 216 sys.stderr.write('qemu-io received signal %i: %s\n' % 217 (-exitcode, ' '.join(args))) 218 return exitcode 219 220def qemu_io_silent_check(*args): 221 '''Run qemu-io and return the true if subprocess returned 0''' 222 args = qemu_io_args + list(args) 223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 224 stderr=subprocess.STDOUT) 225 return exitcode == 0 226 227def get_virtio_scsi_device(): 228 if qemu_default_machine == 's390-ccw-virtio': 229 return 'virtio-scsi-ccw' 230 return 'virtio-scsi-pci' 231 232class QemuIoInteractive: 233 def __init__(self, *args): 234 self.args = qemu_io_args_no_fmt + list(args) 235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 236 stdout=subprocess.PIPE, 237 stderr=subprocess.STDOUT, 238 universal_newlines=True) 239 out = self._p.stdout.read(9) 240 if out != 'qemu-io> ': 241 # Most probably qemu-io just failed to start. 242 # Let's collect the whole output and exit. 243 out += self._p.stdout.read() 244 self._p.wait(timeout=1) 245 raise ValueError(out) 246 247 def close(self): 248 self._p.communicate('q\n') 249 250 def _read_output(self): 251 pattern = 'qemu-io> ' 252 n = len(pattern) 253 pos = 0 254 s = [] 255 while pos != n: 256 c = self._p.stdout.read(1) 257 # check unexpected EOF 258 assert c != '' 259 s.append(c) 260 if c == pattern[pos]: 261 pos += 1 262 else: 263 pos = 0 264 265 return ''.join(s[:-n]) 266 267 def cmd(self, cmd): 268 # quit command is in close(), '\n' is added automatically 269 assert '\n' not in cmd 270 cmd = cmd.strip() 271 assert cmd not in ('q', 'quit') 272 self._p.stdin.write(cmd + '\n') 273 self._p.stdin.flush() 274 return self._read_output() 275 276 277def qemu_nbd(*args): 278 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 280 281def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 282 '''Run qemu-nbd in daemon mode and return both the parent's exit code 283 and its output in case of an error''' 284 full_args = qemu_nbd_args + ['--fork'] + list(args) 285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 286 connect_stderr=False) 287 return returncode, output if returncode else '' 288 289def qemu_nbd_list_log(*args: str) -> str: 290 '''Run qemu-nbd to list remote exports''' 291 full_args = [qemu_nbd_prog, '-L'] + list(args) 292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 293 log(output, filters=[filter_testfiles, filter_nbd_exports]) 294 return output 295 296@contextmanager 297def qemu_nbd_popen(*args): 298 '''Context manager running qemu-nbd within the context''' 299 pid_file = file_path("qemu_nbd_popen-nbd-pid-file") 300 301 assert not os.path.exists(pid_file) 302 303 cmd = list(qemu_nbd_args) 304 cmd.extend(('--persistent', '--pid-file', pid_file)) 305 cmd.extend(args) 306 307 log('Start NBD server') 308 p = subprocess.Popen(cmd) 309 try: 310 while not os.path.exists(pid_file): 311 if p.poll() is not None: 312 raise RuntimeError( 313 "qemu-nbd terminated with exit code {}: {}" 314 .format(p.returncode, ' '.join(cmd))) 315 316 time.sleep(0.01) 317 yield 318 finally: 319 if os.path.exists(pid_file): 320 os.remove(pid_file) 321 log('Kill NBD server') 322 p.kill() 323 p.wait() 324 325def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 326 '''Return True if two image files are identical''' 327 return qemu_img('compare', '-f', fmt1, 328 '-F', fmt2, img1, img2) == 0 329 330def create_image(name, size): 331 '''Create a fully-allocated raw image with sector markers''' 332 file = open(name, 'wb') 333 i = 0 334 while i < size: 335 sector = struct.pack('>l504xl', i // 512, i // 512) 336 file.write(sector) 337 i = i + 512 338 file.close() 339 340def image_size(img): 341 '''Return image's virtual size''' 342 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 343 return json.loads(r)['virtual-size'] 344 345def is_str(val): 346 return isinstance(val, str) 347 348test_dir_re = re.compile(r"%s" % test_dir) 349def filter_test_dir(msg): 350 return test_dir_re.sub("TEST_DIR", msg) 351 352win32_re = re.compile(r"\r") 353def filter_win32(msg): 354 return win32_re.sub("", msg) 355 356qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 357 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 358 r"and [0-9\/.inf]* ops\/sec\)") 359def filter_qemu_io(msg): 360 msg = filter_win32(msg) 361 return qemu_io_re.sub("X ops; XX:XX:XX.X " 362 "(XXX YYY/sec and XXX ops/sec)", msg) 363 364chown_re = re.compile(r"chown [0-9]+:[0-9]+") 365def filter_chown(msg): 366 return chown_re.sub("chown UID:GID", msg) 367 368def filter_qmp_event(event): 369 '''Filter a QMP event dict''' 370 event = dict(event) 371 if 'timestamp' in event: 372 event['timestamp']['seconds'] = 'SECS' 373 event['timestamp']['microseconds'] = 'USECS' 374 return event 375 376def filter_qmp(qmsg, filter_fn): 377 '''Given a string filter, filter a QMP object's values. 378 filter_fn takes a (key, value) pair.''' 379 # Iterate through either lists or dicts; 380 if isinstance(qmsg, list): 381 items = enumerate(qmsg) 382 else: 383 items = qmsg.items() 384 385 for k, v in items: 386 if isinstance(v, (dict, list)): 387 qmsg[k] = filter_qmp(v, filter_fn) 388 else: 389 qmsg[k] = filter_fn(k, v) 390 return qmsg 391 392def filter_testfiles(msg): 393 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 394 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 395 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 396 397def filter_qmp_testfiles(qmsg): 398 def _filter(_key, value): 399 if is_str(value): 400 return filter_testfiles(value) 401 return value 402 return filter_qmp(qmsg, _filter) 403 404def filter_virtio_scsi(output: str) -> str: 405 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 406 407def filter_qmp_virtio_scsi(qmsg): 408 def _filter(_key, value): 409 if is_str(value): 410 return filter_virtio_scsi(value) 411 return value 412 return filter_qmp(qmsg, _filter) 413 414def filter_generated_node_ids(msg): 415 return re.sub("#block[0-9]+", "NODE_NAME", msg) 416 417def filter_img_info(output, filename): 418 lines = [] 419 for line in output.split('\n'): 420 if 'disk size' in line or 'actual-size' in line: 421 continue 422 line = line.replace(filename, 'TEST_IMG') 423 line = filter_testfiles(line) 424 line = line.replace(imgfmt, 'IMGFMT') 425 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 426 line = re.sub('uuid: [-a-f0-9]+', 427 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 428 line) 429 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 430 lines.append(line) 431 return '\n'.join(lines) 432 433def filter_imgfmt(msg): 434 return msg.replace(imgfmt, 'IMGFMT') 435 436def filter_qmp_imgfmt(qmsg): 437 def _filter(_key, value): 438 if is_str(value): 439 return filter_imgfmt(value) 440 return value 441 return filter_qmp(qmsg, _filter) 442 443def filter_nbd_exports(output: str) -> str: 444 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 445 446 447Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 448 449def log(msg: Msg, 450 filters: Iterable[Callable[[Msg], Msg]] = (), 451 indent: Optional[int] = None) -> None: 452 """ 453 Logs either a string message or a JSON serializable message (like QMP). 454 If indent is provided, JSON serializable messages are pretty-printed. 455 """ 456 for flt in filters: 457 msg = flt(msg) 458 if isinstance(msg, (dict, list)): 459 # Don't sort if it's already sorted 460 do_sort = not isinstance(msg, OrderedDict) 461 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 462 else: 463 test_logger.info(msg) 464 465class Timeout: 466 def __init__(self, seconds, errmsg="Timeout"): 467 self.seconds = seconds 468 self.errmsg = errmsg 469 def __enter__(self): 470 signal.signal(signal.SIGALRM, self.timeout) 471 signal.setitimer(signal.ITIMER_REAL, self.seconds) 472 return self 473 def __exit__(self, exc_type, value, traceback): 474 signal.setitimer(signal.ITIMER_REAL, 0) 475 return False 476 def timeout(self, signum, frame): 477 raise Exception(self.errmsg) 478 479def file_pattern(name): 480 return "{0}-{1}".format(os.getpid(), name) 481 482class FilePath: 483 """ 484 Context manager generating multiple file names. The generated files are 485 removed when exiting the context. 486 487 Example usage: 488 489 with FilePath('a.img', 'b.img') as (img_a, img_b): 490 # Use img_a and img_b here... 491 492 # a.img and b.img are automatically removed here. 493 494 By default images are created in iotests.test_dir. To create sockets use 495 iotests.sock_dir: 496 497 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 498 499 For convenience, calling with one argument yields a single file instead of 500 a tuple with one item. 501 502 """ 503 def __init__(self, *names, base_dir=test_dir): 504 self.paths = [os.path.join(base_dir, file_pattern(name)) 505 for name in names] 506 507 def __enter__(self): 508 if len(self.paths) == 1: 509 return self.paths[0] 510 else: 511 return self.paths 512 513 def __exit__(self, exc_type, exc_val, exc_tb): 514 for path in self.paths: 515 try: 516 os.remove(path) 517 except OSError: 518 pass 519 return False 520 521 522def try_remove(img): 523 try: 524 os.remove(img) 525 except OSError: 526 pass 527 528def file_path_remover(): 529 for path in reversed(file_path_remover.paths): 530 try_remove(path) 531 532 533def file_path(*names, base_dir=test_dir): 534 ''' Another way to get auto-generated filename that cleans itself up. 535 536 Use is as simple as: 537 538 img_a, img_b = file_path('a.img', 'b.img') 539 sock = file_path('socket') 540 ''' 541 542 if not hasattr(file_path_remover, 'paths'): 543 file_path_remover.paths = [] 544 atexit.register(file_path_remover) 545 546 paths = [] 547 for name in names: 548 filename = file_pattern(name) 549 path = os.path.join(base_dir, filename) 550 file_path_remover.paths.append(path) 551 paths.append(path) 552 553 return paths[0] if len(paths) == 1 else paths 554 555def remote_filename(path): 556 if imgproto == 'file': 557 return path 558 elif imgproto == 'ssh': 559 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 560 else: 561 raise Exception("Protocol %s not supported" % (imgproto)) 562 563class VM(qtest.QEMUQtestMachine): 564 '''A QEMU VM''' 565 566 def __init__(self, path_suffix=''): 567 name = "qemu%s-%d" % (path_suffix, os.getpid()) 568 super().__init__(qemu_prog, qemu_opts, name=name, 569 test_dir=test_dir, 570 socket_scm_helper=socket_scm_helper, 571 sock_dir=sock_dir) 572 self._num_drives = 0 573 574 def add_object(self, opts): 575 self._args.append('-object') 576 self._args.append(opts) 577 return self 578 579 def add_device(self, opts): 580 self._args.append('-device') 581 self._args.append(opts) 582 return self 583 584 def add_drive_raw(self, opts): 585 self._args.append('-drive') 586 self._args.append(opts) 587 return self 588 589 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 590 '''Add a virtio-blk drive to the VM''' 591 options = ['if=%s' % interface, 592 'id=drive%d' % self._num_drives] 593 594 if path is not None: 595 options.append('file=%s' % path) 596 options.append('format=%s' % img_format) 597 options.append('cache=%s' % cachemode) 598 options.append('aio=%s' % aiomode) 599 600 if opts: 601 options.append(opts) 602 603 if img_format == 'luks' and 'key-secret' not in opts: 604 # default luks support 605 if luks_default_secret_object not in self._args: 606 self.add_object(luks_default_secret_object) 607 608 options.append(luks_default_key_secret_opt) 609 610 self._args.append('-drive') 611 self._args.append(','.join(options)) 612 self._num_drives += 1 613 return self 614 615 def add_blockdev(self, opts): 616 self._args.append('-blockdev') 617 if isinstance(opts, str): 618 self._args.append(opts) 619 else: 620 self._args.append(','.join(opts)) 621 return self 622 623 def add_incoming(self, addr): 624 self._args.append('-incoming') 625 self._args.append(addr) 626 return self 627 628 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 629 cmd = 'human-monitor-command' 630 kwargs: Dict[str, Any] = {'command-line': command_line} 631 if use_log: 632 return self.qmp_log(cmd, **kwargs) 633 else: 634 return self.qmp(cmd, **kwargs) 635 636 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 637 """Pause drive r/w operations""" 638 if not event: 639 self.pause_drive(drive, "read_aio") 640 self.pause_drive(drive, "write_aio") 641 return 642 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 643 644 def resume_drive(self, drive: str) -> None: 645 """Resume drive r/w operations""" 646 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 647 648 def hmp_qemu_io(self, drive: str, cmd: str, 649 use_log: bool = False) -> QMPMessage: 650 """Write to a given drive using an HMP command""" 651 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 652 653 def flatten_qmp_object(self, obj, output=None, basestr=''): 654 if output is None: 655 output = dict() 656 if isinstance(obj, list): 657 for i, item in enumerate(obj): 658 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 659 elif isinstance(obj, dict): 660 for key in obj: 661 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 662 else: 663 output[basestr[:-1]] = obj # Strip trailing '.' 664 return output 665 666 def qmp_to_opts(self, obj): 667 obj = self.flatten_qmp_object(obj) 668 output_list = list() 669 for key in obj: 670 output_list += [key + '=' + obj[key]] 671 return ','.join(output_list) 672 673 def get_qmp_events_filtered(self, wait=60.0): 674 result = [] 675 for ev in self.get_qmp_events(wait=wait): 676 result.append(filter_qmp_event(ev)) 677 return result 678 679 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 680 full_cmd = OrderedDict(( 681 ("execute", cmd), 682 ("arguments", ordered_qmp(kwargs)) 683 )) 684 log(full_cmd, filters, indent=indent) 685 result = self.qmp(cmd, **kwargs) 686 log(result, filters, indent=indent) 687 return result 688 689 # Returns None on success, and an error string on failure 690 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 691 pre_finalize=None, cancel=False, wait=60.0): 692 """ 693 run_job moves a job from creation through to dismissal. 694 695 :param job: String. ID of recently-launched job 696 :param auto_finalize: Bool. True if the job was launched with 697 auto_finalize. Defaults to True. 698 :param auto_dismiss: Bool. True if the job was launched with 699 auto_dismiss=True. Defaults to False. 700 :param pre_finalize: Callback. A callable that takes no arguments to be 701 invoked prior to issuing job-finalize, if any. 702 :param cancel: Bool. When true, cancels the job after the pre_finalize 703 callback. 704 :param wait: Float. Timeout value specifying how long to wait for any 705 event, in seconds. Defaults to 60.0. 706 """ 707 match_device = {'data': {'device': job}} 708 match_id = {'data': {'id': job}} 709 events = [ 710 ('BLOCK_JOB_COMPLETED', match_device), 711 ('BLOCK_JOB_CANCELLED', match_device), 712 ('BLOCK_JOB_ERROR', match_device), 713 ('BLOCK_JOB_READY', match_device), 714 ('BLOCK_JOB_PENDING', match_id), 715 ('JOB_STATUS_CHANGE', match_id) 716 ] 717 error = None 718 while True: 719 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 720 if ev['event'] != 'JOB_STATUS_CHANGE': 721 log(ev) 722 continue 723 status = ev['data']['status'] 724 if status == 'aborting': 725 result = self.qmp('query-jobs') 726 for j in result['return']: 727 if j['id'] == job: 728 error = j['error'] 729 log('Job failed: %s' % (j['error'])) 730 elif status == 'ready': 731 self.qmp_log('job-complete', id=job) 732 elif status == 'pending' and not auto_finalize: 733 if pre_finalize: 734 pre_finalize() 735 if cancel: 736 self.qmp_log('job-cancel', id=job) 737 else: 738 self.qmp_log('job-finalize', id=job) 739 elif status == 'concluded' and not auto_dismiss: 740 self.qmp_log('job-dismiss', id=job) 741 elif status == 'null': 742 return error 743 744 # Returns None on success, and an error string on failure 745 def blockdev_create(self, options, job_id='job0', filters=None): 746 if filters is None: 747 filters = [filter_qmp_testfiles] 748 result = self.qmp_log('blockdev-create', filters=filters, 749 job_id=job_id, options=options) 750 751 if 'return' in result: 752 assert result['return'] == {} 753 job_result = self.run_job(job_id) 754 else: 755 job_result = result['error'] 756 757 log("") 758 return job_result 759 760 def enable_migration_events(self, name): 761 log('Enabling migration QMP events on %s...' % name) 762 log(self.qmp('migrate-set-capabilities', capabilities=[ 763 { 764 'capability': 'events', 765 'state': True 766 } 767 ])) 768 769 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 770 while True: 771 event = self.event_wait('MIGRATION') 772 # We use the default timeout, and with a timeout, event_wait() 773 # never returns None 774 assert event 775 776 log(event, filters=[filter_qmp_event]) 777 if event['data']['status'] in ('completed', 'failed'): 778 break 779 780 if event['data']['status'] == 'completed': 781 # The event may occur in finish-migrate, so wait for the expected 782 # post-migration runstate 783 runstate = None 784 while runstate != expect_runstate: 785 runstate = self.qmp('query-status')['return']['status'] 786 return True 787 else: 788 return False 789 790 def node_info(self, node_name): 791 nodes = self.qmp('query-named-block-nodes') 792 for x in nodes['return']: 793 if x['node-name'] == node_name: 794 return x 795 return None 796 797 def query_bitmaps(self): 798 res = self.qmp("query-named-block-nodes") 799 return {device['node-name']: device['dirty-bitmaps'] 800 for device in res['return'] if 'dirty-bitmaps' in device} 801 802 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 803 """ 804 get a specific bitmap from the object returned by query_bitmaps. 805 :param recording: If specified, filter results by the specified value. 806 :param bitmaps: If specified, use it instead of call query_bitmaps() 807 """ 808 if bitmaps is None: 809 bitmaps = self.query_bitmaps() 810 811 for bitmap in bitmaps[node_name]: 812 if bitmap.get('name', '') == bitmap_name: 813 if recording is None or bitmap.get('recording') == recording: 814 return bitmap 815 return None 816 817 def check_bitmap_status(self, node_name, bitmap_name, fields): 818 ret = self.get_bitmap(node_name, bitmap_name) 819 820 return fields.items() <= ret.items() 821 822 def assert_block_path(self, root, path, expected_node, graph=None): 823 """ 824 Check whether the node under the given path in the block graph 825 is @expected_node. 826 827 @root is the node name of the node where the @path is rooted. 828 829 @path is a string that consists of child names separated by 830 slashes. It must begin with a slash. 831 832 Examples for @root + @path: 833 - root="qcow2-node", path="/backing/file" 834 - root="quorum-node", path="/children.2/file" 835 836 Hypothetically, @path could be empty, in which case it would 837 point to @root. However, in practice this case is not useful 838 and hence not allowed. 839 840 @expected_node may be None. (All elements of the path but the 841 leaf must still exist.) 842 843 @graph may be None or the result of an x-debug-query-block-graph 844 call that has already been performed. 845 """ 846 if graph is None: 847 graph = self.qmp('x-debug-query-block-graph')['return'] 848 849 iter_path = iter(path.split('/')) 850 851 # Must start with a / 852 assert next(iter_path) == '' 853 854 node = next((node for node in graph['nodes'] if node['name'] == root), 855 None) 856 857 # An empty @path is not allowed, so the root node must be present 858 assert node is not None, 'Root node %s not found' % root 859 860 for child_name in iter_path: 861 assert node is not None, 'Cannot follow path %s%s' % (root, path) 862 863 try: 864 node_id = next(edge['child'] for edge in graph['edges'] 865 if (edge['parent'] == node['id'] and 866 edge['name'] == child_name)) 867 868 node = next(node for node in graph['nodes'] 869 if node['id'] == node_id) 870 871 except StopIteration: 872 node = None 873 874 if node is None: 875 assert expected_node is None, \ 876 'No node found under %s (but expected %s)' % \ 877 (path, expected_node) 878 else: 879 assert node['name'] == expected_node, \ 880 'Found node %s under %s (but expected %s)' % \ 881 (node['name'], path, expected_node) 882 883index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 884 885class QMPTestCase(unittest.TestCase): 886 '''Abstract base class for QMP test cases''' 887 888 def __init__(self, *args, **kwargs): 889 super().__init__(*args, **kwargs) 890 # Many users of this class set a VM property we rely on heavily 891 # in the methods below. 892 self.vm = None 893 894 def dictpath(self, d, path): 895 '''Traverse a path in a nested dict''' 896 for component in path.split('/'): 897 m = index_re.match(component) 898 if m: 899 component, idx = m.groups() 900 idx = int(idx) 901 902 if not isinstance(d, dict) or component not in d: 903 self.fail(f'failed path traversal for "{path}" in "{d}"') 904 d = d[component] 905 906 if m: 907 if not isinstance(d, list): 908 self.fail(f'path component "{component}" in "{path}" ' 909 f'is not a list in "{d}"') 910 try: 911 d = d[idx] 912 except IndexError: 913 self.fail(f'invalid index "{idx}" in path "{path}" ' 914 f'in "{d}"') 915 return d 916 917 def assert_qmp_absent(self, d, path): 918 try: 919 result = self.dictpath(d, path) 920 except AssertionError: 921 return 922 self.fail('path "%s" has value "%s"' % (path, str(result))) 923 924 def assert_qmp(self, d, path, value): 925 '''Assert that the value for a specific path in a QMP dict 926 matches. When given a list of values, assert that any of 927 them matches.''' 928 929 result = self.dictpath(d, path) 930 931 # [] makes no sense as a list of valid values, so treat it as 932 # an actual single value. 933 if isinstance(value, list) and value != []: 934 for v in value: 935 if result == v: 936 return 937 self.fail('no match for "%s" in %s' % (str(result), str(value))) 938 else: 939 self.assertEqual(result, value, 940 '"%s" is "%s", expected "%s"' 941 % (path, str(result), str(value))) 942 943 def assert_no_active_block_jobs(self): 944 result = self.vm.qmp('query-block-jobs') 945 self.assert_qmp(result, 'return', []) 946 947 def assert_has_block_node(self, node_name=None, file_name=None): 948 """Issue a query-named-block-nodes and assert node_name and/or 949 file_name is present in the result""" 950 def check_equal_or_none(a, b): 951 return a is None or b is None or a == b 952 assert node_name or file_name 953 result = self.vm.qmp('query-named-block-nodes') 954 for x in result["return"]: 955 if check_equal_or_none(x.get("node-name"), node_name) and \ 956 check_equal_or_none(x.get("file"), file_name): 957 return 958 self.fail("Cannot find %s %s in result:\n%s" % 959 (node_name, file_name, result)) 960 961 def assert_json_filename_equal(self, json_filename, reference): 962 '''Asserts that the given filename is a json: filename and that its 963 content is equal to the given reference object''' 964 self.assertEqual(json_filename[:5], 'json:') 965 self.assertEqual( 966 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 967 self.vm.flatten_qmp_object(reference) 968 ) 969 970 def cancel_and_wait(self, drive='drive0', force=False, 971 resume=False, wait=60.0): 972 '''Cancel a block job and wait for it to finish, returning the event''' 973 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 974 self.assert_qmp(result, 'return', {}) 975 976 if resume: 977 self.vm.resume_drive(drive) 978 979 cancelled = False 980 result = None 981 while not cancelled: 982 for event in self.vm.get_qmp_events(wait=wait): 983 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 984 event['event'] == 'BLOCK_JOB_CANCELLED': 985 self.assert_qmp(event, 'data/device', drive) 986 result = event 987 cancelled = True 988 elif event['event'] == 'JOB_STATUS_CHANGE': 989 self.assert_qmp(event, 'data/id', drive) 990 991 992 self.assert_no_active_block_jobs() 993 return result 994 995 def wait_until_completed(self, drive='drive0', check_offset=True, 996 wait=60.0, error=None): 997 '''Wait for a block job to finish, returning the event''' 998 while True: 999 for event in self.vm.get_qmp_events(wait=wait): 1000 if event['event'] == 'BLOCK_JOB_COMPLETED': 1001 self.assert_qmp(event, 'data/device', drive) 1002 if error is None: 1003 self.assert_qmp_absent(event, 'data/error') 1004 if check_offset: 1005 self.assert_qmp(event, 'data/offset', 1006 event['data']['len']) 1007 else: 1008 self.assert_qmp(event, 'data/error', error) 1009 self.assert_no_active_block_jobs() 1010 return event 1011 if event['event'] == 'JOB_STATUS_CHANGE': 1012 self.assert_qmp(event, 'data/id', drive) 1013 1014 def wait_ready(self, drive='drive0'): 1015 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1016 return self.vm.events_wait([ 1017 ('BLOCK_JOB_READY', 1018 {'data': {'type': 'mirror', 'device': drive}}), 1019 ('BLOCK_JOB_READY', 1020 {'data': {'type': 'commit', 'device': drive}}) 1021 ]) 1022 1023 def wait_ready_and_cancel(self, drive='drive0'): 1024 self.wait_ready(drive=drive) 1025 event = self.cancel_and_wait(drive=drive) 1026 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1027 self.assert_qmp(event, 'data/type', 'mirror') 1028 self.assert_qmp(event, 'data/offset', event['data']['len']) 1029 1030 def complete_and_wait(self, drive='drive0', wait_ready=True, 1031 completion_error=None): 1032 '''Complete a block job and wait for it to finish''' 1033 if wait_ready: 1034 self.wait_ready(drive=drive) 1035 1036 result = self.vm.qmp('block-job-complete', device=drive) 1037 self.assert_qmp(result, 'return', {}) 1038 1039 event = self.wait_until_completed(drive=drive, error=completion_error) 1040 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1041 1042 def pause_wait(self, job_id='job0'): 1043 with Timeout(3, "Timeout waiting for job to pause"): 1044 while True: 1045 result = self.vm.qmp('query-block-jobs') 1046 found = False 1047 for job in result['return']: 1048 if job['device'] == job_id: 1049 found = True 1050 if job['paused'] and not job['busy']: 1051 return job 1052 break 1053 assert found 1054 1055 def pause_job(self, job_id='job0', wait=True): 1056 result = self.vm.qmp('block-job-pause', device=job_id) 1057 self.assert_qmp(result, 'return', {}) 1058 if wait: 1059 return self.pause_wait(job_id) 1060 return result 1061 1062 def case_skip(self, reason): 1063 '''Skip this test case''' 1064 case_notrun(reason) 1065 self.skipTest(reason) 1066 1067 1068def notrun(reason): 1069 '''Skip this test suite''' 1070 # Each test in qemu-iotests has a number ("seq") 1071 seq = os.path.basename(sys.argv[0]) 1072 1073 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1074 logger.warning("%s not run: %s", seq, reason) 1075 sys.exit(0) 1076 1077def case_notrun(reason): 1078 '''Mark this test case as not having been run (without actually 1079 skipping it, that is left to the caller). See 1080 QMPTestCase.case_skip() for a variant that actually skips the 1081 current test case.''' 1082 1083 # Each test in qemu-iotests has a number ("seq") 1084 seq = os.path.basename(sys.argv[0]) 1085 1086 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1087 ' [case not run] ' + reason + '\n') 1088 1089def _verify_image_format(supported_fmts: Sequence[str] = (), 1090 unsupported_fmts: Sequence[str] = ()) -> None: 1091 if 'generic' in supported_fmts and \ 1092 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1093 # similar to 1094 # _supported_fmt generic 1095 # for bash tests 1096 supported_fmts = () 1097 1098 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1099 if not_sup or (imgfmt in unsupported_fmts): 1100 notrun('not suitable for this image format: %s' % imgfmt) 1101 1102 if imgfmt == 'luks': 1103 verify_working_luks() 1104 1105def _verify_protocol(supported: Sequence[str] = (), 1106 unsupported: Sequence[str] = ()) -> None: 1107 assert not (supported and unsupported) 1108 1109 if 'generic' in supported: 1110 return 1111 1112 not_sup = supported and (imgproto not in supported) 1113 if not_sup or (imgproto in unsupported): 1114 notrun('not suitable for this protocol: %s' % imgproto) 1115 1116def _verify_platform(supported: Sequence[str] = (), 1117 unsupported: Sequence[str] = ()) -> None: 1118 if any((sys.platform.startswith(x) for x in unsupported)): 1119 notrun('not suitable for this OS: %s' % sys.platform) 1120 1121 if supported: 1122 if not any((sys.platform.startswith(x) for x in supported)): 1123 notrun('not suitable for this OS: %s' % sys.platform) 1124 1125def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1126 if supported_cache_modes and (cachemode not in supported_cache_modes): 1127 notrun('not suitable for this cache mode: %s' % cachemode) 1128 1129def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1130 if supported_aio_modes and (aiomode not in supported_aio_modes): 1131 notrun('not suitable for this aio mode: %s' % aiomode) 1132 1133def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1134 usf_list = list(set(required_formats) - set(supported_formats())) 1135 if usf_list: 1136 notrun(f'formats {usf_list} are not whitelisted') 1137 1138 1139def _verify_virtio_blk() -> None: 1140 out = qemu_pipe('-M', 'none', '-device', 'help') 1141 if 'virtio-blk' not in out: 1142 notrun('Missing virtio-blk in QEMU binary') 1143 1144 1145def supports_quorum(): 1146 return 'quorum' in qemu_img_pipe('--help') 1147 1148def verify_quorum(): 1149 '''Skip test suite if quorum support is not available''' 1150 if not supports_quorum(): 1151 notrun('quorum support missing') 1152 1153def has_working_luks() -> Tuple[bool, str]: 1154 """ 1155 Check whether our LUKS driver can actually create images 1156 (this extends to LUKS encryption for qcow2). 1157 1158 If not, return the reason why. 1159 """ 1160 1161 img_file = f'{test_dir}/luks-test.luks' 1162 (output, status) = \ 1163 qemu_img_pipe_and_status('create', '-f', 'luks', 1164 '--object', luks_default_secret_object, 1165 '-o', luks_default_key_secret_opt, 1166 '-o', 'iter-time=10', 1167 img_file, '1G') 1168 try: 1169 os.remove(img_file) 1170 except OSError: 1171 pass 1172 1173 if status != 0: 1174 reason = output 1175 for line in output.splitlines(): 1176 if img_file + ':' in line: 1177 reason = line.split(img_file + ':', 1)[1].strip() 1178 break 1179 1180 return (False, reason) 1181 else: 1182 return (True, '') 1183 1184def verify_working_luks(): 1185 """ 1186 Skip test suite if LUKS does not work 1187 """ 1188 (working, reason) = has_working_luks() 1189 if not working: 1190 notrun(reason) 1191 1192def qemu_pipe(*args: str) -> str: 1193 """ 1194 Run qemu with an option to print something and exit (e.g. a help option). 1195 1196 :return: QEMU's stdout output. 1197 """ 1198 full_args = [qemu_prog] + qemu_opts + list(args) 1199 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1200 return output 1201 1202def supported_formats(read_only=False): 1203 '''Set 'read_only' to True to check ro-whitelist 1204 Otherwise, rw-whitelist is checked''' 1205 1206 if not hasattr(supported_formats, "formats"): 1207 supported_formats.formats = {} 1208 1209 if read_only not in supported_formats.formats: 1210 format_message = qemu_pipe("-drive", "format=help") 1211 line = 1 if read_only else 0 1212 supported_formats.formats[read_only] = \ 1213 format_message.splitlines()[line].split(":")[1].split() 1214 1215 return supported_formats.formats[read_only] 1216 1217def skip_if_unsupported(required_formats=(), read_only=False): 1218 '''Skip Test Decorator 1219 Runs the test if all the required formats are whitelisted''' 1220 def skip_test_decorator(func): 1221 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1222 **kwargs: Dict[str, Any]) -> None: 1223 if callable(required_formats): 1224 fmts = required_formats(test_case) 1225 else: 1226 fmts = required_formats 1227 1228 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1229 if usf_list: 1230 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1231 test_case.case_skip(msg) 1232 else: 1233 func(test_case, *args, **kwargs) 1234 return func_wrapper 1235 return skip_test_decorator 1236 1237def skip_for_formats(formats: Sequence[str] = ()) \ 1238 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1239 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1240 '''Skip Test Decorator 1241 Skips the test for the given formats''' 1242 def skip_test_decorator(func): 1243 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1244 **kwargs: Dict[str, Any]) -> None: 1245 if imgfmt in formats: 1246 msg = f'{test_case}: Skipped for format {imgfmt}' 1247 test_case.case_skip(msg) 1248 else: 1249 func(test_case, *args, **kwargs) 1250 return func_wrapper 1251 return skip_test_decorator 1252 1253def skip_if_user_is_root(func): 1254 '''Skip Test Decorator 1255 Runs the test only without root permissions''' 1256 def func_wrapper(*args, **kwargs): 1257 if os.getuid() == 0: 1258 case_notrun('{}: cannot be run as root'.format(args[0])) 1259 return None 1260 else: 1261 return func(*args, **kwargs) 1262 return func_wrapper 1263 1264def execute_unittest(debug=False): 1265 """Executes unittests within the calling module.""" 1266 1267 verbosity = 2 if debug else 1 1268 1269 if debug: 1270 output = sys.stdout 1271 else: 1272 # We need to filter out the time taken from the output so that 1273 # qemu-iotest can reliably diff the results against master output. 1274 output = io.StringIO() 1275 1276 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1277 verbosity=verbosity) 1278 try: 1279 # unittest.main() will use sys.exit(); so expect a SystemExit 1280 # exception 1281 unittest.main(testRunner=runner) 1282 finally: 1283 # We need to filter out the time taken from the output so that 1284 # qemu-iotest can reliably diff the results against master output. 1285 if not debug: 1286 out = output.getvalue() 1287 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1288 1289 # Hide skipped tests from the reference output 1290 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1291 out_first_line, out_rest = out.split('\n', 1) 1292 out = out_first_line.replace('s', '.') + '\n' + out_rest 1293 1294 sys.stderr.write(out) 1295 1296def execute_setup_common(supported_fmts: Sequence[str] = (), 1297 supported_platforms: Sequence[str] = (), 1298 supported_cache_modes: Sequence[str] = (), 1299 supported_aio_modes: Sequence[str] = (), 1300 unsupported_fmts: Sequence[str] = (), 1301 supported_protocols: Sequence[str] = (), 1302 unsupported_protocols: Sequence[str] = (), 1303 required_fmts: Sequence[str] = ()) -> bool: 1304 """ 1305 Perform necessary setup for either script-style or unittest-style tests. 1306 1307 :return: Bool; Whether or not debug mode has been requested via the CLI. 1308 """ 1309 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1310 1311 debug = '-d' in sys.argv 1312 if debug: 1313 sys.argv.remove('-d') 1314 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1315 1316 _verify_image_format(supported_fmts, unsupported_fmts) 1317 _verify_protocol(supported_protocols, unsupported_protocols) 1318 _verify_platform(supported=supported_platforms) 1319 _verify_cache_mode(supported_cache_modes) 1320 _verify_aio_mode(supported_aio_modes) 1321 _verify_formats(required_fmts) 1322 _verify_virtio_blk() 1323 1324 return debug 1325 1326def execute_test(*args, test_function=None, **kwargs): 1327 """Run either unittest or script-style tests.""" 1328 1329 debug = execute_setup_common(*args, **kwargs) 1330 if not test_function: 1331 execute_unittest(debug) 1332 else: 1333 test_function() 1334 1335def activate_logging(): 1336 """Activate iotests.log() output to stdout for script-style tests.""" 1337 handler = logging.StreamHandler(stream=sys.stdout) 1338 formatter = logging.Formatter('%(message)s') 1339 handler.setFormatter(formatter) 1340 test_logger.addHandler(handler) 1341 test_logger.setLevel(logging.INFO) 1342 test_logger.propagate = False 1343 1344# This is called from script-style iotests without a single point of entry 1345def script_initialize(*args, **kwargs): 1346 """Initialize script-style tests without running any tests.""" 1347 activate_logging() 1348 execute_setup_common(*args, **kwargs) 1349 1350# This is called from script-style iotests with a single point of entry 1351def script_main(test_function, *args, **kwargs): 1352 """Run script-style tests outside of the unittest framework""" 1353 activate_logging() 1354 execute_test(*args, test_function=test_function, **kwargs) 1355 1356# This is called from unittest style iotests 1357def main(*args, **kwargs): 1358 """Run tests using the unittest framework""" 1359 execute_test(*args, **kwargs) 1360