1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31from typing import (Any, Callable, Dict, Iterable, 32 List, Optional, Sequence, Tuple, TypeVar) 33import unittest 34 35# pylint: disable=import-error, wrong-import-position 36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 37from qemu import qtest 38 39assert sys.version_info >= (3, 6) 40 41# Type Aliases 42QMPResponse = Dict[str, Any] 43 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79test_dir = os.environ.get('TEST_DIR') 80sock_dir = os.environ.get('SOCK_DIR') 81output_dir = os.environ.get('OUTPUT_DIR', '.') 82cachemode = os.environ.get('CACHEMODE') 83aiomode = os.environ.get('AIOMODE') 84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 85 86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 87 88luks_default_secret_object = 'secret,id=keysec0,data=' + \ 89 os.environ.get('IMGKEYSECRET', '') 90luks_default_key_secret_opt = 'key-secret=keysec0' 91 92 93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 94 """ 95 Run qemu-img and return both its output and its exit code 96 """ 97 subp = subprocess.Popen(qemu_img_args + list(args), 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 universal_newlines=True) 101 output = subp.communicate()[0] 102 if subp.returncode < 0: 103 sys.stderr.write('qemu-img received signal %i: %s\n' 104 % (-subp.returncode, 105 ' '.join(qemu_img_args + list(args)))) 106 return (output, subp.returncode) 107 108def qemu_img(*args: str) -> int: 109 '''Run qemu-img and return the exit code''' 110 return qemu_img_pipe_and_status(*args)[1] 111 112def ordered_qmp(qmsg, conv_keys=True): 113 # Dictionaries are not ordered prior to 3.6, therefore: 114 if isinstance(qmsg, list): 115 return [ordered_qmp(atom) for atom in qmsg] 116 if isinstance(qmsg, dict): 117 od = OrderedDict() 118 for k, v in sorted(qmsg.items()): 119 if conv_keys: 120 k = k.replace('_', '-') 121 od[k] = ordered_qmp(v, conv_keys=False) 122 return od 123 return qmsg 124 125def qemu_img_create(*args): 126 args = list(args) 127 128 # default luks support 129 if '-f' in args and args[args.index('-f') + 1] == 'luks': 130 if '-o' in args: 131 i = args.index('-o') 132 if 'key-secret' not in args[i + 1]: 133 args[i + 1].append(luks_default_key_secret_opt) 134 args.insert(i + 2, '--object') 135 args.insert(i + 3, luks_default_secret_object) 136 else: 137 args = ['-o', luks_default_key_secret_opt, 138 '--object', luks_default_secret_object] + args 139 140 args.insert(0, 'create') 141 142 return qemu_img(*args) 143 144def qemu_img_verbose(*args): 145 '''Run qemu-img without suppressing its output and return the exit code''' 146 exitcode = subprocess.call(qemu_img_args + list(args)) 147 if exitcode < 0: 148 sys.stderr.write('qemu-img received signal %i: %s\n' 149 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 150 return exitcode 151 152def qemu_img_pipe(*args: str) -> str: 153 '''Run qemu-img and return its output''' 154 return qemu_img_pipe_and_status(*args)[0] 155 156def qemu_img_log(*args): 157 result = qemu_img_pipe(*args) 158 log(result, filters=[filter_testfiles]) 159 return result 160 161def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 162 args = ['info'] 163 if imgopts: 164 args.append('--image-opts') 165 else: 166 args += ['-f', imgfmt] 167 args += extra_args 168 args.append(filename) 169 170 output = qemu_img_pipe(*args) 171 if not filter_path: 172 filter_path = filename 173 log(filter_img_info(output, filter_path)) 174 175def qemu_io(*args): 176 '''Run qemu-io and return the stdout data''' 177 args = qemu_io_args + list(args) 178 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 179 stderr=subprocess.STDOUT, 180 universal_newlines=True) 181 output = subp.communicate()[0] 182 if subp.returncode < 0: 183 sys.stderr.write('qemu-io received signal %i: %s\n' 184 % (-subp.returncode, ' '.join(args))) 185 return output 186 187def qemu_io_log(*args): 188 result = qemu_io(*args) 189 log(result, filters=[filter_testfiles, filter_qemu_io]) 190 return result 191 192def qemu_io_silent(*args): 193 '''Run qemu-io and return the exit code, suppressing stdout''' 194 args = qemu_io_args + list(args) 195 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 196 if exitcode < 0: 197 sys.stderr.write('qemu-io received signal %i: %s\n' % 198 (-exitcode, ' '.join(args))) 199 return exitcode 200 201def qemu_io_silent_check(*args): 202 '''Run qemu-io and return the true if subprocess returned 0''' 203 args = qemu_io_args + list(args) 204 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 205 stderr=subprocess.STDOUT) 206 return exitcode == 0 207 208def get_virtio_scsi_device(): 209 if qemu_default_machine == 's390-ccw-virtio': 210 return 'virtio-scsi-ccw' 211 return 'virtio-scsi-pci' 212 213class QemuIoInteractive: 214 def __init__(self, *args): 215 self.args = qemu_io_args + list(args) 216 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 217 stdout=subprocess.PIPE, 218 stderr=subprocess.STDOUT, 219 universal_newlines=True) 220 assert self._p.stdout.read(9) == 'qemu-io> ' 221 222 def close(self): 223 self._p.communicate('q\n') 224 225 def _read_output(self): 226 pattern = 'qemu-io> ' 227 n = len(pattern) 228 pos = 0 229 s = [] 230 while pos != n: 231 c = self._p.stdout.read(1) 232 # check unexpected EOF 233 assert c != '' 234 s.append(c) 235 if c == pattern[pos]: 236 pos += 1 237 else: 238 pos = 0 239 240 return ''.join(s[:-n]) 241 242 def cmd(self, cmd): 243 # quit command is in close(), '\n' is added automatically 244 assert '\n' not in cmd 245 cmd = cmd.strip() 246 assert cmd not in ('q', 'quit') 247 self._p.stdin.write(cmd + '\n') 248 self._p.stdin.flush() 249 return self._read_output() 250 251 252def qemu_nbd(*args): 253 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 254 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 255 256def qemu_nbd_early_pipe(*args): 257 '''Run qemu-nbd in daemon mode and return both the parent's exit code 258 and its output in case of an error''' 259 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 260 stdout=subprocess.PIPE, 261 universal_newlines=True) 262 output = subp.communicate()[0] 263 if subp.returncode < 0: 264 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 265 (-subp.returncode, 266 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 267 268 return subp.returncode, output if subp.returncode else '' 269 270def qemu_nbd_popen(*args): 271 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 272 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args)) 273 274def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 275 '''Return True if two image files are identical''' 276 return qemu_img('compare', '-f', fmt1, 277 '-F', fmt2, img1, img2) == 0 278 279def create_image(name, size): 280 '''Create a fully-allocated raw image with sector markers''' 281 file = open(name, 'wb') 282 i = 0 283 while i < size: 284 sector = struct.pack('>l504xl', i // 512, i // 512) 285 file.write(sector) 286 i = i + 512 287 file.close() 288 289def image_size(img): 290 '''Return image's virtual size''' 291 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 292 return json.loads(r)['virtual-size'] 293 294def is_str(val): 295 return isinstance(val, str) 296 297test_dir_re = re.compile(r"%s" % test_dir) 298def filter_test_dir(msg): 299 return test_dir_re.sub("TEST_DIR", msg) 300 301win32_re = re.compile(r"\r") 302def filter_win32(msg): 303 return win32_re.sub("", msg) 304 305qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 306 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 307 r"and [0-9\/.inf]* ops\/sec\)") 308def filter_qemu_io(msg): 309 msg = filter_win32(msg) 310 return qemu_io_re.sub("X ops; XX:XX:XX.X " 311 "(XXX YYY/sec and XXX ops/sec)", msg) 312 313chown_re = re.compile(r"chown [0-9]+:[0-9]+") 314def filter_chown(msg): 315 return chown_re.sub("chown UID:GID", msg) 316 317def filter_qmp_event(event): 318 '''Filter a QMP event dict''' 319 event = dict(event) 320 if 'timestamp' in event: 321 event['timestamp']['seconds'] = 'SECS' 322 event['timestamp']['microseconds'] = 'USECS' 323 return event 324 325def filter_qmp(qmsg, filter_fn): 326 '''Given a string filter, filter a QMP object's values. 327 filter_fn takes a (key, value) pair.''' 328 # Iterate through either lists or dicts; 329 if isinstance(qmsg, list): 330 items = enumerate(qmsg) 331 else: 332 items = qmsg.items() 333 334 for k, v in items: 335 if isinstance(v, (dict, list)): 336 qmsg[k] = filter_qmp(v, filter_fn) 337 else: 338 qmsg[k] = filter_fn(k, v) 339 return qmsg 340 341def filter_testfiles(msg): 342 prefix = os.path.join(test_dir, "%s-" % (os.getpid())) 343 return msg.replace(prefix, 'TEST_DIR/PID-') 344 345def filter_qmp_testfiles(qmsg): 346 def _filter(_key, value): 347 if is_str(value): 348 return filter_testfiles(value) 349 return value 350 return filter_qmp(qmsg, _filter) 351 352def filter_generated_node_ids(msg): 353 return re.sub("#block[0-9]+", "NODE_NAME", msg) 354 355def filter_img_info(output, filename): 356 lines = [] 357 for line in output.split('\n'): 358 if 'disk size' in line or 'actual-size' in line: 359 continue 360 line = line.replace(filename, 'TEST_IMG') 361 line = filter_testfiles(line) 362 line = line.replace(imgfmt, 'IMGFMT') 363 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 364 line = re.sub('uuid: [-a-f0-9]+', 365 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 366 line) 367 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 368 lines.append(line) 369 return '\n'.join(lines) 370 371def filter_imgfmt(msg): 372 return msg.replace(imgfmt, 'IMGFMT') 373 374def filter_qmp_imgfmt(qmsg): 375 def _filter(_key, value): 376 if is_str(value): 377 return filter_imgfmt(value) 378 return value 379 return filter_qmp(qmsg, _filter) 380 381 382Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 383 384def log(msg: Msg, 385 filters: Iterable[Callable[[Msg], Msg]] = (), 386 indent: Optional[int] = None) -> None: 387 """ 388 Logs either a string message or a JSON serializable message (like QMP). 389 If indent is provided, JSON serializable messages are pretty-printed. 390 """ 391 for flt in filters: 392 msg = flt(msg) 393 if isinstance(msg, (dict, list)): 394 # Don't sort if it's already sorted 395 do_sort = not isinstance(msg, OrderedDict) 396 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 397 else: 398 test_logger.info(msg) 399 400class Timeout: 401 def __init__(self, seconds, errmsg="Timeout"): 402 self.seconds = seconds 403 self.errmsg = errmsg 404 def __enter__(self): 405 signal.signal(signal.SIGALRM, self.timeout) 406 signal.setitimer(signal.ITIMER_REAL, self.seconds) 407 return self 408 def __exit__(self, exc_type, value, traceback): 409 signal.setitimer(signal.ITIMER_REAL, 0) 410 return False 411 def timeout(self, signum, frame): 412 raise Exception(self.errmsg) 413 414def file_pattern(name): 415 return "{0}-{1}".format(os.getpid(), name) 416 417class FilePaths: 418 """ 419 FilePaths is an auto-generated filename that cleans itself up. 420 421 Use this context manager to generate filenames and ensure that the file 422 gets deleted:: 423 424 with FilePaths(['test.img']) as img_path: 425 qemu_img('create', img_path, '1G') 426 # migration_sock_path is automatically deleted 427 """ 428 def __init__(self, names, base_dir=test_dir): 429 self.paths = [] 430 for name in names: 431 self.paths.append(os.path.join(base_dir, file_pattern(name))) 432 433 def __enter__(self): 434 return self.paths 435 436 def __exit__(self, exc_type, exc_val, exc_tb): 437 try: 438 for path in self.paths: 439 os.remove(path) 440 except OSError: 441 pass 442 return False 443 444class FilePath(FilePaths): 445 """ 446 FilePath is a specialization of FilePaths that takes a single filename. 447 """ 448 def __init__(self, name, base_dir=test_dir): 449 super(FilePath, self).__init__([name], base_dir) 450 451 def __enter__(self): 452 return self.paths[0] 453 454def file_path_remover(): 455 for path in reversed(file_path_remover.paths): 456 try: 457 os.remove(path) 458 except OSError: 459 pass 460 461 462def file_path(*names, base_dir=test_dir): 463 ''' Another way to get auto-generated filename that cleans itself up. 464 465 Use is as simple as: 466 467 img_a, img_b = file_path('a.img', 'b.img') 468 sock = file_path('socket') 469 ''' 470 471 if not hasattr(file_path_remover, 'paths'): 472 file_path_remover.paths = [] 473 atexit.register(file_path_remover) 474 475 paths = [] 476 for name in names: 477 filename = file_pattern(name) 478 path = os.path.join(base_dir, filename) 479 file_path_remover.paths.append(path) 480 paths.append(path) 481 482 return paths[0] if len(paths) == 1 else paths 483 484def remote_filename(path): 485 if imgproto == 'file': 486 return path 487 elif imgproto == 'ssh': 488 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 489 else: 490 raise Exception("Protocol %s not supported" % (imgproto)) 491 492class VM(qtest.QEMUQtestMachine): 493 '''A QEMU VM''' 494 495 def __init__(self, path_suffix=''): 496 name = "qemu%s-%d" % (path_suffix, os.getpid()) 497 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 498 test_dir=test_dir, 499 socket_scm_helper=socket_scm_helper, 500 sock_dir=sock_dir) 501 self._num_drives = 0 502 503 def add_object(self, opts): 504 self._args.append('-object') 505 self._args.append(opts) 506 return self 507 508 def add_device(self, opts): 509 self._args.append('-device') 510 self._args.append(opts) 511 return self 512 513 def add_drive_raw(self, opts): 514 self._args.append('-drive') 515 self._args.append(opts) 516 return self 517 518 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 519 '''Add a virtio-blk drive to the VM''' 520 options = ['if=%s' % interface, 521 'id=drive%d' % self._num_drives] 522 523 if path is not None: 524 options.append('file=%s' % path) 525 options.append('format=%s' % img_format) 526 options.append('cache=%s' % cachemode) 527 options.append('aio=%s' % aiomode) 528 529 if opts: 530 options.append(opts) 531 532 if img_format == 'luks' and 'key-secret' not in opts: 533 # default luks support 534 if luks_default_secret_object not in self._args: 535 self.add_object(luks_default_secret_object) 536 537 options.append(luks_default_key_secret_opt) 538 539 self._args.append('-drive') 540 self._args.append(','.join(options)) 541 self._num_drives += 1 542 return self 543 544 def add_blockdev(self, opts): 545 self._args.append('-blockdev') 546 if isinstance(opts, str): 547 self._args.append(opts) 548 else: 549 self._args.append(','.join(opts)) 550 return self 551 552 def add_incoming(self, addr): 553 self._args.append('-incoming') 554 self._args.append(addr) 555 return self 556 557 def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse: 558 cmd = 'human-monitor-command' 559 kwargs = {'command-line': command_line} 560 if use_log: 561 return self.qmp_log(cmd, **kwargs) 562 else: 563 return self.qmp(cmd, **kwargs) 564 565 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 566 """Pause drive r/w operations""" 567 if not event: 568 self.pause_drive(drive, "read_aio") 569 self.pause_drive(drive, "write_aio") 570 return 571 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 572 573 def resume_drive(self, drive: str) -> None: 574 """Resume drive r/w operations""" 575 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 576 577 def hmp_qemu_io(self, drive: str, cmd: str, 578 use_log: bool = False) -> QMPResponse: 579 """Write to a given drive using an HMP command""" 580 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 581 582 def flatten_qmp_object(self, obj, output=None, basestr=''): 583 if output is None: 584 output = dict() 585 if isinstance(obj, list): 586 for i, item in enumerate(obj): 587 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 588 elif isinstance(obj, dict): 589 for key in obj: 590 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 591 else: 592 output[basestr[:-1]] = obj # Strip trailing '.' 593 return output 594 595 def qmp_to_opts(self, obj): 596 obj = self.flatten_qmp_object(obj) 597 output_list = list() 598 for key in obj: 599 output_list += [key + '=' + obj[key]] 600 return ','.join(output_list) 601 602 def get_qmp_events_filtered(self, wait=60.0): 603 result = [] 604 for ev in self.get_qmp_events(wait=wait): 605 result.append(filter_qmp_event(ev)) 606 return result 607 608 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 609 full_cmd = OrderedDict(( 610 ("execute", cmd), 611 ("arguments", ordered_qmp(kwargs)) 612 )) 613 log(full_cmd, filters, indent=indent) 614 result = self.qmp(cmd, **kwargs) 615 log(result, filters, indent=indent) 616 return result 617 618 # Returns None on success, and an error string on failure 619 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 620 pre_finalize=None, cancel=False, wait=60.0): 621 """ 622 run_job moves a job from creation through to dismissal. 623 624 :param job: String. ID of recently-launched job 625 :param auto_finalize: Bool. True if the job was launched with 626 auto_finalize. Defaults to True. 627 :param auto_dismiss: Bool. True if the job was launched with 628 auto_dismiss=True. Defaults to False. 629 :param pre_finalize: Callback. A callable that takes no arguments to be 630 invoked prior to issuing job-finalize, if any. 631 :param cancel: Bool. When true, cancels the job after the pre_finalize 632 callback. 633 :param wait: Float. Timeout value specifying how long to wait for any 634 event, in seconds. Defaults to 60.0. 635 """ 636 match_device = {'data': {'device': job}} 637 match_id = {'data': {'id': job}} 638 events = [ 639 ('BLOCK_JOB_COMPLETED', match_device), 640 ('BLOCK_JOB_CANCELLED', match_device), 641 ('BLOCK_JOB_ERROR', match_device), 642 ('BLOCK_JOB_READY', match_device), 643 ('BLOCK_JOB_PENDING', match_id), 644 ('JOB_STATUS_CHANGE', match_id) 645 ] 646 error = None 647 while True: 648 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 649 if ev['event'] != 'JOB_STATUS_CHANGE': 650 log(ev) 651 continue 652 status = ev['data']['status'] 653 if status == 'aborting': 654 result = self.qmp('query-jobs') 655 for j in result['return']: 656 if j['id'] == job: 657 error = j['error'] 658 log('Job failed: %s' % (j['error'])) 659 elif status == 'ready': 660 self.qmp_log('job-complete', id=job) 661 elif status == 'pending' and not auto_finalize: 662 if pre_finalize: 663 pre_finalize() 664 if cancel: 665 self.qmp_log('job-cancel', id=job) 666 else: 667 self.qmp_log('job-finalize', id=job) 668 elif status == 'concluded' and not auto_dismiss: 669 self.qmp_log('job-dismiss', id=job) 670 elif status == 'null': 671 return error 672 673 # Returns None on success, and an error string on failure 674 def blockdev_create(self, options, job_id='job0', filters=None): 675 if filters is None: 676 filters = [filter_qmp_testfiles] 677 result = self.qmp_log('blockdev-create', filters=filters, 678 job_id=job_id, options=options) 679 680 if 'return' in result: 681 assert result['return'] == {} 682 job_result = self.run_job(job_id) 683 else: 684 job_result = result['error'] 685 686 log("") 687 return job_result 688 689 def enable_migration_events(self, name): 690 log('Enabling migration QMP events on %s...' % name) 691 log(self.qmp('migrate-set-capabilities', capabilities=[ 692 { 693 'capability': 'events', 694 'state': True 695 } 696 ])) 697 698 def wait_migration(self, expect_runstate): 699 while True: 700 event = self.event_wait('MIGRATION') 701 log(event, filters=[filter_qmp_event]) 702 if event['data']['status'] == 'completed': 703 break 704 # The event may occur in finish-migrate, so wait for the expected 705 # post-migration runstate 706 while self.qmp('query-status')['return']['status'] != expect_runstate: 707 pass 708 709 def node_info(self, node_name): 710 nodes = self.qmp('query-named-block-nodes') 711 for x in nodes['return']: 712 if x['node-name'] == node_name: 713 return x 714 return None 715 716 def query_bitmaps(self): 717 res = self.qmp("query-named-block-nodes") 718 return {device['node-name']: device['dirty-bitmaps'] 719 for device in res['return'] if 'dirty-bitmaps' in device} 720 721 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 722 """ 723 get a specific bitmap from the object returned by query_bitmaps. 724 :param recording: If specified, filter results by the specified value. 725 :param bitmaps: If specified, use it instead of call query_bitmaps() 726 """ 727 if bitmaps is None: 728 bitmaps = self.query_bitmaps() 729 730 for bitmap in bitmaps[node_name]: 731 if bitmap.get('name', '') == bitmap_name: 732 if recording is None or bitmap.get('recording') == recording: 733 return bitmap 734 return None 735 736 def check_bitmap_status(self, node_name, bitmap_name, fields): 737 ret = self.get_bitmap(node_name, bitmap_name) 738 739 return fields.items() <= ret.items() 740 741 def assert_block_path(self, root, path, expected_node, graph=None): 742 """ 743 Check whether the node under the given path in the block graph 744 is @expected_node. 745 746 @root is the node name of the node where the @path is rooted. 747 748 @path is a string that consists of child names separated by 749 slashes. It must begin with a slash. 750 751 Examples for @root + @path: 752 - root="qcow2-node", path="/backing/file" 753 - root="quorum-node", path="/children.2/file" 754 755 Hypothetically, @path could be empty, in which case it would 756 point to @root. However, in practice this case is not useful 757 and hence not allowed. 758 759 @expected_node may be None. (All elements of the path but the 760 leaf must still exist.) 761 762 @graph may be None or the result of an x-debug-query-block-graph 763 call that has already been performed. 764 """ 765 if graph is None: 766 graph = self.qmp('x-debug-query-block-graph')['return'] 767 768 iter_path = iter(path.split('/')) 769 770 # Must start with a / 771 assert next(iter_path) == '' 772 773 node = next((node for node in graph['nodes'] if node['name'] == root), 774 None) 775 776 # An empty @path is not allowed, so the root node must be present 777 assert node is not None, 'Root node %s not found' % root 778 779 for child_name in iter_path: 780 assert node is not None, 'Cannot follow path %s%s' % (root, path) 781 782 try: 783 node_id = next(edge['child'] for edge in graph['edges'] 784 if (edge['parent'] == node['id'] and 785 edge['name'] == child_name)) 786 787 node = next(node for node in graph['nodes'] 788 if node['id'] == node_id) 789 790 except StopIteration: 791 node = None 792 793 if node is None: 794 assert expected_node is None, \ 795 'No node found under %s (but expected %s)' % \ 796 (path, expected_node) 797 else: 798 assert node['name'] == expected_node, \ 799 'Found node %s under %s (but expected %s)' % \ 800 (node['name'], path, expected_node) 801 802index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 803 804class QMPTestCase(unittest.TestCase): 805 '''Abstract base class for QMP test cases''' 806 807 def __init__(self, *args, **kwargs): 808 super().__init__(*args, **kwargs) 809 # Many users of this class set a VM property we rely on heavily 810 # in the methods below. 811 self.vm = None 812 813 def dictpath(self, d, path): 814 '''Traverse a path in a nested dict''' 815 for component in path.split('/'): 816 m = index_re.match(component) 817 if m: 818 component, idx = m.groups() 819 idx = int(idx) 820 821 if not isinstance(d, dict) or component not in d: 822 self.fail(f'failed path traversal for "{path}" in "{d}"') 823 d = d[component] 824 825 if m: 826 if not isinstance(d, list): 827 self.fail(f'path component "{component}" in "{path}" ' 828 f'is not a list in "{d}"') 829 try: 830 d = d[idx] 831 except IndexError: 832 self.fail(f'invalid index "{idx}" in path "{path}" ' 833 f'in "{d}"') 834 return d 835 836 def assert_qmp_absent(self, d, path): 837 try: 838 result = self.dictpath(d, path) 839 except AssertionError: 840 return 841 self.fail('path "%s" has value "%s"' % (path, str(result))) 842 843 def assert_qmp(self, d, path, value): 844 '''Assert that the value for a specific path in a QMP dict 845 matches. When given a list of values, assert that any of 846 them matches.''' 847 848 result = self.dictpath(d, path) 849 850 # [] makes no sense as a list of valid values, so treat it as 851 # an actual single value. 852 if isinstance(value, list) and value != []: 853 for v in value: 854 if result == v: 855 return 856 self.fail('no match for "%s" in %s' % (str(result), str(value))) 857 else: 858 self.assertEqual(result, value, 859 '"%s" is "%s", expected "%s"' 860 % (path, str(result), str(value))) 861 862 def assert_no_active_block_jobs(self): 863 result = self.vm.qmp('query-block-jobs') 864 self.assert_qmp(result, 'return', []) 865 866 def assert_has_block_node(self, node_name=None, file_name=None): 867 """Issue a query-named-block-nodes and assert node_name and/or 868 file_name is present in the result""" 869 def check_equal_or_none(a, b): 870 return a is None or b is None or a == b 871 assert node_name or file_name 872 result = self.vm.qmp('query-named-block-nodes') 873 for x in result["return"]: 874 if check_equal_or_none(x.get("node-name"), node_name) and \ 875 check_equal_or_none(x.get("file"), file_name): 876 return 877 self.fail("Cannot find %s %s in result:\n%s" % 878 (node_name, file_name, result)) 879 880 def assert_json_filename_equal(self, json_filename, reference): 881 '''Asserts that the given filename is a json: filename and that its 882 content is equal to the given reference object''' 883 self.assertEqual(json_filename[:5], 'json:') 884 self.assertEqual( 885 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 886 self.vm.flatten_qmp_object(reference) 887 ) 888 889 def cancel_and_wait(self, drive='drive0', force=False, 890 resume=False, wait=60.0): 891 '''Cancel a block job and wait for it to finish, returning the event''' 892 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 893 self.assert_qmp(result, 'return', {}) 894 895 if resume: 896 self.vm.resume_drive(drive) 897 898 cancelled = False 899 result = None 900 while not cancelled: 901 for event in self.vm.get_qmp_events(wait=wait): 902 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 903 event['event'] == 'BLOCK_JOB_CANCELLED': 904 self.assert_qmp(event, 'data/device', drive) 905 result = event 906 cancelled = True 907 elif event['event'] == 'JOB_STATUS_CHANGE': 908 self.assert_qmp(event, 'data/id', drive) 909 910 911 self.assert_no_active_block_jobs() 912 return result 913 914 def wait_until_completed(self, drive='drive0', check_offset=True, 915 wait=60.0, error=None): 916 '''Wait for a block job to finish, returning the event''' 917 while True: 918 for event in self.vm.get_qmp_events(wait=wait): 919 if event['event'] == 'BLOCK_JOB_COMPLETED': 920 self.assert_qmp(event, 'data/device', drive) 921 if error is None: 922 self.assert_qmp_absent(event, 'data/error') 923 if check_offset: 924 self.assert_qmp(event, 'data/offset', 925 event['data']['len']) 926 else: 927 self.assert_qmp(event, 'data/error', error) 928 self.assert_no_active_block_jobs() 929 return event 930 if event['event'] == 'JOB_STATUS_CHANGE': 931 self.assert_qmp(event, 'data/id', drive) 932 933 def wait_ready(self, drive='drive0'): 934 """Wait until a BLOCK_JOB_READY event, and return the event.""" 935 f = {'data': {'type': 'mirror', 'device': drive}} 936 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 937 938 def wait_ready_and_cancel(self, drive='drive0'): 939 self.wait_ready(drive=drive) 940 event = self.cancel_and_wait(drive=drive) 941 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 942 self.assert_qmp(event, 'data/type', 'mirror') 943 self.assert_qmp(event, 'data/offset', event['data']['len']) 944 945 def complete_and_wait(self, drive='drive0', wait_ready=True, 946 completion_error=None): 947 '''Complete a block job and wait for it to finish''' 948 if wait_ready: 949 self.wait_ready(drive=drive) 950 951 result = self.vm.qmp('block-job-complete', device=drive) 952 self.assert_qmp(result, 'return', {}) 953 954 event = self.wait_until_completed(drive=drive, error=completion_error) 955 self.assert_qmp(event, 'data/type', 'mirror') 956 957 def pause_wait(self, job_id='job0'): 958 with Timeout(3, "Timeout waiting for job to pause"): 959 while True: 960 result = self.vm.qmp('query-block-jobs') 961 found = False 962 for job in result['return']: 963 if job['device'] == job_id: 964 found = True 965 if job['paused'] and not job['busy']: 966 return job 967 break 968 assert found 969 970 def pause_job(self, job_id='job0', wait=True): 971 result = self.vm.qmp('block-job-pause', device=job_id) 972 self.assert_qmp(result, 'return', {}) 973 if wait: 974 return self.pause_wait(job_id) 975 return result 976 977 def case_skip(self, reason): 978 '''Skip this test case''' 979 case_notrun(reason) 980 self.skipTest(reason) 981 982 983def notrun(reason): 984 '''Skip this test suite''' 985 # Each test in qemu-iotests has a number ("seq") 986 seq = os.path.basename(sys.argv[0]) 987 988 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 989 logger.warning("%s not run: %s", seq, reason) 990 sys.exit(0) 991 992def case_notrun(reason): 993 '''Mark this test case as not having been run (without actually 994 skipping it, that is left to the caller). See 995 QMPTestCase.case_skip() for a variant that actually skips the 996 current test case.''' 997 998 # Each test in qemu-iotests has a number ("seq") 999 seq = os.path.basename(sys.argv[0]) 1000 1001 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1002 ' [case not run] ' + reason + '\n') 1003 1004def _verify_image_format(supported_fmts: Sequence[str] = (), 1005 unsupported_fmts: Sequence[str] = ()) -> None: 1006 assert not (supported_fmts and unsupported_fmts) 1007 1008 if 'generic' in supported_fmts and \ 1009 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1010 # similar to 1011 # _supported_fmt generic 1012 # for bash tests 1013 if imgfmt == 'luks': 1014 verify_working_luks() 1015 return 1016 1017 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1018 if not_sup or (imgfmt in unsupported_fmts): 1019 notrun('not suitable for this image format: %s' % imgfmt) 1020 1021 if imgfmt == 'luks': 1022 verify_working_luks() 1023 1024def _verify_protocol(supported: Sequence[str] = (), 1025 unsupported: Sequence[str] = ()) -> None: 1026 assert not (supported and unsupported) 1027 1028 if 'generic' in supported: 1029 return 1030 1031 not_sup = supported and (imgproto not in supported) 1032 if not_sup or (imgproto in unsupported): 1033 notrun('not suitable for this protocol: %s' % imgproto) 1034 1035def _verify_platform(supported: Sequence[str] = (), 1036 unsupported: Sequence[str] = ()) -> None: 1037 if any((sys.platform.startswith(x) for x in unsupported)): 1038 notrun('not suitable for this OS: %s' % sys.platform) 1039 1040 if supported: 1041 if not any((sys.platform.startswith(x) for x in supported)): 1042 notrun('not suitable for this OS: %s' % sys.platform) 1043 1044def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1045 if supported_cache_modes and (cachemode not in supported_cache_modes): 1046 notrun('not suitable for this cache mode: %s' % cachemode) 1047 1048def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1049 if supported_aio_modes and (aiomode not in supported_aio_modes): 1050 notrun('not suitable for this aio mode: %s' % aiomode) 1051 1052def supports_quorum(): 1053 return 'quorum' in qemu_img_pipe('--help') 1054 1055def verify_quorum(): 1056 '''Skip test suite if quorum support is not available''' 1057 if not supports_quorum(): 1058 notrun('quorum support missing') 1059 1060def has_working_luks() -> Tuple[bool, str]: 1061 """ 1062 Check whether our LUKS driver can actually create images 1063 (this extends to LUKS encryption for qcow2). 1064 1065 If not, return the reason why. 1066 """ 1067 1068 img_file = f'{test_dir}/luks-test.luks' 1069 (output, status) = \ 1070 qemu_img_pipe_and_status('create', '-f', 'luks', 1071 '--object', luks_default_secret_object, 1072 '-o', luks_default_key_secret_opt, 1073 '-o', 'iter-time=10', 1074 img_file, '1G') 1075 try: 1076 os.remove(img_file) 1077 except OSError: 1078 pass 1079 1080 if status != 0: 1081 reason = output 1082 for line in output.splitlines(): 1083 if img_file + ':' in line: 1084 reason = line.split(img_file + ':', 1)[1].strip() 1085 break 1086 1087 return (False, reason) 1088 else: 1089 return (True, '') 1090 1091def verify_working_luks(): 1092 """ 1093 Skip test suite if LUKS does not work 1094 """ 1095 (working, reason) = has_working_luks() 1096 if not working: 1097 notrun(reason) 1098 1099def qemu_pipe(*args): 1100 """ 1101 Run qemu with an option to print something and exit (e.g. a help option). 1102 1103 :return: QEMU's stdout output. 1104 """ 1105 args = [qemu_prog] + qemu_opts + list(args) 1106 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1107 stderr=subprocess.STDOUT, 1108 universal_newlines=True) 1109 output = subp.communicate()[0] 1110 if subp.returncode < 0: 1111 sys.stderr.write('qemu received signal %i: %s\n' % 1112 (-subp.returncode, ' '.join(args))) 1113 return output 1114 1115def supported_formats(read_only=False): 1116 '''Set 'read_only' to True to check ro-whitelist 1117 Otherwise, rw-whitelist is checked''' 1118 1119 if not hasattr(supported_formats, "formats"): 1120 supported_formats.formats = {} 1121 1122 if read_only not in supported_formats.formats: 1123 format_message = qemu_pipe("-drive", "format=help") 1124 line = 1 if read_only else 0 1125 supported_formats.formats[read_only] = \ 1126 format_message.splitlines()[line].split(":")[1].split() 1127 1128 return supported_formats.formats[read_only] 1129 1130def skip_if_unsupported(required_formats=(), read_only=False): 1131 '''Skip Test Decorator 1132 Runs the test if all the required formats are whitelisted''' 1133 def skip_test_decorator(func): 1134 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1135 **kwargs: Dict[str, Any]) -> None: 1136 if callable(required_formats): 1137 fmts = required_formats(test_case) 1138 else: 1139 fmts = required_formats 1140 1141 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1142 if usf_list: 1143 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1144 test_case.case_skip(msg) 1145 else: 1146 func(test_case, *args, **kwargs) 1147 return func_wrapper 1148 return skip_test_decorator 1149 1150def skip_for_formats(formats: Sequence[str] = ()) \ 1151 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1152 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1153 '''Skip Test Decorator 1154 Skips the test for the given formats''' 1155 def skip_test_decorator(func): 1156 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1157 **kwargs: Dict[str, Any]) -> None: 1158 if imgfmt in formats: 1159 msg = f'{test_case}: Skipped for format {imgfmt}' 1160 test_case.case_skip(msg) 1161 else: 1162 func(test_case, *args, **kwargs) 1163 return func_wrapper 1164 return skip_test_decorator 1165 1166def skip_if_user_is_root(func): 1167 '''Skip Test Decorator 1168 Runs the test only without root permissions''' 1169 def func_wrapper(*args, **kwargs): 1170 if os.getuid() == 0: 1171 case_notrun('{}: cannot be run as root'.format(args[0])) 1172 return None 1173 else: 1174 return func(*args, **kwargs) 1175 return func_wrapper 1176 1177def execute_unittest(debug=False): 1178 """Executes unittests within the calling module.""" 1179 1180 verbosity = 2 if debug else 1 1181 1182 if debug: 1183 output = sys.stdout 1184 else: 1185 # We need to filter out the time taken from the output so that 1186 # qemu-iotest can reliably diff the results against master output. 1187 output = io.StringIO() 1188 1189 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1190 verbosity=verbosity) 1191 try: 1192 # unittest.main() will use sys.exit(); so expect a SystemExit 1193 # exception 1194 unittest.main(testRunner=runner) 1195 finally: 1196 # We need to filter out the time taken from the output so that 1197 # qemu-iotest can reliably diff the results against master output. 1198 if not debug: 1199 out = output.getvalue() 1200 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1201 1202 # Hide skipped tests from the reference output 1203 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1204 out_first_line, out_rest = out.split('\n', 1) 1205 out = out_first_line.replace('s', '.') + '\n' + out_rest 1206 1207 sys.stderr.write(out) 1208 1209def execute_setup_common(supported_fmts: Sequence[str] = (), 1210 supported_platforms: Sequence[str] = (), 1211 supported_cache_modes: Sequence[str] = (), 1212 supported_aio_modes: Sequence[str] = (), 1213 unsupported_fmts: Sequence[str] = (), 1214 supported_protocols: Sequence[str] = (), 1215 unsupported_protocols: Sequence[str] = ()) -> bool: 1216 """ 1217 Perform necessary setup for either script-style or unittest-style tests. 1218 1219 :return: Bool; Whether or not debug mode has been requested via the CLI. 1220 """ 1221 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1222 1223 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1224 # indicate that we're not being run via "check". There may be 1225 # other things set up by "check" that individual test cases rely 1226 # on. 1227 if test_dir is None or qemu_default_machine is None: 1228 sys.stderr.write('Please run this test via the "check" script\n') 1229 sys.exit(os.EX_USAGE) 1230 1231 debug = '-d' in sys.argv 1232 if debug: 1233 sys.argv.remove('-d') 1234 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1235 1236 _verify_image_format(supported_fmts, unsupported_fmts) 1237 _verify_protocol(supported_protocols, unsupported_protocols) 1238 _verify_platform(supported=supported_platforms) 1239 _verify_cache_mode(supported_cache_modes) 1240 _verify_aio_mode(supported_aio_modes) 1241 1242 return debug 1243 1244def execute_test(*args, test_function=None, **kwargs): 1245 """Run either unittest or script-style tests.""" 1246 1247 debug = execute_setup_common(*args, **kwargs) 1248 if not test_function: 1249 execute_unittest(debug) 1250 else: 1251 test_function() 1252 1253def activate_logging(): 1254 """Activate iotests.log() output to stdout for script-style tests.""" 1255 handler = logging.StreamHandler(stream=sys.stdout) 1256 formatter = logging.Formatter('%(message)s') 1257 handler.setFormatter(formatter) 1258 test_logger.addHandler(handler) 1259 test_logger.setLevel(logging.INFO) 1260 test_logger.propagate = False 1261 1262# This is called from script-style iotests without a single point of entry 1263def script_initialize(*args, **kwargs): 1264 """Initialize script-style tests without running any tests.""" 1265 activate_logging() 1266 execute_setup_common(*args, **kwargs) 1267 1268# This is called from script-style iotests with a single point of entry 1269def script_main(test_function, *args, **kwargs): 1270 """Run script-style tests outside of the unittest framework""" 1271 activate_logging() 1272 execute_test(*args, test_function=test_function, **kwargs) 1273 1274# This is called from unittest style iotests 1275def main(*args, **kwargs): 1276 """Run tests using the unittest framework""" 1277 execute_test(*args, **kwargs) 1278