1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43assert sys.version_info >= (3, 6) 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79test_dir = os.environ.get('TEST_DIR') 80sock_dir = os.environ.get('SOCK_DIR') 81output_dir = os.environ.get('OUTPUT_DIR', '.') 82cachemode = os.environ.get('CACHEMODE') 83aiomode = os.environ.get('AIOMODE') 84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 85 86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 87 88luks_default_secret_object = 'secret,id=keysec0,data=' + \ 89 os.environ.get('IMGKEYSECRET', '') 90luks_default_key_secret_opt = 'key-secret=keysec0' 91 92 93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 94 """ 95 Run qemu-img and return both its output and its exit code 96 """ 97 subp = subprocess.Popen(qemu_img_args + list(args), 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 universal_newlines=True) 101 output = subp.communicate()[0] 102 if subp.returncode < 0: 103 sys.stderr.write('qemu-img received signal %i: %s\n' 104 % (-subp.returncode, 105 ' '.join(qemu_img_args + list(args)))) 106 return (output, subp.returncode) 107 108def qemu_img(*args: str) -> int: 109 '''Run qemu-img and return the exit code''' 110 return qemu_img_pipe_and_status(*args)[1] 111 112def ordered_qmp(qmsg, conv_keys=True): 113 # Dictionaries are not ordered prior to 3.6, therefore: 114 if isinstance(qmsg, list): 115 return [ordered_qmp(atom) for atom in qmsg] 116 if isinstance(qmsg, dict): 117 od = OrderedDict() 118 for k, v in sorted(qmsg.items()): 119 if conv_keys: 120 k = k.replace('_', '-') 121 od[k] = ordered_qmp(v, conv_keys=False) 122 return od 123 return qmsg 124 125def qemu_img_create(*args): 126 args = list(args) 127 128 # default luks support 129 if '-f' in args and args[args.index('-f') + 1] == 'luks': 130 if '-o' in args: 131 i = args.index('-o') 132 if 'key-secret' not in args[i + 1]: 133 args[i + 1].append(luks_default_key_secret_opt) 134 args.insert(i + 2, '--object') 135 args.insert(i + 3, luks_default_secret_object) 136 else: 137 args = ['-o', luks_default_key_secret_opt, 138 '--object', luks_default_secret_object] + args 139 140 args.insert(0, 'create') 141 142 return qemu_img(*args) 143 144def qemu_img_measure(*args): 145 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 146 147def qemu_img_check(*args): 148 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 149 150def qemu_img_verbose(*args): 151 '''Run qemu-img without suppressing its output and return the exit code''' 152 exitcode = subprocess.call(qemu_img_args + list(args)) 153 if exitcode < 0: 154 sys.stderr.write('qemu-img received signal %i: %s\n' 155 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 156 return exitcode 157 158def qemu_img_pipe(*args: str) -> str: 159 '''Run qemu-img and return its output''' 160 return qemu_img_pipe_and_status(*args)[0] 161 162def qemu_img_log(*args): 163 result = qemu_img_pipe(*args) 164 log(result, filters=[filter_testfiles]) 165 return result 166 167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 168 args = ['info'] 169 if imgopts: 170 args.append('--image-opts') 171 else: 172 args += ['-f', imgfmt] 173 args += extra_args 174 args.append(filename) 175 176 output = qemu_img_pipe(*args) 177 if not filter_path: 178 filter_path = filename 179 log(filter_img_info(output, filter_path)) 180 181def qemu_io(*args): 182 '''Run qemu-io and return the stdout data''' 183 args = qemu_io_args + list(args) 184 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 185 stderr=subprocess.STDOUT, 186 universal_newlines=True) 187 output = subp.communicate()[0] 188 if subp.returncode < 0: 189 sys.stderr.write('qemu-io received signal %i: %s\n' 190 % (-subp.returncode, ' '.join(args))) 191 return output 192 193def qemu_io_log(*args): 194 result = qemu_io(*args) 195 log(result, filters=[filter_testfiles, filter_qemu_io]) 196 return result 197 198def qemu_io_silent(*args): 199 '''Run qemu-io and return the exit code, suppressing stdout''' 200 args = qemu_io_args + list(args) 201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 202 if exitcode < 0: 203 sys.stderr.write('qemu-io received signal %i: %s\n' % 204 (-exitcode, ' '.join(args))) 205 return exitcode 206 207def qemu_io_silent_check(*args): 208 '''Run qemu-io and return the true if subprocess returned 0''' 209 args = qemu_io_args + list(args) 210 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 211 stderr=subprocess.STDOUT) 212 return exitcode == 0 213 214def get_virtio_scsi_device(): 215 if qemu_default_machine == 's390-ccw-virtio': 216 return 'virtio-scsi-ccw' 217 return 'virtio-scsi-pci' 218 219class QemuIoInteractive: 220 def __init__(self, *args): 221 self.args = qemu_io_args_no_fmt + list(args) 222 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 223 stdout=subprocess.PIPE, 224 stderr=subprocess.STDOUT, 225 universal_newlines=True) 226 out = self._p.stdout.read(9) 227 if out != 'qemu-io> ': 228 # Most probably qemu-io just failed to start. 229 # Let's collect the whole output and exit. 230 out += self._p.stdout.read() 231 self._p.wait(timeout=1) 232 raise ValueError(out) 233 234 def close(self): 235 self._p.communicate('q\n') 236 237 def _read_output(self): 238 pattern = 'qemu-io> ' 239 n = len(pattern) 240 pos = 0 241 s = [] 242 while pos != n: 243 c = self._p.stdout.read(1) 244 # check unexpected EOF 245 assert c != '' 246 s.append(c) 247 if c == pattern[pos]: 248 pos += 1 249 else: 250 pos = 0 251 252 return ''.join(s[:-n]) 253 254 def cmd(self, cmd): 255 # quit command is in close(), '\n' is added automatically 256 assert '\n' not in cmd 257 cmd = cmd.strip() 258 assert cmd not in ('q', 'quit') 259 self._p.stdin.write(cmd + '\n') 260 self._p.stdin.flush() 261 return self._read_output() 262 263 264def qemu_nbd(*args): 265 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 266 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 267 268def qemu_nbd_early_pipe(*args): 269 '''Run qemu-nbd in daemon mode and return both the parent's exit code 270 and its output in case of an error''' 271 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 272 stdout=subprocess.PIPE, 273 universal_newlines=True) 274 output = subp.communicate()[0] 275 if subp.returncode < 0: 276 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 277 (-subp.returncode, 278 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 279 280 return subp.returncode, output if subp.returncode else '' 281 282@contextmanager 283def qemu_nbd_popen(*args): 284 '''Context manager running qemu-nbd within the context''' 285 pid_file = file_path("pid") 286 287 cmd = list(qemu_nbd_args) 288 cmd.extend(('--persistent', '--pid-file', pid_file)) 289 cmd.extend(args) 290 291 log('Start NBD server') 292 p = subprocess.Popen(cmd) 293 try: 294 while not os.path.exists(pid_file): 295 if p.poll() is not None: 296 raise RuntimeError( 297 "qemu-nbd terminated with exit code {}: {}" 298 .format(p.returncode, ' '.join(cmd))) 299 300 time.sleep(0.01) 301 yield 302 finally: 303 log('Kill NBD server') 304 p.kill() 305 p.wait() 306 307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 308 '''Return True if two image files are identical''' 309 return qemu_img('compare', '-f', fmt1, 310 '-F', fmt2, img1, img2) == 0 311 312def create_image(name, size): 313 '''Create a fully-allocated raw image with sector markers''' 314 file = open(name, 'wb') 315 i = 0 316 while i < size: 317 sector = struct.pack('>l504xl', i // 512, i // 512) 318 file.write(sector) 319 i = i + 512 320 file.close() 321 322def image_size(img): 323 '''Return image's virtual size''' 324 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 325 return json.loads(r)['virtual-size'] 326 327def is_str(val): 328 return isinstance(val, str) 329 330test_dir_re = re.compile(r"%s" % test_dir) 331def filter_test_dir(msg): 332 return test_dir_re.sub("TEST_DIR", msg) 333 334win32_re = re.compile(r"\r") 335def filter_win32(msg): 336 return win32_re.sub("", msg) 337 338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 339 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 340 r"and [0-9\/.inf]* ops\/sec\)") 341def filter_qemu_io(msg): 342 msg = filter_win32(msg) 343 return qemu_io_re.sub("X ops; XX:XX:XX.X " 344 "(XXX YYY/sec and XXX ops/sec)", msg) 345 346chown_re = re.compile(r"chown [0-9]+:[0-9]+") 347def filter_chown(msg): 348 return chown_re.sub("chown UID:GID", msg) 349 350def filter_qmp_event(event): 351 '''Filter a QMP event dict''' 352 event = dict(event) 353 if 'timestamp' in event: 354 event['timestamp']['seconds'] = 'SECS' 355 event['timestamp']['microseconds'] = 'USECS' 356 return event 357 358def filter_qmp(qmsg, filter_fn): 359 '''Given a string filter, filter a QMP object's values. 360 filter_fn takes a (key, value) pair.''' 361 # Iterate through either lists or dicts; 362 if isinstance(qmsg, list): 363 items = enumerate(qmsg) 364 else: 365 items = qmsg.items() 366 367 for k, v in items: 368 if isinstance(v, (dict, list)): 369 qmsg[k] = filter_qmp(v, filter_fn) 370 else: 371 qmsg[k] = filter_fn(k, v) 372 return qmsg 373 374def filter_testfiles(msg): 375 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 376 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 377 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 378 379def filter_qmp_testfiles(qmsg): 380 def _filter(_key, value): 381 if is_str(value): 382 return filter_testfiles(value) 383 return value 384 return filter_qmp(qmsg, _filter) 385 386def filter_generated_node_ids(msg): 387 return re.sub("#block[0-9]+", "NODE_NAME", msg) 388 389def filter_img_info(output, filename): 390 lines = [] 391 for line in output.split('\n'): 392 if 'disk size' in line or 'actual-size' in line: 393 continue 394 line = line.replace(filename, 'TEST_IMG') 395 line = filter_testfiles(line) 396 line = line.replace(imgfmt, 'IMGFMT') 397 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 398 line = re.sub('uuid: [-a-f0-9]+', 399 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 400 line) 401 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 402 lines.append(line) 403 return '\n'.join(lines) 404 405def filter_imgfmt(msg): 406 return msg.replace(imgfmt, 'IMGFMT') 407 408def filter_qmp_imgfmt(qmsg): 409 def _filter(_key, value): 410 if is_str(value): 411 return filter_imgfmt(value) 412 return value 413 return filter_qmp(qmsg, _filter) 414 415 416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 417 418def log(msg: Msg, 419 filters: Iterable[Callable[[Msg], Msg]] = (), 420 indent: Optional[int] = None) -> None: 421 """ 422 Logs either a string message or a JSON serializable message (like QMP). 423 If indent is provided, JSON serializable messages are pretty-printed. 424 """ 425 for flt in filters: 426 msg = flt(msg) 427 if isinstance(msg, (dict, list)): 428 # Don't sort if it's already sorted 429 do_sort = not isinstance(msg, OrderedDict) 430 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 431 else: 432 test_logger.info(msg) 433 434class Timeout: 435 def __init__(self, seconds, errmsg="Timeout"): 436 self.seconds = seconds 437 self.errmsg = errmsg 438 def __enter__(self): 439 signal.signal(signal.SIGALRM, self.timeout) 440 signal.setitimer(signal.ITIMER_REAL, self.seconds) 441 return self 442 def __exit__(self, exc_type, value, traceback): 443 signal.setitimer(signal.ITIMER_REAL, 0) 444 return False 445 def timeout(self, signum, frame): 446 raise Exception(self.errmsg) 447 448def file_pattern(name): 449 return "{0}-{1}".format(os.getpid(), name) 450 451class FilePaths: 452 """ 453 FilePaths is an auto-generated filename that cleans itself up. 454 455 Use this context manager to generate filenames and ensure that the file 456 gets deleted:: 457 458 with FilePaths(['test.img']) as img_path: 459 qemu_img('create', img_path, '1G') 460 # migration_sock_path is automatically deleted 461 """ 462 def __init__(self, names, base_dir=test_dir): 463 self.paths = [] 464 for name in names: 465 self.paths.append(os.path.join(base_dir, file_pattern(name))) 466 467 def __enter__(self): 468 return self.paths 469 470 def __exit__(self, exc_type, exc_val, exc_tb): 471 try: 472 for path in self.paths: 473 os.remove(path) 474 except OSError: 475 pass 476 return False 477 478class FilePath(FilePaths): 479 """ 480 FilePath is a specialization of FilePaths that takes a single filename. 481 """ 482 def __init__(self, name, base_dir=test_dir): 483 super(FilePath, self).__init__([name], base_dir) 484 485 def __enter__(self): 486 return self.paths[0] 487 488def file_path_remover(): 489 for path in reversed(file_path_remover.paths): 490 try: 491 os.remove(path) 492 except OSError: 493 pass 494 495 496def file_path(*names, base_dir=test_dir): 497 ''' Another way to get auto-generated filename that cleans itself up. 498 499 Use is as simple as: 500 501 img_a, img_b = file_path('a.img', 'b.img') 502 sock = file_path('socket') 503 ''' 504 505 if not hasattr(file_path_remover, 'paths'): 506 file_path_remover.paths = [] 507 atexit.register(file_path_remover) 508 509 paths = [] 510 for name in names: 511 filename = file_pattern(name) 512 path = os.path.join(base_dir, filename) 513 file_path_remover.paths.append(path) 514 paths.append(path) 515 516 return paths[0] if len(paths) == 1 else paths 517 518def remote_filename(path): 519 if imgproto == 'file': 520 return path 521 elif imgproto == 'ssh': 522 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 523 else: 524 raise Exception("Protocol %s not supported" % (imgproto)) 525 526class VM(qtest.QEMUQtestMachine): 527 '''A QEMU VM''' 528 529 def __init__(self, path_suffix=''): 530 name = "qemu%s-%d" % (path_suffix, os.getpid()) 531 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 532 test_dir=test_dir, 533 socket_scm_helper=socket_scm_helper, 534 sock_dir=sock_dir) 535 self._num_drives = 0 536 537 def add_object(self, opts): 538 self._args.append('-object') 539 self._args.append(opts) 540 return self 541 542 def add_device(self, opts): 543 self._args.append('-device') 544 self._args.append(opts) 545 return self 546 547 def add_drive_raw(self, opts): 548 self._args.append('-drive') 549 self._args.append(opts) 550 return self 551 552 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 553 '''Add a virtio-blk drive to the VM''' 554 options = ['if=%s' % interface, 555 'id=drive%d' % self._num_drives] 556 557 if path is not None: 558 options.append('file=%s' % path) 559 options.append('format=%s' % img_format) 560 options.append('cache=%s' % cachemode) 561 options.append('aio=%s' % aiomode) 562 563 if opts: 564 options.append(opts) 565 566 if img_format == 'luks' and 'key-secret' not in opts: 567 # default luks support 568 if luks_default_secret_object not in self._args: 569 self.add_object(luks_default_secret_object) 570 571 options.append(luks_default_key_secret_opt) 572 573 self._args.append('-drive') 574 self._args.append(','.join(options)) 575 self._num_drives += 1 576 return self 577 578 def add_blockdev(self, opts): 579 self._args.append('-blockdev') 580 if isinstance(opts, str): 581 self._args.append(opts) 582 else: 583 self._args.append(','.join(opts)) 584 return self 585 586 def add_incoming(self, addr): 587 self._args.append('-incoming') 588 self._args.append(addr) 589 return self 590 591 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 592 cmd = 'human-monitor-command' 593 kwargs = {'command-line': command_line} 594 if use_log: 595 return self.qmp_log(cmd, **kwargs) 596 else: 597 return self.qmp(cmd, **kwargs) 598 599 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 600 """Pause drive r/w operations""" 601 if not event: 602 self.pause_drive(drive, "read_aio") 603 self.pause_drive(drive, "write_aio") 604 return 605 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 606 607 def resume_drive(self, drive: str) -> None: 608 """Resume drive r/w operations""" 609 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 610 611 def hmp_qemu_io(self, drive: str, cmd: str, 612 use_log: bool = False) -> QMPMessage: 613 """Write to a given drive using an HMP command""" 614 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 615 616 def flatten_qmp_object(self, obj, output=None, basestr=''): 617 if output is None: 618 output = dict() 619 if isinstance(obj, list): 620 for i, item in enumerate(obj): 621 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 622 elif isinstance(obj, dict): 623 for key in obj: 624 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 625 else: 626 output[basestr[:-1]] = obj # Strip trailing '.' 627 return output 628 629 def qmp_to_opts(self, obj): 630 obj = self.flatten_qmp_object(obj) 631 output_list = list() 632 for key in obj: 633 output_list += [key + '=' + obj[key]] 634 return ','.join(output_list) 635 636 def get_qmp_events_filtered(self, wait=60.0): 637 result = [] 638 for ev in self.get_qmp_events(wait=wait): 639 result.append(filter_qmp_event(ev)) 640 return result 641 642 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 643 full_cmd = OrderedDict(( 644 ("execute", cmd), 645 ("arguments", ordered_qmp(kwargs)) 646 )) 647 log(full_cmd, filters, indent=indent) 648 result = self.qmp(cmd, **kwargs) 649 log(result, filters, indent=indent) 650 return result 651 652 # Returns None on success, and an error string on failure 653 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 654 pre_finalize=None, cancel=False, wait=60.0): 655 """ 656 run_job moves a job from creation through to dismissal. 657 658 :param job: String. ID of recently-launched job 659 :param auto_finalize: Bool. True if the job was launched with 660 auto_finalize. Defaults to True. 661 :param auto_dismiss: Bool. True if the job was launched with 662 auto_dismiss=True. Defaults to False. 663 :param pre_finalize: Callback. A callable that takes no arguments to be 664 invoked prior to issuing job-finalize, if any. 665 :param cancel: Bool. When true, cancels the job after the pre_finalize 666 callback. 667 :param wait: Float. Timeout value specifying how long to wait for any 668 event, in seconds. Defaults to 60.0. 669 """ 670 match_device = {'data': {'device': job}} 671 match_id = {'data': {'id': job}} 672 events = [ 673 ('BLOCK_JOB_COMPLETED', match_device), 674 ('BLOCK_JOB_CANCELLED', match_device), 675 ('BLOCK_JOB_ERROR', match_device), 676 ('BLOCK_JOB_READY', match_device), 677 ('BLOCK_JOB_PENDING', match_id), 678 ('JOB_STATUS_CHANGE', match_id) 679 ] 680 error = None 681 while True: 682 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 683 if ev['event'] != 'JOB_STATUS_CHANGE': 684 log(ev) 685 continue 686 status = ev['data']['status'] 687 if status == 'aborting': 688 result = self.qmp('query-jobs') 689 for j in result['return']: 690 if j['id'] == job: 691 error = j['error'] 692 log('Job failed: %s' % (j['error'])) 693 elif status == 'ready': 694 self.qmp_log('job-complete', id=job) 695 elif status == 'pending' and not auto_finalize: 696 if pre_finalize: 697 pre_finalize() 698 if cancel: 699 self.qmp_log('job-cancel', id=job) 700 else: 701 self.qmp_log('job-finalize', id=job) 702 elif status == 'concluded' and not auto_dismiss: 703 self.qmp_log('job-dismiss', id=job) 704 elif status == 'null': 705 return error 706 707 # Returns None on success, and an error string on failure 708 def blockdev_create(self, options, job_id='job0', filters=None): 709 if filters is None: 710 filters = [filter_qmp_testfiles] 711 result = self.qmp_log('blockdev-create', filters=filters, 712 job_id=job_id, options=options) 713 714 if 'return' in result: 715 assert result['return'] == {} 716 job_result = self.run_job(job_id) 717 else: 718 job_result = result['error'] 719 720 log("") 721 return job_result 722 723 def enable_migration_events(self, name): 724 log('Enabling migration QMP events on %s...' % name) 725 log(self.qmp('migrate-set-capabilities', capabilities=[ 726 { 727 'capability': 'events', 728 'state': True 729 } 730 ])) 731 732 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 733 while True: 734 event = self.event_wait('MIGRATION') 735 log(event, filters=[filter_qmp_event]) 736 if event['data']['status'] in ('completed', 'failed'): 737 break 738 739 if event['data']['status'] == 'completed': 740 # The event may occur in finish-migrate, so wait for the expected 741 # post-migration runstate 742 runstate = None 743 while runstate != expect_runstate: 744 runstate = self.qmp('query-status')['return']['status'] 745 return True 746 else: 747 return False 748 749 def node_info(self, node_name): 750 nodes = self.qmp('query-named-block-nodes') 751 for x in nodes['return']: 752 if x['node-name'] == node_name: 753 return x 754 return None 755 756 def query_bitmaps(self): 757 res = self.qmp("query-named-block-nodes") 758 return {device['node-name']: device['dirty-bitmaps'] 759 for device in res['return'] if 'dirty-bitmaps' in device} 760 761 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 762 """ 763 get a specific bitmap from the object returned by query_bitmaps. 764 :param recording: If specified, filter results by the specified value. 765 :param bitmaps: If specified, use it instead of call query_bitmaps() 766 """ 767 if bitmaps is None: 768 bitmaps = self.query_bitmaps() 769 770 for bitmap in bitmaps[node_name]: 771 if bitmap.get('name', '') == bitmap_name: 772 if recording is None or bitmap.get('recording') == recording: 773 return bitmap 774 return None 775 776 def check_bitmap_status(self, node_name, bitmap_name, fields): 777 ret = self.get_bitmap(node_name, bitmap_name) 778 779 return fields.items() <= ret.items() 780 781 def assert_block_path(self, root, path, expected_node, graph=None): 782 """ 783 Check whether the node under the given path in the block graph 784 is @expected_node. 785 786 @root is the node name of the node where the @path is rooted. 787 788 @path is a string that consists of child names separated by 789 slashes. It must begin with a slash. 790 791 Examples for @root + @path: 792 - root="qcow2-node", path="/backing/file" 793 - root="quorum-node", path="/children.2/file" 794 795 Hypothetically, @path could be empty, in which case it would 796 point to @root. However, in practice this case is not useful 797 and hence not allowed. 798 799 @expected_node may be None. (All elements of the path but the 800 leaf must still exist.) 801 802 @graph may be None or the result of an x-debug-query-block-graph 803 call that has already been performed. 804 """ 805 if graph is None: 806 graph = self.qmp('x-debug-query-block-graph')['return'] 807 808 iter_path = iter(path.split('/')) 809 810 # Must start with a / 811 assert next(iter_path) == '' 812 813 node = next((node for node in graph['nodes'] if node['name'] == root), 814 None) 815 816 # An empty @path is not allowed, so the root node must be present 817 assert node is not None, 'Root node %s not found' % root 818 819 for child_name in iter_path: 820 assert node is not None, 'Cannot follow path %s%s' % (root, path) 821 822 try: 823 node_id = next(edge['child'] for edge in graph['edges'] 824 if (edge['parent'] == node['id'] and 825 edge['name'] == child_name)) 826 827 node = next(node for node in graph['nodes'] 828 if node['id'] == node_id) 829 830 except StopIteration: 831 node = None 832 833 if node is None: 834 assert expected_node is None, \ 835 'No node found under %s (but expected %s)' % \ 836 (path, expected_node) 837 else: 838 assert node['name'] == expected_node, \ 839 'Found node %s under %s (but expected %s)' % \ 840 (node['name'], path, expected_node) 841 842index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 843 844class QMPTestCase(unittest.TestCase): 845 '''Abstract base class for QMP test cases''' 846 847 def __init__(self, *args, **kwargs): 848 super().__init__(*args, **kwargs) 849 # Many users of this class set a VM property we rely on heavily 850 # in the methods below. 851 self.vm = None 852 853 def dictpath(self, d, path): 854 '''Traverse a path in a nested dict''' 855 for component in path.split('/'): 856 m = index_re.match(component) 857 if m: 858 component, idx = m.groups() 859 idx = int(idx) 860 861 if not isinstance(d, dict) or component not in d: 862 self.fail(f'failed path traversal for "{path}" in "{d}"') 863 d = d[component] 864 865 if m: 866 if not isinstance(d, list): 867 self.fail(f'path component "{component}" in "{path}" ' 868 f'is not a list in "{d}"') 869 try: 870 d = d[idx] 871 except IndexError: 872 self.fail(f'invalid index "{idx}" in path "{path}" ' 873 f'in "{d}"') 874 return d 875 876 def assert_qmp_absent(self, d, path): 877 try: 878 result = self.dictpath(d, path) 879 except AssertionError: 880 return 881 self.fail('path "%s" has value "%s"' % (path, str(result))) 882 883 def assert_qmp(self, d, path, value): 884 '''Assert that the value for a specific path in a QMP dict 885 matches. When given a list of values, assert that any of 886 them matches.''' 887 888 result = self.dictpath(d, path) 889 890 # [] makes no sense as a list of valid values, so treat it as 891 # an actual single value. 892 if isinstance(value, list) and value != []: 893 for v in value: 894 if result == v: 895 return 896 self.fail('no match for "%s" in %s' % (str(result), str(value))) 897 else: 898 self.assertEqual(result, value, 899 '"%s" is "%s", expected "%s"' 900 % (path, str(result), str(value))) 901 902 def assert_no_active_block_jobs(self): 903 result = self.vm.qmp('query-block-jobs') 904 self.assert_qmp(result, 'return', []) 905 906 def assert_has_block_node(self, node_name=None, file_name=None): 907 """Issue a query-named-block-nodes and assert node_name and/or 908 file_name is present in the result""" 909 def check_equal_or_none(a, b): 910 return a is None or b is None or a == b 911 assert node_name or file_name 912 result = self.vm.qmp('query-named-block-nodes') 913 for x in result["return"]: 914 if check_equal_or_none(x.get("node-name"), node_name) and \ 915 check_equal_or_none(x.get("file"), file_name): 916 return 917 self.fail("Cannot find %s %s in result:\n%s" % 918 (node_name, file_name, result)) 919 920 def assert_json_filename_equal(self, json_filename, reference): 921 '''Asserts that the given filename is a json: filename and that its 922 content is equal to the given reference object''' 923 self.assertEqual(json_filename[:5], 'json:') 924 self.assertEqual( 925 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 926 self.vm.flatten_qmp_object(reference) 927 ) 928 929 def cancel_and_wait(self, drive='drive0', force=False, 930 resume=False, wait=60.0): 931 '''Cancel a block job and wait for it to finish, returning the event''' 932 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 933 self.assert_qmp(result, 'return', {}) 934 935 if resume: 936 self.vm.resume_drive(drive) 937 938 cancelled = False 939 result = None 940 while not cancelled: 941 for event in self.vm.get_qmp_events(wait=wait): 942 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 943 event['event'] == 'BLOCK_JOB_CANCELLED': 944 self.assert_qmp(event, 'data/device', drive) 945 result = event 946 cancelled = True 947 elif event['event'] == 'JOB_STATUS_CHANGE': 948 self.assert_qmp(event, 'data/id', drive) 949 950 951 self.assert_no_active_block_jobs() 952 return result 953 954 def wait_until_completed(self, drive='drive0', check_offset=True, 955 wait=60.0, error=None): 956 '''Wait for a block job to finish, returning the event''' 957 while True: 958 for event in self.vm.get_qmp_events(wait=wait): 959 if event['event'] == 'BLOCK_JOB_COMPLETED': 960 self.assert_qmp(event, 'data/device', drive) 961 if error is None: 962 self.assert_qmp_absent(event, 'data/error') 963 if check_offset: 964 self.assert_qmp(event, 'data/offset', 965 event['data']['len']) 966 else: 967 self.assert_qmp(event, 'data/error', error) 968 self.assert_no_active_block_jobs() 969 return event 970 if event['event'] == 'JOB_STATUS_CHANGE': 971 self.assert_qmp(event, 'data/id', drive) 972 973 def wait_ready(self, drive='drive0'): 974 """Wait until a BLOCK_JOB_READY event, and return the event.""" 975 return self.vm.events_wait([ 976 ('BLOCK_JOB_READY', 977 {'data': {'type': 'mirror', 'device': drive}}), 978 ('BLOCK_JOB_READY', 979 {'data': {'type': 'commit', 'device': drive}}) 980 ]) 981 982 def wait_ready_and_cancel(self, drive='drive0'): 983 self.wait_ready(drive=drive) 984 event = self.cancel_and_wait(drive=drive) 985 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 986 self.assert_qmp(event, 'data/type', 'mirror') 987 self.assert_qmp(event, 'data/offset', event['data']['len']) 988 989 def complete_and_wait(self, drive='drive0', wait_ready=True, 990 completion_error=None): 991 '''Complete a block job and wait for it to finish''' 992 if wait_ready: 993 self.wait_ready(drive=drive) 994 995 result = self.vm.qmp('block-job-complete', device=drive) 996 self.assert_qmp(result, 'return', {}) 997 998 event = self.wait_until_completed(drive=drive, error=completion_error) 999 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1000 1001 def pause_wait(self, job_id='job0'): 1002 with Timeout(3, "Timeout waiting for job to pause"): 1003 while True: 1004 result = self.vm.qmp('query-block-jobs') 1005 found = False 1006 for job in result['return']: 1007 if job['device'] == job_id: 1008 found = True 1009 if job['paused'] and not job['busy']: 1010 return job 1011 break 1012 assert found 1013 1014 def pause_job(self, job_id='job0', wait=True): 1015 result = self.vm.qmp('block-job-pause', device=job_id) 1016 self.assert_qmp(result, 'return', {}) 1017 if wait: 1018 return self.pause_wait(job_id) 1019 return result 1020 1021 def case_skip(self, reason): 1022 '''Skip this test case''' 1023 case_notrun(reason) 1024 self.skipTest(reason) 1025 1026 1027def notrun(reason): 1028 '''Skip this test suite''' 1029 # Each test in qemu-iotests has a number ("seq") 1030 seq = os.path.basename(sys.argv[0]) 1031 1032 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1033 logger.warning("%s not run: %s", seq, reason) 1034 sys.exit(0) 1035 1036def case_notrun(reason): 1037 '''Mark this test case as not having been run (without actually 1038 skipping it, that is left to the caller). See 1039 QMPTestCase.case_skip() for a variant that actually skips the 1040 current test case.''' 1041 1042 # Each test in qemu-iotests has a number ("seq") 1043 seq = os.path.basename(sys.argv[0]) 1044 1045 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1046 ' [case not run] ' + reason + '\n') 1047 1048def _verify_image_format(supported_fmts: Sequence[str] = (), 1049 unsupported_fmts: Sequence[str] = ()) -> None: 1050 assert not (supported_fmts and unsupported_fmts) 1051 1052 if 'generic' in supported_fmts and \ 1053 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1054 # similar to 1055 # _supported_fmt generic 1056 # for bash tests 1057 if imgfmt == 'luks': 1058 verify_working_luks() 1059 return 1060 1061 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1062 if not_sup or (imgfmt in unsupported_fmts): 1063 notrun('not suitable for this image format: %s' % imgfmt) 1064 1065 if imgfmt == 'luks': 1066 verify_working_luks() 1067 1068def _verify_protocol(supported: Sequence[str] = (), 1069 unsupported: Sequence[str] = ()) -> None: 1070 assert not (supported and unsupported) 1071 1072 if 'generic' in supported: 1073 return 1074 1075 not_sup = supported and (imgproto not in supported) 1076 if not_sup or (imgproto in unsupported): 1077 notrun('not suitable for this protocol: %s' % imgproto) 1078 1079def _verify_platform(supported: Sequence[str] = (), 1080 unsupported: Sequence[str] = ()) -> None: 1081 if any((sys.platform.startswith(x) for x in unsupported)): 1082 notrun('not suitable for this OS: %s' % sys.platform) 1083 1084 if supported: 1085 if not any((sys.platform.startswith(x) for x in supported)): 1086 notrun('not suitable for this OS: %s' % sys.platform) 1087 1088def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1089 if supported_cache_modes and (cachemode not in supported_cache_modes): 1090 notrun('not suitable for this cache mode: %s' % cachemode) 1091 1092def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1093 if supported_aio_modes and (aiomode not in supported_aio_modes): 1094 notrun('not suitable for this aio mode: %s' % aiomode) 1095 1096def supports_quorum(): 1097 return 'quorum' in qemu_img_pipe('--help') 1098 1099def verify_quorum(): 1100 '''Skip test suite if quorum support is not available''' 1101 if not supports_quorum(): 1102 notrun('quorum support missing') 1103 1104def has_working_luks() -> Tuple[bool, str]: 1105 """ 1106 Check whether our LUKS driver can actually create images 1107 (this extends to LUKS encryption for qcow2). 1108 1109 If not, return the reason why. 1110 """ 1111 1112 img_file = f'{test_dir}/luks-test.luks' 1113 (output, status) = \ 1114 qemu_img_pipe_and_status('create', '-f', 'luks', 1115 '--object', luks_default_secret_object, 1116 '-o', luks_default_key_secret_opt, 1117 '-o', 'iter-time=10', 1118 img_file, '1G') 1119 try: 1120 os.remove(img_file) 1121 except OSError: 1122 pass 1123 1124 if status != 0: 1125 reason = output 1126 for line in output.splitlines(): 1127 if img_file + ':' in line: 1128 reason = line.split(img_file + ':', 1)[1].strip() 1129 break 1130 1131 return (False, reason) 1132 else: 1133 return (True, '') 1134 1135def verify_working_luks(): 1136 """ 1137 Skip test suite if LUKS does not work 1138 """ 1139 (working, reason) = has_working_luks() 1140 if not working: 1141 notrun(reason) 1142 1143def qemu_pipe(*args): 1144 """ 1145 Run qemu with an option to print something and exit (e.g. a help option). 1146 1147 :return: QEMU's stdout output. 1148 """ 1149 args = [qemu_prog] + qemu_opts + list(args) 1150 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1151 stderr=subprocess.STDOUT, 1152 universal_newlines=True) 1153 output = subp.communicate()[0] 1154 if subp.returncode < 0: 1155 sys.stderr.write('qemu received signal %i: %s\n' % 1156 (-subp.returncode, ' '.join(args))) 1157 return output 1158 1159def supported_formats(read_only=False): 1160 '''Set 'read_only' to True to check ro-whitelist 1161 Otherwise, rw-whitelist is checked''' 1162 1163 if not hasattr(supported_formats, "formats"): 1164 supported_formats.formats = {} 1165 1166 if read_only not in supported_formats.formats: 1167 format_message = qemu_pipe("-drive", "format=help") 1168 line = 1 if read_only else 0 1169 supported_formats.formats[read_only] = \ 1170 format_message.splitlines()[line].split(":")[1].split() 1171 1172 return supported_formats.formats[read_only] 1173 1174def skip_if_unsupported(required_formats=(), read_only=False): 1175 '''Skip Test Decorator 1176 Runs the test if all the required formats are whitelisted''' 1177 def skip_test_decorator(func): 1178 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1179 **kwargs: Dict[str, Any]) -> None: 1180 if callable(required_formats): 1181 fmts = required_formats(test_case) 1182 else: 1183 fmts = required_formats 1184 1185 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1186 if usf_list: 1187 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1188 test_case.case_skip(msg) 1189 else: 1190 func(test_case, *args, **kwargs) 1191 return func_wrapper 1192 return skip_test_decorator 1193 1194def skip_for_formats(formats: Sequence[str] = ()) \ 1195 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1196 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1197 '''Skip Test Decorator 1198 Skips the test for the given formats''' 1199 def skip_test_decorator(func): 1200 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1201 **kwargs: Dict[str, Any]) -> None: 1202 if imgfmt in formats: 1203 msg = f'{test_case}: Skipped for format {imgfmt}' 1204 test_case.case_skip(msg) 1205 else: 1206 func(test_case, *args, **kwargs) 1207 return func_wrapper 1208 return skip_test_decorator 1209 1210def skip_if_user_is_root(func): 1211 '''Skip Test Decorator 1212 Runs the test only without root permissions''' 1213 def func_wrapper(*args, **kwargs): 1214 if os.getuid() == 0: 1215 case_notrun('{}: cannot be run as root'.format(args[0])) 1216 return None 1217 else: 1218 return func(*args, **kwargs) 1219 return func_wrapper 1220 1221def execute_unittest(debug=False): 1222 """Executes unittests within the calling module.""" 1223 1224 verbosity = 2 if debug else 1 1225 1226 if debug: 1227 output = sys.stdout 1228 else: 1229 # We need to filter out the time taken from the output so that 1230 # qemu-iotest can reliably diff the results against master output. 1231 output = io.StringIO() 1232 1233 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1234 verbosity=verbosity) 1235 try: 1236 # unittest.main() will use sys.exit(); so expect a SystemExit 1237 # exception 1238 unittest.main(testRunner=runner) 1239 finally: 1240 # We need to filter out the time taken from the output so that 1241 # qemu-iotest can reliably diff the results against master output. 1242 if not debug: 1243 out = output.getvalue() 1244 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1245 1246 # Hide skipped tests from the reference output 1247 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1248 out_first_line, out_rest = out.split('\n', 1) 1249 out = out_first_line.replace('s', '.') + '\n' + out_rest 1250 1251 sys.stderr.write(out) 1252 1253def execute_setup_common(supported_fmts: Sequence[str] = (), 1254 supported_platforms: Sequence[str] = (), 1255 supported_cache_modes: Sequence[str] = (), 1256 supported_aio_modes: Sequence[str] = (), 1257 unsupported_fmts: Sequence[str] = (), 1258 supported_protocols: Sequence[str] = (), 1259 unsupported_protocols: Sequence[str] = ()) -> bool: 1260 """ 1261 Perform necessary setup for either script-style or unittest-style tests. 1262 1263 :return: Bool; Whether or not debug mode has been requested via the CLI. 1264 """ 1265 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1266 1267 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1268 # indicate that we're not being run via "check". There may be 1269 # other things set up by "check" that individual test cases rely 1270 # on. 1271 if test_dir is None or qemu_default_machine is None: 1272 sys.stderr.write('Please run this test via the "check" script\n') 1273 sys.exit(os.EX_USAGE) 1274 1275 debug = '-d' in sys.argv 1276 if debug: 1277 sys.argv.remove('-d') 1278 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1279 1280 _verify_image_format(supported_fmts, unsupported_fmts) 1281 _verify_protocol(supported_protocols, unsupported_protocols) 1282 _verify_platform(supported=supported_platforms) 1283 _verify_cache_mode(supported_cache_modes) 1284 _verify_aio_mode(supported_aio_modes) 1285 1286 return debug 1287 1288def execute_test(*args, test_function=None, **kwargs): 1289 """Run either unittest or script-style tests.""" 1290 1291 debug = execute_setup_common(*args, **kwargs) 1292 if not test_function: 1293 execute_unittest(debug) 1294 else: 1295 test_function() 1296 1297def activate_logging(): 1298 """Activate iotests.log() output to stdout for script-style tests.""" 1299 handler = logging.StreamHandler(stream=sys.stdout) 1300 formatter = logging.Formatter('%(message)s') 1301 handler.setFormatter(formatter) 1302 test_logger.addHandler(handler) 1303 test_logger.setLevel(logging.INFO) 1304 test_logger.propagate = False 1305 1306# This is called from script-style iotests without a single point of entry 1307def script_initialize(*args, **kwargs): 1308 """Initialize script-style tests without running any tests.""" 1309 activate_logging() 1310 execute_setup_common(*args, **kwargs) 1311 1312# This is called from script-style iotests with a single point of entry 1313def script_main(test_function, *args, **kwargs): 1314 """Run script-style tests outside of the unittest framework""" 1315 activate_logging() 1316 execute_test(*args, test_function=test_function, **kwargs) 1317 1318# This is called from unittest style iotests 1319def main(*args, **kwargs): 1320 """Run tests using the unittest framework""" 1321 execute_test(*args, **kwargs) 1322