1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31from typing import (Any, Callable, Dict, Iterable, 32 List, Optional, Sequence, Tuple, TypeVar) 33import unittest 34 35# pylint: disable=import-error, wrong-import-position 36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 37from qemu import qtest 38 39assert sys.version_info >= (3, 6) 40 41# Type Aliases 42QMPResponse = Dict[str, Any] 43 44 45# Use this logger for logging messages directly from the iotests module 46logger = logging.getLogger('qemu.iotests') 47logger.addHandler(logging.NullHandler()) 48 49# Use this logger for messages that ought to be used for diff output. 50test_logger = logging.getLogger('qemu.iotests.diff_io') 51 52 53faulthandler.enable() 54 55# This will not work if arguments contain spaces but is necessary if we 56# want to support the override options that ./check supports. 57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 58if os.environ.get('QEMU_IMG_OPTIONS'): 59 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 60 61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 62if os.environ.get('QEMU_IO_OPTIONS'): 63 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 64 65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 67 qemu_io_args_no_fmt += \ 68 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 69 70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 71if os.environ.get('QEMU_NBD_OPTIONS'): 72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 73 74qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 76 77imgfmt = os.environ.get('IMGFMT', 'raw') 78imgproto = os.environ.get('IMGPROTO', 'file') 79test_dir = os.environ.get('TEST_DIR') 80sock_dir = os.environ.get('SOCK_DIR') 81output_dir = os.environ.get('OUTPUT_DIR', '.') 82cachemode = os.environ.get('CACHEMODE') 83aiomode = os.environ.get('AIOMODE') 84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 85 86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 87 88luks_default_secret_object = 'secret,id=keysec0,data=' + \ 89 os.environ.get('IMGKEYSECRET', '') 90luks_default_key_secret_opt = 'key-secret=keysec0' 91 92 93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 94 """ 95 Run qemu-img and return both its output and its exit code 96 """ 97 subp = subprocess.Popen(qemu_img_args + list(args), 98 stdout=subprocess.PIPE, 99 stderr=subprocess.STDOUT, 100 universal_newlines=True) 101 output = subp.communicate()[0] 102 if subp.returncode < 0: 103 sys.stderr.write('qemu-img received signal %i: %s\n' 104 % (-subp.returncode, 105 ' '.join(qemu_img_args + list(args)))) 106 return (output, subp.returncode) 107 108def qemu_img(*args: str) -> int: 109 '''Run qemu-img and return the exit code''' 110 return qemu_img_pipe_and_status(*args)[1] 111 112def ordered_qmp(qmsg, conv_keys=True): 113 # Dictionaries are not ordered prior to 3.6, therefore: 114 if isinstance(qmsg, list): 115 return [ordered_qmp(atom) for atom in qmsg] 116 if isinstance(qmsg, dict): 117 od = OrderedDict() 118 for k, v in sorted(qmsg.items()): 119 if conv_keys: 120 k = k.replace('_', '-') 121 od[k] = ordered_qmp(v, conv_keys=False) 122 return od 123 return qmsg 124 125def qemu_img_create(*args): 126 args = list(args) 127 128 # default luks support 129 if '-f' in args and args[args.index('-f') + 1] == 'luks': 130 if '-o' in args: 131 i = args.index('-o') 132 if 'key-secret' not in args[i + 1]: 133 args[i + 1].append(luks_default_key_secret_opt) 134 args.insert(i + 2, '--object') 135 args.insert(i + 3, luks_default_secret_object) 136 else: 137 args = ['-o', luks_default_key_secret_opt, 138 '--object', luks_default_secret_object] + args 139 140 args.insert(0, 'create') 141 142 return qemu_img(*args) 143 144def qemu_img_verbose(*args): 145 '''Run qemu-img without suppressing its output and return the exit code''' 146 exitcode = subprocess.call(qemu_img_args + list(args)) 147 if exitcode < 0: 148 sys.stderr.write('qemu-img received signal %i: %s\n' 149 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 150 return exitcode 151 152def qemu_img_pipe(*args: str) -> str: 153 '''Run qemu-img and return its output''' 154 return qemu_img_pipe_and_status(*args)[0] 155 156def qemu_img_log(*args): 157 result = qemu_img_pipe(*args) 158 log(result, filters=[filter_testfiles]) 159 return result 160 161def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 162 args = ['info'] 163 if imgopts: 164 args.append('--image-opts') 165 else: 166 args += ['-f', imgfmt] 167 args += extra_args 168 args.append(filename) 169 170 output = qemu_img_pipe(*args) 171 if not filter_path: 172 filter_path = filename 173 log(filter_img_info(output, filter_path)) 174 175def qemu_io(*args): 176 '''Run qemu-io and return the stdout data''' 177 args = qemu_io_args + list(args) 178 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 179 stderr=subprocess.STDOUT, 180 universal_newlines=True) 181 output = subp.communicate()[0] 182 if subp.returncode < 0: 183 sys.stderr.write('qemu-io received signal %i: %s\n' 184 % (-subp.returncode, ' '.join(args))) 185 return output 186 187def qemu_io_log(*args): 188 result = qemu_io(*args) 189 log(result, filters=[filter_testfiles, filter_qemu_io]) 190 return result 191 192def qemu_io_silent(*args): 193 '''Run qemu-io and return the exit code, suppressing stdout''' 194 args = qemu_io_args + list(args) 195 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 196 if exitcode < 0: 197 sys.stderr.write('qemu-io received signal %i: %s\n' % 198 (-exitcode, ' '.join(args))) 199 return exitcode 200 201def qemu_io_silent_check(*args): 202 '''Run qemu-io and return the true if subprocess returned 0''' 203 args = qemu_io_args + list(args) 204 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 205 stderr=subprocess.STDOUT) 206 return exitcode == 0 207 208def get_virtio_scsi_device(): 209 if qemu_default_machine == 's390-ccw-virtio': 210 return 'virtio-scsi-ccw' 211 return 'virtio-scsi-pci' 212 213class QemuIoInteractive: 214 def __init__(self, *args): 215 self.args = qemu_io_args_no_fmt + list(args) 216 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 217 stdout=subprocess.PIPE, 218 stderr=subprocess.STDOUT, 219 universal_newlines=True) 220 out = self._p.stdout.read(9) 221 if out != 'qemu-io> ': 222 # Most probably qemu-io just failed to start. 223 # Let's collect the whole output and exit. 224 out += self._p.stdout.read() 225 self._p.wait(timeout=1) 226 raise ValueError(out) 227 228 def close(self): 229 self._p.communicate('q\n') 230 231 def _read_output(self): 232 pattern = 'qemu-io> ' 233 n = len(pattern) 234 pos = 0 235 s = [] 236 while pos != n: 237 c = self._p.stdout.read(1) 238 # check unexpected EOF 239 assert c != '' 240 s.append(c) 241 if c == pattern[pos]: 242 pos += 1 243 else: 244 pos = 0 245 246 return ''.join(s[:-n]) 247 248 def cmd(self, cmd): 249 # quit command is in close(), '\n' is added automatically 250 assert '\n' not in cmd 251 cmd = cmd.strip() 252 assert cmd not in ('q', 'quit') 253 self._p.stdin.write(cmd + '\n') 254 self._p.stdin.flush() 255 return self._read_output() 256 257 258def qemu_nbd(*args): 259 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 260 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 261 262def qemu_nbd_early_pipe(*args): 263 '''Run qemu-nbd in daemon mode and return both the parent's exit code 264 and its output in case of an error''' 265 subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args), 266 stdout=subprocess.PIPE, 267 universal_newlines=True) 268 output = subp.communicate()[0] 269 if subp.returncode < 0: 270 sys.stderr.write('qemu-nbd received signal %i: %s\n' % 271 (-subp.returncode, 272 ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) 273 274 return subp.returncode, output if subp.returncode else '' 275 276def qemu_nbd_popen(*args): 277 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 278 return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args)) 279 280def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 281 '''Return True if two image files are identical''' 282 return qemu_img('compare', '-f', fmt1, 283 '-F', fmt2, img1, img2) == 0 284 285def create_image(name, size): 286 '''Create a fully-allocated raw image with sector markers''' 287 file = open(name, 'wb') 288 i = 0 289 while i < size: 290 sector = struct.pack('>l504xl', i // 512, i // 512) 291 file.write(sector) 292 i = i + 512 293 file.close() 294 295def image_size(img): 296 '''Return image's virtual size''' 297 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 298 return json.loads(r)['virtual-size'] 299 300def is_str(val): 301 return isinstance(val, str) 302 303test_dir_re = re.compile(r"%s" % test_dir) 304def filter_test_dir(msg): 305 return test_dir_re.sub("TEST_DIR", msg) 306 307win32_re = re.compile(r"\r") 308def filter_win32(msg): 309 return win32_re.sub("", msg) 310 311qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 312 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 313 r"and [0-9\/.inf]* ops\/sec\)") 314def filter_qemu_io(msg): 315 msg = filter_win32(msg) 316 return qemu_io_re.sub("X ops; XX:XX:XX.X " 317 "(XXX YYY/sec and XXX ops/sec)", msg) 318 319chown_re = re.compile(r"chown [0-9]+:[0-9]+") 320def filter_chown(msg): 321 return chown_re.sub("chown UID:GID", msg) 322 323def filter_qmp_event(event): 324 '''Filter a QMP event dict''' 325 event = dict(event) 326 if 'timestamp' in event: 327 event['timestamp']['seconds'] = 'SECS' 328 event['timestamp']['microseconds'] = 'USECS' 329 return event 330 331def filter_qmp(qmsg, filter_fn): 332 '''Given a string filter, filter a QMP object's values. 333 filter_fn takes a (key, value) pair.''' 334 # Iterate through either lists or dicts; 335 if isinstance(qmsg, list): 336 items = enumerate(qmsg) 337 else: 338 items = qmsg.items() 339 340 for k, v in items: 341 if isinstance(v, (dict, list)): 342 qmsg[k] = filter_qmp(v, filter_fn) 343 else: 344 qmsg[k] = filter_fn(k, v) 345 return qmsg 346 347def filter_testfiles(msg): 348 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 349 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 350 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 351 352def filter_qmp_testfiles(qmsg): 353 def _filter(_key, value): 354 if is_str(value): 355 return filter_testfiles(value) 356 return value 357 return filter_qmp(qmsg, _filter) 358 359def filter_generated_node_ids(msg): 360 return re.sub("#block[0-9]+", "NODE_NAME", msg) 361 362def filter_img_info(output, filename): 363 lines = [] 364 for line in output.split('\n'): 365 if 'disk size' in line or 'actual-size' in line: 366 continue 367 line = line.replace(filename, 'TEST_IMG') 368 line = filter_testfiles(line) 369 line = line.replace(imgfmt, 'IMGFMT') 370 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 371 line = re.sub('uuid: [-a-f0-9]+', 372 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 373 line) 374 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 375 lines.append(line) 376 return '\n'.join(lines) 377 378def filter_imgfmt(msg): 379 return msg.replace(imgfmt, 'IMGFMT') 380 381def filter_qmp_imgfmt(qmsg): 382 def _filter(_key, value): 383 if is_str(value): 384 return filter_imgfmt(value) 385 return value 386 return filter_qmp(qmsg, _filter) 387 388 389Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 390 391def log(msg: Msg, 392 filters: Iterable[Callable[[Msg], Msg]] = (), 393 indent: Optional[int] = None) -> None: 394 """ 395 Logs either a string message or a JSON serializable message (like QMP). 396 If indent is provided, JSON serializable messages are pretty-printed. 397 """ 398 for flt in filters: 399 msg = flt(msg) 400 if isinstance(msg, (dict, list)): 401 # Don't sort if it's already sorted 402 do_sort = not isinstance(msg, OrderedDict) 403 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 404 else: 405 test_logger.info(msg) 406 407class Timeout: 408 def __init__(self, seconds, errmsg="Timeout"): 409 self.seconds = seconds 410 self.errmsg = errmsg 411 def __enter__(self): 412 signal.signal(signal.SIGALRM, self.timeout) 413 signal.setitimer(signal.ITIMER_REAL, self.seconds) 414 return self 415 def __exit__(self, exc_type, value, traceback): 416 signal.setitimer(signal.ITIMER_REAL, 0) 417 return False 418 def timeout(self, signum, frame): 419 raise Exception(self.errmsg) 420 421def file_pattern(name): 422 return "{0}-{1}".format(os.getpid(), name) 423 424class FilePaths: 425 """ 426 FilePaths is an auto-generated filename that cleans itself up. 427 428 Use this context manager to generate filenames and ensure that the file 429 gets deleted:: 430 431 with FilePaths(['test.img']) as img_path: 432 qemu_img('create', img_path, '1G') 433 # migration_sock_path is automatically deleted 434 """ 435 def __init__(self, names, base_dir=test_dir): 436 self.paths = [] 437 for name in names: 438 self.paths.append(os.path.join(base_dir, file_pattern(name))) 439 440 def __enter__(self): 441 return self.paths 442 443 def __exit__(self, exc_type, exc_val, exc_tb): 444 try: 445 for path in self.paths: 446 os.remove(path) 447 except OSError: 448 pass 449 return False 450 451class FilePath(FilePaths): 452 """ 453 FilePath is a specialization of FilePaths that takes a single filename. 454 """ 455 def __init__(self, name, base_dir=test_dir): 456 super(FilePath, self).__init__([name], base_dir) 457 458 def __enter__(self): 459 return self.paths[0] 460 461def file_path_remover(): 462 for path in reversed(file_path_remover.paths): 463 try: 464 os.remove(path) 465 except OSError: 466 pass 467 468 469def file_path(*names, base_dir=test_dir): 470 ''' Another way to get auto-generated filename that cleans itself up. 471 472 Use is as simple as: 473 474 img_a, img_b = file_path('a.img', 'b.img') 475 sock = file_path('socket') 476 ''' 477 478 if not hasattr(file_path_remover, 'paths'): 479 file_path_remover.paths = [] 480 atexit.register(file_path_remover) 481 482 paths = [] 483 for name in names: 484 filename = file_pattern(name) 485 path = os.path.join(base_dir, filename) 486 file_path_remover.paths.append(path) 487 paths.append(path) 488 489 return paths[0] if len(paths) == 1 else paths 490 491def remote_filename(path): 492 if imgproto == 'file': 493 return path 494 elif imgproto == 'ssh': 495 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 496 else: 497 raise Exception("Protocol %s not supported" % (imgproto)) 498 499class VM(qtest.QEMUQtestMachine): 500 '''A QEMU VM''' 501 502 def __init__(self, path_suffix=''): 503 name = "qemu%s-%d" % (path_suffix, os.getpid()) 504 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 505 test_dir=test_dir, 506 socket_scm_helper=socket_scm_helper, 507 sock_dir=sock_dir) 508 self._num_drives = 0 509 510 def add_object(self, opts): 511 self._args.append('-object') 512 self._args.append(opts) 513 return self 514 515 def add_device(self, opts): 516 self._args.append('-device') 517 self._args.append(opts) 518 return self 519 520 def add_drive_raw(self, opts): 521 self._args.append('-drive') 522 self._args.append(opts) 523 return self 524 525 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 526 '''Add a virtio-blk drive to the VM''' 527 options = ['if=%s' % interface, 528 'id=drive%d' % self._num_drives] 529 530 if path is not None: 531 options.append('file=%s' % path) 532 options.append('format=%s' % img_format) 533 options.append('cache=%s' % cachemode) 534 options.append('aio=%s' % aiomode) 535 536 if opts: 537 options.append(opts) 538 539 if img_format == 'luks' and 'key-secret' not in opts: 540 # default luks support 541 if luks_default_secret_object not in self._args: 542 self.add_object(luks_default_secret_object) 543 544 options.append(luks_default_key_secret_opt) 545 546 self._args.append('-drive') 547 self._args.append(','.join(options)) 548 self._num_drives += 1 549 return self 550 551 def add_blockdev(self, opts): 552 self._args.append('-blockdev') 553 if isinstance(opts, str): 554 self._args.append(opts) 555 else: 556 self._args.append(','.join(opts)) 557 return self 558 559 def add_incoming(self, addr): 560 self._args.append('-incoming') 561 self._args.append(addr) 562 return self 563 564 def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse: 565 cmd = 'human-monitor-command' 566 kwargs = {'command-line': command_line} 567 if use_log: 568 return self.qmp_log(cmd, **kwargs) 569 else: 570 return self.qmp(cmd, **kwargs) 571 572 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 573 """Pause drive r/w operations""" 574 if not event: 575 self.pause_drive(drive, "read_aio") 576 self.pause_drive(drive, "write_aio") 577 return 578 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 579 580 def resume_drive(self, drive: str) -> None: 581 """Resume drive r/w operations""" 582 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 583 584 def hmp_qemu_io(self, drive: str, cmd: str, 585 use_log: bool = False) -> QMPResponse: 586 """Write to a given drive using an HMP command""" 587 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 588 589 def flatten_qmp_object(self, obj, output=None, basestr=''): 590 if output is None: 591 output = dict() 592 if isinstance(obj, list): 593 for i, item in enumerate(obj): 594 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 595 elif isinstance(obj, dict): 596 for key in obj: 597 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 598 else: 599 output[basestr[:-1]] = obj # Strip trailing '.' 600 return output 601 602 def qmp_to_opts(self, obj): 603 obj = self.flatten_qmp_object(obj) 604 output_list = list() 605 for key in obj: 606 output_list += [key + '=' + obj[key]] 607 return ','.join(output_list) 608 609 def get_qmp_events_filtered(self, wait=60.0): 610 result = [] 611 for ev in self.get_qmp_events(wait=wait): 612 result.append(filter_qmp_event(ev)) 613 return result 614 615 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 616 full_cmd = OrderedDict(( 617 ("execute", cmd), 618 ("arguments", ordered_qmp(kwargs)) 619 )) 620 log(full_cmd, filters, indent=indent) 621 result = self.qmp(cmd, **kwargs) 622 log(result, filters, indent=indent) 623 return result 624 625 # Returns None on success, and an error string on failure 626 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 627 pre_finalize=None, cancel=False, wait=60.0): 628 """ 629 run_job moves a job from creation through to dismissal. 630 631 :param job: String. ID of recently-launched job 632 :param auto_finalize: Bool. True if the job was launched with 633 auto_finalize. Defaults to True. 634 :param auto_dismiss: Bool. True if the job was launched with 635 auto_dismiss=True. Defaults to False. 636 :param pre_finalize: Callback. A callable that takes no arguments to be 637 invoked prior to issuing job-finalize, if any. 638 :param cancel: Bool. When true, cancels the job after the pre_finalize 639 callback. 640 :param wait: Float. Timeout value specifying how long to wait for any 641 event, in seconds. Defaults to 60.0. 642 """ 643 match_device = {'data': {'device': job}} 644 match_id = {'data': {'id': job}} 645 events = [ 646 ('BLOCK_JOB_COMPLETED', match_device), 647 ('BLOCK_JOB_CANCELLED', match_device), 648 ('BLOCK_JOB_ERROR', match_device), 649 ('BLOCK_JOB_READY', match_device), 650 ('BLOCK_JOB_PENDING', match_id), 651 ('JOB_STATUS_CHANGE', match_id) 652 ] 653 error = None 654 while True: 655 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 656 if ev['event'] != 'JOB_STATUS_CHANGE': 657 log(ev) 658 continue 659 status = ev['data']['status'] 660 if status == 'aborting': 661 result = self.qmp('query-jobs') 662 for j in result['return']: 663 if j['id'] == job: 664 error = j['error'] 665 log('Job failed: %s' % (j['error'])) 666 elif status == 'ready': 667 self.qmp_log('job-complete', id=job) 668 elif status == 'pending' and not auto_finalize: 669 if pre_finalize: 670 pre_finalize() 671 if cancel: 672 self.qmp_log('job-cancel', id=job) 673 else: 674 self.qmp_log('job-finalize', id=job) 675 elif status == 'concluded' and not auto_dismiss: 676 self.qmp_log('job-dismiss', id=job) 677 elif status == 'null': 678 return error 679 680 # Returns None on success, and an error string on failure 681 def blockdev_create(self, options, job_id='job0', filters=None): 682 if filters is None: 683 filters = [filter_qmp_testfiles] 684 result = self.qmp_log('blockdev-create', filters=filters, 685 job_id=job_id, options=options) 686 687 if 'return' in result: 688 assert result['return'] == {} 689 job_result = self.run_job(job_id) 690 else: 691 job_result = result['error'] 692 693 log("") 694 return job_result 695 696 def enable_migration_events(self, name): 697 log('Enabling migration QMP events on %s...' % name) 698 log(self.qmp('migrate-set-capabilities', capabilities=[ 699 { 700 'capability': 'events', 701 'state': True 702 } 703 ])) 704 705 def wait_migration(self, expect_runstate): 706 while True: 707 event = self.event_wait('MIGRATION') 708 log(event, filters=[filter_qmp_event]) 709 if event['data']['status'] == 'completed': 710 break 711 # The event may occur in finish-migrate, so wait for the expected 712 # post-migration runstate 713 while self.qmp('query-status')['return']['status'] != expect_runstate: 714 pass 715 716 def node_info(self, node_name): 717 nodes = self.qmp('query-named-block-nodes') 718 for x in nodes['return']: 719 if x['node-name'] == node_name: 720 return x 721 return None 722 723 def query_bitmaps(self): 724 res = self.qmp("query-named-block-nodes") 725 return {device['node-name']: device['dirty-bitmaps'] 726 for device in res['return'] if 'dirty-bitmaps' in device} 727 728 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 729 """ 730 get a specific bitmap from the object returned by query_bitmaps. 731 :param recording: If specified, filter results by the specified value. 732 :param bitmaps: If specified, use it instead of call query_bitmaps() 733 """ 734 if bitmaps is None: 735 bitmaps = self.query_bitmaps() 736 737 for bitmap in bitmaps[node_name]: 738 if bitmap.get('name', '') == bitmap_name: 739 if recording is None or bitmap.get('recording') == recording: 740 return bitmap 741 return None 742 743 def check_bitmap_status(self, node_name, bitmap_name, fields): 744 ret = self.get_bitmap(node_name, bitmap_name) 745 746 return fields.items() <= ret.items() 747 748 def assert_block_path(self, root, path, expected_node, graph=None): 749 """ 750 Check whether the node under the given path in the block graph 751 is @expected_node. 752 753 @root is the node name of the node where the @path is rooted. 754 755 @path is a string that consists of child names separated by 756 slashes. It must begin with a slash. 757 758 Examples for @root + @path: 759 - root="qcow2-node", path="/backing/file" 760 - root="quorum-node", path="/children.2/file" 761 762 Hypothetically, @path could be empty, in which case it would 763 point to @root. However, in practice this case is not useful 764 and hence not allowed. 765 766 @expected_node may be None. (All elements of the path but the 767 leaf must still exist.) 768 769 @graph may be None or the result of an x-debug-query-block-graph 770 call that has already been performed. 771 """ 772 if graph is None: 773 graph = self.qmp('x-debug-query-block-graph')['return'] 774 775 iter_path = iter(path.split('/')) 776 777 # Must start with a / 778 assert next(iter_path) == '' 779 780 node = next((node for node in graph['nodes'] if node['name'] == root), 781 None) 782 783 # An empty @path is not allowed, so the root node must be present 784 assert node is not None, 'Root node %s not found' % root 785 786 for child_name in iter_path: 787 assert node is not None, 'Cannot follow path %s%s' % (root, path) 788 789 try: 790 node_id = next(edge['child'] for edge in graph['edges'] 791 if (edge['parent'] == node['id'] and 792 edge['name'] == child_name)) 793 794 node = next(node for node in graph['nodes'] 795 if node['id'] == node_id) 796 797 except StopIteration: 798 node = None 799 800 if node is None: 801 assert expected_node is None, \ 802 'No node found under %s (but expected %s)' % \ 803 (path, expected_node) 804 else: 805 assert node['name'] == expected_node, \ 806 'Found node %s under %s (but expected %s)' % \ 807 (node['name'], path, expected_node) 808 809index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 810 811class QMPTestCase(unittest.TestCase): 812 '''Abstract base class for QMP test cases''' 813 814 def __init__(self, *args, **kwargs): 815 super().__init__(*args, **kwargs) 816 # Many users of this class set a VM property we rely on heavily 817 # in the methods below. 818 self.vm = None 819 820 def dictpath(self, d, path): 821 '''Traverse a path in a nested dict''' 822 for component in path.split('/'): 823 m = index_re.match(component) 824 if m: 825 component, idx = m.groups() 826 idx = int(idx) 827 828 if not isinstance(d, dict) or component not in d: 829 self.fail(f'failed path traversal for "{path}" in "{d}"') 830 d = d[component] 831 832 if m: 833 if not isinstance(d, list): 834 self.fail(f'path component "{component}" in "{path}" ' 835 f'is not a list in "{d}"') 836 try: 837 d = d[idx] 838 except IndexError: 839 self.fail(f'invalid index "{idx}" in path "{path}" ' 840 f'in "{d}"') 841 return d 842 843 def assert_qmp_absent(self, d, path): 844 try: 845 result = self.dictpath(d, path) 846 except AssertionError: 847 return 848 self.fail('path "%s" has value "%s"' % (path, str(result))) 849 850 def assert_qmp(self, d, path, value): 851 '''Assert that the value for a specific path in a QMP dict 852 matches. When given a list of values, assert that any of 853 them matches.''' 854 855 result = self.dictpath(d, path) 856 857 # [] makes no sense as a list of valid values, so treat it as 858 # an actual single value. 859 if isinstance(value, list) and value != []: 860 for v in value: 861 if result == v: 862 return 863 self.fail('no match for "%s" in %s' % (str(result), str(value))) 864 else: 865 self.assertEqual(result, value, 866 '"%s" is "%s", expected "%s"' 867 % (path, str(result), str(value))) 868 869 def assert_no_active_block_jobs(self): 870 result = self.vm.qmp('query-block-jobs') 871 self.assert_qmp(result, 'return', []) 872 873 def assert_has_block_node(self, node_name=None, file_name=None): 874 """Issue a query-named-block-nodes and assert node_name and/or 875 file_name is present in the result""" 876 def check_equal_or_none(a, b): 877 return a is None or b is None or a == b 878 assert node_name or file_name 879 result = self.vm.qmp('query-named-block-nodes') 880 for x in result["return"]: 881 if check_equal_or_none(x.get("node-name"), node_name) and \ 882 check_equal_or_none(x.get("file"), file_name): 883 return 884 self.fail("Cannot find %s %s in result:\n%s" % 885 (node_name, file_name, result)) 886 887 def assert_json_filename_equal(self, json_filename, reference): 888 '''Asserts that the given filename is a json: filename and that its 889 content is equal to the given reference object''' 890 self.assertEqual(json_filename[:5], 'json:') 891 self.assertEqual( 892 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 893 self.vm.flatten_qmp_object(reference) 894 ) 895 896 def cancel_and_wait(self, drive='drive0', force=False, 897 resume=False, wait=60.0): 898 '''Cancel a block job and wait for it to finish, returning the event''' 899 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 900 self.assert_qmp(result, 'return', {}) 901 902 if resume: 903 self.vm.resume_drive(drive) 904 905 cancelled = False 906 result = None 907 while not cancelled: 908 for event in self.vm.get_qmp_events(wait=wait): 909 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 910 event['event'] == 'BLOCK_JOB_CANCELLED': 911 self.assert_qmp(event, 'data/device', drive) 912 result = event 913 cancelled = True 914 elif event['event'] == 'JOB_STATUS_CHANGE': 915 self.assert_qmp(event, 'data/id', drive) 916 917 918 self.assert_no_active_block_jobs() 919 return result 920 921 def wait_until_completed(self, drive='drive0', check_offset=True, 922 wait=60.0, error=None): 923 '''Wait for a block job to finish, returning the event''' 924 while True: 925 for event in self.vm.get_qmp_events(wait=wait): 926 if event['event'] == 'BLOCK_JOB_COMPLETED': 927 self.assert_qmp(event, 'data/device', drive) 928 if error is None: 929 self.assert_qmp_absent(event, 'data/error') 930 if check_offset: 931 self.assert_qmp(event, 'data/offset', 932 event['data']['len']) 933 else: 934 self.assert_qmp(event, 'data/error', error) 935 self.assert_no_active_block_jobs() 936 return event 937 if event['event'] == 'JOB_STATUS_CHANGE': 938 self.assert_qmp(event, 'data/id', drive) 939 940 def wait_ready(self, drive='drive0'): 941 """Wait until a BLOCK_JOB_READY event, and return the event.""" 942 f = {'data': {'type': 'mirror', 'device': drive}} 943 return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 944 945 def wait_ready_and_cancel(self, drive='drive0'): 946 self.wait_ready(drive=drive) 947 event = self.cancel_and_wait(drive=drive) 948 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 949 self.assert_qmp(event, 'data/type', 'mirror') 950 self.assert_qmp(event, 'data/offset', event['data']['len']) 951 952 def complete_and_wait(self, drive='drive0', wait_ready=True, 953 completion_error=None): 954 '''Complete a block job and wait for it to finish''' 955 if wait_ready: 956 self.wait_ready(drive=drive) 957 958 result = self.vm.qmp('block-job-complete', device=drive) 959 self.assert_qmp(result, 'return', {}) 960 961 event = self.wait_until_completed(drive=drive, error=completion_error) 962 self.assert_qmp(event, 'data/type', 'mirror') 963 964 def pause_wait(self, job_id='job0'): 965 with Timeout(3, "Timeout waiting for job to pause"): 966 while True: 967 result = self.vm.qmp('query-block-jobs') 968 found = False 969 for job in result['return']: 970 if job['device'] == job_id: 971 found = True 972 if job['paused'] and not job['busy']: 973 return job 974 break 975 assert found 976 977 def pause_job(self, job_id='job0', wait=True): 978 result = self.vm.qmp('block-job-pause', device=job_id) 979 self.assert_qmp(result, 'return', {}) 980 if wait: 981 return self.pause_wait(job_id) 982 return result 983 984 def case_skip(self, reason): 985 '''Skip this test case''' 986 case_notrun(reason) 987 self.skipTest(reason) 988 989 990def notrun(reason): 991 '''Skip this test suite''' 992 # Each test in qemu-iotests has a number ("seq") 993 seq = os.path.basename(sys.argv[0]) 994 995 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 996 logger.warning("%s not run: %s", seq, reason) 997 sys.exit(0) 998 999def case_notrun(reason): 1000 '''Mark this test case as not having been run (without actually 1001 skipping it, that is left to the caller). See 1002 QMPTestCase.case_skip() for a variant that actually skips the 1003 current test case.''' 1004 1005 # Each test in qemu-iotests has a number ("seq") 1006 seq = os.path.basename(sys.argv[0]) 1007 1008 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1009 ' [case not run] ' + reason + '\n') 1010 1011def _verify_image_format(supported_fmts: Sequence[str] = (), 1012 unsupported_fmts: Sequence[str] = ()) -> None: 1013 assert not (supported_fmts and unsupported_fmts) 1014 1015 if 'generic' in supported_fmts and \ 1016 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1017 # similar to 1018 # _supported_fmt generic 1019 # for bash tests 1020 if imgfmt == 'luks': 1021 verify_working_luks() 1022 return 1023 1024 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1025 if not_sup or (imgfmt in unsupported_fmts): 1026 notrun('not suitable for this image format: %s' % imgfmt) 1027 1028 if imgfmt == 'luks': 1029 verify_working_luks() 1030 1031def _verify_protocol(supported: Sequence[str] = (), 1032 unsupported: Sequence[str] = ()) -> None: 1033 assert not (supported and unsupported) 1034 1035 if 'generic' in supported: 1036 return 1037 1038 not_sup = supported and (imgproto not in supported) 1039 if not_sup or (imgproto in unsupported): 1040 notrun('not suitable for this protocol: %s' % imgproto) 1041 1042def _verify_platform(supported: Sequence[str] = (), 1043 unsupported: Sequence[str] = ()) -> None: 1044 if any((sys.platform.startswith(x) for x in unsupported)): 1045 notrun('not suitable for this OS: %s' % sys.platform) 1046 1047 if supported: 1048 if not any((sys.platform.startswith(x) for x in supported)): 1049 notrun('not suitable for this OS: %s' % sys.platform) 1050 1051def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1052 if supported_cache_modes and (cachemode not in supported_cache_modes): 1053 notrun('not suitable for this cache mode: %s' % cachemode) 1054 1055def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1056 if supported_aio_modes and (aiomode not in supported_aio_modes): 1057 notrun('not suitable for this aio mode: %s' % aiomode) 1058 1059def supports_quorum(): 1060 return 'quorum' in qemu_img_pipe('--help') 1061 1062def verify_quorum(): 1063 '''Skip test suite if quorum support is not available''' 1064 if not supports_quorum(): 1065 notrun('quorum support missing') 1066 1067def has_working_luks() -> Tuple[bool, str]: 1068 """ 1069 Check whether our LUKS driver can actually create images 1070 (this extends to LUKS encryption for qcow2). 1071 1072 If not, return the reason why. 1073 """ 1074 1075 img_file = f'{test_dir}/luks-test.luks' 1076 (output, status) = \ 1077 qemu_img_pipe_and_status('create', '-f', 'luks', 1078 '--object', luks_default_secret_object, 1079 '-o', luks_default_key_secret_opt, 1080 '-o', 'iter-time=10', 1081 img_file, '1G') 1082 try: 1083 os.remove(img_file) 1084 except OSError: 1085 pass 1086 1087 if status != 0: 1088 reason = output 1089 for line in output.splitlines(): 1090 if img_file + ':' in line: 1091 reason = line.split(img_file + ':', 1)[1].strip() 1092 break 1093 1094 return (False, reason) 1095 else: 1096 return (True, '') 1097 1098def verify_working_luks(): 1099 """ 1100 Skip test suite if LUKS does not work 1101 """ 1102 (working, reason) = has_working_luks() 1103 if not working: 1104 notrun(reason) 1105 1106def qemu_pipe(*args): 1107 """ 1108 Run qemu with an option to print something and exit (e.g. a help option). 1109 1110 :return: QEMU's stdout output. 1111 """ 1112 args = [qemu_prog] + qemu_opts + list(args) 1113 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 1114 stderr=subprocess.STDOUT, 1115 universal_newlines=True) 1116 output = subp.communicate()[0] 1117 if subp.returncode < 0: 1118 sys.stderr.write('qemu received signal %i: %s\n' % 1119 (-subp.returncode, ' '.join(args))) 1120 return output 1121 1122def supported_formats(read_only=False): 1123 '''Set 'read_only' to True to check ro-whitelist 1124 Otherwise, rw-whitelist is checked''' 1125 1126 if not hasattr(supported_formats, "formats"): 1127 supported_formats.formats = {} 1128 1129 if read_only not in supported_formats.formats: 1130 format_message = qemu_pipe("-drive", "format=help") 1131 line = 1 if read_only else 0 1132 supported_formats.formats[read_only] = \ 1133 format_message.splitlines()[line].split(":")[1].split() 1134 1135 return supported_formats.formats[read_only] 1136 1137def skip_if_unsupported(required_formats=(), read_only=False): 1138 '''Skip Test Decorator 1139 Runs the test if all the required formats are whitelisted''' 1140 def skip_test_decorator(func): 1141 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1142 **kwargs: Dict[str, Any]) -> None: 1143 if callable(required_formats): 1144 fmts = required_formats(test_case) 1145 else: 1146 fmts = required_formats 1147 1148 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1149 if usf_list: 1150 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1151 test_case.case_skip(msg) 1152 else: 1153 func(test_case, *args, **kwargs) 1154 return func_wrapper 1155 return skip_test_decorator 1156 1157def skip_for_formats(formats: Sequence[str] = ()) \ 1158 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1159 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1160 '''Skip Test Decorator 1161 Skips the test for the given formats''' 1162 def skip_test_decorator(func): 1163 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1164 **kwargs: Dict[str, Any]) -> None: 1165 if imgfmt in formats: 1166 msg = f'{test_case}: Skipped for format {imgfmt}' 1167 test_case.case_skip(msg) 1168 else: 1169 func(test_case, *args, **kwargs) 1170 return func_wrapper 1171 return skip_test_decorator 1172 1173def skip_if_user_is_root(func): 1174 '''Skip Test Decorator 1175 Runs the test only without root permissions''' 1176 def func_wrapper(*args, **kwargs): 1177 if os.getuid() == 0: 1178 case_notrun('{}: cannot be run as root'.format(args[0])) 1179 return None 1180 else: 1181 return func(*args, **kwargs) 1182 return func_wrapper 1183 1184def execute_unittest(debug=False): 1185 """Executes unittests within the calling module.""" 1186 1187 verbosity = 2 if debug else 1 1188 1189 if debug: 1190 output = sys.stdout 1191 else: 1192 # We need to filter out the time taken from the output so that 1193 # qemu-iotest can reliably diff the results against master output. 1194 output = io.StringIO() 1195 1196 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1197 verbosity=verbosity) 1198 try: 1199 # unittest.main() will use sys.exit(); so expect a SystemExit 1200 # exception 1201 unittest.main(testRunner=runner) 1202 finally: 1203 # We need to filter out the time taken from the output so that 1204 # qemu-iotest can reliably diff the results against master output. 1205 if not debug: 1206 out = output.getvalue() 1207 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1208 1209 # Hide skipped tests from the reference output 1210 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1211 out_first_line, out_rest = out.split('\n', 1) 1212 out = out_first_line.replace('s', '.') + '\n' + out_rest 1213 1214 sys.stderr.write(out) 1215 1216def execute_setup_common(supported_fmts: Sequence[str] = (), 1217 supported_platforms: Sequence[str] = (), 1218 supported_cache_modes: Sequence[str] = (), 1219 supported_aio_modes: Sequence[str] = (), 1220 unsupported_fmts: Sequence[str] = (), 1221 supported_protocols: Sequence[str] = (), 1222 unsupported_protocols: Sequence[str] = ()) -> bool: 1223 """ 1224 Perform necessary setup for either script-style or unittest-style tests. 1225 1226 :return: Bool; Whether or not debug mode has been requested via the CLI. 1227 """ 1228 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1229 1230 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1231 # indicate that we're not being run via "check". There may be 1232 # other things set up by "check" that individual test cases rely 1233 # on. 1234 if test_dir is None or qemu_default_machine is None: 1235 sys.stderr.write('Please run this test via the "check" script\n') 1236 sys.exit(os.EX_USAGE) 1237 1238 debug = '-d' in sys.argv 1239 if debug: 1240 sys.argv.remove('-d') 1241 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1242 1243 _verify_image_format(supported_fmts, unsupported_fmts) 1244 _verify_protocol(supported_protocols, unsupported_protocols) 1245 _verify_platform(supported=supported_platforms) 1246 _verify_cache_mode(supported_cache_modes) 1247 _verify_aio_mode(supported_aio_modes) 1248 1249 return debug 1250 1251def execute_test(*args, test_function=None, **kwargs): 1252 """Run either unittest or script-style tests.""" 1253 1254 debug = execute_setup_common(*args, **kwargs) 1255 if not test_function: 1256 execute_unittest(debug) 1257 else: 1258 test_function() 1259 1260def activate_logging(): 1261 """Activate iotests.log() output to stdout for script-style tests.""" 1262 handler = logging.StreamHandler(stream=sys.stdout) 1263 formatter = logging.Formatter('%(message)s') 1264 handler.setFormatter(formatter) 1265 test_logger.addHandler(handler) 1266 test_logger.setLevel(logging.INFO) 1267 test_logger.propagate = False 1268 1269# This is called from script-style iotests without a single point of entry 1270def script_initialize(*args, **kwargs): 1271 """Initialize script-style tests without running any tests.""" 1272 activate_logging() 1273 execute_setup_common(*args, **kwargs) 1274 1275# This is called from script-style iotests with a single point of entry 1276def script_main(test_function, *args, **kwargs): 1277 """Run script-style tests outside of the unittest framework""" 1278 activate_logging() 1279 execute_test(*args, test_function=test_function, **kwargs) 1280 1281# This is called from unittest style iotests 1282def main(*args, **kwargs): 1283 """Run tests using the unittest framework""" 1284 execute_test(*args, **kwargs) 1285