1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31from typing import (Any, Callable, Dict, Iterable, 32 List, Optional, Sequence, Tuple, TypeVar) 33import unittest 34 35# pylint: disable=import-error, wrong-import-position 36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 37from qemu import qtest 38from qemu.qmp import QMPMessage 39 40assert sys.version_info >= (3, 6) 41 42# Use this logger for logging messages directly from the iotests module 43logger = logging.getLogger('qemu.iotests') 44logger.addHandler(logging.NullHandler()) 45 46# Use this logger for messages that ought to be used for diff output. 47test_logger = logging.getLogger('qemu.iotests.diff_io') 48 49 50faulthandler.enable() 51 52# This will not work if arguments contain spaces but is necessary if we 53# want to support the override options that ./check supports. 54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 55if os.environ.get('QEMU_IMG_OPTIONS'): 56 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 57 58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 59if os.environ.get('QEMU_IO_OPTIONS'): 60 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 61 62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 64 qemu_io_args_no_fmt += \ 65 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 66 67qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 68if os.environ.get('QEMU_NBD_OPTIONS'): 69 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 70 71qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 72qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 73 74imgfmt = os.environ.get('IMGFMT', 'raw') 75imgproto = os.environ.get('IMGPROTO', 'file') 76test_dir = os.environ.get('TEST_DIR') 77sock_dir = os.environ.get('SOCK_DIR') 78output_dir = os.environ.get('OUTPUT_DIR', '.') 79cachemode = os.environ.get('CACHEMODE') 80aiomode = os.environ.get('AIOMODE') 81qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 82 83socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 84 85luks_default_secret_object = 'secret,id=keysec0,data=' + \ 86 os.environ.get('IMGKEYSECRET', '') 87luks_default_key_secret_opt = 'key-secret=keysec0' 88 89 90def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 91 """ 92 Run qemu-img and return both its output and its exit code 93 """ 94 subp = subprocess.Popen(qemu_img_args + list(args), 95 stdout=subprocess.PIPE, 96 stderr=subprocess.STDOUT, 97 universal_newlines=True) 98 output = subp.communicate()[0] 99 if subp.returncode < 0: 100 sys.stderr.write('qemu-img received signal %i: %s\n' 101 % (-subp.returncode, 102 ' '.join(qemu_img_args + list(args)))) 103 return (output, subp.returncode) 104 105def qemu_img(*args: str) -> int: 106 '''Run qemu-img and return the exit code''' 107 return qemu_img_pipe_and_status(*args)[1] 108 109def ordered_qmp(qmsg, conv_keys=True): 110 # Dictionaries are not ordered prior to 3.6, therefore: 111 if isinstance(qmsg, list): 112 return [ordered_qmp(atom) for atom in qmsg] 113 if isinstance(qmsg, dict): 114 od = OrderedDict() 115 for k, v in sorted(qmsg.items()): 116 if conv_keys: 117 k = k.replace('_', '-') 118 od[k] = ordered_qmp(v, conv_keys=False) 119 return od 120 return qmsg 121 122def qemu_img_create(*args): 123 args = list(args) 124 125 # default luks support 126 if '-f' in args and args[args.index('-f') + 1] == 'luks': 127 if '-o' in args: 128 i = args.index('-o') 129 if 'key-secret' not in args[i + 1]: 130 args[i + 1].append(luks_default_key_secret_opt) 131 args.insert(i + 2, '--object') 132 args.insert(i + 3, luks_default_secret_object) 133 else: 134 args = ['-o', luks_default_key_secret_opt, 135 '--object', luks_default_secret_object] + args 136 137 args.insert(0, 'create') 138 139 return qemu_img(*args) 140 141def qemu_img_verbose(*args): 142 '''Run qemu-img without suppressing its output and return the exit code''' 143 exitcode = subprocess.call(qemu_img_args + list(args)) 144 if exitcode < 0: 145 sys.stderr.write('qemu-img received signal %i: %s\n' 146 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 147 return exitcode 148 149def qemu_img_pipe(*args: str) -> str: 150 '''Run qemu-img and return its output''' 151 return qemu_img_pipe_and_status(*args)[0] 152 153def qemu_img_log(*args): 154 result = qemu_img_pipe(*args) 155 log(result, filters=[filter_testfiles]) 156 return result 157 158def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 159 args = ['info'] 160 if imgopts: 161 args.append('--image-opts') 162 else: 163 args += ['-f', imgfmt] 164 args += extra_args 165 args.append(filename) 166 167 output = qemu_img_pipe(*args) 168 if not filter_path: 169 filter_path = filename 170 log(filter_img_info(output, filter_path)) 171 172def qemu_io(*args): 173 '''Run qemu-io and return the stdout data''' 174 args = qemu_io_args + list(args) 175 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 176 stderr=subprocess.STDOUT, 177 universal_newlines=True) 178 output = subp.communicate()[0] 179 if subp.returncode < 0: 180 sys.stderr.write('qemu-io received signal %i: %s\n' 181 % (-subp.returncode, ' '.join(args))) 182 return output 183 184def qemu_io_log(*args): 185 result = qemu_io(*args) 186 log(result, filters=[filter_testfiles, filter_qemu_io]) 187 return result 188 189def qemu_io_silent(*args): 190 '''Run qemu-io and return the exit code, suppressing stdout''' 191 args = qemu_io_args + list(args) 192 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 193 if exitcode < 0: 194 sys.stderr.write('qemu-io received signal %i: %s\n' % 195 (-exitcode, ' '.join(args))) 196 return exitcode 197 198def qemu_io_silent_check(*args): 199 '''Run qemu-io and return the true if subprocess returned 0''' 200 args = qemu_io_args + list(args) 201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 202 stderr=subprocess.STDOUT) 203 return exitcode == 0 204 205def get_virtio_scsi_device(): 206 if qemu_default_machine == 's390-ccw-virtio': 207 return 'virtio-scsi-ccw' 208 return 'virtio-scsi-pci' 209 210class QemuIoInteractive: 211 def __init__(self, *args): 212 self.args = qemu_io_args_no_fmt + list(args) 213 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 214 stdout=subprocess.PIPE, 215 stderr=subprocess.STDOUT, 216 universal_newlines=True) 217 out = self._p.stdout.read(9) 218 if out != 'qemu-io> ': 219 # Most probably qemu-io just failed to start. 220 # Let's collect the whole output and exit. 221 out += self._p.stdout.read() 222 self._p.wait(timeout=1) 223 raise ValueError(out) 224 225 def close(self): 226 self._p.communicate('q\n') 227 228 def _read_output(self): 229 pattern = 'qemu-io> ' 230 n = len(pattern) 231 pos = 0 232 s = [] 233 while pos != n: 234 c = self._p.stdout.read(1) 235 # check unexpected EOF 236 assert c != '' 237 s.append(c) 238 if c == pattern[pos]: 239 pos += 1 240 else: 241 pos = 0 242 243 return ''.join(s[:-n]) 244 245 def cmd(self, cmd): 246 # quit command is in close(), '\n' is added automatically 247 assert '\n' not in cmd 248 cmd = cmd.strip() 249 assert cmd not in ('q', 'quit') 250 self._p.stdin.write(cmd + '\n') 251 self._p.stdin.flush() 252 return self._read_output() 253 254 255def qemu_nbd(*args): 256 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 257 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 258 259def qemu_nbd_early_pipe(*args): 260 '''Run qemu-nbd in daemon mode and return both the parent's exit code 261 and its output in case of an error''' 262 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 263 stdout=subprocess.PIPE, 264 universal_newlines=True) 265 output = subp.communicate()[0] 266 if subp.returncode < 0: 267 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 268 (-subp.returncode, 269 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 270 271 return subp.returncode, output if subp.returncode else '' 272 273def qemu_nbd_popen(*args): 274 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 275 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args)) 276 277def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 278 '''Return True if two image files are identical''' 279 return qemu_img('compare', '-f', fmt1, 280 '-F', fmt2, img1, img2) == 0 281 282def create_image(name, size): 283 '''Create a fully-allocated raw image with sector markers''' 284 file = open(name, 'wb') 285 i = 0 286 while i < size: 287 sector = struct.pack('>l504xl', i // 512, i // 512) 288 file.write(sector) 289 i = i + 512 290 file.close() 291 292def image_size(img): 293 '''Return image's virtual size''' 294 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 295 return json.loads(r)['virtual-size'] 296 297def is_str(val): 298 return isinstance(val, str) 299 300test_dir_re = re.compile(r"%s" % test_dir) 301def filter_test_dir(msg): 302 return test_dir_re.sub("TEST_DIR", msg) 303 304win32_re = re.compile(r"\r") 305def filter_win32(msg): 306 return win32_re.sub("", msg) 307 308qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 309 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 310 r"and [0-9\/.inf]* ops\/sec\)") 311def filter_qemu_io(msg): 312 msg = filter_win32(msg) 313 return qemu_io_re.sub("X ops; XX:XX:XX.X " 314 "(XXX YYY/sec and XXX ops/sec)", msg) 315 316chown_re = re.compile(r"chown [0-9]+:[0-9]+") 317def filter_chown(msg): 318 return chown_re.sub("chown UID:GID", msg) 319 320def filter_qmp_event(event): 321 '''Filter a QMP event dict''' 322 event = dict(event) 323 if 'timestamp' in event: 324 event['timestamp']['seconds'] = 'SECS' 325 event['timestamp']['microseconds'] = 'USECS' 326 return event 327 328def filter_qmp(qmsg, filter_fn): 329 '''Given a string filter, filter a QMP object's values. 330 filter_fn takes a (key, value) pair.''' 331 # Iterate through either lists or dicts; 332 if isinstance(qmsg, list): 333 items = enumerate(qmsg) 334 else: 335 items = qmsg.items() 336 337 for k, v in items: 338 if isinstance(v, (dict, list)): 339 qmsg[k] = filter_qmp(v, filter_fn) 340 else: 341 qmsg[k] = filter_fn(k, v) 342 return qmsg 343 344def filter_testfiles(msg): 345 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 346 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 347 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 348 349def filter_qmp_testfiles(qmsg): 350 def _filter(_key, value): 351 if is_str(value): 352 return filter_testfiles(value) 353 return value 354 return filter_qmp(qmsg, _filter) 355 356def filter_generated_node_ids(msg): 357 return re.sub("#block[0-9]+", "NODE_NAME", msg) 358 359def filter_img_info(output, filename): 360 lines = [] 361 for line in output.split('\n'): 362 if 'disk size' in line or 'actual-size' in line: 363 continue 364 line = line.replace(filename, 'TEST_IMG') 365 line = filter_testfiles(line) 366 line = line.replace(imgfmt, 'IMGFMT') 367 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 368 line = re.sub('uuid: [-a-f0-9]+', 369 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 370 line) 371 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 372 lines.append(line) 373 return '\n'.join(lines) 374 375def filter_imgfmt(msg): 376 return msg.replace(imgfmt, 'IMGFMT') 377 378def filter_qmp_imgfmt(qmsg): 379 def _filter(_key, value): 380 if is_str(value): 381 return filter_imgfmt(value) 382 return value 383 return filter_qmp(qmsg, _filter) 384 385 386Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 387 388def log(msg: Msg, 389 filters: Iterable[Callable[[Msg], Msg]] = (), 390 indent: Optional[int] = None) -> None: 391 """ 392 Logs either a string message or a JSON serializable message (like QMP). 393 If indent is provided, JSON serializable messages are pretty-printed. 394 """ 395 for flt in filters: 396 msg = flt(msg) 397 if isinstance(msg, (dict, list)): 398 # Don't sort if it's already sorted 399 do_sort = not isinstance(msg, OrderedDict) 400 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 401 else: 402 test_logger.info(msg) 403 404class Timeout: 405 def __init__(self, seconds, errmsg="Timeout"): 406 self.seconds = seconds 407 self.errmsg = errmsg 408 def __enter__(self): 409 signal.signal(signal.SIGALRM, self.timeout) 410 signal.setitimer(signal.ITIMER_REAL, self.seconds) 411 return self 412 def __exit__(self, exc_type, value, traceback): 413 signal.setitimer(signal.ITIMER_REAL, 0) 414 return False 415 def timeout(self, signum, frame): 416 raise Exception(self.errmsg) 417 418def file_pattern(name): 419 return "{0}-{1}".format(os.getpid(), name) 420 421class FilePaths: 422 """ 423 FilePaths is an auto-generated filename that cleans itself up. 424 425 Use this context manager to generate filenames and ensure that the file 426 gets deleted:: 427 428 with FilePaths(['test.img']) as img_path: 429 qemu_img('create', img_path, '1G') 430 # migration_sock_path is automatically deleted 431 """ 432 def __init__(self, names, base_dir=test_dir): 433 self.paths = [] 434 for name in names: 435 self.paths.append(os.path.join(base_dir, file_pattern(name))) 436 437 def __enter__(self): 438 return self.paths 439 440 def __exit__(self, exc_type, exc_val, exc_tb): 441 try: 442 for path in self.paths: 443 os.remove(path) 444 except OSError: 445 pass 446 return False 447 448class FilePath(FilePaths): 449 """ 450 FilePath is a specialization of FilePaths that takes a single filename. 451 """ 452 def __init__(self, name, base_dir=test_dir): 453 super(FilePath, self).__init__([name], base_dir) 454 455 def __enter__(self): 456 return self.paths[0] 457 458def file_path_remover(): 459 for path in reversed(file_path_remover.paths): 460 try: 461 os.remove(path) 462 except OSError: 463 pass 464 465 466def file_path(*names, base_dir=test_dir): 467 ''' Another way to get auto-generated filename that cleans itself up. 468 469 Use is as simple as: 470 471 img_a, img_b = file_path('a.img', 'b.img') 472 sock = file_path('socket') 473 ''' 474 475 if not hasattr(file_path_remover, 'paths'): 476 file_path_remover.paths = [] 477 atexit.register(file_path_remover) 478 479 paths = [] 480 for name in names: 481 filename = file_pattern(name) 482 path = os.path.join(base_dir, filename) 483 file_path_remover.paths.append(path) 484 paths.append(path) 485 486 return paths[0] if len(paths) == 1 else paths 487 488def remote_filename(path): 489 if imgproto == 'file': 490 return path 491 elif imgproto == 'ssh': 492 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 493 else: 494 raise Exception("Protocol %s not supported" % (imgproto)) 495 496class VM(qtest.QEMUQtestMachine): 497 '''A QEMU VM''' 498 499 def __init__(self, path_suffix=''): 500 name = "qemu%s-%d" % (path_suffix, os.getpid()) 501 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 502 test_dir=test_dir, 503 socket_scm_helper=socket_scm_helper, 504 sock_dir=sock_dir) 505 self._num_drives = 0 506 507 def add_object(self, opts): 508 self._args.append('-object') 509 self._args.append(opts) 510 return self 511 512 def add_device(self, opts): 513 self._args.append('-device') 514 self._args.append(opts) 515 return self 516 517 def add_drive_raw(self, opts): 518 self._args.append('-drive') 519 self._args.append(opts) 520 return self 521 522 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 523 '''Add a virtio-blk drive to the VM''' 524 options = ['if=%s' % interface, 525 'id=drive%d' % self._num_drives] 526 527 if path is not None: 528 options.append('file=%s' % path) 529 options.append('format=%s' % img_format) 530 options.append('cache=%s' % cachemode) 531 options.append('aio=%s' % aiomode) 532 533 if opts: 534 options.append(opts) 535 536 if img_format == 'luks' and 'key-secret' not in opts: 537 # default luks support 538 if luks_default_secret_object not in self._args: 539 self.add_object(luks_default_secret_object) 540 541 options.append(luks_default_key_secret_opt) 542 543 self._args.append('-drive') 544 self._args.append(','.join(options)) 545 self._num_drives += 1 546 return self 547 548 def add_blockdev(self, opts): 549 self._args.append('-blockdev') 550 if isinstance(opts, str): 551 self._args.append(opts) 552 else: 553 self._args.append(','.join(opts)) 554 return self 555 556 def add_incoming(self, addr): 557 self._args.append('-incoming') 558 self._args.append(addr) 559 return self 560 561 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 562 cmd = 'human-monitor-command' 563 kwargs = {'command-line': command_line} 564 if use_log: 565 return self.qmp_log(cmd, **kwargs) 566 else: 567 return self.qmp(cmd, **kwargs) 568 569 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 570 """Pause drive r/w operations""" 571 if not event: 572 self.pause_drive(drive, "read_aio") 573 self.pause_drive(drive, "write_aio") 574 return 575 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 576 577 def resume_drive(self, drive: str) -> None: 578 """Resume drive r/w operations""" 579 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 580 581 def hmp_qemu_io(self, drive: str, cmd: str, 582 use_log: bool = False) -> QMPMessage: 583 """Write to a given drive using an HMP command""" 584 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 585 586 def flatten_qmp_object(self, obj, output=None, basestr=''): 587 if output is None: 588 output = dict() 589 if isinstance(obj, list): 590 for i, item in enumerate(obj): 591 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 592 elif isinstance(obj, dict): 593 for key in obj: 594 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 595 else: 596 output[basestr[:-1]] = obj # Strip trailing '.' 597 return output 598 599 def qmp_to_opts(self, obj): 600 obj = self.flatten_qmp_object(obj) 601 output_list = list() 602 for key in obj: 603 output_list += [key + '=' + obj[key]] 604 return ','.join(output_list) 605 606 def get_qmp_events_filtered(self, wait=60.0): 607 result = [] 608 for ev in self.get_qmp_events(wait=wait): 609 result.append(filter_qmp_event(ev)) 610 return result 611 612 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 613 full_cmd = OrderedDict(( 614 ("execute", cmd), 615 ("arguments", ordered_qmp(kwargs)) 616 )) 617 log(full_cmd, filters, indent=indent) 618 result = self.qmp(cmd, **kwargs) 619 log(result, filters, indent=indent) 620 return result 621 622 # Returns None on success, and an error string on failure 623 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 624 pre_finalize=None, cancel=False, wait=60.0): 625 """ 626 run_job moves a job from creation through to dismissal. 627 628 :param job: String. ID of recently-launched job 629 :param auto_finalize: Bool. True if the job was launched with 630 auto_finalize. Defaults to True. 631 :param auto_dismiss: Bool. True if the job was launched with 632 auto_dismiss=True. Defaults to False. 633 :param pre_finalize: Callback. A callable that takes no arguments to be 634 invoked prior to issuing job-finalize, if any. 635 :param cancel: Bool. When true, cancels the job after the pre_finalize 636 callback. 637 :param wait: Float. Timeout value specifying how long to wait for any 638 event, in seconds. Defaults to 60.0. 639 """ 640 match_device = {'data': {'device': job}} 641 match_id = {'data': {'id': job}} 642 events = [ 643 ('BLOCK_JOB_COMPLETED', match_device), 644 ('BLOCK_JOB_CANCELLED', match_device), 645 ('BLOCK_JOB_ERROR', match_device), 646 ('BLOCK_JOB_READY', match_device), 647 ('BLOCK_JOB_PENDING', match_id), 648 ('JOB_STATUS_CHANGE', match_id) 649 ] 650 error = None 651 while True: 652 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 653 if ev['event'] != 'JOB_STATUS_CHANGE': 654 log(ev) 655 continue 656 status = ev['data']['status'] 657 if status == 'aborting': 658 result = self.qmp('query-jobs') 659 for j in result['return']: 660 if j['id'] == job: 661 error = j['error'] 662 log('Job failed: %s' % (j['error'])) 663 elif status == 'ready': 664 self.qmp_log('job-complete', id=job) 665 elif status == 'pending' and not auto_finalize: 666 if pre_finalize: 667 pre_finalize() 668 if cancel: 669 self.qmp_log('job-cancel', id=job) 670 else: 671 self.qmp_log('job-finalize', id=job) 672 elif status == 'concluded' and not auto_dismiss: 673 self.qmp_log('job-dismiss', id=job) 674 elif status == 'null': 675 return error 676 677 # Returns None on success, and an error string on failure 678 def blockdev_create(self, options, job_id='job0', filters=None): 679 if filters is None: 680 filters = [filter_qmp_testfiles] 681 result = self.qmp_log('blockdev-create', filters=filters, 682 job_id=job_id, options=options) 683 684 if 'return' in result: 685 assert result['return'] == {} 686 job_result = self.run_job(job_id) 687 else: 688 job_result = result['error'] 689 690 log("") 691 return job_result 692 693 def enable_migration_events(self, name): 694 log('Enabling migration QMP events on %s...' % name) 695 log(self.qmp('migrate-set-capabilities', capabilities=[ 696 { 697 'capability': 'events', 698 'state': True 699 } 700 ])) 701 702 def wait_migration(self, expect_runstate): 703 while True: 704 event = self.event_wait('MIGRATION') 705 log(event, filters=[filter_qmp_event]) 706 if event['data']['status'] == 'completed': 707 break 708 # The event may occur in finish-migrate, so wait for the expected 709 # post-migration runstate 710 while self.qmp('query-status')['return']['status'] != expect_runstate: 711 pass 712 713 def node_info(self, node_name): 714 nodes = self.qmp('query-named-block-nodes') 715 for x in nodes['return']: 716 if x['node-name'] == node_name: 717 return x 718 return None 719 720 def query_bitmaps(self): 721 res = self.qmp("query-named-block-nodes") 722 return {device['node-name']: device['dirty-bitmaps'] 723 for device in res['return'] if 'dirty-bitmaps' in device} 724 725 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 726 """ 727 get a specific bitmap from the object returned by query_bitmaps. 728 :param recording: If specified, filter results by the specified value. 729 :param bitmaps: If specified, use it instead of call query_bitmaps() 730 """ 731 if bitmaps is None: 732 bitmaps = self.query_bitmaps() 733 734 for bitmap in bitmaps[node_name]: 735 if bitmap.get('name', '') == bitmap_name: 736 if recording is None or bitmap.get('recording') == recording: 737 return bitmap 738 return None 739 740 def check_bitmap_status(self, node_name, bitmap_name, fields): 741 ret = self.get_bitmap(node_name, bitmap_name) 742 743 return fields.items() <= ret.items() 744 745 def assert_block_path(self, root, path, expected_node, graph=None): 746 """ 747 Check whether the node under the given path in the block graph 748 is @expected_node. 749 750 @root is the node name of the node where the @path is rooted. 751 752 @path is a string that consists of child names separated by 753 slashes. It must begin with a slash. 754 755 Examples for @root + @path: 756 - root="qcow2-node", path="/backing/file" 757 - root="quorum-node", path="/children.2/file" 758 759 Hypothetically, @path could be empty, in which case it would 760 point to @root. However, in practice this case is not useful 761 and hence not allowed. 762 763 @expected_node may be None. (All elements of the path but the 764 leaf must still exist.) 765 766 @graph may be None or the result of an x-debug-query-block-graph 767 call that has already been performed. 768 """ 769 if graph is None: 770 graph = self.qmp('x-debug-query-block-graph')['return'] 771 772 iter_path = iter(path.split('/')) 773 774 # Must start with a / 775 assert next(iter_path) == '' 776 777 node = next((node for node in graph['nodes'] if node['name'] == root), 778 None) 779 780 # An empty @path is not allowed, so the root node must be present 781 assert node is not None, 'Root node %s not found' % root 782 783 for child_name in iter_path: 784 assert node is not None, 'Cannot follow path %s%s' % (root, path) 785 786 try: 787 node_id = next(edge['child'] for edge in graph['edges'] 788 if (edge['parent'] == node['id'] and 789 edge['name'] == child_name)) 790 791 node = next(node for node in graph['nodes'] 792 if node['id'] == node_id) 793 794 except StopIteration: 795 node = None 796 797 if node is None: 798 assert expected_node is None, \ 799 'No node found under %s (but expected %s)' % \ 800 (path, expected_node) 801 else: 802 assert node['name'] == expected_node, \ 803 'Found node %s under %s (but expected %s)' % \ 804 (node['name'], path, expected_node) 805 806index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 807 808class QMPTestCase(unittest.TestCase): 809 '''Abstract base class for QMP test cases''' 810 811 def __init__(self, *args, **kwargs): 812 super().__init__(*args, **kwargs) 813 # Many users of this class set a VM property we rely on heavily 814 # in the methods below. 815 self.vm = None 816 817 def dictpath(self, d, path): 818 '''Traverse a path in a nested dict''' 819 for component in path.split('/'): 820 m = index_re.match(component) 821 if m: 822 component, idx = m.groups() 823 idx = int(idx) 824 825 if not isinstance(d, dict) or component not in d: 826 self.fail(f'failed path traversal for "{path}" in "{d}"') 827 d = d[component] 828 829 if m: 830 if not isinstance(d, list): 831 self.fail(f'path component "{component}" in "{path}" ' 832 f'is not a list in "{d}"') 833 try: 834 d = d[idx] 835 except IndexError: 836 self.fail(f'invalid index "{idx}" in path "{path}" ' 837 f'in "{d}"') 838 return d 839 840 def assert_qmp_absent(self, d, path): 841 try: 842 result = self.dictpath(d, path) 843 except AssertionError: 844 return 845 self.fail('path "%s" has value "%s"' % (path, str(result))) 846 847 def assert_qmp(self, d, path, value): 848 '''Assert that the value for a specific path in a QMP dict 849 matches. When given a list of values, assert that any of 850 them matches.''' 851 852 result = self.dictpath(d, path) 853 854 # [] makes no sense as a list of valid values, so treat it as 855 # an actual single value. 856 if isinstance(value, list) and value != []: 857 for v in value: 858 if result == v: 859 return 860 self.fail('no match for "%s" in %s' % (str(result), str(value))) 861 else: 862 self.assertEqual(result, value, 863 '"%s" is "%s", expected "%s"' 864 % (path, str(result), str(value))) 865 866 def assert_no_active_block_jobs(self): 867 result = self.vm.qmp('query-block-jobs') 868 self.assert_qmp(result, 'return', []) 869 870 def assert_has_block_node(self, node_name=None, file_name=None): 871 """Issue a query-named-block-nodes and assert node_name and/or 872 file_name is present in the result""" 873 def check_equal_or_none(a, b): 874 return a is None or b is None or a == b 875 assert node_name or file_name 876 result = self.vm.qmp('query-named-block-nodes') 877 for x in result["return"]: 878 if check_equal_or_none(x.get("node-name"), node_name) and \ 879 check_equal_or_none(x.get("file"), file_name): 880 return 881 self.fail("Cannot find %s %s in result:\n%s" % 882 (node_name, file_name, result)) 883 884 def assert_json_filename_equal(self, json_filename, reference): 885 '''Asserts that the given filename is a json: filename and that its 886 content is equal to the given reference object''' 887 self.assertEqual(json_filename[:5], 'json:') 888 self.assertEqual( 889 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 890 self.vm.flatten_qmp_object(reference) 891 ) 892 893 def cancel_and_wait(self, drive='drive0', force=False, 894 resume=False, wait=60.0): 895 '''Cancel a block job and wait for it to finish, returning the event''' 896 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 897 self.assert_qmp(result, 'return', {}) 898 899 if resume: 900 self.vm.resume_drive(drive) 901 902 cancelled = False 903 result = None 904 while not cancelled: 905 for event in self.vm.get_qmp_events(wait=wait): 906 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 907 event['event'] == 'BLOCK_JOB_CANCELLED': 908 self.assert_qmp(event, 'data/device', drive) 909 result = event 910 cancelled = True 911 elif event['event'] == 'JOB_STATUS_CHANGE': 912 self.assert_qmp(event, 'data/id', drive) 913 914 915 self.assert_no_active_block_jobs() 916 return result 917 918 def wait_until_completed(self, drive='drive0', check_offset=True, 919 wait=60.0, error=None): 920 '''Wait for a block job to finish, returning the event''' 921 while True: 922 for event in self.vm.get_qmp_events(wait=wait): 923 if event['event'] == 'BLOCK_JOB_COMPLETED': 924 self.assert_qmp(event, 'data/device', drive) 925 if error is None: 926 self.assert_qmp_absent(event, 'data/error') 927 if check_offset: 928 self.assert_qmp(event, 'data/offset', 929 event['data']['len']) 930 else: 931 self.assert_qmp(event, 'data/error', error) 932 self.assert_no_active_block_jobs() 933 return event 934 if event['event'] == 'JOB_STATUS_CHANGE': 935 self.assert_qmp(event, 'data/id', drive) 936 937 def wait_ready(self, drive='drive0'): 938 """Wait until a BLOCK_JOB_READY event, and return the event.""" 939 f = {'data': {'type': 'mirror', 'device': drive}} 940 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 941 942 def wait_ready_and_cancel(self, drive='drive0'): 943 self.wait_ready(drive=drive) 944 event = self.cancel_and_wait(drive=drive) 945 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 946 self.assert_qmp(event, 'data/type', 'mirror') 947 self.assert_qmp(event, 'data/offset', event['data']['len']) 948 949 def complete_and_wait(self, drive='drive0', wait_ready=True, 950 completion_error=None): 951 '''Complete a block job and wait for it to finish''' 952 if wait_ready: 953 self.wait_ready(drive=drive) 954 955 result = self.vm.qmp('block-job-complete', device=drive) 956 self.assert_qmp(result, 'return', {}) 957 958 event = self.wait_until_completed(drive=drive, error=completion_error) 959 self.assert_qmp(event, 'data/type', 'mirror') 960 961 def pause_wait(self, job_id='job0'): 962 with Timeout(3, "Timeout waiting for job to pause"): 963 while True: 964 result = self.vm.qmp('query-block-jobs') 965 found = False 966 for job in result['return']: 967 if job['device'] == job_id: 968 found = True 969 if job['paused'] and not job['busy']: 970 return job 971 break 972 assert found 973 974 def pause_job(self, job_id='job0', wait=True): 975 result = self.vm.qmp('block-job-pause', device=job_id) 976 self.assert_qmp(result, 'return', {}) 977 if wait: 978 return self.pause_wait(job_id) 979 return result 980 981 def case_skip(self, reason): 982 '''Skip this test case''' 983 case_notrun(reason) 984 self.skipTest(reason) 985 986 987def notrun(reason): 988 '''Skip this test suite''' 989 # Each test in qemu-iotests has a number ("seq") 990 seq = os.path.basename(sys.argv[0]) 991 992 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 993 logger.warning("%s not run: %s", seq, reason) 994 sys.exit(0) 995 996def case_notrun(reason): 997 '''Mark this test case as not having been run (without actually 998 skipping it, that is left to the caller). See 999 QMPTestCase.case_skip() for a variant that actually skips the 1000 current test case.''' 1001 1002 # Each test in qemu-iotests has a number ("seq") 1003 seq = os.path.basename(sys.argv[0]) 1004 1005 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1006 ' [case not run] ' + reason + '\n') 1007 1008def _verify_image_format(supported_fmts: Sequence[str] = (), 1009 unsupported_fmts: Sequence[str] = ()) -> None: 1010 assert not (supported_fmts and unsupported_fmts) 1011 1012 if 'generic' in supported_fmts and \ 1013 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1014 # similar to 1015 # _supported_fmt generic 1016 # for bash tests 1017 if imgfmt == 'luks': 1018 verify_working_luks() 1019 return 1020 1021 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1022 if not_sup or (imgfmt in unsupported_fmts): 1023 notrun('not suitable for this image format: %s' % imgfmt) 1024 1025 if imgfmt == 'luks': 1026 verify_working_luks() 1027 1028def _verify_protocol(supported: Sequence[str] = (), 1029 unsupported: Sequence[str] = ()) -> None: 1030 assert not (supported and unsupported) 1031 1032 if 'generic' in supported: 1033 return 1034 1035 not_sup = supported and (imgproto not in supported) 1036 if not_sup or (imgproto in unsupported): 1037 notrun('not suitable for this protocol: %s' % imgproto) 1038 1039def _verify_platform(supported: Sequence[str] = (), 1040 unsupported: Sequence[str] = ()) -> None: 1041 if any((sys.platform.startswith(x) for x in unsupported)): 1042 notrun('not suitable for this OS: %s' % sys.platform) 1043 1044 if supported: 1045 if not any((sys.platform.startswith(x) for x in supported)): 1046 notrun('not suitable for this OS: %s' % sys.platform) 1047 1048def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1049 if supported_cache_modes and (cachemode not in supported_cache_modes): 1050 notrun('not suitable for this cache mode: %s' % cachemode) 1051 1052def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1053 if supported_aio_modes and (aiomode not in supported_aio_modes): 1054 notrun('not suitable for this aio mode: %s' % aiomode) 1055 1056def supports_quorum(): 1057 return 'quorum' in qemu_img_pipe('--help') 1058 1059def verify_quorum(): 1060 '''Skip test suite if quorum support is not available''' 1061 if not supports_quorum(): 1062 notrun('quorum support missing') 1063 1064def has_working_luks() -> Tuple[bool, str]: 1065 """ 1066 Check whether our LUKS driver can actually create images 1067 (this extends to LUKS encryption for qcow2). 1068 1069 If not, return the reason why. 1070 """ 1071 1072 img_file = f'{test_dir}/luks-test.luks' 1073 (output, status) = \ 1074 qemu_img_pipe_and_status('create', '-f', 'luks', 1075 '--object', luks_default_secret_object, 1076 '-o', luks_default_key_secret_opt, 1077 '-o', 'iter-time=10', 1078 img_file, '1G') 1079 try: 1080 os.remove(img_file) 1081 except OSError: 1082 pass 1083 1084 if status != 0: 1085 reason = output 1086 for line in output.splitlines(): 1087 if img_file + ':' in line: 1088 reason = line.split(img_file + ':', 1)[1].strip() 1089 break 1090 1091 return (False, reason) 1092 else: 1093 return (True, '') 1094 1095def verify_working_luks(): 1096 """ 1097 Skip test suite if LUKS does not work 1098 """ 1099 (working, reason) = has_working_luks() 1100 if not working: 1101 notrun(reason) 1102 1103def qemu_pipe(*args): 1104 """ 1105 Run qemu with an option to print something and exit (e.g. a help option). 1106 1107 :return: QEMU's stdout output. 1108 """ 1109 args = [qemu_prog] + qemu_opts + list(args) 1110 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1111 stderr=subprocess.STDOUT, 1112 universal_newlines=True) 1113 output = subp.communicate()[0] 1114 if subp.returncode < 0: 1115 sys.stderr.write('qemu received signal %i: %s\n' % 1116 (-subp.returncode, ' '.join(args))) 1117 return output 1118 1119def supported_formats(read_only=False): 1120 '''Set 'read_only' to True to check ro-whitelist 1121 Otherwise, rw-whitelist is checked''' 1122 1123 if not hasattr(supported_formats, "formats"): 1124 supported_formats.formats = {} 1125 1126 if read_only not in supported_formats.formats: 1127 format_message = qemu_pipe("-drive", "format=help") 1128 line = 1 if read_only else 0 1129 supported_formats.formats[read_only] = \ 1130 format_message.splitlines()[line].split(":")[1].split() 1131 1132 return supported_formats.formats[read_only] 1133 1134def skip_if_unsupported(required_formats=(), read_only=False): 1135 '''Skip Test Decorator 1136 Runs the test if all the required formats are whitelisted''' 1137 def skip_test_decorator(func): 1138 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1139 **kwargs: Dict[str, Any]) -> None: 1140 if callable(required_formats): 1141 fmts = required_formats(test_case) 1142 else: 1143 fmts = required_formats 1144 1145 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1146 if usf_list: 1147 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1148 test_case.case_skip(msg) 1149 else: 1150 func(test_case, *args, **kwargs) 1151 return func_wrapper 1152 return skip_test_decorator 1153 1154def skip_for_formats(formats: Sequence[str] = ()) \ 1155 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1156 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1157 '''Skip Test Decorator 1158 Skips the test for the given formats''' 1159 def skip_test_decorator(func): 1160 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1161 **kwargs: Dict[str, Any]) -> None: 1162 if imgfmt in formats: 1163 msg = f'{test_case}: Skipped for format {imgfmt}' 1164 test_case.case_skip(msg) 1165 else: 1166 func(test_case, *args, **kwargs) 1167 return func_wrapper 1168 return skip_test_decorator 1169 1170def skip_if_user_is_root(func): 1171 '''Skip Test Decorator 1172 Runs the test only without root permissions''' 1173 def func_wrapper(*args, **kwargs): 1174 if os.getuid() == 0: 1175 case_notrun('{}: cannot be run as root'.format(args[0])) 1176 return None 1177 else: 1178 return func(*args, **kwargs) 1179 return func_wrapper 1180 1181def execute_unittest(debug=False): 1182 """Executes unittests within the calling module.""" 1183 1184 verbosity = 2 if debug else 1 1185 1186 if debug: 1187 output = sys.stdout 1188 else: 1189 # We need to filter out the time taken from the output so that 1190 # qemu-iotest can reliably diff the results against master output. 1191 output = io.StringIO() 1192 1193 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1194 verbosity=verbosity) 1195 try: 1196 # unittest.main() will use sys.exit(); so expect a SystemExit 1197 # exception 1198 unittest.main(testRunner=runner) 1199 finally: 1200 # We need to filter out the time taken from the output so that 1201 # qemu-iotest can reliably diff the results against master output. 1202 if not debug: 1203 out = output.getvalue() 1204 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1205 1206 # Hide skipped tests from the reference output 1207 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1208 out_first_line, out_rest = out.split('\n', 1) 1209 out = out_first_line.replace('s', '.') + '\n' + out_rest 1210 1211 sys.stderr.write(out) 1212 1213def execute_setup_common(supported_fmts: Sequence[str] = (), 1214 supported_platforms: Sequence[str] = (), 1215 supported_cache_modes: Sequence[str] = (), 1216 supported_aio_modes: Sequence[str] = (), 1217 unsupported_fmts: Sequence[str] = (), 1218 supported_protocols: Sequence[str] = (), 1219 unsupported_protocols: Sequence[str] = ()) -> bool: 1220 """ 1221 Perform necessary setup for either script-style or unittest-style tests. 1222 1223 :return: Bool; Whether or not debug mode has been requested via the CLI. 1224 """ 1225 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1226 1227 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1228 # indicate that we're not being run via "check". There may be 1229 # other things set up by "check" that individual test cases rely 1230 # on. 1231 if test_dir is None or qemu_default_machine is None: 1232 sys.stderr.write('Please run this test via the "check" script\n') 1233 sys.exit(os.EX_USAGE) 1234 1235 debug = '-d' in sys.argv 1236 if debug: 1237 sys.argv.remove('-d') 1238 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1239 1240 _verify_image_format(supported_fmts, unsupported_fmts) 1241 _verify_protocol(supported_protocols, unsupported_protocols) 1242 _verify_platform(supported=supported_platforms) 1243 _verify_cache_mode(supported_cache_modes) 1244 _verify_aio_mode(supported_aio_modes) 1245 1246 return debug 1247 1248def execute_test(*args, test_function=None, **kwargs): 1249 """Run either unittest or script-style tests.""" 1250 1251 debug = execute_setup_common(*args, **kwargs) 1252 if not test_function: 1253 execute_unittest(debug) 1254 else: 1255 test_function() 1256 1257def activate_logging(): 1258 """Activate iotests.log() output to stdout for script-style tests.""" 1259 handler = logging.StreamHandler(stream=sys.stdout) 1260 formatter = logging.Formatter('%(message)s') 1261 handler.setFormatter(formatter) 1262 test_logger.addHandler(handler) 1263 test_logger.setLevel(logging.INFO) 1264 test_logger.propagate = False 1265 1266# This is called from script-style iotests without a single point of entry 1267def script_initialize(*args, **kwargs): 1268 """Initialize script-style tests without running any tests.""" 1269 activate_logging() 1270 execute_setup_common(*args, **kwargs) 1271 1272# This is called from script-style iotests with a single point of entry 1273def script_main(test_function, *args, **kwargs): 1274 """Run script-style tests outside of the unittest framework""" 1275 activate_logging() 1276 execute_test(*args, test_function=test_function, **kwargs) 1277 1278# This is called from unittest style iotests 1279def main(*args, **kwargs): 1280 """Run tests using the unittest framework""" 1281 execute_test(*args, **kwargs) 1282