1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78test_dir = os.environ.get('TEST_DIR') 79sock_dir = os.environ.get('SOCK_DIR') 80output_dir = os.environ.get('OUTPUT_DIR', '.') 81cachemode = os.environ.get('CACHEMODE') 82aiomode = os.environ.get('AIOMODE') 83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 84 85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 86 87luks_default_secret_object = 'secret,id=keysec0,data=' + \ 88 os.environ.get('IMGKEYSECRET', '') 89luks_default_key_secret_opt = 'key-secret=keysec0' 90 91 92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 93 connect_stderr: bool = True) -> Tuple[str, int]: 94 """ 95 Run a tool and return both its output and its exit code 96 """ 97 stderr = subprocess.STDOUT if connect_stderr else None 98 subp = subprocess.Popen(args, 99 stdout=subprocess.PIPE, 100 stderr=stderr, 101 universal_newlines=True) 102 output = subp.communicate()[0] 103 if subp.returncode < 0: 104 cmd = ' '.join(args) 105 sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n') 106 return (output, subp.returncode) 107 108def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 109 """ 110 Run qemu-img and return both its output and its exit code 111 """ 112 full_args = qemu_img_args + list(args) 113 return qemu_tool_pipe_and_status('qemu-img', full_args) 114 115def qemu_img(*args: str) -> int: 116 '''Run qemu-img and return the exit code''' 117 return qemu_img_pipe_and_status(*args)[1] 118 119def ordered_qmp(qmsg, conv_keys=True): 120 # Dictionaries are not ordered prior to 3.6, therefore: 121 if isinstance(qmsg, list): 122 return [ordered_qmp(atom) for atom in qmsg] 123 if isinstance(qmsg, dict): 124 od = OrderedDict() 125 for k, v in sorted(qmsg.items()): 126 if conv_keys: 127 k = k.replace('_', '-') 128 od[k] = ordered_qmp(v, conv_keys=False) 129 return od 130 return qmsg 131 132def qemu_img_create(*args): 133 args = list(args) 134 135 # default luks support 136 if '-f' in args and args[args.index('-f') + 1] == 'luks': 137 if '-o' in args: 138 i = args.index('-o') 139 if 'key-secret' not in args[i + 1]: 140 args[i + 1].append(luks_default_key_secret_opt) 141 args.insert(i + 2, '--object') 142 args.insert(i + 3, luks_default_secret_object) 143 else: 144 args = ['-o', luks_default_key_secret_opt, 145 '--object', luks_default_secret_object] + args 146 147 args.insert(0, 'create') 148 149 return qemu_img(*args) 150 151def qemu_img_measure(*args): 152 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 153 154def qemu_img_check(*args): 155 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 156 157def qemu_img_verbose(*args): 158 '''Run qemu-img without suppressing its output and return the exit code''' 159 exitcode = subprocess.call(qemu_img_args + list(args)) 160 if exitcode < 0: 161 sys.stderr.write('qemu-img received signal %i: %s\n' 162 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 163 return exitcode 164 165def qemu_img_pipe(*args: str) -> str: 166 '''Run qemu-img and return its output''' 167 return qemu_img_pipe_and_status(*args)[0] 168 169def qemu_img_log(*args): 170 result = qemu_img_pipe(*args) 171 log(result, filters=[filter_testfiles]) 172 return result 173 174def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 175 args = ['info'] 176 if imgopts: 177 args.append('--image-opts') 178 else: 179 args += ['-f', imgfmt] 180 args += extra_args 181 args.append(filename) 182 183 output = qemu_img_pipe(*args) 184 if not filter_path: 185 filter_path = filename 186 log(filter_img_info(output, filter_path)) 187 188def qemu_io(*args): 189 '''Run qemu-io and return the stdout data''' 190 args = qemu_io_args + list(args) 191 return qemu_tool_pipe_and_status('qemu-io', args)[0] 192 193def qemu_io_log(*args): 194 result = qemu_io(*args) 195 log(result, filters=[filter_testfiles, filter_qemu_io]) 196 return result 197 198def qemu_io_silent(*args): 199 '''Run qemu-io and return the exit code, suppressing stdout''' 200 if '-f' in args or '--image-opts' in args: 201 default_args = qemu_io_args_no_fmt 202 else: 203 default_args = qemu_io_args 204 205 args = default_args + list(args) 206 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 207 if exitcode < 0: 208 sys.stderr.write('qemu-io received signal %i: %s\n' % 209 (-exitcode, ' '.join(args))) 210 return exitcode 211 212def qemu_io_silent_check(*args): 213 '''Run qemu-io and return the true if subprocess returned 0''' 214 args = qemu_io_args + list(args) 215 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 216 stderr=subprocess.STDOUT) 217 return exitcode == 0 218 219def get_virtio_scsi_device(): 220 if qemu_default_machine == 's390-ccw-virtio': 221 return 'virtio-scsi-ccw' 222 return 'virtio-scsi-pci' 223 224class QemuIoInteractive: 225 def __init__(self, *args): 226 self.args = qemu_io_args_no_fmt + list(args) 227 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 228 stdout=subprocess.PIPE, 229 stderr=subprocess.STDOUT, 230 universal_newlines=True) 231 out = self._p.stdout.read(9) 232 if out != 'qemu-io> ': 233 # Most probably qemu-io just failed to start. 234 # Let's collect the whole output and exit. 235 out += self._p.stdout.read() 236 self._p.wait(timeout=1) 237 raise ValueError(out) 238 239 def close(self): 240 self._p.communicate('q\n') 241 242 def _read_output(self): 243 pattern = 'qemu-io> ' 244 n = len(pattern) 245 pos = 0 246 s = [] 247 while pos != n: 248 c = self._p.stdout.read(1) 249 # check unexpected EOF 250 assert c != '' 251 s.append(c) 252 if c == pattern[pos]: 253 pos += 1 254 else: 255 pos = 0 256 257 return ''.join(s[:-n]) 258 259 def cmd(self, cmd): 260 # quit command is in close(), '\n' is added automatically 261 assert '\n' not in cmd 262 cmd = cmd.strip() 263 assert cmd not in ('q', 'quit') 264 self._p.stdin.write(cmd + '\n') 265 self._p.stdin.flush() 266 return self._read_output() 267 268 269def qemu_nbd(*args): 270 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 271 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 272 273def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 274 '''Run qemu-nbd in daemon mode and return both the parent's exit code 275 and its output in case of an error''' 276 full_args = qemu_nbd_args + ['--fork'] + list(args) 277 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 278 connect_stderr=False) 279 return returncode, output if returncode else '' 280 281def qemu_nbd_list_log(*args: str) -> str: 282 '''Run qemu-nbd to list remote exports''' 283 full_args = [qemu_nbd_prog, '-L'] + list(args) 284 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 285 log(output, filters=[filter_testfiles, filter_nbd_exports]) 286 return output 287 288@contextmanager 289def qemu_nbd_popen(*args): 290 '''Context manager running qemu-nbd within the context''' 291 pid_file = file_path("pid") 292 293 cmd = list(qemu_nbd_args) 294 cmd.extend(('--persistent', '--pid-file', pid_file)) 295 cmd.extend(args) 296 297 log('Start NBD server') 298 p = subprocess.Popen(cmd) 299 try: 300 while not os.path.exists(pid_file): 301 if p.poll() is not None: 302 raise RuntimeError( 303 "qemu-nbd terminated with exit code {}: {}" 304 .format(p.returncode, ' '.join(cmd))) 305 306 time.sleep(0.01) 307 yield 308 finally: 309 log('Kill NBD server') 310 p.kill() 311 p.wait() 312 313def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 314 '''Return True if two image files are identical''' 315 return qemu_img('compare', '-f', fmt1, 316 '-F', fmt2, img1, img2) == 0 317 318def create_image(name, size): 319 '''Create a fully-allocated raw image with sector markers''' 320 file = open(name, 'wb') 321 i = 0 322 while i < size: 323 sector = struct.pack('>l504xl', i // 512, i // 512) 324 file.write(sector) 325 i = i + 512 326 file.close() 327 328def image_size(img): 329 '''Return image's virtual size''' 330 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 331 return json.loads(r)['virtual-size'] 332 333def is_str(val): 334 return isinstance(val, str) 335 336test_dir_re = re.compile(r"%s" % test_dir) 337def filter_test_dir(msg): 338 return test_dir_re.sub("TEST_DIR", msg) 339 340win32_re = re.compile(r"\r") 341def filter_win32(msg): 342 return win32_re.sub("", msg) 343 344qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 345 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 346 r"and [0-9\/.inf]* ops\/sec\)") 347def filter_qemu_io(msg): 348 msg = filter_win32(msg) 349 return qemu_io_re.sub("X ops; XX:XX:XX.X " 350 "(XXX YYY/sec and XXX ops/sec)", msg) 351 352chown_re = re.compile(r"chown [0-9]+:[0-9]+") 353def filter_chown(msg): 354 return chown_re.sub("chown UID:GID", msg) 355 356def filter_qmp_event(event): 357 '''Filter a QMP event dict''' 358 event = dict(event) 359 if 'timestamp' in event: 360 event['timestamp']['seconds'] = 'SECS' 361 event['timestamp']['microseconds'] = 'USECS' 362 return event 363 364def filter_qmp(qmsg, filter_fn): 365 '''Given a string filter, filter a QMP object's values. 366 filter_fn takes a (key, value) pair.''' 367 # Iterate through either lists or dicts; 368 if isinstance(qmsg, list): 369 items = enumerate(qmsg) 370 else: 371 items = qmsg.items() 372 373 for k, v in items: 374 if isinstance(v, (dict, list)): 375 qmsg[k] = filter_qmp(v, filter_fn) 376 else: 377 qmsg[k] = filter_fn(k, v) 378 return qmsg 379 380def filter_testfiles(msg): 381 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 382 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 383 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 384 385def filter_qmp_testfiles(qmsg): 386 def _filter(_key, value): 387 if is_str(value): 388 return filter_testfiles(value) 389 return value 390 return filter_qmp(qmsg, _filter) 391 392def filter_virtio_scsi(output: str) -> str: 393 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 394 395def filter_qmp_virtio_scsi(qmsg): 396 def _filter(_key, value): 397 if is_str(value): 398 return filter_virtio_scsi(value) 399 return value 400 return filter_qmp(qmsg, _filter) 401 402def filter_generated_node_ids(msg): 403 return re.sub("#block[0-9]+", "NODE_NAME", msg) 404 405def filter_img_info(output, filename): 406 lines = [] 407 for line in output.split('\n'): 408 if 'disk size' in line or 'actual-size' in line: 409 continue 410 line = line.replace(filename, 'TEST_IMG') 411 line = filter_testfiles(line) 412 line = line.replace(imgfmt, 'IMGFMT') 413 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 414 line = re.sub('uuid: [-a-f0-9]+', 415 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 416 line) 417 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 418 lines.append(line) 419 return '\n'.join(lines) 420 421def filter_imgfmt(msg): 422 return msg.replace(imgfmt, 'IMGFMT') 423 424def filter_qmp_imgfmt(qmsg): 425 def _filter(_key, value): 426 if is_str(value): 427 return filter_imgfmt(value) 428 return value 429 return filter_qmp(qmsg, _filter) 430 431def filter_nbd_exports(output: str) -> str: 432 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 433 434 435Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 436 437def log(msg: Msg, 438 filters: Iterable[Callable[[Msg], Msg]] = (), 439 indent: Optional[int] = None) -> None: 440 """ 441 Logs either a string message or a JSON serializable message (like QMP). 442 If indent is provided, JSON serializable messages are pretty-printed. 443 """ 444 for flt in filters: 445 msg = flt(msg) 446 if isinstance(msg, (dict, list)): 447 # Don't sort if it's already sorted 448 do_sort = not isinstance(msg, OrderedDict) 449 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 450 else: 451 test_logger.info(msg) 452 453class Timeout: 454 def __init__(self, seconds, errmsg="Timeout"): 455 self.seconds = seconds 456 self.errmsg = errmsg 457 def __enter__(self): 458 signal.signal(signal.SIGALRM, self.timeout) 459 signal.setitimer(signal.ITIMER_REAL, self.seconds) 460 return self 461 def __exit__(self, exc_type, value, traceback): 462 signal.setitimer(signal.ITIMER_REAL, 0) 463 return False 464 def timeout(self, signum, frame): 465 raise Exception(self.errmsg) 466 467def file_pattern(name): 468 return "{0}-{1}".format(os.getpid(), name) 469 470class FilePath: 471 """ 472 Context manager generating multiple file names. The generated files are 473 removed when exiting the context. 474 475 Example usage: 476 477 with FilePath('a.img', 'b.img') as (img_a, img_b): 478 # Use img_a and img_b here... 479 480 # a.img and b.img are automatically removed here. 481 482 By default images are created in iotests.test_dir. To create sockets use 483 iotests.sock_dir: 484 485 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 486 487 For convenience, calling with one argument yields a single file instead of 488 a tuple with one item. 489 490 """ 491 def __init__(self, *names, base_dir=test_dir): 492 self.paths = [os.path.join(base_dir, file_pattern(name)) 493 for name in names] 494 495 def __enter__(self): 496 if len(self.paths) == 1: 497 return self.paths[0] 498 else: 499 return self.paths 500 501 def __exit__(self, exc_type, exc_val, exc_tb): 502 for path in self.paths: 503 try: 504 os.remove(path) 505 except OSError: 506 pass 507 return False 508 509 510def file_path_remover(): 511 for path in reversed(file_path_remover.paths): 512 try: 513 os.remove(path) 514 except OSError: 515 pass 516 517 518def file_path(*names, base_dir=test_dir): 519 ''' Another way to get auto-generated filename that cleans itself up. 520 521 Use is as simple as: 522 523 img_a, img_b = file_path('a.img', 'b.img') 524 sock = file_path('socket') 525 ''' 526 527 if not hasattr(file_path_remover, 'paths'): 528 file_path_remover.paths = [] 529 atexit.register(file_path_remover) 530 531 paths = [] 532 for name in names: 533 filename = file_pattern(name) 534 path = os.path.join(base_dir, filename) 535 file_path_remover.paths.append(path) 536 paths.append(path) 537 538 return paths[0] if len(paths) == 1 else paths 539 540def remote_filename(path): 541 if imgproto == 'file': 542 return path 543 elif imgproto == 'ssh': 544 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 545 else: 546 raise Exception("Protocol %s not supported" % (imgproto)) 547 548class VM(qtest.QEMUQtestMachine): 549 '''A QEMU VM''' 550 551 def __init__(self, path_suffix=''): 552 name = "qemu%s-%d" % (path_suffix, os.getpid()) 553 super().__init__(qemu_prog, qemu_opts, name=name, 554 test_dir=test_dir, 555 socket_scm_helper=socket_scm_helper, 556 sock_dir=sock_dir) 557 self._num_drives = 0 558 559 def add_object(self, opts): 560 self._args.append('-object') 561 self._args.append(opts) 562 return self 563 564 def add_device(self, opts): 565 self._args.append('-device') 566 self._args.append(opts) 567 return self 568 569 def add_drive_raw(self, opts): 570 self._args.append('-drive') 571 self._args.append(opts) 572 return self 573 574 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 575 '''Add a virtio-blk drive to the VM''' 576 options = ['if=%s' % interface, 577 'id=drive%d' % self._num_drives] 578 579 if path is not None: 580 options.append('file=%s' % path) 581 options.append('format=%s' % img_format) 582 options.append('cache=%s' % cachemode) 583 options.append('aio=%s' % aiomode) 584 585 if opts: 586 options.append(opts) 587 588 if img_format == 'luks' and 'key-secret' not in opts: 589 # default luks support 590 if luks_default_secret_object not in self._args: 591 self.add_object(luks_default_secret_object) 592 593 options.append(luks_default_key_secret_opt) 594 595 self._args.append('-drive') 596 self._args.append(','.join(options)) 597 self._num_drives += 1 598 return self 599 600 def add_blockdev(self, opts): 601 self._args.append('-blockdev') 602 if isinstance(opts, str): 603 self._args.append(opts) 604 else: 605 self._args.append(','.join(opts)) 606 return self 607 608 def add_incoming(self, addr): 609 self._args.append('-incoming') 610 self._args.append(addr) 611 return self 612 613 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 614 cmd = 'human-monitor-command' 615 kwargs: Dict[str, Any] = {'command-line': command_line} 616 if use_log: 617 return self.qmp_log(cmd, **kwargs) 618 else: 619 return self.qmp(cmd, **kwargs) 620 621 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 622 """Pause drive r/w operations""" 623 if not event: 624 self.pause_drive(drive, "read_aio") 625 self.pause_drive(drive, "write_aio") 626 return 627 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 628 629 def resume_drive(self, drive: str) -> None: 630 """Resume drive r/w operations""" 631 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 632 633 def hmp_qemu_io(self, drive: str, cmd: str, 634 use_log: bool = False) -> QMPMessage: 635 """Write to a given drive using an HMP command""" 636 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 637 638 def flatten_qmp_object(self, obj, output=None, basestr=''): 639 if output is None: 640 output = dict() 641 if isinstance(obj, list): 642 for i, item in enumerate(obj): 643 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 644 elif isinstance(obj, dict): 645 for key in obj: 646 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 647 else: 648 output[basestr[:-1]] = obj # Strip trailing '.' 649 return output 650 651 def qmp_to_opts(self, obj): 652 obj = self.flatten_qmp_object(obj) 653 output_list = list() 654 for key in obj: 655 output_list += [key + '=' + obj[key]] 656 return ','.join(output_list) 657 658 def get_qmp_events_filtered(self, wait=60.0): 659 result = [] 660 for ev in self.get_qmp_events(wait=wait): 661 result.append(filter_qmp_event(ev)) 662 return result 663 664 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 665 full_cmd = OrderedDict(( 666 ("execute", cmd), 667 ("arguments", ordered_qmp(kwargs)) 668 )) 669 log(full_cmd, filters, indent=indent) 670 result = self.qmp(cmd, **kwargs) 671 log(result, filters, indent=indent) 672 return result 673 674 # Returns None on success, and an error string on failure 675 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 676 pre_finalize=None, cancel=False, wait=60.0): 677 """ 678 run_job moves a job from creation through to dismissal. 679 680 :param job: String. ID of recently-launched job 681 :param auto_finalize: Bool. True if the job was launched with 682 auto_finalize. Defaults to True. 683 :param auto_dismiss: Bool. True if the job was launched with 684 auto_dismiss=True. Defaults to False. 685 :param pre_finalize: Callback. A callable that takes no arguments to be 686 invoked prior to issuing job-finalize, if any. 687 :param cancel: Bool. When true, cancels the job after the pre_finalize 688 callback. 689 :param wait: Float. Timeout value specifying how long to wait for any 690 event, in seconds. Defaults to 60.0. 691 """ 692 match_device = {'data': {'device': job}} 693 match_id = {'data': {'id': job}} 694 events = [ 695 ('BLOCK_JOB_COMPLETED', match_device), 696 ('BLOCK_JOB_CANCELLED', match_device), 697 ('BLOCK_JOB_ERROR', match_device), 698 ('BLOCK_JOB_READY', match_device), 699 ('BLOCK_JOB_PENDING', match_id), 700 ('JOB_STATUS_CHANGE', match_id) 701 ] 702 error = None 703 while True: 704 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 705 if ev['event'] != 'JOB_STATUS_CHANGE': 706 log(ev) 707 continue 708 status = ev['data']['status'] 709 if status == 'aborting': 710 result = self.qmp('query-jobs') 711 for j in result['return']: 712 if j['id'] == job: 713 error = j['error'] 714 log('Job failed: %s' % (j['error'])) 715 elif status == 'ready': 716 self.qmp_log('job-complete', id=job) 717 elif status == 'pending' and not auto_finalize: 718 if pre_finalize: 719 pre_finalize() 720 if cancel: 721 self.qmp_log('job-cancel', id=job) 722 else: 723 self.qmp_log('job-finalize', id=job) 724 elif status == 'concluded' and not auto_dismiss: 725 self.qmp_log('job-dismiss', id=job) 726 elif status == 'null': 727 return error 728 729 # Returns None on success, and an error string on failure 730 def blockdev_create(self, options, job_id='job0', filters=None): 731 if filters is None: 732 filters = [filter_qmp_testfiles] 733 result = self.qmp_log('blockdev-create', filters=filters, 734 job_id=job_id, options=options) 735 736 if 'return' in result: 737 assert result['return'] == {} 738 job_result = self.run_job(job_id) 739 else: 740 job_result = result['error'] 741 742 log("") 743 return job_result 744 745 def enable_migration_events(self, name): 746 log('Enabling migration QMP events on %s...' % name) 747 log(self.qmp('migrate-set-capabilities', capabilities=[ 748 { 749 'capability': 'events', 750 'state': True 751 } 752 ])) 753 754 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 755 while True: 756 event = self.event_wait('MIGRATION') 757 # We use the default timeout, and with a timeout, event_wait() 758 # never returns None 759 assert event 760 761 log(event, filters=[filter_qmp_event]) 762 if event['data']['status'] in ('completed', 'failed'): 763 break 764 765 if event['data']['status'] == 'completed': 766 # The event may occur in finish-migrate, so wait for the expected 767 # post-migration runstate 768 runstate = None 769 while runstate != expect_runstate: 770 runstate = self.qmp('query-status')['return']['status'] 771 return True 772 else: 773 return False 774 775 def node_info(self, node_name): 776 nodes = self.qmp('query-named-block-nodes') 777 for x in nodes['return']: 778 if x['node-name'] == node_name: 779 return x 780 return None 781 782 def query_bitmaps(self): 783 res = self.qmp("query-named-block-nodes") 784 return {device['node-name']: device['dirty-bitmaps'] 785 for device in res['return'] if 'dirty-bitmaps' in device} 786 787 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 788 """ 789 get a specific bitmap from the object returned by query_bitmaps. 790 :param recording: If specified, filter results by the specified value. 791 :param bitmaps: If specified, use it instead of call query_bitmaps() 792 """ 793 if bitmaps is None: 794 bitmaps = self.query_bitmaps() 795 796 for bitmap in bitmaps[node_name]: 797 if bitmap.get('name', '') == bitmap_name: 798 if recording is None or bitmap.get('recording') == recording: 799 return bitmap 800 return None 801 802 def check_bitmap_status(self, node_name, bitmap_name, fields): 803 ret = self.get_bitmap(node_name, bitmap_name) 804 805 return fields.items() <= ret.items() 806 807 def assert_block_path(self, root, path, expected_node, graph=None): 808 """ 809 Check whether the node under the given path in the block graph 810 is @expected_node. 811 812 @root is the node name of the node where the @path is rooted. 813 814 @path is a string that consists of child names separated by 815 slashes. It must begin with a slash. 816 817 Examples for @root + @path: 818 - root="qcow2-node", path="/backing/file" 819 - root="quorum-node", path="/children.2/file" 820 821 Hypothetically, @path could be empty, in which case it would 822 point to @root. However, in practice this case is not useful 823 and hence not allowed. 824 825 @expected_node may be None. (All elements of the path but the 826 leaf must still exist.) 827 828 @graph may be None or the result of an x-debug-query-block-graph 829 call that has already been performed. 830 """ 831 if graph is None: 832 graph = self.qmp('x-debug-query-block-graph')['return'] 833 834 iter_path = iter(path.split('/')) 835 836 # Must start with a / 837 assert next(iter_path) == '' 838 839 node = next((node for node in graph['nodes'] if node['name'] == root), 840 None) 841 842 # An empty @path is not allowed, so the root node must be present 843 assert node is not None, 'Root node %s not found' % root 844 845 for child_name in iter_path: 846 assert node is not None, 'Cannot follow path %s%s' % (root, path) 847 848 try: 849 node_id = next(edge['child'] for edge in graph['edges'] 850 if (edge['parent'] == node['id'] and 851 edge['name'] == child_name)) 852 853 node = next(node for node in graph['nodes'] 854 if node['id'] == node_id) 855 856 except StopIteration: 857 node = None 858 859 if node is None: 860 assert expected_node is None, \ 861 'No node found under %s (but expected %s)' % \ 862 (path, expected_node) 863 else: 864 assert node['name'] == expected_node, \ 865 'Found node %s under %s (but expected %s)' % \ 866 (node['name'], path, expected_node) 867 868index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 869 870class QMPTestCase(unittest.TestCase): 871 '''Abstract base class for QMP test cases''' 872 873 def __init__(self, *args, **kwargs): 874 super().__init__(*args, **kwargs) 875 # Many users of this class set a VM property we rely on heavily 876 # in the methods below. 877 self.vm = None 878 879 def dictpath(self, d, path): 880 '''Traverse a path in a nested dict''' 881 for component in path.split('/'): 882 m = index_re.match(component) 883 if m: 884 component, idx = m.groups() 885 idx = int(idx) 886 887 if not isinstance(d, dict) or component not in d: 888 self.fail(f'failed path traversal for "{path}" in "{d}"') 889 d = d[component] 890 891 if m: 892 if not isinstance(d, list): 893 self.fail(f'path component "{component}" in "{path}" ' 894 f'is not a list in "{d}"') 895 try: 896 d = d[idx] 897 except IndexError: 898 self.fail(f'invalid index "{idx}" in path "{path}" ' 899 f'in "{d}"') 900 return d 901 902 def assert_qmp_absent(self, d, path): 903 try: 904 result = self.dictpath(d, path) 905 except AssertionError: 906 return 907 self.fail('path "%s" has value "%s"' % (path, str(result))) 908 909 def assert_qmp(self, d, path, value): 910 '''Assert that the value for a specific path in a QMP dict 911 matches. When given a list of values, assert that any of 912 them matches.''' 913 914 result = self.dictpath(d, path) 915 916 # [] makes no sense as a list of valid values, so treat it as 917 # an actual single value. 918 if isinstance(value, list) and value != []: 919 for v in value: 920 if result == v: 921 return 922 self.fail('no match for "%s" in %s' % (str(result), str(value))) 923 else: 924 self.assertEqual(result, value, 925 '"%s" is "%s", expected "%s"' 926 % (path, str(result), str(value))) 927 928 def assert_no_active_block_jobs(self): 929 result = self.vm.qmp('query-block-jobs') 930 self.assert_qmp(result, 'return', []) 931 932 def assert_has_block_node(self, node_name=None, file_name=None): 933 """Issue a query-named-block-nodes and assert node_name and/or 934 file_name is present in the result""" 935 def check_equal_or_none(a, b): 936 return a is None or b is None or a == b 937 assert node_name or file_name 938 result = self.vm.qmp('query-named-block-nodes') 939 for x in result["return"]: 940 if check_equal_or_none(x.get("node-name"), node_name) and \ 941 check_equal_or_none(x.get("file"), file_name): 942 return 943 self.fail("Cannot find %s %s in result:\n%s" % 944 (node_name, file_name, result)) 945 946 def assert_json_filename_equal(self, json_filename, reference): 947 '''Asserts that the given filename is a json: filename and that its 948 content is equal to the given reference object''' 949 self.assertEqual(json_filename[:5], 'json:') 950 self.assertEqual( 951 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 952 self.vm.flatten_qmp_object(reference) 953 ) 954 955 def cancel_and_wait(self, drive='drive0', force=False, 956 resume=False, wait=60.0): 957 '''Cancel a block job and wait for it to finish, returning the event''' 958 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 959 self.assert_qmp(result, 'return', {}) 960 961 if resume: 962 self.vm.resume_drive(drive) 963 964 cancelled = False 965 result = None 966 while not cancelled: 967 for event in self.vm.get_qmp_events(wait=wait): 968 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 969 event['event'] == 'BLOCK_JOB_CANCELLED': 970 self.assert_qmp(event, 'data/device', drive) 971 result = event 972 cancelled = True 973 elif event['event'] == 'JOB_STATUS_CHANGE': 974 self.assert_qmp(event, 'data/id', drive) 975 976 977 self.assert_no_active_block_jobs() 978 return result 979 980 def wait_until_completed(self, drive='drive0', check_offset=True, 981 wait=60.0, error=None): 982 '''Wait for a block job to finish, returning the event''' 983 while True: 984 for event in self.vm.get_qmp_events(wait=wait): 985 if event['event'] == 'BLOCK_JOB_COMPLETED': 986 self.assert_qmp(event, 'data/device', drive) 987 if error is None: 988 self.assert_qmp_absent(event, 'data/error') 989 if check_offset: 990 self.assert_qmp(event, 'data/offset', 991 event['data']['len']) 992 else: 993 self.assert_qmp(event, 'data/error', error) 994 self.assert_no_active_block_jobs() 995 return event 996 if event['event'] == 'JOB_STATUS_CHANGE': 997 self.assert_qmp(event, 'data/id', drive) 998 999 def wait_ready(self, drive='drive0'): 1000 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1001 return self.vm.events_wait([ 1002 ('BLOCK_JOB_READY', 1003 {'data': {'type': 'mirror', 'device': drive}}), 1004 ('BLOCK_JOB_READY', 1005 {'data': {'type': 'commit', 'device': drive}}) 1006 ]) 1007 1008 def wait_ready_and_cancel(self, drive='drive0'): 1009 self.wait_ready(drive=drive) 1010 event = self.cancel_and_wait(drive=drive) 1011 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1012 self.assert_qmp(event, 'data/type', 'mirror') 1013 self.assert_qmp(event, 'data/offset', event['data']['len']) 1014 1015 def complete_and_wait(self, drive='drive0', wait_ready=True, 1016 completion_error=None): 1017 '''Complete a block job and wait for it to finish''' 1018 if wait_ready: 1019 self.wait_ready(drive=drive) 1020 1021 result = self.vm.qmp('block-job-complete', device=drive) 1022 self.assert_qmp(result, 'return', {}) 1023 1024 event = self.wait_until_completed(drive=drive, error=completion_error) 1025 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1026 1027 def pause_wait(self, job_id='job0'): 1028 with Timeout(3, "Timeout waiting for job to pause"): 1029 while True: 1030 result = self.vm.qmp('query-block-jobs') 1031 found = False 1032 for job in result['return']: 1033 if job['device'] == job_id: 1034 found = True 1035 if job['paused'] and not job['busy']: 1036 return job 1037 break 1038 assert found 1039 1040 def pause_job(self, job_id='job0', wait=True): 1041 result = self.vm.qmp('block-job-pause', device=job_id) 1042 self.assert_qmp(result, 'return', {}) 1043 if wait: 1044 return self.pause_wait(job_id) 1045 return result 1046 1047 def case_skip(self, reason): 1048 '''Skip this test case''' 1049 case_notrun(reason) 1050 self.skipTest(reason) 1051 1052 1053def notrun(reason): 1054 '''Skip this test suite''' 1055 # Each test in qemu-iotests has a number ("seq") 1056 seq = os.path.basename(sys.argv[0]) 1057 1058 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1059 logger.warning("%s not run: %s", seq, reason) 1060 sys.exit(0) 1061 1062def case_notrun(reason): 1063 '''Mark this test case as not having been run (without actually 1064 skipping it, that is left to the caller). See 1065 QMPTestCase.case_skip() for a variant that actually skips the 1066 current test case.''' 1067 1068 # Each test in qemu-iotests has a number ("seq") 1069 seq = os.path.basename(sys.argv[0]) 1070 1071 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1072 ' [case not run] ' + reason + '\n') 1073 1074def _verify_image_format(supported_fmts: Sequence[str] = (), 1075 unsupported_fmts: Sequence[str] = ()) -> None: 1076 if 'generic' in supported_fmts and \ 1077 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1078 # similar to 1079 # _supported_fmt generic 1080 # for bash tests 1081 supported_fmts = () 1082 1083 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1084 if not_sup or (imgfmt in unsupported_fmts): 1085 notrun('not suitable for this image format: %s' % imgfmt) 1086 1087 if imgfmt == 'luks': 1088 verify_working_luks() 1089 1090def _verify_protocol(supported: Sequence[str] = (), 1091 unsupported: Sequence[str] = ()) -> None: 1092 assert not (supported and unsupported) 1093 1094 if 'generic' in supported: 1095 return 1096 1097 not_sup = supported and (imgproto not in supported) 1098 if not_sup or (imgproto in unsupported): 1099 notrun('not suitable for this protocol: %s' % imgproto) 1100 1101def _verify_platform(supported: Sequence[str] = (), 1102 unsupported: Sequence[str] = ()) -> None: 1103 if any((sys.platform.startswith(x) for x in unsupported)): 1104 notrun('not suitable for this OS: %s' % sys.platform) 1105 1106 if supported: 1107 if not any((sys.platform.startswith(x) for x in supported)): 1108 notrun('not suitable for this OS: %s' % sys.platform) 1109 1110def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1111 if supported_cache_modes and (cachemode not in supported_cache_modes): 1112 notrun('not suitable for this cache mode: %s' % cachemode) 1113 1114def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1115 if supported_aio_modes and (aiomode not in supported_aio_modes): 1116 notrun('not suitable for this aio mode: %s' % aiomode) 1117 1118def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1119 usf_list = list(set(required_formats) - set(supported_formats())) 1120 if usf_list: 1121 notrun(f'formats {usf_list} are not whitelisted') 1122 1123def supports_quorum(): 1124 return 'quorum' in qemu_img_pipe('--help') 1125 1126def verify_quorum(): 1127 '''Skip test suite if quorum support is not available''' 1128 if not supports_quorum(): 1129 notrun('quorum support missing') 1130 1131def has_working_luks() -> Tuple[bool, str]: 1132 """ 1133 Check whether our LUKS driver can actually create images 1134 (this extends to LUKS encryption for qcow2). 1135 1136 If not, return the reason why. 1137 """ 1138 1139 img_file = f'{test_dir}/luks-test.luks' 1140 (output, status) = \ 1141 qemu_img_pipe_and_status('create', '-f', 'luks', 1142 '--object', luks_default_secret_object, 1143 '-o', luks_default_key_secret_opt, 1144 '-o', 'iter-time=10', 1145 img_file, '1G') 1146 try: 1147 os.remove(img_file) 1148 except OSError: 1149 pass 1150 1151 if status != 0: 1152 reason = output 1153 for line in output.splitlines(): 1154 if img_file + ':' in line: 1155 reason = line.split(img_file + ':', 1)[1].strip() 1156 break 1157 1158 return (False, reason) 1159 else: 1160 return (True, '') 1161 1162def verify_working_luks(): 1163 """ 1164 Skip test suite if LUKS does not work 1165 """ 1166 (working, reason) = has_working_luks() 1167 if not working: 1168 notrun(reason) 1169 1170def qemu_pipe(*args: str) -> str: 1171 """ 1172 Run qemu with an option to print something and exit (e.g. a help option). 1173 1174 :return: QEMU's stdout output. 1175 """ 1176 full_args = [qemu_prog] + qemu_opts + list(args) 1177 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1178 return output 1179 1180def supported_formats(read_only=False): 1181 '''Set 'read_only' to True to check ro-whitelist 1182 Otherwise, rw-whitelist is checked''' 1183 1184 if not hasattr(supported_formats, "formats"): 1185 supported_formats.formats = {} 1186 1187 if read_only not in supported_formats.formats: 1188 format_message = qemu_pipe("-drive", "format=help") 1189 line = 1 if read_only else 0 1190 supported_formats.formats[read_only] = \ 1191 format_message.splitlines()[line].split(":")[1].split() 1192 1193 return supported_formats.formats[read_only] 1194 1195def skip_if_unsupported(required_formats=(), read_only=False): 1196 '''Skip Test Decorator 1197 Runs the test if all the required formats are whitelisted''' 1198 def skip_test_decorator(func): 1199 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1200 **kwargs: Dict[str, Any]) -> None: 1201 if callable(required_formats): 1202 fmts = required_formats(test_case) 1203 else: 1204 fmts = required_formats 1205 1206 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1207 if usf_list: 1208 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1209 test_case.case_skip(msg) 1210 else: 1211 func(test_case, *args, **kwargs) 1212 return func_wrapper 1213 return skip_test_decorator 1214 1215def skip_for_formats(formats: Sequence[str] = ()) \ 1216 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1217 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1218 '''Skip Test Decorator 1219 Skips the test for the given formats''' 1220 def skip_test_decorator(func): 1221 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1222 **kwargs: Dict[str, Any]) -> None: 1223 if imgfmt in formats: 1224 msg = f'{test_case}: Skipped for format {imgfmt}' 1225 test_case.case_skip(msg) 1226 else: 1227 func(test_case, *args, **kwargs) 1228 return func_wrapper 1229 return skip_test_decorator 1230 1231def skip_if_user_is_root(func): 1232 '''Skip Test Decorator 1233 Runs the test only without root permissions''' 1234 def func_wrapper(*args, **kwargs): 1235 if os.getuid() == 0: 1236 case_notrun('{}: cannot be run as root'.format(args[0])) 1237 return None 1238 else: 1239 return func(*args, **kwargs) 1240 return func_wrapper 1241 1242def execute_unittest(debug=False): 1243 """Executes unittests within the calling module.""" 1244 1245 verbosity = 2 if debug else 1 1246 1247 if debug: 1248 output = sys.stdout 1249 else: 1250 # We need to filter out the time taken from the output so that 1251 # qemu-iotest can reliably diff the results against master output. 1252 output = io.StringIO() 1253 1254 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1255 verbosity=verbosity) 1256 try: 1257 # unittest.main() will use sys.exit(); so expect a SystemExit 1258 # exception 1259 unittest.main(testRunner=runner) 1260 finally: 1261 # We need to filter out the time taken from the output so that 1262 # qemu-iotest can reliably diff the results against master output. 1263 if not debug: 1264 out = output.getvalue() 1265 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1266 1267 # Hide skipped tests from the reference output 1268 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1269 out_first_line, out_rest = out.split('\n', 1) 1270 out = out_first_line.replace('s', '.') + '\n' + out_rest 1271 1272 sys.stderr.write(out) 1273 1274def execute_setup_common(supported_fmts: Sequence[str] = (), 1275 supported_platforms: Sequence[str] = (), 1276 supported_cache_modes: Sequence[str] = (), 1277 supported_aio_modes: Sequence[str] = (), 1278 unsupported_fmts: Sequence[str] = (), 1279 supported_protocols: Sequence[str] = (), 1280 unsupported_protocols: Sequence[str] = (), 1281 required_fmts: Sequence[str] = ()) -> bool: 1282 """ 1283 Perform necessary setup for either script-style or unittest-style tests. 1284 1285 :return: Bool; Whether or not debug mode has been requested via the CLI. 1286 """ 1287 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1288 1289 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1290 # indicate that we're not being run via "check". There may be 1291 # other things set up by "check" that individual test cases rely 1292 # on. 1293 if test_dir is None or qemu_default_machine is None: 1294 sys.stderr.write('Please run this test via the "check" script\n') 1295 sys.exit(os.EX_USAGE) 1296 1297 debug = '-d' in sys.argv 1298 if debug: 1299 sys.argv.remove('-d') 1300 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1301 1302 _verify_image_format(supported_fmts, unsupported_fmts) 1303 _verify_protocol(supported_protocols, unsupported_protocols) 1304 _verify_platform(supported=supported_platforms) 1305 _verify_cache_mode(supported_cache_modes) 1306 _verify_aio_mode(supported_aio_modes) 1307 _verify_formats(required_fmts) 1308 1309 return debug 1310 1311def execute_test(*args, test_function=None, **kwargs): 1312 """Run either unittest or script-style tests.""" 1313 1314 debug = execute_setup_common(*args, **kwargs) 1315 if not test_function: 1316 execute_unittest(debug) 1317 else: 1318 test_function() 1319 1320def activate_logging(): 1321 """Activate iotests.log() output to stdout for script-style tests.""" 1322 handler = logging.StreamHandler(stream=sys.stdout) 1323 formatter = logging.Formatter('%(message)s') 1324 handler.setFormatter(formatter) 1325 test_logger.addHandler(handler) 1326 test_logger.setLevel(logging.INFO) 1327 test_logger.propagate = False 1328 1329# This is called from script-style iotests without a single point of entry 1330def script_initialize(*args, **kwargs): 1331 """Initialize script-style tests without running any tests.""" 1332 activate_logging() 1333 execute_setup_common(*args, **kwargs) 1334 1335# This is called from script-style iotests with a single point of entry 1336def script_main(test_function, *args, **kwargs): 1337 """Run script-style tests outside of the unittest framework""" 1338 activate_logging() 1339 execute_test(*args, test_function=test_function, **kwargs) 1340 1341# This is called from unittest style iotests 1342def main(*args, **kwargs): 1343 """Run tests using the unittest framework""" 1344 execute_test(*args, **kwargs) 1345