1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43assert sys.version_info >= (3, 6) 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79test_dir = os.environ.get('TEST_DIR') 80sock_dir = os.environ.get('SOCK_DIR') 81output_dir = os.environ.get('OUTPUT_DIR', '.') 82cachemode = os.environ.get('CACHEMODE') 83aiomode = os.environ.get('AIOMODE') 84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 85 86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 87 88luks_default_secret_object = 'secret,id=keysec0,data=' + \ 89 os.environ.get('IMGKEYSECRET', '') 90luks_default_key_secret_opt = 'key-secret=keysec0' 91 92 93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 94 """ 95 Run qemu-img and return both its output and its exit code 96 """ 97 subp = subprocess.Popen(qemu_img_args + list(args), 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 universal_newlines=True) 101 output = subp.communicate()[0] 102 if subp.returncode < 0: 103 sys.stderr.write('qemu-img received signal %i: %s\n' 104 % (-subp.returncode, 105 ' '.join(qemu_img_args + list(args)))) 106 return (output, subp.returncode) 107 108def qemu_img(*args: str) -> int: 109 '''Run qemu-img and return the exit code''' 110 return qemu_img_pipe_and_status(*args)[1] 111 112def ordered_qmp(qmsg, conv_keys=True): 113 # Dictionaries are not ordered prior to 3.6, therefore: 114 if isinstance(qmsg, list): 115 return [ordered_qmp(atom) for atom in qmsg] 116 if isinstance(qmsg, dict): 117 od = OrderedDict() 118 for k, v in sorted(qmsg.items()): 119 if conv_keys: 120 k = k.replace('_', '-') 121 od[k] = ordered_qmp(v, conv_keys=False) 122 return od 123 return qmsg 124 125def qemu_img_create(*args): 126 args = list(args) 127 128 # default luks support 129 if '-f' in args and args[args.index('-f') + 1] == 'luks': 130 if '-o' in args: 131 i = args.index('-o') 132 if 'key-secret' not in args[i + 1]: 133 args[i + 1].append(luks_default_key_secret_opt) 134 args.insert(i + 2, '--object') 135 args.insert(i + 3, luks_default_secret_object) 136 else: 137 args = ['-o', luks_default_key_secret_opt, 138 '--object', luks_default_secret_object] + args 139 140 args.insert(0, 'create') 141 142 return qemu_img(*args) 143 144def qemu_img_measure(*args): 145 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 146 147def qemu_img_check(*args): 148 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 149 150def qemu_img_verbose(*args): 151 '''Run qemu-img without suppressing its output and return the exit code''' 152 exitcode = subprocess.call(qemu_img_args + list(args)) 153 if exitcode < 0: 154 sys.stderr.write('qemu-img received signal %i: %s\n' 155 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 156 return exitcode 157 158def qemu_img_pipe(*args: str) -> str: 159 '''Run qemu-img and return its output''' 160 return qemu_img_pipe_and_status(*args)[0] 161 162def qemu_img_log(*args): 163 result = qemu_img_pipe(*args) 164 log(result, filters=[filter_testfiles]) 165 return result 166 167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 168 args = ['info'] 169 if imgopts: 170 args.append('--image-opts') 171 else: 172 args += ['-f', imgfmt] 173 args += extra_args 174 args.append(filename) 175 176 output = qemu_img_pipe(*args) 177 if not filter_path: 178 filter_path = filename 179 log(filter_img_info(output, filter_path)) 180 181def qemu_io(*args): 182 '''Run qemu-io and return the stdout data''' 183 args = qemu_io_args + list(args) 184 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 185 stderr=subprocess.STDOUT, 186 universal_newlines=True) 187 output = subp.communicate()[0] 188 if subp.returncode < 0: 189 sys.stderr.write('qemu-io received signal %i: %s\n' 190 % (-subp.returncode, ' '.join(args))) 191 return output 192 193def qemu_io_log(*args): 194 result = qemu_io(*args) 195 log(result, filters=[filter_testfiles, filter_qemu_io]) 196 return result 197 198def qemu_io_silent(*args): 199 '''Run qemu-io and return the exit code, suppressing stdout''' 200 args = qemu_io_args + list(args) 201 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 202 if exitcode < 0: 203 sys.stderr.write('qemu-io received signal %i: %s\n' % 204 (-exitcode, ' '.join(args))) 205 return exitcode 206 207def qemu_io_silent_check(*args): 208 '''Run qemu-io and return the true if subprocess returned 0''' 209 args = qemu_io_args + list(args) 210 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 211 stderr=subprocess.STDOUT) 212 return exitcode == 0 213 214def get_virtio_scsi_device(): 215 if qemu_default_machine == 's390-ccw-virtio': 216 return 'virtio-scsi-ccw' 217 return 'virtio-scsi-pci' 218 219class QemuIoInteractive: 220 def __init__(self, *args): 221 self.args = qemu_io_args_no_fmt + list(args) 222 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 223 stdout=subprocess.PIPE, 224 stderr=subprocess.STDOUT, 225 universal_newlines=True) 226 out = self._p.stdout.read(9) 227 if out != 'qemu-io> ': 228 # Most probably qemu-io just failed to start. 229 # Let's collect the whole output and exit. 230 out += self._p.stdout.read() 231 self._p.wait(timeout=1) 232 raise ValueError(out) 233 234 def close(self): 235 self._p.communicate('q\n') 236 237 def _read_output(self): 238 pattern = 'qemu-io> ' 239 n = len(pattern) 240 pos = 0 241 s = [] 242 while pos != n: 243 c = self._p.stdout.read(1) 244 # check unexpected EOF 245 assert c != '' 246 s.append(c) 247 if c == pattern[pos]: 248 pos += 1 249 else: 250 pos = 0 251 252 return ''.join(s[:-n]) 253 254 def cmd(self, cmd): 255 # quit command is in close(), '\n' is added automatically 256 assert '\n' not in cmd 257 cmd = cmd.strip() 258 assert cmd not in ('q', 'quit') 259 self._p.stdin.write(cmd + '\n') 260 self._p.stdin.flush() 261 return self._read_output() 262 263 264def qemu_nbd(*args): 265 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 266 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 267 268def qemu_nbd_early_pipe(*args): 269 '''Run qemu-nbd in daemon mode and return both the parent's exit code 270 and its output in case of an error''' 271 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 272 stdout=subprocess.PIPE, 273 universal_newlines=True) 274 output = subp.communicate()[0] 275 if subp.returncode < 0: 276 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 277 (-subp.returncode, 278 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 279 280 return subp.returncode, output if subp.returncode else '' 281 282@contextmanager 283def qemu_nbd_popen(*args): 284 '''Context manager running qemu-nbd within the context''' 285 pid_file = file_path("pid") 286 287 cmd = list(qemu_nbd_args) 288 cmd.extend(('--persistent', '--pid-file', pid_file)) 289 cmd.extend(args) 290 291 log('Start NBD server') 292 p = subprocess.Popen(cmd) 293 try: 294 while not os.path.exists(pid_file): 295 if p.poll() is not None: 296 raise RuntimeError( 297 "qemu-nbd terminated with exit code {}: {}" 298 .format(p.returncode, ' '.join(cmd))) 299 300 time.sleep(0.01) 301 yield 302 finally: 303 log('Kill NBD server') 304 p.kill() 305 p.wait() 306 307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 308 '''Return True if two image files are identical''' 309 return qemu_img('compare', '-f', fmt1, 310 '-F', fmt2, img1, img2) == 0 311 312def create_image(name, size): 313 '''Create a fully-allocated raw image with sector markers''' 314 file = open(name, 'wb') 315 i = 0 316 while i < size: 317 sector = struct.pack('>l504xl', i // 512, i // 512) 318 file.write(sector) 319 i = i + 512 320 file.close() 321 322def image_size(img): 323 '''Return image's virtual size''' 324 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 325 return json.loads(r)['virtual-size'] 326 327def is_str(val): 328 return isinstance(val, str) 329 330test_dir_re = re.compile(r"%s" % test_dir) 331def filter_test_dir(msg): 332 return test_dir_re.sub("TEST_DIR", msg) 333 334win32_re = re.compile(r"\r") 335def filter_win32(msg): 336 return win32_re.sub("", msg) 337 338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 339 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 340 r"and [0-9\/.inf]* ops\/sec\)") 341def filter_qemu_io(msg): 342 msg = filter_win32(msg) 343 return qemu_io_re.sub("X ops; XX:XX:XX.X " 344 "(XXX YYY/sec and XXX ops/sec)", msg) 345 346chown_re = re.compile(r"chown [0-9]+:[0-9]+") 347def filter_chown(msg): 348 return chown_re.sub("chown UID:GID", msg) 349 350def filter_qmp_event(event): 351 '''Filter a QMP event dict''' 352 event = dict(event) 353 if 'timestamp' in event: 354 event['timestamp']['seconds'] = 'SECS' 355 event['timestamp']['microseconds'] = 'USECS' 356 return event 357 358def filter_qmp(qmsg, filter_fn): 359 '''Given a string filter, filter a QMP object's values. 360 filter_fn takes a (key, value) pair.''' 361 # Iterate through either lists or dicts; 362 if isinstance(qmsg, list): 363 items = enumerate(qmsg) 364 else: 365 items = qmsg.items() 366 367 for k, v in items: 368 if isinstance(v, (dict, list)): 369 qmsg[k] = filter_qmp(v, filter_fn) 370 else: 371 qmsg[k] = filter_fn(k, v) 372 return qmsg 373 374def filter_testfiles(msg): 375 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 376 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 377 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 378 379def filter_qmp_testfiles(qmsg): 380 def _filter(_key, value): 381 if is_str(value): 382 return filter_testfiles(value) 383 return value 384 return filter_qmp(qmsg, _filter) 385 386def filter_generated_node_ids(msg): 387 return re.sub("#block[0-9]+", "NODE_NAME", msg) 388 389def filter_img_info(output, filename): 390 lines = [] 391 for line in output.split('\n'): 392 if 'disk size' in line or 'actual-size' in line: 393 continue 394 line = line.replace(filename, 'TEST_IMG') 395 line = filter_testfiles(line) 396 line = line.replace(imgfmt, 'IMGFMT') 397 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 398 line = re.sub('uuid: [-a-f0-9]+', 399 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 400 line) 401 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 402 lines.append(line) 403 return '\n'.join(lines) 404 405def filter_imgfmt(msg): 406 return msg.replace(imgfmt, 'IMGFMT') 407 408def filter_qmp_imgfmt(qmsg): 409 def _filter(_key, value): 410 if is_str(value): 411 return filter_imgfmt(value) 412 return value 413 return filter_qmp(qmsg, _filter) 414 415 416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 417 418def log(msg: Msg, 419 filters: Iterable[Callable[[Msg], Msg]] = (), 420 indent: Optional[int] = None) -> None: 421 """ 422 Logs either a string message or a JSON serializable message (like QMP). 423 If indent is provided, JSON serializable messages are pretty-printed. 424 """ 425 for flt in filters: 426 msg = flt(msg) 427 if isinstance(msg, (dict, list)): 428 # Don't sort if it's already sorted 429 do_sort = not isinstance(msg, OrderedDict) 430 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 431 else: 432 test_logger.info(msg) 433 434class Timeout: 435 def __init__(self, seconds, errmsg="Timeout"): 436 self.seconds = seconds 437 self.errmsg = errmsg 438 def __enter__(self): 439 signal.signal(signal.SIGALRM, self.timeout) 440 signal.setitimer(signal.ITIMER_REAL, self.seconds) 441 return self 442 def __exit__(self, exc_type, value, traceback): 443 signal.setitimer(signal.ITIMER_REAL, 0) 444 return False 445 def timeout(self, signum, frame): 446 raise Exception(self.errmsg) 447 448def file_pattern(name): 449 return "{0}-{1}".format(os.getpid(), name) 450 451class FilePaths: 452 """ 453 FilePaths is an auto-generated filename that cleans itself up. 454 455 Use this context manager to generate filenames and ensure that the file 456 gets deleted:: 457 458 with FilePaths(['test.img']) as img_path: 459 qemu_img('create', img_path, '1G') 460 # migration_sock_path is automatically deleted 461 """ 462 def __init__(self, names, base_dir=test_dir): 463 self.paths = [] 464 for name in names: 465 self.paths.append(os.path.join(base_dir, file_pattern(name))) 466 467 def __enter__(self): 468 return self.paths 469 470 def __exit__(self, exc_type, exc_val, exc_tb): 471 try: 472 for path in self.paths: 473 os.remove(path) 474 except OSError: 475 pass 476 return False 477 478class FilePath(FilePaths): 479 """ 480 FilePath is a specialization of FilePaths that takes a single filename. 481 """ 482 def __init__(self, name, base_dir=test_dir): 483 super(FilePath, self).__init__([name], base_dir) 484 485 def __enter__(self): 486 return self.paths[0] 487 488def file_path_remover(): 489 for path in reversed(file_path_remover.paths): 490 try: 491 os.remove(path) 492 except OSError: 493 pass 494 495 496def file_path(*names, base_dir=test_dir): 497 ''' Another way to get auto-generated filename that cleans itself up. 498 499 Use is as simple as: 500 501 img_a, img_b = file_path('a.img', 'b.img') 502 sock = file_path('socket') 503 ''' 504 505 if not hasattr(file_path_remover, 'paths'): 506 file_path_remover.paths = [] 507 atexit.register(file_path_remover) 508 509 paths = [] 510 for name in names: 511 filename = file_pattern(name) 512 path = os.path.join(base_dir, filename) 513 file_path_remover.paths.append(path) 514 paths.append(path) 515 516 return paths[0] if len(paths) == 1 else paths 517 518def remote_filename(path): 519 if imgproto == 'file': 520 return path 521 elif imgproto == 'ssh': 522 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 523 else: 524 raise Exception("Protocol %s not supported" % (imgproto)) 525 526class VM(qtest.QEMUQtestMachine): 527 '''A QEMU VM''' 528 529 def __init__(self, path_suffix=''): 530 name = "qemu%s-%d" % (path_suffix, os.getpid()) 531 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 532 test_dir=test_dir, 533 socket_scm_helper=socket_scm_helper, 534 sock_dir=sock_dir) 535 self._num_drives = 0 536 537 def add_object(self, opts): 538 self._args.append('-object') 539 self._args.append(opts) 540 return self 541 542 def add_device(self, opts): 543 self._args.append('-device') 544 self._args.append(opts) 545 return self 546 547 def add_drive_raw(self, opts): 548 self._args.append('-drive') 549 self._args.append(opts) 550 return self 551 552 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 553 '''Add a virtio-blk drive to the VM''' 554 options = ['if=%s' % interface, 555 'id=drive%d' % self._num_drives] 556 557 if path is not None: 558 options.append('file=%s' % path) 559 options.append('format=%s' % img_format) 560 options.append('cache=%s' % cachemode) 561 options.append('aio=%s' % aiomode) 562 563 if opts: 564 options.append(opts) 565 566 if img_format == 'luks' and 'key-secret' not in opts: 567 # default luks support 568 if luks_default_secret_object not in self._args: 569 self.add_object(luks_default_secret_object) 570 571 options.append(luks_default_key_secret_opt) 572 573 self._args.append('-drive') 574 self._args.append(','.join(options)) 575 self._num_drives += 1 576 return self 577 578 def add_blockdev(self, opts): 579 self._args.append('-blockdev') 580 if isinstance(opts, str): 581 self._args.append(opts) 582 else: 583 self._args.append(','.join(opts)) 584 return self 585 586 def add_incoming(self, addr): 587 self._args.append('-incoming') 588 self._args.append(addr) 589 return self 590 591 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 592 cmd = 'human-monitor-command' 593 kwargs = {'command-line': command_line} 594 if use_log: 595 return self.qmp_log(cmd, **kwargs) 596 else: 597 return self.qmp(cmd, **kwargs) 598 599 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 600 """Pause drive r/w operations""" 601 if not event: 602 self.pause_drive(drive, "read_aio") 603 self.pause_drive(drive, "write_aio") 604 return 605 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 606 607 def resume_drive(self, drive: str) -> None: 608 """Resume drive r/w operations""" 609 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 610 611 def hmp_qemu_io(self, drive: str, cmd: str, 612 use_log: bool = False) -> QMPMessage: 613 """Write to a given drive using an HMP command""" 614 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 615 616 def flatten_qmp_object(self, obj, output=None, basestr=''): 617 if output is None: 618 output = dict() 619 if isinstance(obj, list): 620 for i, item in enumerate(obj): 621 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 622 elif isinstance(obj, dict): 623 for key in obj: 624 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 625 else: 626 output[basestr[:-1]] = obj # Strip trailing '.' 627 return output 628 629 def qmp_to_opts(self, obj): 630 obj = self.flatten_qmp_object(obj) 631 output_list = list() 632 for key in obj: 633 output_list += [key + '=' + obj[key]] 634 return ','.join(output_list) 635 636 def get_qmp_events_filtered(self, wait=60.0): 637 result = [] 638 for ev in self.get_qmp_events(wait=wait): 639 result.append(filter_qmp_event(ev)) 640 return result 641 642 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 643 full_cmd = OrderedDict(( 644 ("execute", cmd), 645 ("arguments", ordered_qmp(kwargs)) 646 )) 647 log(full_cmd, filters, indent=indent) 648 result = self.qmp(cmd, **kwargs) 649 log(result, filters, indent=indent) 650 return result 651 652 # Returns None on success, and an error string on failure 653 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 654 pre_finalize=None, cancel=False, wait=60.0): 655 """ 656 run_job moves a job from creation through to dismissal. 657 658 :param job: String. ID of recently-launched job 659 :param auto_finalize: Bool. True if the job was launched with 660 auto_finalize. Defaults to True. 661 :param auto_dismiss: Bool. True if the job was launched with 662 auto_dismiss=True. Defaults to False. 663 :param pre_finalize: Callback. A callable that takes no arguments to be 664 invoked prior to issuing job-finalize, if any. 665 :param cancel: Bool. When true, cancels the job after the pre_finalize 666 callback. 667 :param wait: Float. Timeout value specifying how long to wait for any 668 event, in seconds. Defaults to 60.0. 669 """ 670 match_device = {'data': {'device': job}} 671 match_id = {'data': {'id': job}} 672 events = [ 673 ('BLOCK_JOB_COMPLETED', match_device), 674 ('BLOCK_JOB_CANCELLED', match_device), 675 ('BLOCK_JOB_ERROR', match_device), 676 ('BLOCK_JOB_READY', match_device), 677 ('BLOCK_JOB_PENDING', match_id), 678 ('JOB_STATUS_CHANGE', match_id) 679 ] 680 error = None 681 while True: 682 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 683 if ev['event'] != 'JOB_STATUS_CHANGE': 684 log(ev) 685 continue 686 status = ev['data']['status'] 687 if status == 'aborting': 688 result = self.qmp('query-jobs') 689 for j in result['return']: 690 if j['id'] == job: 691 error = j['error'] 692 log('Job failed: %s' % (j['error'])) 693 elif status == 'ready': 694 self.qmp_log('job-complete', id=job) 695 elif status == 'pending' and not auto_finalize: 696 if pre_finalize: 697 pre_finalize() 698 if cancel: 699 self.qmp_log('job-cancel', id=job) 700 else: 701 self.qmp_log('job-finalize', id=job) 702 elif status == 'concluded' and not auto_dismiss: 703 self.qmp_log('job-dismiss', id=job) 704 elif status == 'null': 705 return error 706 707 # Returns None on success, and an error string on failure 708 def blockdev_create(self, options, job_id='job0', filters=None): 709 if filters is None: 710 filters = [filter_qmp_testfiles] 711 result = self.qmp_log('blockdev-create', filters=filters, 712 job_id=job_id, options=options) 713 714 if 'return' in result: 715 assert result['return'] == {} 716 job_result = self.run_job(job_id) 717 else: 718 job_result = result['error'] 719 720 log("") 721 return job_result 722 723 def enable_migration_events(self, name): 724 log('Enabling migration QMP events on %s...' % name) 725 log(self.qmp('migrate-set-capabilities', capabilities=[ 726 { 727 'capability': 'events', 728 'state': True 729 } 730 ])) 731 732 def wait_migration(self, expect_runstate): 733 while True: 734 event = self.event_wait('MIGRATION') 735 log(event, filters=[filter_qmp_event]) 736 if event['data']['status'] == 'completed': 737 break 738 # The event may occur in finish-migrate, so wait for the expected 739 # post-migration runstate 740 while self.qmp('query-status')['return']['status'] != expect_runstate: 741 pass 742 743 def node_info(self, node_name): 744 nodes = self.qmp('query-named-block-nodes') 745 for x in nodes['return']: 746 if x['node-name'] == node_name: 747 return x 748 return None 749 750 def query_bitmaps(self): 751 res = self.qmp("query-named-block-nodes") 752 return {device['node-name']: device['dirty-bitmaps'] 753 for device in res['return'] if 'dirty-bitmaps' in device} 754 755 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 756 """ 757 get a specific bitmap from the object returned by query_bitmaps. 758 :param recording: If specified, filter results by the specified value. 759 :param bitmaps: If specified, use it instead of call query_bitmaps() 760 """ 761 if bitmaps is None: 762 bitmaps = self.query_bitmaps() 763 764 for bitmap in bitmaps[node_name]: 765 if bitmap.get('name', '') == bitmap_name: 766 if recording is None or bitmap.get('recording') == recording: 767 return bitmap 768 return None 769 770 def check_bitmap_status(self, node_name, bitmap_name, fields): 771 ret = self.get_bitmap(node_name, bitmap_name) 772 773 return fields.items() <= ret.items() 774 775 def assert_block_path(self, root, path, expected_node, graph=None): 776 """ 777 Check whether the node under the given path in the block graph 778 is @expected_node. 779 780 @root is the node name of the node where the @path is rooted. 781 782 @path is a string that consists of child names separated by 783 slashes. It must begin with a slash. 784 785 Examples for @root + @path: 786 - root="qcow2-node", path="/backing/file" 787 - root="quorum-node", path="/children.2/file" 788 789 Hypothetically, @path could be empty, in which case it would 790 point to @root. However, in practice this case is not useful 791 and hence not allowed. 792 793 @expected_node may be None. (All elements of the path but the 794 leaf must still exist.) 795 796 @graph may be None or the result of an x-debug-query-block-graph 797 call that has already been performed. 798 """ 799 if graph is None: 800 graph = self.qmp('x-debug-query-block-graph')['return'] 801 802 iter_path = iter(path.split('/')) 803 804 # Must start with a / 805 assert next(iter_path) == '' 806 807 node = next((node for node in graph['nodes'] if node['name'] == root), 808 None) 809 810 # An empty @path is not allowed, so the root node must be present 811 assert node is not None, 'Root node %s not found' % root 812 813 for child_name in iter_path: 814 assert node is not None, 'Cannot follow path %s%s' % (root, path) 815 816 try: 817 node_id = next(edge['child'] for edge in graph['edges'] 818 if (edge['parent'] == node['id'] and 819 edge['name'] == child_name)) 820 821 node = next(node for node in graph['nodes'] 822 if node['id'] == node_id) 823 824 except StopIteration: 825 node = None 826 827 if node is None: 828 assert expected_node is None, \ 829 'No node found under %s (but expected %s)' % \ 830 (path, expected_node) 831 else: 832 assert node['name'] == expected_node, \ 833 'Found node %s under %s (but expected %s)' % \ 834 (node['name'], path, expected_node) 835 836index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 837 838class QMPTestCase(unittest.TestCase): 839 '''Abstract base class for QMP test cases''' 840 841 def __init__(self, *args, **kwargs): 842 super().__init__(*args, **kwargs) 843 # Many users of this class set a VM property we rely on heavily 844 # in the methods below. 845 self.vm = None 846 847 def dictpath(self, d, path): 848 '''Traverse a path in a nested dict''' 849 for component in path.split('/'): 850 m = index_re.match(component) 851 if m: 852 component, idx = m.groups() 853 idx = int(idx) 854 855 if not isinstance(d, dict) or component not in d: 856 self.fail(f'failed path traversal for "{path}" in "{d}"') 857 d = d[component] 858 859 if m: 860 if not isinstance(d, list): 861 self.fail(f'path component "{component}" in "{path}" ' 862 f'is not a list in "{d}"') 863 try: 864 d = d[idx] 865 except IndexError: 866 self.fail(f'invalid index "{idx}" in path "{path}" ' 867 f'in "{d}"') 868 return d 869 870 def assert_qmp_absent(self, d, path): 871 try: 872 result = self.dictpath(d, path) 873 except AssertionError: 874 return 875 self.fail('path "%s" has value "%s"' % (path, str(result))) 876 877 def assert_qmp(self, d, path, value): 878 '''Assert that the value for a specific path in a QMP dict 879 matches. When given a list of values, assert that any of 880 them matches.''' 881 882 result = self.dictpath(d, path) 883 884 # [] makes no sense as a list of valid values, so treat it as 885 # an actual single value. 886 if isinstance(value, list) and value != []: 887 for v in value: 888 if result == v: 889 return 890 self.fail('no match for "%s" in %s' % (str(result), str(value))) 891 else: 892 self.assertEqual(result, value, 893 '"%s" is "%s", expected "%s"' 894 % (path, str(result), str(value))) 895 896 def assert_no_active_block_jobs(self): 897 result = self.vm.qmp('query-block-jobs') 898 self.assert_qmp(result, 'return', []) 899 900 def assert_has_block_node(self, node_name=None, file_name=None): 901 """Issue a query-named-block-nodes and assert node_name and/or 902 file_name is present in the result""" 903 def check_equal_or_none(a, b): 904 return a is None or b is None or a == b 905 assert node_name or file_name 906 result = self.vm.qmp('query-named-block-nodes') 907 for x in result["return"]: 908 if check_equal_or_none(x.get("node-name"), node_name) and \ 909 check_equal_or_none(x.get("file"), file_name): 910 return 911 self.fail("Cannot find %s %s in result:\n%s" % 912 (node_name, file_name, result)) 913 914 def assert_json_filename_equal(self, json_filename, reference): 915 '''Asserts that the given filename is a json: filename and that its 916 content is equal to the given reference object''' 917 self.assertEqual(json_filename[:5], 'json:') 918 self.assertEqual( 919 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 920 self.vm.flatten_qmp_object(reference) 921 ) 922 923 def cancel_and_wait(self, drive='drive0', force=False, 924 resume=False, wait=60.0): 925 '''Cancel a block job and wait for it to finish, returning the event''' 926 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 927 self.assert_qmp(result, 'return', {}) 928 929 if resume: 930 self.vm.resume_drive(drive) 931 932 cancelled = False 933 result = None 934 while not cancelled: 935 for event in self.vm.get_qmp_events(wait=wait): 936 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 937 event['event'] == 'BLOCK_JOB_CANCELLED': 938 self.assert_qmp(event, 'data/device', drive) 939 result = event 940 cancelled = True 941 elif event['event'] == 'JOB_STATUS_CHANGE': 942 self.assert_qmp(event, 'data/id', drive) 943 944 945 self.assert_no_active_block_jobs() 946 return result 947 948 def wait_until_completed(self, drive='drive0', check_offset=True, 949 wait=60.0, error=None): 950 '''Wait for a block job to finish, returning the event''' 951 while True: 952 for event in self.vm.get_qmp_events(wait=wait): 953 if event['event'] == 'BLOCK_JOB_COMPLETED': 954 self.assert_qmp(event, 'data/device', drive) 955 if error is None: 956 self.assert_qmp_absent(event, 'data/error') 957 if check_offset: 958 self.assert_qmp(event, 'data/offset', 959 event['data']['len']) 960 else: 961 self.assert_qmp(event, 'data/error', error) 962 self.assert_no_active_block_jobs() 963 return event 964 if event['event'] == 'JOB_STATUS_CHANGE': 965 self.assert_qmp(event, 'data/id', drive) 966 967 def wait_ready(self, drive='drive0'): 968 """Wait until a BLOCK_JOB_READY event, and return the event.""" 969 f = {'data': {'type': 'mirror', 'device': drive}} 970 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 971 972 def wait_ready_and_cancel(self, drive='drive0'): 973 self.wait_ready(drive=drive) 974 event = self.cancel_and_wait(drive=drive) 975 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 976 self.assert_qmp(event, 'data/type', 'mirror') 977 self.assert_qmp(event, 'data/offset', event['data']['len']) 978 979 def complete_and_wait(self, drive='drive0', wait_ready=True, 980 completion_error=None): 981 '''Complete a block job and wait for it to finish''' 982 if wait_ready: 983 self.wait_ready(drive=drive) 984 985 result = self.vm.qmp('block-job-complete', device=drive) 986 self.assert_qmp(result, 'return', {}) 987 988 event = self.wait_until_completed(drive=drive, error=completion_error) 989 self.assert_qmp(event, 'data/type', 'mirror') 990 991 def pause_wait(self, job_id='job0'): 992 with Timeout(3, "Timeout waiting for job to pause"): 993 while True: 994 result = self.vm.qmp('query-block-jobs') 995 found = False 996 for job in result['return']: 997 if job['device'] == job_id: 998 found = True 999 if job['paused'] and not job['busy']: 1000 return job 1001 break 1002 assert found 1003 1004 def pause_job(self, job_id='job0', wait=True): 1005 result = self.vm.qmp('block-job-pause', device=job_id) 1006 self.assert_qmp(result, 'return', {}) 1007 if wait: 1008 return self.pause_wait(job_id) 1009 return result 1010 1011 def case_skip(self, reason): 1012 '''Skip this test case''' 1013 case_notrun(reason) 1014 self.skipTest(reason) 1015 1016 1017def notrun(reason): 1018 '''Skip this test suite''' 1019 # Each test in qemu-iotests has a number ("seq") 1020 seq = os.path.basename(sys.argv[0]) 1021 1022 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1023 logger.warning("%s not run: %s", seq, reason) 1024 sys.exit(0) 1025 1026def case_notrun(reason): 1027 '''Mark this test case as not having been run (without actually 1028 skipping it, that is left to the caller). See 1029 QMPTestCase.case_skip() for a variant that actually skips the 1030 current test case.''' 1031 1032 # Each test in qemu-iotests has a number ("seq") 1033 seq = os.path.basename(sys.argv[0]) 1034 1035 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1036 ' [case not run] ' + reason + '\n') 1037 1038def _verify_image_format(supported_fmts: Sequence[str] = (), 1039 unsupported_fmts: Sequence[str] = ()) -> None: 1040 assert not (supported_fmts and unsupported_fmts) 1041 1042 if 'generic' in supported_fmts and \ 1043 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1044 # similar to 1045 # _supported_fmt generic 1046 # for bash tests 1047 if imgfmt == 'luks': 1048 verify_working_luks() 1049 return 1050 1051 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1052 if not_sup or (imgfmt in unsupported_fmts): 1053 notrun('not suitable for this image format: %s' % imgfmt) 1054 1055 if imgfmt == 'luks': 1056 verify_working_luks() 1057 1058def _verify_protocol(supported: Sequence[str] = (), 1059 unsupported: Sequence[str] = ()) -> None: 1060 assert not (supported and unsupported) 1061 1062 if 'generic' in supported: 1063 return 1064 1065 not_sup = supported and (imgproto not in supported) 1066 if not_sup or (imgproto in unsupported): 1067 notrun('not suitable for this protocol: %s' % imgproto) 1068 1069def _verify_platform(supported: Sequence[str] = (), 1070 unsupported: Sequence[str] = ()) -> None: 1071 if any((sys.platform.startswith(x) for x in unsupported)): 1072 notrun('not suitable for this OS: %s' % sys.platform) 1073 1074 if supported: 1075 if not any((sys.platform.startswith(x) for x in supported)): 1076 notrun('not suitable for this OS: %s' % sys.platform) 1077 1078def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1079 if supported_cache_modes and (cachemode not in supported_cache_modes): 1080 notrun('not suitable for this cache mode: %s' % cachemode) 1081 1082def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1083 if supported_aio_modes and (aiomode not in supported_aio_modes): 1084 notrun('not suitable for this aio mode: %s' % aiomode) 1085 1086def supports_quorum(): 1087 return 'quorum' in qemu_img_pipe('--help') 1088 1089def verify_quorum(): 1090 '''Skip test suite if quorum support is not available''' 1091 if not supports_quorum(): 1092 notrun('quorum support missing') 1093 1094def has_working_luks() -> Tuple[bool, str]: 1095 """ 1096 Check whether our LUKS driver can actually create images 1097 (this extends to LUKS encryption for qcow2). 1098 1099 If not, return the reason why. 1100 """ 1101 1102 img_file = f'{test_dir}/luks-test.luks' 1103 (output, status) = \ 1104 qemu_img_pipe_and_status('create', '-f', 'luks', 1105 '--object', luks_default_secret_object, 1106 '-o', luks_default_key_secret_opt, 1107 '-o', 'iter-time=10', 1108 img_file, '1G') 1109 try: 1110 os.remove(img_file) 1111 except OSError: 1112 pass 1113 1114 if status != 0: 1115 reason = output 1116 for line in output.splitlines(): 1117 if img_file + ':' in line: 1118 reason = line.split(img_file + ':', 1)[1].strip() 1119 break 1120 1121 return (False, reason) 1122 else: 1123 return (True, '') 1124 1125def verify_working_luks(): 1126 """ 1127 Skip test suite if LUKS does not work 1128 """ 1129 (working, reason) = has_working_luks() 1130 if not working: 1131 notrun(reason) 1132 1133def qemu_pipe(*args): 1134 """ 1135 Run qemu with an option to print something and exit (e.g. a help option). 1136 1137 :return: QEMU's stdout output. 1138 """ 1139 args = [qemu_prog] + qemu_opts + list(args) 1140 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1141 stderr=subprocess.STDOUT, 1142 universal_newlines=True) 1143 output = subp.communicate()[0] 1144 if subp.returncode < 0: 1145 sys.stderr.write('qemu received signal %i: %s\n' % 1146 (-subp.returncode, ' '.join(args))) 1147 return output 1148 1149def supported_formats(read_only=False): 1150 '''Set 'read_only' to True to check ro-whitelist 1151 Otherwise, rw-whitelist is checked''' 1152 1153 if not hasattr(supported_formats, "formats"): 1154 supported_formats.formats = {} 1155 1156 if read_only not in supported_formats.formats: 1157 format_message = qemu_pipe("-drive", "format=help") 1158 line = 1 if read_only else 0 1159 supported_formats.formats[read_only] = \ 1160 format_message.splitlines()[line].split(":")[1].split() 1161 1162 return supported_formats.formats[read_only] 1163 1164def skip_if_unsupported(required_formats=(), read_only=False): 1165 '''Skip Test Decorator 1166 Runs the test if all the required formats are whitelisted''' 1167 def skip_test_decorator(func): 1168 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1169 **kwargs: Dict[str, Any]) -> None: 1170 if callable(required_formats): 1171 fmts = required_formats(test_case) 1172 else: 1173 fmts = required_formats 1174 1175 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1176 if usf_list: 1177 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1178 test_case.case_skip(msg) 1179 else: 1180 func(test_case, *args, **kwargs) 1181 return func_wrapper 1182 return skip_test_decorator 1183 1184def skip_for_formats(formats: Sequence[str] = ()) \ 1185 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1186 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1187 '''Skip Test Decorator 1188 Skips the test for the given formats''' 1189 def skip_test_decorator(func): 1190 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1191 **kwargs: Dict[str, Any]) -> None: 1192 if imgfmt in formats: 1193 msg = f'{test_case}: Skipped for format {imgfmt}' 1194 test_case.case_skip(msg) 1195 else: 1196 func(test_case, *args, **kwargs) 1197 return func_wrapper 1198 return skip_test_decorator 1199 1200def skip_if_user_is_root(func): 1201 '''Skip Test Decorator 1202 Runs the test only without root permissions''' 1203 def func_wrapper(*args, **kwargs): 1204 if os.getuid() == 0: 1205 case_notrun('{}: cannot be run as root'.format(args[0])) 1206 return None 1207 else: 1208 return func(*args, **kwargs) 1209 return func_wrapper 1210 1211def execute_unittest(debug=False): 1212 """Executes unittests within the calling module.""" 1213 1214 verbosity = 2 if debug else 1 1215 1216 if debug: 1217 output = sys.stdout 1218 else: 1219 # We need to filter out the time taken from the output so that 1220 # qemu-iotest can reliably diff the results against master output. 1221 output = io.StringIO() 1222 1223 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1224 verbosity=verbosity) 1225 try: 1226 # unittest.main() will use sys.exit(); so expect a SystemExit 1227 # exception 1228 unittest.main(testRunner=runner) 1229 finally: 1230 # We need to filter out the time taken from the output so that 1231 # qemu-iotest can reliably diff the results against master output. 1232 if not debug: 1233 out = output.getvalue() 1234 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1235 1236 # Hide skipped tests from the reference output 1237 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1238 out_first_line, out_rest = out.split('\n', 1) 1239 out = out_first_line.replace('s', '.') + '\n' + out_rest 1240 1241 sys.stderr.write(out) 1242 1243def execute_setup_common(supported_fmts: Sequence[str] = (), 1244 supported_platforms: Sequence[str] = (), 1245 supported_cache_modes: Sequence[str] = (), 1246 supported_aio_modes: Sequence[str] = (), 1247 unsupported_fmts: Sequence[str] = (), 1248 supported_protocols: Sequence[str] = (), 1249 unsupported_protocols: Sequence[str] = ()) -> bool: 1250 """ 1251 Perform necessary setup for either script-style or unittest-style tests. 1252 1253 :return: Bool; Whether or not debug mode has been requested via the CLI. 1254 """ 1255 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1256 1257 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1258 # indicate that we're not being run via "check". There may be 1259 # other things set up by "check" that individual test cases rely 1260 # on. 1261 if test_dir is None or qemu_default_machine is None: 1262 sys.stderr.write('Please run this test via the "check" script\n') 1263 sys.exit(os.EX_USAGE) 1264 1265 debug = '-d' in sys.argv 1266 if debug: 1267 sys.argv.remove('-d') 1268 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1269 1270 _verify_image_format(supported_fmts, unsupported_fmts) 1271 _verify_protocol(supported_protocols, unsupported_protocols) 1272 _verify_platform(supported=supported_platforms) 1273 _verify_cache_mode(supported_cache_modes) 1274 _verify_aio_mode(supported_aio_modes) 1275 1276 return debug 1277 1278def execute_test(*args, test_function=None, **kwargs): 1279 """Run either unittest or script-style tests.""" 1280 1281 debug = execute_setup_common(*args, **kwargs) 1282 if not test_function: 1283 execute_unittest(debug) 1284 else: 1285 test_function() 1286 1287def activate_logging(): 1288 """Activate iotests.log() output to stdout for script-style tests.""" 1289 handler = logging.StreamHandler(stream=sys.stdout) 1290 formatter = logging.Formatter('%(message)s') 1291 handler.setFormatter(formatter) 1292 test_logger.addHandler(handler) 1293 test_logger.setLevel(logging.INFO) 1294 test_logger.propagate = False 1295 1296# This is called from script-style iotests without a single point of entry 1297def script_initialize(*args, **kwargs): 1298 """Initialize script-style tests without running any tests.""" 1299 activate_logging() 1300 execute_setup_common(*args, **kwargs) 1301 1302# This is called from script-style iotests with a single point of entry 1303def script_main(test_function, *args, **kwargs): 1304 """Run script-style tests outside of the unittest framework""" 1305 activate_logging() 1306 execute_test(*args, test_function=test_function, **kwargs) 1307 1308# This is called from unittest style iotests 1309def main(*args, **kwargs): 1310 """Run tests using the unittest framework""" 1311 execute_test(*args, **kwargs) 1312