1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78test_dir = os.environ.get('TEST_DIR') 79sock_dir = os.environ.get('SOCK_DIR') 80output_dir = os.environ.get('OUTPUT_DIR', '.') 81cachemode = os.environ.get('CACHEMODE') 82aiomode = os.environ.get('AIOMODE') 83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 84 85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 86 87luks_default_secret_object = 'secret,id=keysec0,data=' + \ 88 os.environ.get('IMGKEYSECRET', '') 89luks_default_key_secret_opt = 'key-secret=keysec0' 90 91 92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 93 connect_stderr: bool = True) -> Tuple[str, int]: 94 """ 95 Run a tool and return both its output and its exit code 96 """ 97 stderr = subprocess.STDOUT if connect_stderr else None 98 subp = subprocess.Popen(args, 99 stdout=subprocess.PIPE, 100 stderr=stderr, 101 universal_newlines=True) 102 output = subp.communicate()[0] 103 if subp.returncode < 0: 104 sys.stderr.write('%s received signal %i: %s\n' 105 % (tool, -subp.returncode, 106 ' '.join(qemu_img_args + list(args)))) 107 return (output, subp.returncode) 108 109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 110 """ 111 Run qemu-img and return both its output and its exit code 112 """ 113 full_args = qemu_img_args + list(args) 114 return qemu_tool_pipe_and_status('qemu-img', full_args) 115 116def qemu_img(*args: str) -> int: 117 '''Run qemu-img and return the exit code''' 118 return qemu_img_pipe_and_status(*args)[1] 119 120def ordered_qmp(qmsg, conv_keys=True): 121 # Dictionaries are not ordered prior to 3.6, therefore: 122 if isinstance(qmsg, list): 123 return [ordered_qmp(atom) for atom in qmsg] 124 if isinstance(qmsg, dict): 125 od = OrderedDict() 126 for k, v in sorted(qmsg.items()): 127 if conv_keys: 128 k = k.replace('_', '-') 129 od[k] = ordered_qmp(v, conv_keys=False) 130 return od 131 return qmsg 132 133def qemu_img_create(*args): 134 args = list(args) 135 136 # default luks support 137 if '-f' in args and args[args.index('-f') + 1] == 'luks': 138 if '-o' in args: 139 i = args.index('-o') 140 if 'key-secret' not in args[i + 1]: 141 args[i + 1].append(luks_default_key_secret_opt) 142 args.insert(i + 2, '--object') 143 args.insert(i + 3, luks_default_secret_object) 144 else: 145 args = ['-o', luks_default_key_secret_opt, 146 '--object', luks_default_secret_object] + args 147 148 args.insert(0, 'create') 149 150 return qemu_img(*args) 151 152def qemu_img_measure(*args): 153 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 154 155def qemu_img_check(*args): 156 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 157 158def qemu_img_verbose(*args): 159 '''Run qemu-img without suppressing its output and return the exit code''' 160 exitcode = subprocess.call(qemu_img_args + list(args)) 161 if exitcode < 0: 162 sys.stderr.write('qemu-img received signal %i: %s\n' 163 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 164 return exitcode 165 166def qemu_img_pipe(*args: str) -> str: 167 '''Run qemu-img and return its output''' 168 return qemu_img_pipe_and_status(*args)[0] 169 170def qemu_img_log(*args): 171 result = qemu_img_pipe(*args) 172 log(result, filters=[filter_testfiles]) 173 return result 174 175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 176 args = ['info'] 177 if imgopts: 178 args.append('--image-opts') 179 else: 180 args += ['-f', imgfmt] 181 args += extra_args 182 args.append(filename) 183 184 output = qemu_img_pipe(*args) 185 if not filter_path: 186 filter_path = filename 187 log(filter_img_info(output, filter_path)) 188 189def qemu_io(*args): 190 '''Run qemu-io and return the stdout data''' 191 args = qemu_io_args + list(args) 192 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 193 stderr=subprocess.STDOUT, 194 universal_newlines=True) 195 output = subp.communicate()[0] 196 if subp.returncode < 0: 197 sys.stderr.write('qemu-io received signal %i: %s\n' 198 % (-subp.returncode, ' '.join(args))) 199 return output 200 201def qemu_io_log(*args): 202 result = qemu_io(*args) 203 log(result, filters=[filter_testfiles, filter_qemu_io]) 204 return result 205 206def qemu_io_silent(*args): 207 '''Run qemu-io and return the exit code, suppressing stdout''' 208 args = qemu_io_args + list(args) 209 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 210 if exitcode < 0: 211 sys.stderr.write('qemu-io received signal %i: %s\n' % 212 (-exitcode, ' '.join(args))) 213 return exitcode 214 215def qemu_io_silent_check(*args): 216 '''Run qemu-io and return the true if subprocess returned 0''' 217 args = qemu_io_args + list(args) 218 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 219 stderr=subprocess.STDOUT) 220 return exitcode == 0 221 222def get_virtio_scsi_device(): 223 if qemu_default_machine == 's390-ccw-virtio': 224 return 'virtio-scsi-ccw' 225 return 'virtio-scsi-pci' 226 227class QemuIoInteractive: 228 def __init__(self, *args): 229 self.args = qemu_io_args_no_fmt + list(args) 230 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 231 stdout=subprocess.PIPE, 232 stderr=subprocess.STDOUT, 233 universal_newlines=True) 234 out = self._p.stdout.read(9) 235 if out != 'qemu-io> ': 236 # Most probably qemu-io just failed to start. 237 # Let's collect the whole output and exit. 238 out += self._p.stdout.read() 239 self._p.wait(timeout=1) 240 raise ValueError(out) 241 242 def close(self): 243 self._p.communicate('q\n') 244 245 def _read_output(self): 246 pattern = 'qemu-io> ' 247 n = len(pattern) 248 pos = 0 249 s = [] 250 while pos != n: 251 c = self._p.stdout.read(1) 252 # check unexpected EOF 253 assert c != '' 254 s.append(c) 255 if c == pattern[pos]: 256 pos += 1 257 else: 258 pos = 0 259 260 return ''.join(s[:-n]) 261 262 def cmd(self, cmd): 263 # quit command is in close(), '\n' is added automatically 264 assert '\n' not in cmd 265 cmd = cmd.strip() 266 assert cmd not in ('q', 'quit') 267 self._p.stdin.write(cmd + '\n') 268 self._p.stdin.flush() 269 return self._read_output() 270 271 272def qemu_nbd(*args): 273 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 274 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 275 276def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 277 '''Run qemu-nbd in daemon mode and return both the parent's exit code 278 and its output in case of an error''' 279 full_args = qemu_nbd_args + ['--fork'] + list(args) 280 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 281 connect_stderr=False) 282 return returncode, output if returncode else '' 283 284def qemu_nbd_list_log(*args: str) -> str: 285 '''Run qemu-nbd to list remote exports''' 286 full_args = [qemu_nbd_prog, '-L'] + list(args) 287 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 288 log(output, filters=[filter_testfiles, filter_nbd_exports]) 289 return output 290 291@contextmanager 292def qemu_nbd_popen(*args): 293 '''Context manager running qemu-nbd within the context''' 294 pid_file = file_path("pid") 295 296 cmd = list(qemu_nbd_args) 297 cmd.extend(('--persistent', '--pid-file', pid_file)) 298 cmd.extend(args) 299 300 log('Start NBD server') 301 p = subprocess.Popen(cmd) 302 try: 303 while not os.path.exists(pid_file): 304 if p.poll() is not None: 305 raise RuntimeError( 306 "qemu-nbd terminated with exit code {}: {}" 307 .format(p.returncode, ' '.join(cmd))) 308 309 time.sleep(0.01) 310 yield 311 finally: 312 log('Kill NBD server') 313 p.kill() 314 p.wait() 315 316def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 317 '''Return True if two image files are identical''' 318 return qemu_img('compare', '-f', fmt1, 319 '-F', fmt2, img1, img2) == 0 320 321def create_image(name, size): 322 '''Create a fully-allocated raw image with sector markers''' 323 file = open(name, 'wb') 324 i = 0 325 while i < size: 326 sector = struct.pack('>l504xl', i // 512, i // 512) 327 file.write(sector) 328 i = i + 512 329 file.close() 330 331def image_size(img): 332 '''Return image's virtual size''' 333 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 334 return json.loads(r)['virtual-size'] 335 336def is_str(val): 337 return isinstance(val, str) 338 339test_dir_re = re.compile(r"%s" % test_dir) 340def filter_test_dir(msg): 341 return test_dir_re.sub("TEST_DIR", msg) 342 343win32_re = re.compile(r"\r") 344def filter_win32(msg): 345 return win32_re.sub("", msg) 346 347qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 348 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 349 r"and [0-9\/.inf]* ops\/sec\)") 350def filter_qemu_io(msg): 351 msg = filter_win32(msg) 352 return qemu_io_re.sub("X ops; XX:XX:XX.X " 353 "(XXX YYY/sec and XXX ops/sec)", msg) 354 355chown_re = re.compile(r"chown [0-9]+:[0-9]+") 356def filter_chown(msg): 357 return chown_re.sub("chown UID:GID", msg) 358 359def filter_qmp_event(event): 360 '''Filter a QMP event dict''' 361 event = dict(event) 362 if 'timestamp' in event: 363 event['timestamp']['seconds'] = 'SECS' 364 event['timestamp']['microseconds'] = 'USECS' 365 return event 366 367def filter_qmp(qmsg, filter_fn): 368 '''Given a string filter, filter a QMP object's values. 369 filter_fn takes a (key, value) pair.''' 370 # Iterate through either lists or dicts; 371 if isinstance(qmsg, list): 372 items = enumerate(qmsg) 373 else: 374 items = qmsg.items() 375 376 for k, v in items: 377 if isinstance(v, (dict, list)): 378 qmsg[k] = filter_qmp(v, filter_fn) 379 else: 380 qmsg[k] = filter_fn(k, v) 381 return qmsg 382 383def filter_testfiles(msg): 384 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 385 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 386 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 387 388def filter_qmp_testfiles(qmsg): 389 def _filter(_key, value): 390 if is_str(value): 391 return filter_testfiles(value) 392 return value 393 return filter_qmp(qmsg, _filter) 394 395def filter_virtio_scsi(output: str) -> str: 396 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 397 398def filter_qmp_virtio_scsi(qmsg): 399 def _filter(_key, value): 400 if is_str(value): 401 return filter_virtio_scsi(value) 402 return value 403 return filter_qmp(qmsg, _filter) 404 405def filter_generated_node_ids(msg): 406 return re.sub("#block[0-9]+", "NODE_NAME", msg) 407 408def filter_img_info(output, filename): 409 lines = [] 410 for line in output.split('\n'): 411 if 'disk size' in line or 'actual-size' in line: 412 continue 413 line = line.replace(filename, 'TEST_IMG') 414 line = filter_testfiles(line) 415 line = line.replace(imgfmt, 'IMGFMT') 416 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 417 line = re.sub('uuid: [-a-f0-9]+', 418 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 419 line) 420 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 421 lines.append(line) 422 return '\n'.join(lines) 423 424def filter_imgfmt(msg): 425 return msg.replace(imgfmt, 'IMGFMT') 426 427def filter_qmp_imgfmt(qmsg): 428 def _filter(_key, value): 429 if is_str(value): 430 return filter_imgfmt(value) 431 return value 432 return filter_qmp(qmsg, _filter) 433 434def filter_nbd_exports(output: str) -> str: 435 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 436 437 438Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 439 440def log(msg: Msg, 441 filters: Iterable[Callable[[Msg], Msg]] = (), 442 indent: Optional[int] = None) -> None: 443 """ 444 Logs either a string message or a JSON serializable message (like QMP). 445 If indent is provided, JSON serializable messages are pretty-printed. 446 """ 447 for flt in filters: 448 msg = flt(msg) 449 if isinstance(msg, (dict, list)): 450 # Don't sort if it's already sorted 451 do_sort = not isinstance(msg, OrderedDict) 452 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 453 else: 454 test_logger.info(msg) 455 456class Timeout: 457 def __init__(self, seconds, errmsg="Timeout"): 458 self.seconds = seconds 459 self.errmsg = errmsg 460 def __enter__(self): 461 signal.signal(signal.SIGALRM, self.timeout) 462 signal.setitimer(signal.ITIMER_REAL, self.seconds) 463 return self 464 def __exit__(self, exc_type, value, traceback): 465 signal.setitimer(signal.ITIMER_REAL, 0) 466 return False 467 def timeout(self, signum, frame): 468 raise Exception(self.errmsg) 469 470def file_pattern(name): 471 return "{0}-{1}".format(os.getpid(), name) 472 473class FilePath: 474 """ 475 Context manager generating multiple file names. The generated files are 476 removed when exiting the context. 477 478 Example usage: 479 480 with FilePath('a.img', 'b.img') as (img_a, img_b): 481 # Use img_a and img_b here... 482 483 # a.img and b.img are automatically removed here. 484 485 By default images are created in iotests.test_dir. To create sockets use 486 iotests.sock_dir: 487 488 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 489 490 For convenience, calling with one argument yields a single file instead of 491 a tuple with one item. 492 493 """ 494 def __init__(self, *names, base_dir=test_dir): 495 self.paths = [os.path.join(base_dir, file_pattern(name)) 496 for name in names] 497 498 def __enter__(self): 499 if len(self.paths) == 1: 500 return self.paths[0] 501 else: 502 return self.paths 503 504 def __exit__(self, exc_type, exc_val, exc_tb): 505 for path in self.paths: 506 try: 507 os.remove(path) 508 except OSError: 509 pass 510 return False 511 512 513def file_path_remover(): 514 for path in reversed(file_path_remover.paths): 515 try: 516 os.remove(path) 517 except OSError: 518 pass 519 520 521def file_path(*names, base_dir=test_dir): 522 ''' Another way to get auto-generated filename that cleans itself up. 523 524 Use is as simple as: 525 526 img_a, img_b = file_path('a.img', 'b.img') 527 sock = file_path('socket') 528 ''' 529 530 if not hasattr(file_path_remover, 'paths'): 531 file_path_remover.paths = [] 532 atexit.register(file_path_remover) 533 534 paths = [] 535 for name in names: 536 filename = file_pattern(name) 537 path = os.path.join(base_dir, filename) 538 file_path_remover.paths.append(path) 539 paths.append(path) 540 541 return paths[0] if len(paths) == 1 else paths 542 543def remote_filename(path): 544 if imgproto == 'file': 545 return path 546 elif imgproto == 'ssh': 547 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 548 else: 549 raise Exception("Protocol %s not supported" % (imgproto)) 550 551class VM(qtest.QEMUQtestMachine): 552 '''A QEMU VM''' 553 554 def __init__(self, path_suffix=''): 555 name = "qemu%s-%d" % (path_suffix, os.getpid()) 556 super().__init__(qemu_prog, qemu_opts, name=name, 557 test_dir=test_dir, 558 socket_scm_helper=socket_scm_helper, 559 sock_dir=sock_dir) 560 self._num_drives = 0 561 562 def add_object(self, opts): 563 self._args.append('-object') 564 self._args.append(opts) 565 return self 566 567 def add_device(self, opts): 568 self._args.append('-device') 569 self._args.append(opts) 570 return self 571 572 def add_drive_raw(self, opts): 573 self._args.append('-drive') 574 self._args.append(opts) 575 return self 576 577 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 578 '''Add a virtio-blk drive to the VM''' 579 options = ['if=%s' % interface, 580 'id=drive%d' % self._num_drives] 581 582 if path is not None: 583 options.append('file=%s' % path) 584 options.append('format=%s' % img_format) 585 options.append('cache=%s' % cachemode) 586 options.append('aio=%s' % aiomode) 587 588 if opts: 589 options.append(opts) 590 591 if img_format == 'luks' and 'key-secret' not in opts: 592 # default luks support 593 if luks_default_secret_object not in self._args: 594 self.add_object(luks_default_secret_object) 595 596 options.append(luks_default_key_secret_opt) 597 598 self._args.append('-drive') 599 self._args.append(','.join(options)) 600 self._num_drives += 1 601 return self 602 603 def add_blockdev(self, opts): 604 self._args.append('-blockdev') 605 if isinstance(opts, str): 606 self._args.append(opts) 607 else: 608 self._args.append(','.join(opts)) 609 return self 610 611 def add_incoming(self, addr): 612 self._args.append('-incoming') 613 self._args.append(addr) 614 return self 615 616 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 617 cmd = 'human-monitor-command' 618 kwargs: Dict[str, Any] = {'command-line': command_line} 619 if use_log: 620 return self.qmp_log(cmd, **kwargs) 621 else: 622 return self.qmp(cmd, **kwargs) 623 624 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 625 """Pause drive r/w operations""" 626 if not event: 627 self.pause_drive(drive, "read_aio") 628 self.pause_drive(drive, "write_aio") 629 return 630 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 631 632 def resume_drive(self, drive: str) -> None: 633 """Resume drive r/w operations""" 634 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 635 636 def hmp_qemu_io(self, drive: str, cmd: str, 637 use_log: bool = False) -> QMPMessage: 638 """Write to a given drive using an HMP command""" 639 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 640 641 def flatten_qmp_object(self, obj, output=None, basestr=''): 642 if output is None: 643 output = dict() 644 if isinstance(obj, list): 645 for i, item in enumerate(obj): 646 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 647 elif isinstance(obj, dict): 648 for key in obj: 649 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 650 else: 651 output[basestr[:-1]] = obj # Strip trailing '.' 652 return output 653 654 def qmp_to_opts(self, obj): 655 obj = self.flatten_qmp_object(obj) 656 output_list = list() 657 for key in obj: 658 output_list += [key + '=' + obj[key]] 659 return ','.join(output_list) 660 661 def get_qmp_events_filtered(self, wait=60.0): 662 result = [] 663 for ev in self.get_qmp_events(wait=wait): 664 result.append(filter_qmp_event(ev)) 665 return result 666 667 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 668 full_cmd = OrderedDict(( 669 ("execute", cmd), 670 ("arguments", ordered_qmp(kwargs)) 671 )) 672 log(full_cmd, filters, indent=indent) 673 result = self.qmp(cmd, **kwargs) 674 log(result, filters, indent=indent) 675 return result 676 677 # Returns None on success, and an error string on failure 678 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 679 pre_finalize=None, cancel=False, wait=60.0): 680 """ 681 run_job moves a job from creation through to dismissal. 682 683 :param job: String. ID of recently-launched job 684 :param auto_finalize: Bool. True if the job was launched with 685 auto_finalize. Defaults to True. 686 :param auto_dismiss: Bool. True if the job was launched with 687 auto_dismiss=True. Defaults to False. 688 :param pre_finalize: Callback. A callable that takes no arguments to be 689 invoked prior to issuing job-finalize, if any. 690 :param cancel: Bool. When true, cancels the job after the pre_finalize 691 callback. 692 :param wait: Float. Timeout value specifying how long to wait for any 693 event, in seconds. Defaults to 60.0. 694 """ 695 match_device = {'data': {'device': job}} 696 match_id = {'data': {'id': job}} 697 events = [ 698 ('BLOCK_JOB_COMPLETED', match_device), 699 ('BLOCK_JOB_CANCELLED', match_device), 700 ('BLOCK_JOB_ERROR', match_device), 701 ('BLOCK_JOB_READY', match_device), 702 ('BLOCK_JOB_PENDING', match_id), 703 ('JOB_STATUS_CHANGE', match_id) 704 ] 705 error = None 706 while True: 707 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 708 if ev['event'] != 'JOB_STATUS_CHANGE': 709 log(ev) 710 continue 711 status = ev['data']['status'] 712 if status == 'aborting': 713 result = self.qmp('query-jobs') 714 for j in result['return']: 715 if j['id'] == job: 716 error = j['error'] 717 log('Job failed: %s' % (j['error'])) 718 elif status == 'ready': 719 self.qmp_log('job-complete', id=job) 720 elif status == 'pending' and not auto_finalize: 721 if pre_finalize: 722 pre_finalize() 723 if cancel: 724 self.qmp_log('job-cancel', id=job) 725 else: 726 self.qmp_log('job-finalize', id=job) 727 elif status == 'concluded' and not auto_dismiss: 728 self.qmp_log('job-dismiss', id=job) 729 elif status == 'null': 730 return error 731 732 # Returns None on success, and an error string on failure 733 def blockdev_create(self, options, job_id='job0', filters=None): 734 if filters is None: 735 filters = [filter_qmp_testfiles] 736 result = self.qmp_log('blockdev-create', filters=filters, 737 job_id=job_id, options=options) 738 739 if 'return' in result: 740 assert result['return'] == {} 741 job_result = self.run_job(job_id) 742 else: 743 job_result = result['error'] 744 745 log("") 746 return job_result 747 748 def enable_migration_events(self, name): 749 log('Enabling migration QMP events on %s...' % name) 750 log(self.qmp('migrate-set-capabilities', capabilities=[ 751 { 752 'capability': 'events', 753 'state': True 754 } 755 ])) 756 757 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 758 while True: 759 event = self.event_wait('MIGRATION') 760 # We use the default timeout, and with a timeout, event_wait() 761 # never returns None 762 assert event 763 764 log(event, filters=[filter_qmp_event]) 765 if event['data']['status'] in ('completed', 'failed'): 766 break 767 768 if event['data']['status'] == 'completed': 769 # The event may occur in finish-migrate, so wait for the expected 770 # post-migration runstate 771 runstate = None 772 while runstate != expect_runstate: 773 runstate = self.qmp('query-status')['return']['status'] 774 return True 775 else: 776 return False 777 778 def node_info(self, node_name): 779 nodes = self.qmp('query-named-block-nodes') 780 for x in nodes['return']: 781 if x['node-name'] == node_name: 782 return x 783 return None 784 785 def query_bitmaps(self): 786 res = self.qmp("query-named-block-nodes") 787 return {device['node-name']: device['dirty-bitmaps'] 788 for device in res['return'] if 'dirty-bitmaps' in device} 789 790 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 791 """ 792 get a specific bitmap from the object returned by query_bitmaps. 793 :param recording: If specified, filter results by the specified value. 794 :param bitmaps: If specified, use it instead of call query_bitmaps() 795 """ 796 if bitmaps is None: 797 bitmaps = self.query_bitmaps() 798 799 for bitmap in bitmaps[node_name]: 800 if bitmap.get('name', '') == bitmap_name: 801 if recording is None or bitmap.get('recording') == recording: 802 return bitmap 803 return None 804 805 def check_bitmap_status(self, node_name, bitmap_name, fields): 806 ret = self.get_bitmap(node_name, bitmap_name) 807 808 return fields.items() <= ret.items() 809 810 def assert_block_path(self, root, path, expected_node, graph=None): 811 """ 812 Check whether the node under the given path in the block graph 813 is @expected_node. 814 815 @root is the node name of the node where the @path is rooted. 816 817 @path is a string that consists of child names separated by 818 slashes. It must begin with a slash. 819 820 Examples for @root + @path: 821 - root="qcow2-node", path="/backing/file" 822 - root="quorum-node", path="/children.2/file" 823 824 Hypothetically, @path could be empty, in which case it would 825 point to @root. However, in practice this case is not useful 826 and hence not allowed. 827 828 @expected_node may be None. (All elements of the path but the 829 leaf must still exist.) 830 831 @graph may be None or the result of an x-debug-query-block-graph 832 call that has already been performed. 833 """ 834 if graph is None: 835 graph = self.qmp('x-debug-query-block-graph')['return'] 836 837 iter_path = iter(path.split('/')) 838 839 # Must start with a / 840 assert next(iter_path) == '' 841 842 node = next((node for node in graph['nodes'] if node['name'] == root), 843 None) 844 845 # An empty @path is not allowed, so the root node must be present 846 assert node is not None, 'Root node %s not found' % root 847 848 for child_name in iter_path: 849 assert node is not None, 'Cannot follow path %s%s' % (root, path) 850 851 try: 852 node_id = next(edge['child'] for edge in graph['edges'] 853 if (edge['parent'] == node['id'] and 854 edge['name'] == child_name)) 855 856 node = next(node for node in graph['nodes'] 857 if node['id'] == node_id) 858 859 except StopIteration: 860 node = None 861 862 if node is None: 863 assert expected_node is None, \ 864 'No node found under %s (but expected %s)' % \ 865 (path, expected_node) 866 else: 867 assert node['name'] == expected_node, \ 868 'Found node %s under %s (but expected %s)' % \ 869 (node['name'], path, expected_node) 870 871index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 872 873class QMPTestCase(unittest.TestCase): 874 '''Abstract base class for QMP test cases''' 875 876 def __init__(self, *args, **kwargs): 877 super().__init__(*args, **kwargs) 878 # Many users of this class set a VM property we rely on heavily 879 # in the methods below. 880 self.vm = None 881 882 def dictpath(self, d, path): 883 '''Traverse a path in a nested dict''' 884 for component in path.split('/'): 885 m = index_re.match(component) 886 if m: 887 component, idx = m.groups() 888 idx = int(idx) 889 890 if not isinstance(d, dict) or component not in d: 891 self.fail(f'failed path traversal for "{path}" in "{d}"') 892 d = d[component] 893 894 if m: 895 if not isinstance(d, list): 896 self.fail(f'path component "{component}" in "{path}" ' 897 f'is not a list in "{d}"') 898 try: 899 d = d[idx] 900 except IndexError: 901 self.fail(f'invalid index "{idx}" in path "{path}" ' 902 f'in "{d}"') 903 return d 904 905 def assert_qmp_absent(self, d, path): 906 try: 907 result = self.dictpath(d, path) 908 except AssertionError: 909 return 910 self.fail('path "%s" has value "%s"' % (path, str(result))) 911 912 def assert_qmp(self, d, path, value): 913 '''Assert that the value for a specific path in a QMP dict 914 matches. When given a list of values, assert that any of 915 them matches.''' 916 917 result = self.dictpath(d, path) 918 919 # [] makes no sense as a list of valid values, so treat it as 920 # an actual single value. 921 if isinstance(value, list) and value != []: 922 for v in value: 923 if result == v: 924 return 925 self.fail('no match for "%s" in %s' % (str(result), str(value))) 926 else: 927 self.assertEqual(result, value, 928 '"%s" is "%s", expected "%s"' 929 % (path, str(result), str(value))) 930 931 def assert_no_active_block_jobs(self): 932 result = self.vm.qmp('query-block-jobs') 933 self.assert_qmp(result, 'return', []) 934 935 def assert_has_block_node(self, node_name=None, file_name=None): 936 """Issue a query-named-block-nodes and assert node_name and/or 937 file_name is present in the result""" 938 def check_equal_or_none(a, b): 939 return a is None or b is None or a == b 940 assert node_name or file_name 941 result = self.vm.qmp('query-named-block-nodes') 942 for x in result["return"]: 943 if check_equal_or_none(x.get("node-name"), node_name) and \ 944 check_equal_or_none(x.get("file"), file_name): 945 return 946 self.fail("Cannot find %s %s in result:\n%s" % 947 (node_name, file_name, result)) 948 949 def assert_json_filename_equal(self, json_filename, reference): 950 '''Asserts that the given filename is a json: filename and that its 951 content is equal to the given reference object''' 952 self.assertEqual(json_filename[:5], 'json:') 953 self.assertEqual( 954 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 955 self.vm.flatten_qmp_object(reference) 956 ) 957 958 def cancel_and_wait(self, drive='drive0', force=False, 959 resume=False, wait=60.0): 960 '''Cancel a block job and wait for it to finish, returning the event''' 961 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 962 self.assert_qmp(result, 'return', {}) 963 964 if resume: 965 self.vm.resume_drive(drive) 966 967 cancelled = False 968 result = None 969 while not cancelled: 970 for event in self.vm.get_qmp_events(wait=wait): 971 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 972 event['event'] == 'BLOCK_JOB_CANCELLED': 973 self.assert_qmp(event, 'data/device', drive) 974 result = event 975 cancelled = True 976 elif event['event'] == 'JOB_STATUS_CHANGE': 977 self.assert_qmp(event, 'data/id', drive) 978 979 980 self.assert_no_active_block_jobs() 981 return result 982 983 def wait_until_completed(self, drive='drive0', check_offset=True, 984 wait=60.0, error=None): 985 '''Wait for a block job to finish, returning the event''' 986 while True: 987 for event in self.vm.get_qmp_events(wait=wait): 988 if event['event'] == 'BLOCK_JOB_COMPLETED': 989 self.assert_qmp(event, 'data/device', drive) 990 if error is None: 991 self.assert_qmp_absent(event, 'data/error') 992 if check_offset: 993 self.assert_qmp(event, 'data/offset', 994 event['data']['len']) 995 else: 996 self.assert_qmp(event, 'data/error', error) 997 self.assert_no_active_block_jobs() 998 return event 999 if event['event'] == 'JOB_STATUS_CHANGE': 1000 self.assert_qmp(event, 'data/id', drive) 1001 1002 def wait_ready(self, drive='drive0'): 1003 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1004 return self.vm.events_wait([ 1005 ('BLOCK_JOB_READY', 1006 {'data': {'type': 'mirror', 'device': drive}}), 1007 ('BLOCK_JOB_READY', 1008 {'data': {'type': 'commit', 'device': drive}}) 1009 ]) 1010 1011 def wait_ready_and_cancel(self, drive='drive0'): 1012 self.wait_ready(drive=drive) 1013 event = self.cancel_and_wait(drive=drive) 1014 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1015 self.assert_qmp(event, 'data/type', 'mirror') 1016 self.assert_qmp(event, 'data/offset', event['data']['len']) 1017 1018 def complete_and_wait(self, drive='drive0', wait_ready=True, 1019 completion_error=None): 1020 '''Complete a block job and wait for it to finish''' 1021 if wait_ready: 1022 self.wait_ready(drive=drive) 1023 1024 result = self.vm.qmp('block-job-complete', device=drive) 1025 self.assert_qmp(result, 'return', {}) 1026 1027 event = self.wait_until_completed(drive=drive, error=completion_error) 1028 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1029 1030 def pause_wait(self, job_id='job0'): 1031 with Timeout(3, "Timeout waiting for job to pause"): 1032 while True: 1033 result = self.vm.qmp('query-block-jobs') 1034 found = False 1035 for job in result['return']: 1036 if job['device'] == job_id: 1037 found = True 1038 if job['paused'] and not job['busy']: 1039 return job 1040 break 1041 assert found 1042 1043 def pause_job(self, job_id='job0', wait=True): 1044 result = self.vm.qmp('block-job-pause', device=job_id) 1045 self.assert_qmp(result, 'return', {}) 1046 if wait: 1047 return self.pause_wait(job_id) 1048 return result 1049 1050 def case_skip(self, reason): 1051 '''Skip this test case''' 1052 case_notrun(reason) 1053 self.skipTest(reason) 1054 1055 1056def notrun(reason): 1057 '''Skip this test suite''' 1058 # Each test in qemu-iotests has a number ("seq") 1059 seq = os.path.basename(sys.argv[0]) 1060 1061 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1062 logger.warning("%s not run: %s", seq, reason) 1063 sys.exit(0) 1064 1065def case_notrun(reason): 1066 '''Mark this test case as not having been run (without actually 1067 skipping it, that is left to the caller). See 1068 QMPTestCase.case_skip() for a variant that actually skips the 1069 current test case.''' 1070 1071 # Each test in qemu-iotests has a number ("seq") 1072 seq = os.path.basename(sys.argv[0]) 1073 1074 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1075 ' [case not run] ' + reason + '\n') 1076 1077def _verify_image_format(supported_fmts: Sequence[str] = (), 1078 unsupported_fmts: Sequence[str] = ()) -> None: 1079 if 'generic' in supported_fmts and \ 1080 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1081 # similar to 1082 # _supported_fmt generic 1083 # for bash tests 1084 supported_fmts = () 1085 1086 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1087 if not_sup or (imgfmt in unsupported_fmts): 1088 notrun('not suitable for this image format: %s' % imgfmt) 1089 1090 if imgfmt == 'luks': 1091 verify_working_luks() 1092 1093def _verify_protocol(supported: Sequence[str] = (), 1094 unsupported: Sequence[str] = ()) -> None: 1095 assert not (supported and unsupported) 1096 1097 if 'generic' in supported: 1098 return 1099 1100 not_sup = supported and (imgproto not in supported) 1101 if not_sup or (imgproto in unsupported): 1102 notrun('not suitable for this protocol: %s' % imgproto) 1103 1104def _verify_platform(supported: Sequence[str] = (), 1105 unsupported: Sequence[str] = ()) -> None: 1106 if any((sys.platform.startswith(x) for x in unsupported)): 1107 notrun('not suitable for this OS: %s' % sys.platform) 1108 1109 if supported: 1110 if not any((sys.platform.startswith(x) for x in supported)): 1111 notrun('not suitable for this OS: %s' % sys.platform) 1112 1113def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1114 if supported_cache_modes and (cachemode not in supported_cache_modes): 1115 notrun('not suitable for this cache mode: %s' % cachemode) 1116 1117def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1118 if supported_aio_modes and (aiomode not in supported_aio_modes): 1119 notrun('not suitable for this aio mode: %s' % aiomode) 1120 1121def supports_quorum(): 1122 return 'quorum' in qemu_img_pipe('--help') 1123 1124def verify_quorum(): 1125 '''Skip test suite if quorum support is not available''' 1126 if not supports_quorum(): 1127 notrun('quorum support missing') 1128 1129def has_working_luks() -> Tuple[bool, str]: 1130 """ 1131 Check whether our LUKS driver can actually create images 1132 (this extends to LUKS encryption for qcow2). 1133 1134 If not, return the reason why. 1135 """ 1136 1137 img_file = f'{test_dir}/luks-test.luks' 1138 (output, status) = \ 1139 qemu_img_pipe_and_status('create', '-f', 'luks', 1140 '--object', luks_default_secret_object, 1141 '-o', luks_default_key_secret_opt, 1142 '-o', 'iter-time=10', 1143 img_file, '1G') 1144 try: 1145 os.remove(img_file) 1146 except OSError: 1147 pass 1148 1149 if status != 0: 1150 reason = output 1151 for line in output.splitlines(): 1152 if img_file + ':' in line: 1153 reason = line.split(img_file + ':', 1)[1].strip() 1154 break 1155 1156 return (False, reason) 1157 else: 1158 return (True, '') 1159 1160def verify_working_luks(): 1161 """ 1162 Skip test suite if LUKS does not work 1163 """ 1164 (working, reason) = has_working_luks() 1165 if not working: 1166 notrun(reason) 1167 1168def qemu_pipe(*args: str) -> str: 1169 """ 1170 Run qemu with an option to print something and exit (e.g. a help option). 1171 1172 :return: QEMU's stdout output. 1173 """ 1174 full_args = [qemu_prog] + qemu_opts + list(args) 1175 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1176 return output 1177 1178def supported_formats(read_only=False): 1179 '''Set 'read_only' to True to check ro-whitelist 1180 Otherwise, rw-whitelist is checked''' 1181 1182 if not hasattr(supported_formats, "formats"): 1183 supported_formats.formats = {} 1184 1185 if read_only not in supported_formats.formats: 1186 format_message = qemu_pipe("-drive", "format=help") 1187 line = 1 if read_only else 0 1188 supported_formats.formats[read_only] = \ 1189 format_message.splitlines()[line].split(":")[1].split() 1190 1191 return supported_formats.formats[read_only] 1192 1193def skip_if_unsupported(required_formats=(), read_only=False): 1194 '''Skip Test Decorator 1195 Runs the test if all the required formats are whitelisted''' 1196 def skip_test_decorator(func): 1197 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1198 **kwargs: Dict[str, Any]) -> None: 1199 if callable(required_formats): 1200 fmts = required_formats(test_case) 1201 else: 1202 fmts = required_formats 1203 1204 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1205 if usf_list: 1206 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1207 test_case.case_skip(msg) 1208 else: 1209 func(test_case, *args, **kwargs) 1210 return func_wrapper 1211 return skip_test_decorator 1212 1213def skip_for_formats(formats: Sequence[str] = ()) \ 1214 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1215 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1216 '''Skip Test Decorator 1217 Skips the test for the given formats''' 1218 def skip_test_decorator(func): 1219 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1220 **kwargs: Dict[str, Any]) -> None: 1221 if imgfmt in formats: 1222 msg = f'{test_case}: Skipped for format {imgfmt}' 1223 test_case.case_skip(msg) 1224 else: 1225 func(test_case, *args, **kwargs) 1226 return func_wrapper 1227 return skip_test_decorator 1228 1229def skip_if_user_is_root(func): 1230 '''Skip Test Decorator 1231 Runs the test only without root permissions''' 1232 def func_wrapper(*args, **kwargs): 1233 if os.getuid() == 0: 1234 case_notrun('{}: cannot be run as root'.format(args[0])) 1235 return None 1236 else: 1237 return func(*args, **kwargs) 1238 return func_wrapper 1239 1240def execute_unittest(debug=False): 1241 """Executes unittests within the calling module.""" 1242 1243 verbosity = 2 if debug else 1 1244 1245 if debug: 1246 output = sys.stdout 1247 else: 1248 # We need to filter out the time taken from the output so that 1249 # qemu-iotest can reliably diff the results against master output. 1250 output = io.StringIO() 1251 1252 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1253 verbosity=verbosity) 1254 try: 1255 # unittest.main() will use sys.exit(); so expect a SystemExit 1256 # exception 1257 unittest.main(testRunner=runner) 1258 finally: 1259 # We need to filter out the time taken from the output so that 1260 # qemu-iotest can reliably diff the results against master output. 1261 if not debug: 1262 out = output.getvalue() 1263 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1264 1265 # Hide skipped tests from the reference output 1266 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1267 out_first_line, out_rest = out.split('\n', 1) 1268 out = out_first_line.replace('s', '.') + '\n' + out_rest 1269 1270 sys.stderr.write(out) 1271 1272def execute_setup_common(supported_fmts: Sequence[str] = (), 1273 supported_platforms: Sequence[str] = (), 1274 supported_cache_modes: Sequence[str] = (), 1275 supported_aio_modes: Sequence[str] = (), 1276 unsupported_fmts: Sequence[str] = (), 1277 supported_protocols: Sequence[str] = (), 1278 unsupported_protocols: Sequence[str] = ()) -> bool: 1279 """ 1280 Perform necessary setup for either script-style or unittest-style tests. 1281 1282 :return: Bool; Whether or not debug mode has been requested via the CLI. 1283 """ 1284 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1285 1286 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1287 # indicate that we're not being run via "check". There may be 1288 # other things set up by "check" that individual test cases rely 1289 # on. 1290 if test_dir is None or qemu_default_machine is None: 1291 sys.stderr.write('Please run this test via the "check" script\n') 1292 sys.exit(os.EX_USAGE) 1293 1294 debug = '-d' in sys.argv 1295 if debug: 1296 sys.argv.remove('-d') 1297 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1298 1299 _verify_image_format(supported_fmts, unsupported_fmts) 1300 _verify_protocol(supported_protocols, unsupported_protocols) 1301 _verify_platform(supported=supported_platforms) 1302 _verify_cache_mode(supported_cache_modes) 1303 _verify_aio_mode(supported_aio_modes) 1304 1305 return debug 1306 1307def execute_test(*args, test_function=None, **kwargs): 1308 """Run either unittest or script-style tests.""" 1309 1310 debug = execute_setup_common(*args, **kwargs) 1311 if not test_function: 1312 execute_unittest(debug) 1313 else: 1314 test_function() 1315 1316def activate_logging(): 1317 """Activate iotests.log() output to stdout for script-style tests.""" 1318 handler = logging.StreamHandler(stream=sys.stdout) 1319 formatter = logging.Formatter('%(message)s') 1320 handler.setFormatter(formatter) 1321 test_logger.addHandler(handler) 1322 test_logger.setLevel(logging.INFO) 1323 test_logger.propagate = False 1324 1325# This is called from script-style iotests without a single point of entry 1326def script_initialize(*args, **kwargs): 1327 """Initialize script-style tests without running any tests.""" 1328 activate_logging() 1329 execute_setup_common(*args, **kwargs) 1330 1331# This is called from script-style iotests with a single point of entry 1332def script_main(test_function, *args, **kwargs): 1333 """Run script-style tests outside of the unittest framework""" 1334 activate_logging() 1335 execute_test(*args, test_function=test_function, **kwargs) 1336 1337# This is called from unittest style iotests 1338def main(*args, **kwargs): 1339 """Run tests using the unittest framework""" 1340 execute_test(*args, **kwargs) 1341