1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43assert sys.version_info >= (3, 6) 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79test_dir = os.environ.get('TEST_DIR') 80sock_dir = os.environ.get('SOCK_DIR') 81output_dir = os.environ.get('OUTPUT_DIR', '.') 82cachemode = os.environ.get('CACHEMODE') 83aiomode = os.environ.get('AIOMODE') 84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 85 86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 87 88luks_default_secret_object = 'secret,id=keysec0,data=' + \ 89 os.environ.get('IMGKEYSECRET', '') 90luks_default_key_secret_opt = 'key-secret=keysec0' 91 92 93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 94 """ 95 Run qemu-img and return both its output and its exit code 96 """ 97 subp = subprocess.Popen(qemu_img_args + list(args), 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 universal_newlines=True) 101 output = subp.communicate()[0] 102 if subp.returncode < 0: 103 sys.stderr.write('qemu-img received signal %i: %s\n' 104 % (-subp.returncode, 105 ' '.join(qemu_img_args + list(args)))) 106 return (output, subp.returncode) 107 108def qemu_img(*args: str) -> int: 109 '''Run qemu-img and return the exit code''' 110 return qemu_img_pipe_and_status(*args)[1] 111 112def ordered_qmp(qmsg, conv_keys=True): 113 # Dictionaries are not ordered prior to 3.6, therefore: 114 if isinstance(qmsg, list): 115 return [ordered_qmp(atom) for atom in qmsg] 116 if isinstance(qmsg, dict): 117 od = OrderedDict() 118 for k, v in sorted(qmsg.items()): 119 if conv_keys: 120 k = k.replace('_', '-') 121 od[k] = ordered_qmp(v, conv_keys=False) 122 return od 123 return qmsg 124 125def qemu_img_create(*args): 126 args = list(args) 127 128 # default luks support 129 if '-f' in args and args[args.index('-f') + 1] == 'luks': 130 if '-o' in args: 131 i = args.index('-o') 132 if 'key-secret' not in args[i + 1]: 133 args[i + 1].append(luks_default_key_secret_opt) 134 args.insert(i + 2, '--object') 135 args.insert(i + 3, luks_default_secret_object) 136 else: 137 args = ['-o', luks_default_key_secret_opt, 138 '--object', luks_default_secret_object] + args 139 140 args.insert(0, 'create') 141 142 return qemu_img(*args) 143 144def qemu_img_measure(*args): 145 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 146 147def qemu_img_check(*args): 148 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 149 150def qemu_img_verbose(*args): 151 '''Run qemu-img without suppressing its output and return the exit code''' 152 exitcode = subprocess.call(qemu_img_args + list(args)) 153 if exitcode < 0: 154 sys.stderr.write('qemu-img received signal %i: %s\n' 155 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 156 return exitcode 157 158def qemu_img_pipe(*args: str) -> str: 159 '''Run qemu-img and return its output''' 160 return qemu_img_pipe_and_status(*args)[0] 161 162def qemu_img_log(*args): 163 result = qemu_img_pipe(*args) 164 log(result, filters=[filter_testfiles]) 165 return result 166 167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 168 args = ['info'] 169 if imgopts: 170 args.append('--image-opts') 171 else: 172 args += ['-f', imgfmt] 173 args += extra_args 174 args.append(filename) 175 176 output = qemu_img_pipe(*args) 177 if not filter_path: 178 filter_path = filename 179 log(filter_img_info(output, filter_path)) 180 181def qemu_io(*args): 182 '''Run qemu-io and return the stdout data''' 183 args = qemu_io_args + list(args) 184 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 185 stderr=subprocess.STDOUT, 186 universal_newlines=True) 187 output = subp.communicate()[0] 188 if subp.returncode < 0: 189 sys.stderr.write('qemu-io received signal %i: %s\n' 190 % (-subp.returncode, ' '.join(args))) 191 return output 192 193def qemu_io_log(*args): 194 result = qemu_io(*args) 195 log(result, filters=[filter_testfiles, filter_qemu_io]) 196 return result 197 198def qemu_io_silent(*args): 199 '''Run qemu-io and return the exit code, suppressing stdout''' 200 args = qemu_io_args + list(args) 201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 202 if exitcode < 0: 203 sys.stderr.write('qemu-io received signal %i: %s\n' % 204 (-exitcode, ' '.join(args))) 205 return exitcode 206 207def qemu_io_silent_check(*args): 208 '''Run qemu-io and return the true if subprocess returned 0''' 209 args = qemu_io_args + list(args) 210 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 211 stderr=subprocess.STDOUT) 212 return exitcode == 0 213 214def get_virtio_scsi_device(): 215 if qemu_default_machine == 's390-ccw-virtio': 216 return 'virtio-scsi-ccw' 217 return 'virtio-scsi-pci' 218 219class QemuIoInteractive: 220 def __init__(self, *args): 221 self.args = qemu_io_args_no_fmt + list(args) 222 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 223 stdout=subprocess.PIPE, 224 stderr=subprocess.STDOUT, 225 universal_newlines=True) 226 out = self._p.stdout.read(9) 227 if out != 'qemu-io> ': 228 # Most probably qemu-io just failed to start. 229 # Let's collect the whole output and exit. 230 out += self._p.stdout.read() 231 self._p.wait(timeout=1) 232 raise ValueError(out) 233 234 def close(self): 235 self._p.communicate('q\n') 236 237 def _read_output(self): 238 pattern = 'qemu-io> ' 239 n = len(pattern) 240 pos = 0 241 s = [] 242 while pos != n: 243 c = self._p.stdout.read(1) 244 # check unexpected EOF 245 assert c != '' 246 s.append(c) 247 if c == pattern[pos]: 248 pos += 1 249 else: 250 pos = 0 251 252 return ''.join(s[:-n]) 253 254 def cmd(self, cmd): 255 # quit command is in close(), '\n' is added automatically 256 assert '\n' not in cmd 257 cmd = cmd.strip() 258 assert cmd not in ('q', 'quit') 259 self._p.stdin.write(cmd + '\n') 260 self._p.stdin.flush() 261 return self._read_output() 262 263 264def qemu_nbd(*args): 265 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 266 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 267 268def qemu_nbd_early_pipe(*args): 269 '''Run qemu-nbd in daemon mode and return both the parent's exit code 270 and its output in case of an error''' 271 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 272 stdout=subprocess.PIPE, 273 universal_newlines=True) 274 output = subp.communicate()[0] 275 if subp.returncode < 0: 276 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 277 (-subp.returncode, 278 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 279 280 return subp.returncode, output if subp.returncode else '' 281 282@contextmanager 283def qemu_nbd_popen(*args): 284 '''Context manager running qemu-nbd within the context''' 285 pid_file = file_path("pid") 286 287 cmd = list(qemu_nbd_args) 288 cmd.extend(('--persistent', '--pid-file', pid_file)) 289 cmd.extend(args) 290 291 log('Start NBD server') 292 p = subprocess.Popen(cmd) 293 try: 294 while not os.path.exists(pid_file): 295 if p.poll() is not None: 296 raise RuntimeError( 297 "qemu-nbd terminated with exit code {}: {}" 298 .format(p.returncode, ' '.join(cmd))) 299 300 time.sleep(0.01) 301 yield 302 finally: 303 log('Kill NBD server') 304 p.kill() 305 p.wait() 306 307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 308 '''Return True if two image files are identical''' 309 return qemu_img('compare', '-f', fmt1, 310 '-F', fmt2, img1, img2) == 0 311 312def create_image(name, size): 313 '''Create a fully-allocated raw image with sector markers''' 314 file = open(name, 'wb') 315 i = 0 316 while i < size: 317 sector = struct.pack('>l504xl', i // 512, i // 512) 318 file.write(sector) 319 i = i + 512 320 file.close() 321 322def image_size(img): 323 '''Return image's virtual size''' 324 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 325 return json.loads(r)['virtual-size'] 326 327def is_str(val): 328 return isinstance(val, str) 329 330test_dir_re = re.compile(r"%s" % test_dir) 331def filter_test_dir(msg): 332 return test_dir_re.sub("TEST_DIR", msg) 333 334win32_re = re.compile(r"\r") 335def filter_win32(msg): 336 return win32_re.sub("", msg) 337 338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 339 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 340 r"and [0-9\/.inf]* ops\/sec\)") 341def filter_qemu_io(msg): 342 msg = filter_win32(msg) 343 return qemu_io_re.sub("X ops; XX:XX:XX.X " 344 "(XXX YYY/sec and XXX ops/sec)", msg) 345 346chown_re = re.compile(r"chown [0-9]+:[0-9]+") 347def filter_chown(msg): 348 return chown_re.sub("chown UID:GID", msg) 349 350def filter_qmp_event(event): 351 '''Filter a QMP event dict''' 352 event = dict(event) 353 if 'timestamp' in event: 354 event['timestamp']['seconds'] = 'SECS' 355 event['timestamp']['microseconds'] = 'USECS' 356 return event 357 358def filter_qmp(qmsg, filter_fn): 359 '''Given a string filter, filter a QMP object's values. 360 filter_fn takes a (key, value) pair.''' 361 # Iterate through either lists or dicts; 362 if isinstance(qmsg, list): 363 items = enumerate(qmsg) 364 else: 365 items = qmsg.items() 366 367 for k, v in items: 368 if isinstance(v, (dict, list)): 369 qmsg[k] = filter_qmp(v, filter_fn) 370 else: 371 qmsg[k] = filter_fn(k, v) 372 return qmsg 373 374def filter_testfiles(msg): 375 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 376 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 377 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 378 379def filter_qmp_testfiles(qmsg): 380 def _filter(_key, value): 381 if is_str(value): 382 return filter_testfiles(value) 383 return value 384 return filter_qmp(qmsg, _filter) 385 386def filter_generated_node_ids(msg): 387 return re.sub("#block[0-9]+", "NODE_NAME", msg) 388 389def filter_img_info(output, filename): 390 lines = [] 391 for line in output.split('\n'): 392 if 'disk size' in line or 'actual-size' in line: 393 continue 394 line = line.replace(filename, 'TEST_IMG') 395 line = filter_testfiles(line) 396 line = line.replace(imgfmt, 'IMGFMT') 397 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 398 line = re.sub('uuid: [-a-f0-9]+', 399 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 400 line) 401 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 402 lines.append(line) 403 return '\n'.join(lines) 404 405def filter_imgfmt(msg): 406 return msg.replace(imgfmt, 'IMGFMT') 407 408def filter_qmp_imgfmt(qmsg): 409 def _filter(_key, value): 410 if is_str(value): 411 return filter_imgfmt(value) 412 return value 413 return filter_qmp(qmsg, _filter) 414 415 416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 417 418def log(msg: Msg, 419 filters: Iterable[Callable[[Msg], Msg]] = (), 420 indent: Optional[int] = None) -> None: 421 """ 422 Logs either a string message or a JSON serializable message (like QMP). 423 If indent is provided, JSON serializable messages are pretty-printed. 424 """ 425 for flt in filters: 426 msg = flt(msg) 427 if isinstance(msg, (dict, list)): 428 # Don't sort if it's already sorted 429 do_sort = not isinstance(msg, OrderedDict) 430 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 431 else: 432 test_logger.info(msg) 433 434class Timeout: 435 def __init__(self, seconds, errmsg="Timeout"): 436 self.seconds = seconds 437 self.errmsg = errmsg 438 def __enter__(self): 439 signal.signal(signal.SIGALRM, self.timeout) 440 signal.setitimer(signal.ITIMER_REAL, self.seconds) 441 return self 442 def __exit__(self, exc_type, value, traceback): 443 signal.setitimer(signal.ITIMER_REAL, 0) 444 return False 445 def timeout(self, signum, frame): 446 raise Exception(self.errmsg) 447 448def file_pattern(name): 449 return "{0}-{1}".format(os.getpid(), name) 450 451class FilePaths: 452 """ 453 FilePaths is an auto-generated filename that cleans itself up. 454 455 Use this context manager to generate filenames and ensure that the file 456 gets deleted:: 457 458 with FilePaths(['test.img']) as img_path: 459 qemu_img('create', img_path, '1G') 460 # migration_sock_path is automatically deleted 461 """ 462 def __init__(self, names, base_dir=test_dir): 463 self.paths = [] 464 for name in names: 465 self.paths.append(os.path.join(base_dir, file_pattern(name))) 466 467 def __enter__(self): 468 return self.paths 469 470 def __exit__(self, exc_type, exc_val, exc_tb): 471 try: 472 for path in self.paths: 473 os.remove(path) 474 except OSError: 475 pass 476 return False 477 478class FilePath(FilePaths): 479 """ 480 FilePath is a specialization of FilePaths that takes a single filename. 481 """ 482 def __init__(self, name, base_dir=test_dir): 483 super(FilePath, self).__init__([name], base_dir) 484 485 def __enter__(self): 486 return self.paths[0] 487 488def file_path_remover(): 489 for path in reversed(file_path_remover.paths): 490 try: 491 os.remove(path) 492 except OSError: 493 pass 494 495 496def file_path(*names, base_dir=test_dir): 497 ''' Another way to get auto-generated filename that cleans itself up. 498 499 Use is as simple as: 500 501 img_a, img_b = file_path('a.img', 'b.img') 502 sock = file_path('socket') 503 ''' 504 505 if not hasattr(file_path_remover, 'paths'): 506 file_path_remover.paths = [] 507 atexit.register(file_path_remover) 508 509 paths = [] 510 for name in names: 511 filename = file_pattern(name) 512 path = os.path.join(base_dir, filename) 513 file_path_remover.paths.append(path) 514 paths.append(path) 515 516 return paths[0] if len(paths) == 1 else paths 517 518def remote_filename(path): 519 if imgproto == 'file': 520 return path 521 elif imgproto == 'ssh': 522 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 523 else: 524 raise Exception("Protocol %s not supported" % (imgproto)) 525 526class VM(qtest.QEMUQtestMachine): 527 '''A QEMU VM''' 528 529 def __init__(self, path_suffix=''): 530 name = "qemu%s-%d" % (path_suffix, os.getpid()) 531 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 532 test_dir=test_dir, 533 socket_scm_helper=socket_scm_helper, 534 sock_dir=sock_dir) 535 self._num_drives = 0 536 537 def add_object(self, opts): 538 self._args.append('-object') 539 self._args.append(opts) 540 return self 541 542 def add_device(self, opts): 543 self._args.append('-device') 544 self._args.append(opts) 545 return self 546 547 def add_drive_raw(self, opts): 548 self._args.append('-drive') 549 self._args.append(opts) 550 return self 551 552 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 553 '''Add a virtio-blk drive to the VM''' 554 options = ['if=%s' % interface, 555 'id=drive%d' % self._num_drives] 556 557 if path is not None: 558 options.append('file=%s' % path) 559 options.append('format=%s' % img_format) 560 options.append('cache=%s' % cachemode) 561 options.append('aio=%s' % aiomode) 562 563 if opts: 564 options.append(opts) 565 566 if img_format == 'luks' and 'key-secret' not in opts: 567 # default luks support 568 if luks_default_secret_object not in self._args: 569 self.add_object(luks_default_secret_object) 570 571 options.append(luks_default_key_secret_opt) 572 573 self._args.append('-drive') 574 self._args.append(','.join(options)) 575 self._num_drives += 1 576 return self 577 578 def add_blockdev(self, opts): 579 self._args.append('-blockdev') 580 if isinstance(opts, str): 581 self._args.append(opts) 582 else: 583 self._args.append(','.join(opts)) 584 return self 585 586 def add_incoming(self, addr): 587 self._args.append('-incoming') 588 self._args.append(addr) 589 return self 590 591 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 592 cmd = 'human-monitor-command' 593 kwargs = {'command-line': command_line} 594 if use_log: 595 return self.qmp_log(cmd, **kwargs) 596 else: 597 return self.qmp(cmd, **kwargs) 598 599 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 600 """Pause drive r/w operations""" 601 if not event: 602 self.pause_drive(drive, "read_aio") 603 self.pause_drive(drive, "write_aio") 604 return 605 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 606 607 def resume_drive(self, drive: str) -> None: 608 """Resume drive r/w operations""" 609 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 610 611 def hmp_qemu_io(self, drive: str, cmd: str, 612 use_log: bool = False) -> QMPMessage: 613 """Write to a given drive using an HMP command""" 614 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 615 616 def flatten_qmp_object(self, obj, output=None, basestr=''): 617 if output is None: 618 output = dict() 619 if isinstance(obj, list): 620 for i, item in enumerate(obj): 621 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 622 elif isinstance(obj, dict): 623 for key in obj: 624 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 625 else: 626 output[basestr[:-1]] = obj # Strip trailing '.' 627 return output 628 629 def qmp_to_opts(self, obj): 630 obj = self.flatten_qmp_object(obj) 631 output_list = list() 632 for key in obj: 633 output_list += [key + '=' + obj[key]] 634 return ','.join(output_list) 635 636 def get_qmp_events_filtered(self, wait=60.0): 637 result = [] 638 for ev in self.get_qmp_events(wait=wait): 639 result.append(filter_qmp_event(ev)) 640 return result 641 642 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 643 full_cmd = OrderedDict(( 644 ("execute", cmd), 645 ("arguments", ordered_qmp(kwargs)) 646 )) 647 log(full_cmd, filters, indent=indent) 648 result = self.qmp(cmd, **kwargs) 649 log(result, filters, indent=indent) 650 return result 651 652 # Returns None on success, and an error string on failure 653 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 654 pre_finalize=None, cancel=False, wait=60.0): 655 """ 656 run_job moves a job from creation through to dismissal. 657 658 :param job: String. ID of recently-launched job 659 :param auto_finalize: Bool. True if the job was launched with 660 auto_finalize. Defaults to True. 661 :param auto_dismiss: Bool. True if the job was launched with 662 auto_dismiss=True. Defaults to False. 663 :param pre_finalize: Callback. A callable that takes no arguments to be 664 invoked prior to issuing job-finalize, if any. 665 :param cancel: Bool. When true, cancels the job after the pre_finalize 666 callback. 667 :param wait: Float. Timeout value specifying how long to wait for any 668 event, in seconds. Defaults to 60.0. 669 """ 670 match_device = {'data': {'device': job}} 671 match_id = {'data': {'id': job}} 672 events = [ 673 ('BLOCK_JOB_COMPLETED', match_device), 674 ('BLOCK_JOB_CANCELLED', match_device), 675 ('BLOCK_JOB_ERROR', match_device), 676 ('BLOCK_JOB_READY', match_device), 677 ('BLOCK_JOB_PENDING', match_id), 678 ('JOB_STATUS_CHANGE', match_id) 679 ] 680 error = None 681 while True: 682 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 683 if ev['event'] != 'JOB_STATUS_CHANGE': 684 log(ev) 685 continue 686 status = ev['data']['status'] 687 if status == 'aborting': 688 result = self.qmp('query-jobs') 689 for j in result['return']: 690 if j['id'] == job: 691 error = j['error'] 692 log('Job failed: %s' % (j['error'])) 693 elif status == 'ready': 694 self.qmp_log('job-complete', id=job) 695 elif status == 'pending' and not auto_finalize: 696 if pre_finalize: 697 pre_finalize() 698 if cancel: 699 self.qmp_log('job-cancel', id=job) 700 else: 701 self.qmp_log('job-finalize', id=job) 702 elif status == 'concluded' and not auto_dismiss: 703 self.qmp_log('job-dismiss', id=job) 704 elif status == 'null': 705 return error 706 707 # Returns None on success, and an error string on failure 708 def blockdev_create(self, options, job_id='job0', filters=None): 709 if filters is None: 710 filters = [filter_qmp_testfiles] 711 result = self.qmp_log('blockdev-create', filters=filters, 712 job_id=job_id, options=options) 713 714 if 'return' in result: 715 assert result['return'] == {} 716 job_result = self.run_job(job_id) 717 else: 718 job_result = result['error'] 719 720 log("") 721 return job_result 722 723 def enable_migration_events(self, name): 724 log('Enabling migration QMP events on %s...' % name) 725 log(self.qmp('migrate-set-capabilities', capabilities=[ 726 { 727 'capability': 'events', 728 'state': True 729 } 730 ])) 731 732 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 733 while True: 734 event = self.event_wait('MIGRATION') 735 log(event, filters=[filter_qmp_event]) 736 if event['data']['status'] in ('completed', 'failed'): 737 break 738 739 if event['data']['status'] == 'completed': 740 # The event may occur in finish-migrate, so wait for the expected 741 # post-migration runstate 742 runstate = None 743 while runstate != expect_runstate: 744 runstate = self.qmp('query-status')['return']['status'] 745 return True 746 else: 747 return False 748 749 def node_info(self, node_name): 750 nodes = self.qmp('query-named-block-nodes') 751 for x in nodes['return']: 752 if x['node-name'] == node_name: 753 return x 754 return None 755 756 def query_bitmaps(self): 757 res = self.qmp("query-named-block-nodes") 758 return {device['node-name']: device['dirty-bitmaps'] 759 for device in res['return'] if 'dirty-bitmaps' in device} 760 761 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 762 """ 763 get a specific bitmap from the object returned by query_bitmaps. 764 :param recording: If specified, filter results by the specified value. 765 :param bitmaps: If specified, use it instead of call query_bitmaps() 766 """ 767 if bitmaps is None: 768 bitmaps = self.query_bitmaps() 769 770 for bitmap in bitmaps[node_name]: 771 if bitmap.get('name', '') == bitmap_name: 772 if recording is None or bitmap.get('recording') == recording: 773 return bitmap 774 return None 775 776 def check_bitmap_status(self, node_name, bitmap_name, fields): 777 ret = self.get_bitmap(node_name, bitmap_name) 778 779 return fields.items() <= ret.items() 780 781 def assert_block_path(self, root, path, expected_node, graph=None): 782 """ 783 Check whether the node under the given path in the block graph 784 is @expected_node. 785 786 @root is the node name of the node where the @path is rooted. 787 788 @path is a string that consists of child names separated by 789 slashes. It must begin with a slash. 790 791 Examples for @root + @path: 792 - root="qcow2-node", path="/backing/file" 793 - root="quorum-node", path="/children.2/file" 794 795 Hypothetically, @path could be empty, in which case it would 796 point to @root. However, in practice this case is not useful 797 and hence not allowed. 798 799 @expected_node may be None. (All elements of the path but the 800 leaf must still exist.) 801 802 @graph may be None or the result of an x-debug-query-block-graph 803 call that has already been performed. 804 """ 805 if graph is None: 806 graph = self.qmp('x-debug-query-block-graph')['return'] 807 808 iter_path = iter(path.split('/')) 809 810 # Must start with a / 811 assert next(iter_path) == '' 812 813 node = next((node for node in graph['nodes'] if node['name'] == root), 814 None) 815 816 # An empty @path is not allowed, so the root node must be present 817 assert node is not None, 'Root node %s not found' % root 818 819 for child_name in iter_path: 820 assert node is not None, 'Cannot follow path %s%s' % (root, path) 821 822 try: 823 node_id = next(edge['child'] for edge in graph['edges'] 824 if (edge['parent'] == node['id'] and 825 edge['name'] == child_name)) 826 827 node = next(node for node in graph['nodes'] 828 if node['id'] == node_id) 829 830 except StopIteration: 831 node = None 832 833 if node is None: 834 assert expected_node is None, \ 835 'No node found under %s (but expected %s)' % \ 836 (path, expected_node) 837 else: 838 assert node['name'] == expected_node, \ 839 'Found node %s under %s (but expected %s)' % \ 840 (node['name'], path, expected_node) 841 842index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 843 844class QMPTestCase(unittest.TestCase): 845 '''Abstract base class for QMP test cases''' 846 847 def __init__(self, *args, **kwargs): 848 super().__init__(*args, **kwargs) 849 # Many users of this class set a VM property we rely on heavily 850 # in the methods below. 851 self.vm = None 852 853 def dictpath(self, d, path): 854 '''Traverse a path in a nested dict''' 855 for component in path.split('/'): 856 m = index_re.match(component) 857 if m: 858 component, idx = m.groups() 859 idx = int(idx) 860 861 if not isinstance(d, dict) or component not in d: 862 self.fail(f'failed path traversal for "{path}" in "{d}"') 863 d = d[component] 864 865 if m: 866 if not isinstance(d, list): 867 self.fail(f'path component "{component}" in "{path}" ' 868 f'is not a list in "{d}"') 869 try: 870 d = d[idx] 871 except IndexError: 872 self.fail(f'invalid index "{idx}" in path "{path}" ' 873 f'in "{d}"') 874 return d 875 876 def assert_qmp_absent(self, d, path): 877 try: 878 result = self.dictpath(d, path) 879 except AssertionError: 880 return 881 self.fail('path "%s" has value "%s"' % (path, str(result))) 882 883 def assert_qmp(self, d, path, value): 884 '''Assert that the value for a specific path in a QMP dict 885 matches. When given a list of values, assert that any of 886 them matches.''' 887 888 result = self.dictpath(d, path) 889 890 # [] makes no sense as a list of valid values, so treat it as 891 # an actual single value. 892 if isinstance(value, list) and value != []: 893 for v in value: 894 if result == v: 895 return 896 self.fail('no match for "%s" in %s' % (str(result), str(value))) 897 else: 898 self.assertEqual(result, value, 899 '"%s" is "%s", expected "%s"' 900 % (path, str(result), str(value))) 901 902 def assert_no_active_block_jobs(self): 903 result = self.vm.qmp('query-block-jobs') 904 self.assert_qmp(result, 'return', []) 905 906 def assert_has_block_node(self, node_name=None, file_name=None): 907 """Issue a query-named-block-nodes and assert node_name and/or 908 file_name is present in the result""" 909 def check_equal_or_none(a, b): 910 return a is None or b is None or a == b 911 assert node_name or file_name 912 result = self.vm.qmp('query-named-block-nodes') 913 for x in result["return"]: 914 if check_equal_or_none(x.get("node-name"), node_name) and \ 915 check_equal_or_none(x.get("file"), file_name): 916 return 917 self.fail("Cannot find %s %s in result:\n%s" % 918 (node_name, file_name, result)) 919 920 def assert_json_filename_equal(self, json_filename, reference): 921 '''Asserts that the given filename is a json: filename and that its 922 content is equal to the given reference object''' 923 self.assertEqual(json_filename[:5], 'json:') 924 self.assertEqual( 925 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 926 self.vm.flatten_qmp_object(reference) 927 ) 928 929 def cancel_and_wait(self, drive='drive0', force=False, 930 resume=False, wait=60.0): 931 '''Cancel a block job and wait for it to finish, returning the event''' 932 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 933 self.assert_qmp(result, 'return', {}) 934 935 if resume: 936 self.vm.resume_drive(drive) 937 938 cancelled = False 939 result = None 940 while not cancelled: 941 for event in self.vm.get_qmp_events(wait=wait): 942 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 943 event['event'] == 'BLOCK_JOB_CANCELLED': 944 self.assert_qmp(event, 'data/device', drive) 945 result = event 946 cancelled = True 947 elif event['event'] == 'JOB_STATUS_CHANGE': 948 self.assert_qmp(event, 'data/id', drive) 949 950 951 self.assert_no_active_block_jobs() 952 return result 953 954 def wait_until_completed(self, drive='drive0', check_offset=True, 955 wait=60.0, error=None): 956 '''Wait for a block job to finish, returning the event''' 957 while True: 958 for event in self.vm.get_qmp_events(wait=wait): 959 if event['event'] == 'BLOCK_JOB_COMPLETED': 960 self.assert_qmp(event, 'data/device', drive) 961 if error is None: 962 self.assert_qmp_absent(event, 'data/error') 963 if check_offset: 964 self.assert_qmp(event, 'data/offset', 965 event['data']['len']) 966 else: 967 self.assert_qmp(event, 'data/error', error) 968 self.assert_no_active_block_jobs() 969 return event 970 if event['event'] == 'JOB_STATUS_CHANGE': 971 self.assert_qmp(event, 'data/id', drive) 972 973 def wait_ready(self, drive='drive0'): 974 """Wait until a BLOCK_JOB_READY event, and return the event.""" 975 f = {'data': {'type': 'mirror', 'device': drive}} 976 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 977 978 def wait_ready_and_cancel(self, drive='drive0'): 979 self.wait_ready(drive=drive) 980 event = self.cancel_and_wait(drive=drive) 981 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 982 self.assert_qmp(event, 'data/type', 'mirror') 983 self.assert_qmp(event, 'data/offset', event['data']['len']) 984 985 def complete_and_wait(self, drive='drive0', wait_ready=True, 986 completion_error=None): 987 '''Complete a block job and wait for it to finish''' 988 if wait_ready: 989 self.wait_ready(drive=drive) 990 991 result = self.vm.qmp('block-job-complete', device=drive) 992 self.assert_qmp(result, 'return', {}) 993 994 event = self.wait_until_completed(drive=drive, error=completion_error) 995 self.assert_qmp(event, 'data/type', 'mirror') 996 997 def pause_wait(self, job_id='job0'): 998 with Timeout(3, "Timeout waiting for job to pause"): 999 while True: 1000 result = self.vm.qmp('query-block-jobs') 1001 found = False 1002 for job in result['return']: 1003 if job['device'] == job_id: 1004 found = True 1005 if job['paused'] and not job['busy']: 1006 return job 1007 break 1008 assert found 1009 1010 def pause_job(self, job_id='job0', wait=True): 1011 result = self.vm.qmp('block-job-pause', device=job_id) 1012 self.assert_qmp(result, 'return', {}) 1013 if wait: 1014 return self.pause_wait(job_id) 1015 return result 1016 1017 def case_skip(self, reason): 1018 '''Skip this test case''' 1019 case_notrun(reason) 1020 self.skipTest(reason) 1021 1022 1023def notrun(reason): 1024 '''Skip this test suite''' 1025 # Each test in qemu-iotests has a number ("seq") 1026 seq = os.path.basename(sys.argv[0]) 1027 1028 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1029 logger.warning("%s not run: %s", seq, reason) 1030 sys.exit(0) 1031 1032def case_notrun(reason): 1033 '''Mark this test case as not having been run (without actually 1034 skipping it, that is left to the caller). See 1035 QMPTestCase.case_skip() for a variant that actually skips the 1036 current test case.''' 1037 1038 # Each test in qemu-iotests has a number ("seq") 1039 seq = os.path.basename(sys.argv[0]) 1040 1041 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1042 ' [case not run] ' + reason + '\n') 1043 1044def _verify_image_format(supported_fmts: Sequence[str] = (), 1045 unsupported_fmts: Sequence[str] = ()) -> None: 1046 assert not (supported_fmts and unsupported_fmts) 1047 1048 if 'generic' in supported_fmts and \ 1049 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1050 # similar to 1051 # _supported_fmt generic 1052 # for bash tests 1053 if imgfmt == 'luks': 1054 verify_working_luks() 1055 return 1056 1057 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1058 if not_sup or (imgfmt in unsupported_fmts): 1059 notrun('not suitable for this image format: %s' % imgfmt) 1060 1061 if imgfmt == 'luks': 1062 verify_working_luks() 1063 1064def _verify_protocol(supported: Sequence[str] = (), 1065 unsupported: Sequence[str] = ()) -> None: 1066 assert not (supported and unsupported) 1067 1068 if 'generic' in supported: 1069 return 1070 1071 not_sup = supported and (imgproto not in supported) 1072 if not_sup or (imgproto in unsupported): 1073 notrun('not suitable for this protocol: %s' % imgproto) 1074 1075def _verify_platform(supported: Sequence[str] = (), 1076 unsupported: Sequence[str] = ()) -> None: 1077 if any((sys.platform.startswith(x) for x in unsupported)): 1078 notrun('not suitable for this OS: %s' % sys.platform) 1079 1080 if supported: 1081 if not any((sys.platform.startswith(x) for x in supported)): 1082 notrun('not suitable for this OS: %s' % sys.platform) 1083 1084def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1085 if supported_cache_modes and (cachemode not in supported_cache_modes): 1086 notrun('not suitable for this cache mode: %s' % cachemode) 1087 1088def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1089 if supported_aio_modes and (aiomode not in supported_aio_modes): 1090 notrun('not suitable for this aio mode: %s' % aiomode) 1091 1092def supports_quorum(): 1093 return 'quorum' in qemu_img_pipe('--help') 1094 1095def verify_quorum(): 1096 '''Skip test suite if quorum support is not available''' 1097 if not supports_quorum(): 1098 notrun('quorum support missing') 1099 1100def has_working_luks() -> Tuple[bool, str]: 1101 """ 1102 Check whether our LUKS driver can actually create images 1103 (this extends to LUKS encryption for qcow2). 1104 1105 If not, return the reason why. 1106 """ 1107 1108 img_file = f'{test_dir}/luks-test.luks' 1109 (output, status) = \ 1110 qemu_img_pipe_and_status('create', '-f', 'luks', 1111 '--object', luks_default_secret_object, 1112 '-o', luks_default_key_secret_opt, 1113 '-o', 'iter-time=10', 1114 img_file, '1G') 1115 try: 1116 os.remove(img_file) 1117 except OSError: 1118 pass 1119 1120 if status != 0: 1121 reason = output 1122 for line in output.splitlines(): 1123 if img_file + ':' in line: 1124 reason = line.split(img_file + ':', 1)[1].strip() 1125 break 1126 1127 return (False, reason) 1128 else: 1129 return (True, '') 1130 1131def verify_working_luks(): 1132 """ 1133 Skip test suite if LUKS does not work 1134 """ 1135 (working, reason) = has_working_luks() 1136 if not working: 1137 notrun(reason) 1138 1139def qemu_pipe(*args): 1140 """ 1141 Run qemu with an option to print something and exit (e.g. a help option). 1142 1143 :return: QEMU's stdout output. 1144 """ 1145 args = [qemu_prog] + qemu_opts + list(args) 1146 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1147 stderr=subprocess.STDOUT, 1148 universal_newlines=True) 1149 output = subp.communicate()[0] 1150 if subp.returncode < 0: 1151 sys.stderr.write('qemu received signal %i: %s\n' % 1152 (-subp.returncode, ' '.join(args))) 1153 return output 1154 1155def supported_formats(read_only=False): 1156 '''Set 'read_only' to True to check ro-whitelist 1157 Otherwise, rw-whitelist is checked''' 1158 1159 if not hasattr(supported_formats, "formats"): 1160 supported_formats.formats = {} 1161 1162 if read_only not in supported_formats.formats: 1163 format_message = qemu_pipe("-drive", "format=help") 1164 line = 1 if read_only else 0 1165 supported_formats.formats[read_only] = \ 1166 format_message.splitlines()[line].split(":")[1].split() 1167 1168 return supported_formats.formats[read_only] 1169 1170def skip_if_unsupported(required_formats=(), read_only=False): 1171 '''Skip Test Decorator 1172 Runs the test if all the required formats are whitelisted''' 1173 def skip_test_decorator(func): 1174 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1175 **kwargs: Dict[str, Any]) -> None: 1176 if callable(required_formats): 1177 fmts = required_formats(test_case) 1178 else: 1179 fmts = required_formats 1180 1181 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1182 if usf_list: 1183 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1184 test_case.case_skip(msg) 1185 else: 1186 func(test_case, *args, **kwargs) 1187 return func_wrapper 1188 return skip_test_decorator 1189 1190def skip_for_formats(formats: Sequence[str] = ()) \ 1191 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1192 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1193 '''Skip Test Decorator 1194 Skips the test for the given formats''' 1195 def skip_test_decorator(func): 1196 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1197 **kwargs: Dict[str, Any]) -> None: 1198 if imgfmt in formats: 1199 msg = f'{test_case}: Skipped for format {imgfmt}' 1200 test_case.case_skip(msg) 1201 else: 1202 func(test_case, *args, **kwargs) 1203 return func_wrapper 1204 return skip_test_decorator 1205 1206def skip_if_user_is_root(func): 1207 '''Skip Test Decorator 1208 Runs the test only without root permissions''' 1209 def func_wrapper(*args, **kwargs): 1210 if os.getuid() == 0: 1211 case_notrun('{}: cannot be run as root'.format(args[0])) 1212 return None 1213 else: 1214 return func(*args, **kwargs) 1215 return func_wrapper 1216 1217def execute_unittest(debug=False): 1218 """Executes unittests within the calling module.""" 1219 1220 verbosity = 2 if debug else 1 1221 1222 if debug: 1223 output = sys.stdout 1224 else: 1225 # We need to filter out the time taken from the output so that 1226 # qemu-iotest can reliably diff the results against master output. 1227 output = io.StringIO() 1228 1229 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1230 verbosity=verbosity) 1231 try: 1232 # unittest.main() will use sys.exit(); so expect a SystemExit 1233 # exception 1234 unittest.main(testRunner=runner) 1235 finally: 1236 # We need to filter out the time taken from the output so that 1237 # qemu-iotest can reliably diff the results against master output. 1238 if not debug: 1239 out = output.getvalue() 1240 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1241 1242 # Hide skipped tests from the reference output 1243 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1244 out_first_line, out_rest = out.split('\n', 1) 1245 out = out_first_line.replace('s', '.') + '\n' + out_rest 1246 1247 sys.stderr.write(out) 1248 1249def execute_setup_common(supported_fmts: Sequence[str] = (), 1250 supported_platforms: Sequence[str] = (), 1251 supported_cache_modes: Sequence[str] = (), 1252 supported_aio_modes: Sequence[str] = (), 1253 unsupported_fmts: Sequence[str] = (), 1254 supported_protocols: Sequence[str] = (), 1255 unsupported_protocols: Sequence[str] = ()) -> bool: 1256 """ 1257 Perform necessary setup for either script-style or unittest-style tests. 1258 1259 :return: Bool; Whether or not debug mode has been requested via the CLI. 1260 """ 1261 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1262 1263 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1264 # indicate that we're not being run via "check". There may be 1265 # other things set up by "check" that individual test cases rely 1266 # on. 1267 if test_dir is None or qemu_default_machine is None: 1268 sys.stderr.write('Please run this test via the "check" script\n') 1269 sys.exit(os.EX_USAGE) 1270 1271 debug = '-d' in sys.argv 1272 if debug: 1273 sys.argv.remove('-d') 1274 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1275 1276 _verify_image_format(supported_fmts, unsupported_fmts) 1277 _verify_protocol(supported_protocols, unsupported_protocols) 1278 _verify_platform(supported=supported_platforms) 1279 _verify_cache_mode(supported_cache_modes) 1280 _verify_aio_mode(supported_aio_modes) 1281 1282 return debug 1283 1284def execute_test(*args, test_function=None, **kwargs): 1285 """Run either unittest or script-style tests.""" 1286 1287 debug = execute_setup_common(*args, **kwargs) 1288 if not test_function: 1289 execute_unittest(debug) 1290 else: 1291 test_function() 1292 1293def activate_logging(): 1294 """Activate iotests.log() output to stdout for script-style tests.""" 1295 handler = logging.StreamHandler(stream=sys.stdout) 1296 formatter = logging.Formatter('%(message)s') 1297 handler.setFormatter(formatter) 1298 test_logger.addHandler(handler) 1299 test_logger.setLevel(logging.INFO) 1300 test_logger.propagate = False 1301 1302# This is called from script-style iotests without a single point of entry 1303def script_initialize(*args, **kwargs): 1304 """Initialize script-style tests without running any tests.""" 1305 activate_logging() 1306 execute_setup_common(*args, **kwargs) 1307 1308# This is called from script-style iotests with a single point of entry 1309def script_main(test_function, *args, **kwargs): 1310 """Run script-style tests outside of the unittest framework""" 1311 activate_logging() 1312 execute_test(*args, test_function=test_function, **kwargs) 1313 1314# This is called from unittest style iotests 1315def main(*args, **kwargs): 1316 """Run tests using the unittest framework""" 1317 execute_test(*args, **kwargs) 1318