1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78test_dir = os.environ.get('TEST_DIR') 79sock_dir = os.environ.get('SOCK_DIR') 80output_dir = os.environ.get('OUTPUT_DIR', '.') 81cachemode = os.environ.get('CACHEMODE') 82aiomode = os.environ.get('AIOMODE') 83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 84 85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 86 87luks_default_secret_object = 'secret,id=keysec0,data=' + \ 88 os.environ.get('IMGKEYSECRET', '') 89luks_default_key_secret_opt = 'key-secret=keysec0' 90 91 92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 93 connect_stderr: bool = True) -> Tuple[str, int]: 94 """ 95 Run a tool and return both its output and its exit code 96 """ 97 stderr = subprocess.STDOUT if connect_stderr else None 98 subp = subprocess.Popen(args, 99 stdout=subprocess.PIPE, 100 stderr=stderr, 101 universal_newlines=True) 102 output = subp.communicate()[0] 103 if subp.returncode < 0: 104 sys.stderr.write('%s received signal %i: %s\n' 105 % (tool, -subp.returncode, 106 ' '.join(qemu_img_args + list(args)))) 107 return (output, subp.returncode) 108 109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 110 """ 111 Run qemu-img and return both its output and its exit code 112 """ 113 full_args = qemu_img_args + list(args) 114 return qemu_tool_pipe_and_status('qemu-img', full_args) 115 116def qemu_img(*args: str) -> int: 117 '''Run qemu-img and return the exit code''' 118 return qemu_img_pipe_and_status(*args)[1] 119 120def ordered_qmp(qmsg, conv_keys=True): 121 # Dictionaries are not ordered prior to 3.6, therefore: 122 if isinstance(qmsg, list): 123 return [ordered_qmp(atom) for atom in qmsg] 124 if isinstance(qmsg, dict): 125 od = OrderedDict() 126 for k, v in sorted(qmsg.items()): 127 if conv_keys: 128 k = k.replace('_', '-') 129 od[k] = ordered_qmp(v, conv_keys=False) 130 return od 131 return qmsg 132 133def qemu_img_create(*args): 134 args = list(args) 135 136 # default luks support 137 if '-f' in args and args[args.index('-f') + 1] == 'luks': 138 if '-o' in args: 139 i = args.index('-o') 140 if 'key-secret' not in args[i + 1]: 141 args[i + 1].append(luks_default_key_secret_opt) 142 args.insert(i + 2, '--object') 143 args.insert(i + 3, luks_default_secret_object) 144 else: 145 args = ['-o', luks_default_key_secret_opt, 146 '--object', luks_default_secret_object] + args 147 148 args.insert(0, 'create') 149 150 return qemu_img(*args) 151 152def qemu_img_measure(*args): 153 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 154 155def qemu_img_check(*args): 156 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 157 158def qemu_img_verbose(*args): 159 '''Run qemu-img without suppressing its output and return the exit code''' 160 exitcode = subprocess.call(qemu_img_args + list(args)) 161 if exitcode < 0: 162 sys.stderr.write('qemu-img received signal %i: %s\n' 163 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 164 return exitcode 165 166def qemu_img_pipe(*args: str) -> str: 167 '''Run qemu-img and return its output''' 168 return qemu_img_pipe_and_status(*args)[0] 169 170def qemu_img_log(*args): 171 result = qemu_img_pipe(*args) 172 log(result, filters=[filter_testfiles]) 173 return result 174 175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 176 args = ['info'] 177 if imgopts: 178 args.append('--image-opts') 179 else: 180 args += ['-f', imgfmt] 181 args += extra_args 182 args.append(filename) 183 184 output = qemu_img_pipe(*args) 185 if not filter_path: 186 filter_path = filename 187 log(filter_img_info(output, filter_path)) 188 189def qemu_io(*args): 190 '''Run qemu-io and return the stdout data''' 191 args = qemu_io_args + list(args) 192 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 193 stderr=subprocess.STDOUT, 194 universal_newlines=True) 195 output = subp.communicate()[0] 196 if subp.returncode < 0: 197 sys.stderr.write('qemu-io received signal %i: %s\n' 198 % (-subp.returncode, ' '.join(args))) 199 return output 200 201def qemu_io_log(*args): 202 result = qemu_io(*args) 203 log(result, filters=[filter_testfiles, filter_qemu_io]) 204 return result 205 206def qemu_io_silent(*args): 207 '''Run qemu-io and return the exit code, suppressing stdout''' 208 if '-f' in args or '--image-opts' in args: 209 default_args = qemu_io_args_no_fmt 210 else: 211 default_args = qemu_io_args 212 213 args = default_args + list(args) 214 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 215 if exitcode < 0: 216 sys.stderr.write('qemu-io received signal %i: %s\n' % 217 (-exitcode, ' '.join(args))) 218 return exitcode 219 220def qemu_io_silent_check(*args): 221 '''Run qemu-io and return the true if subprocess returned 0''' 222 args = qemu_io_args + list(args) 223 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 224 stderr=subprocess.STDOUT) 225 return exitcode == 0 226 227def get_virtio_scsi_device(): 228 if qemu_default_machine == 's390-ccw-virtio': 229 return 'virtio-scsi-ccw' 230 return 'virtio-scsi-pci' 231 232class QemuIoInteractive: 233 def __init__(self, *args): 234 self.args = qemu_io_args_no_fmt + list(args) 235 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 236 stdout=subprocess.PIPE, 237 stderr=subprocess.STDOUT, 238 universal_newlines=True) 239 out = self._p.stdout.read(9) 240 if out != 'qemu-io> ': 241 # Most probably qemu-io just failed to start. 242 # Let's collect the whole output and exit. 243 out += self._p.stdout.read() 244 self._p.wait(timeout=1) 245 raise ValueError(out) 246 247 def close(self): 248 self._p.communicate('q\n') 249 250 def _read_output(self): 251 pattern = 'qemu-io> ' 252 n = len(pattern) 253 pos = 0 254 s = [] 255 while pos != n: 256 c = self._p.stdout.read(1) 257 # check unexpected EOF 258 assert c != '' 259 s.append(c) 260 if c == pattern[pos]: 261 pos += 1 262 else: 263 pos = 0 264 265 return ''.join(s[:-n]) 266 267 def cmd(self, cmd): 268 # quit command is in close(), '\n' is added automatically 269 assert '\n' not in cmd 270 cmd = cmd.strip() 271 assert cmd not in ('q', 'quit') 272 self._p.stdin.write(cmd + '\n') 273 self._p.stdin.flush() 274 return self._read_output() 275 276 277def qemu_nbd(*args): 278 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 279 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 280 281def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 282 '''Run qemu-nbd in daemon mode and return both the parent's exit code 283 and its output in case of an error''' 284 full_args = qemu_nbd_args + ['--fork'] + list(args) 285 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 286 connect_stderr=False) 287 return returncode, output if returncode else '' 288 289def qemu_nbd_list_log(*args: str) -> str: 290 '''Run qemu-nbd to list remote exports''' 291 full_args = [qemu_nbd_prog, '-L'] + list(args) 292 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 293 log(output, filters=[filter_testfiles, filter_nbd_exports]) 294 return output 295 296@contextmanager 297def qemu_nbd_popen(*args): 298 '''Context manager running qemu-nbd within the context''' 299 pid_file = file_path("pid") 300 301 cmd = list(qemu_nbd_args) 302 cmd.extend(('--persistent', '--pid-file', pid_file)) 303 cmd.extend(args) 304 305 log('Start NBD server') 306 p = subprocess.Popen(cmd) 307 try: 308 while not os.path.exists(pid_file): 309 if p.poll() is not None: 310 raise RuntimeError( 311 "qemu-nbd terminated with exit code {}: {}" 312 .format(p.returncode, ' '.join(cmd))) 313 314 time.sleep(0.01) 315 yield 316 finally: 317 log('Kill NBD server') 318 p.kill() 319 p.wait() 320 321def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 322 '''Return True if two image files are identical''' 323 return qemu_img('compare', '-f', fmt1, 324 '-F', fmt2, img1, img2) == 0 325 326def create_image(name, size): 327 '''Create a fully-allocated raw image with sector markers''' 328 file = open(name, 'wb') 329 i = 0 330 while i < size: 331 sector = struct.pack('>l504xl', i // 512, i // 512) 332 file.write(sector) 333 i = i + 512 334 file.close() 335 336def image_size(img): 337 '''Return image's virtual size''' 338 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 339 return json.loads(r)['virtual-size'] 340 341def is_str(val): 342 return isinstance(val, str) 343 344test_dir_re = re.compile(r"%s" % test_dir) 345def filter_test_dir(msg): 346 return test_dir_re.sub("TEST_DIR", msg) 347 348win32_re = re.compile(r"\r") 349def filter_win32(msg): 350 return win32_re.sub("", msg) 351 352qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 353 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 354 r"and [0-9\/.inf]* ops\/sec\)") 355def filter_qemu_io(msg): 356 msg = filter_win32(msg) 357 return qemu_io_re.sub("X ops; XX:XX:XX.X " 358 "(XXX YYY/sec and XXX ops/sec)", msg) 359 360chown_re = re.compile(r"chown [0-9]+:[0-9]+") 361def filter_chown(msg): 362 return chown_re.sub("chown UID:GID", msg) 363 364def filter_qmp_event(event): 365 '''Filter a QMP event dict''' 366 event = dict(event) 367 if 'timestamp' in event: 368 event['timestamp']['seconds'] = 'SECS' 369 event['timestamp']['microseconds'] = 'USECS' 370 return event 371 372def filter_qmp(qmsg, filter_fn): 373 '''Given a string filter, filter a QMP object's values. 374 filter_fn takes a (key, value) pair.''' 375 # Iterate through either lists or dicts; 376 if isinstance(qmsg, list): 377 items = enumerate(qmsg) 378 else: 379 items = qmsg.items() 380 381 for k, v in items: 382 if isinstance(v, (dict, list)): 383 qmsg[k] = filter_qmp(v, filter_fn) 384 else: 385 qmsg[k] = filter_fn(k, v) 386 return qmsg 387 388def filter_testfiles(msg): 389 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 390 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 391 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 392 393def filter_qmp_testfiles(qmsg): 394 def _filter(_key, value): 395 if is_str(value): 396 return filter_testfiles(value) 397 return value 398 return filter_qmp(qmsg, _filter) 399 400def filter_virtio_scsi(output: str) -> str: 401 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output) 402 403def filter_qmp_virtio_scsi(qmsg): 404 def _filter(_key, value): 405 if is_str(value): 406 return filter_virtio_scsi(value) 407 return value 408 return filter_qmp(qmsg, _filter) 409 410def filter_generated_node_ids(msg): 411 return re.sub("#block[0-9]+", "NODE_NAME", msg) 412 413def filter_img_info(output, filename): 414 lines = [] 415 for line in output.split('\n'): 416 if 'disk size' in line or 'actual-size' in line: 417 continue 418 line = line.replace(filename, 'TEST_IMG') 419 line = filter_testfiles(line) 420 line = line.replace(imgfmt, 'IMGFMT') 421 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 422 line = re.sub('uuid: [-a-f0-9]+', 423 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 424 line) 425 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 426 lines.append(line) 427 return '\n'.join(lines) 428 429def filter_imgfmt(msg): 430 return msg.replace(imgfmt, 'IMGFMT') 431 432def filter_qmp_imgfmt(qmsg): 433 def _filter(_key, value): 434 if is_str(value): 435 return filter_imgfmt(value) 436 return value 437 return filter_qmp(qmsg, _filter) 438 439def filter_nbd_exports(output: str) -> str: 440 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 441 442 443Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 444 445def log(msg: Msg, 446 filters: Iterable[Callable[[Msg], Msg]] = (), 447 indent: Optional[int] = None) -> None: 448 """ 449 Logs either a string message or a JSON serializable message (like QMP). 450 If indent is provided, JSON serializable messages are pretty-printed. 451 """ 452 for flt in filters: 453 msg = flt(msg) 454 if isinstance(msg, (dict, list)): 455 # Don't sort if it's already sorted 456 do_sort = not isinstance(msg, OrderedDict) 457 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 458 else: 459 test_logger.info(msg) 460 461class Timeout: 462 def __init__(self, seconds, errmsg="Timeout"): 463 self.seconds = seconds 464 self.errmsg = errmsg 465 def __enter__(self): 466 signal.signal(signal.SIGALRM, self.timeout) 467 signal.setitimer(signal.ITIMER_REAL, self.seconds) 468 return self 469 def __exit__(self, exc_type, value, traceback): 470 signal.setitimer(signal.ITIMER_REAL, 0) 471 return False 472 def timeout(self, signum, frame): 473 raise Exception(self.errmsg) 474 475def file_pattern(name): 476 return "{0}-{1}".format(os.getpid(), name) 477 478class FilePath: 479 """ 480 Context manager generating multiple file names. The generated files are 481 removed when exiting the context. 482 483 Example usage: 484 485 with FilePath('a.img', 'b.img') as (img_a, img_b): 486 # Use img_a and img_b here... 487 488 # a.img and b.img are automatically removed here. 489 490 By default images are created in iotests.test_dir. To create sockets use 491 iotests.sock_dir: 492 493 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 494 495 For convenience, calling with one argument yields a single file instead of 496 a tuple with one item. 497 498 """ 499 def __init__(self, *names, base_dir=test_dir): 500 self.paths = [os.path.join(base_dir, file_pattern(name)) 501 for name in names] 502 503 def __enter__(self): 504 if len(self.paths) == 1: 505 return self.paths[0] 506 else: 507 return self.paths 508 509 def __exit__(self, exc_type, exc_val, exc_tb): 510 for path in self.paths: 511 try: 512 os.remove(path) 513 except OSError: 514 pass 515 return False 516 517 518def file_path_remover(): 519 for path in reversed(file_path_remover.paths): 520 try: 521 os.remove(path) 522 except OSError: 523 pass 524 525 526def file_path(*names, base_dir=test_dir): 527 ''' Another way to get auto-generated filename that cleans itself up. 528 529 Use is as simple as: 530 531 img_a, img_b = file_path('a.img', 'b.img') 532 sock = file_path('socket') 533 ''' 534 535 if not hasattr(file_path_remover, 'paths'): 536 file_path_remover.paths = [] 537 atexit.register(file_path_remover) 538 539 paths = [] 540 for name in names: 541 filename = file_pattern(name) 542 path = os.path.join(base_dir, filename) 543 file_path_remover.paths.append(path) 544 paths.append(path) 545 546 return paths[0] if len(paths) == 1 else paths 547 548def remote_filename(path): 549 if imgproto == 'file': 550 return path 551 elif imgproto == 'ssh': 552 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 553 else: 554 raise Exception("Protocol %s not supported" % (imgproto)) 555 556class VM(qtest.QEMUQtestMachine): 557 '''A QEMU VM''' 558 559 def __init__(self, path_suffix=''): 560 name = "qemu%s-%d" % (path_suffix, os.getpid()) 561 super().__init__(qemu_prog, qemu_opts, name=name, 562 test_dir=test_dir, 563 socket_scm_helper=socket_scm_helper, 564 sock_dir=sock_dir) 565 self._num_drives = 0 566 567 def add_object(self, opts): 568 self._args.append('-object') 569 self._args.append(opts) 570 return self 571 572 def add_device(self, opts): 573 self._args.append('-device') 574 self._args.append(opts) 575 return self 576 577 def add_drive_raw(self, opts): 578 self._args.append('-drive') 579 self._args.append(opts) 580 return self 581 582 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 583 '''Add a virtio-blk drive to the VM''' 584 options = ['if=%s' % interface, 585 'id=drive%d' % self._num_drives] 586 587 if path is not None: 588 options.append('file=%s' % path) 589 options.append('format=%s' % img_format) 590 options.append('cache=%s' % cachemode) 591 options.append('aio=%s' % aiomode) 592 593 if opts: 594 options.append(opts) 595 596 if img_format == 'luks' and 'key-secret' not in opts: 597 # default luks support 598 if luks_default_secret_object not in self._args: 599 self.add_object(luks_default_secret_object) 600 601 options.append(luks_default_key_secret_opt) 602 603 self._args.append('-drive') 604 self._args.append(','.join(options)) 605 self._num_drives += 1 606 return self 607 608 def add_blockdev(self, opts): 609 self._args.append('-blockdev') 610 if isinstance(opts, str): 611 self._args.append(opts) 612 else: 613 self._args.append(','.join(opts)) 614 return self 615 616 def add_incoming(self, addr): 617 self._args.append('-incoming') 618 self._args.append(addr) 619 return self 620 621 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 622 cmd = 'human-monitor-command' 623 kwargs: Dict[str, Any] = {'command-line': command_line} 624 if use_log: 625 return self.qmp_log(cmd, **kwargs) 626 else: 627 return self.qmp(cmd, **kwargs) 628 629 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 630 """Pause drive r/w operations""" 631 if not event: 632 self.pause_drive(drive, "read_aio") 633 self.pause_drive(drive, "write_aio") 634 return 635 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 636 637 def resume_drive(self, drive: str) -> None: 638 """Resume drive r/w operations""" 639 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 640 641 def hmp_qemu_io(self, drive: str, cmd: str, 642 use_log: bool = False) -> QMPMessage: 643 """Write to a given drive using an HMP command""" 644 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 645 646 def flatten_qmp_object(self, obj, output=None, basestr=''): 647 if output is None: 648 output = dict() 649 if isinstance(obj, list): 650 for i, item in enumerate(obj): 651 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 652 elif isinstance(obj, dict): 653 for key in obj: 654 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 655 else: 656 output[basestr[:-1]] = obj # Strip trailing '.' 657 return output 658 659 def qmp_to_opts(self, obj): 660 obj = self.flatten_qmp_object(obj) 661 output_list = list() 662 for key in obj: 663 output_list += [key + '=' + obj[key]] 664 return ','.join(output_list) 665 666 def get_qmp_events_filtered(self, wait=60.0): 667 result = [] 668 for ev in self.get_qmp_events(wait=wait): 669 result.append(filter_qmp_event(ev)) 670 return result 671 672 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 673 full_cmd = OrderedDict(( 674 ("execute", cmd), 675 ("arguments", ordered_qmp(kwargs)) 676 )) 677 log(full_cmd, filters, indent=indent) 678 result = self.qmp(cmd, **kwargs) 679 log(result, filters, indent=indent) 680 return result 681 682 # Returns None on success, and an error string on failure 683 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 684 pre_finalize=None, cancel=False, wait=60.0): 685 """ 686 run_job moves a job from creation through to dismissal. 687 688 :param job: String. ID of recently-launched job 689 :param auto_finalize: Bool. True if the job was launched with 690 auto_finalize. Defaults to True. 691 :param auto_dismiss: Bool. True if the job was launched with 692 auto_dismiss=True. Defaults to False. 693 :param pre_finalize: Callback. A callable that takes no arguments to be 694 invoked prior to issuing job-finalize, if any. 695 :param cancel: Bool. When true, cancels the job after the pre_finalize 696 callback. 697 :param wait: Float. Timeout value specifying how long to wait for any 698 event, in seconds. Defaults to 60.0. 699 """ 700 match_device = {'data': {'device': job}} 701 match_id = {'data': {'id': job}} 702 events = [ 703 ('BLOCK_JOB_COMPLETED', match_device), 704 ('BLOCK_JOB_CANCELLED', match_device), 705 ('BLOCK_JOB_ERROR', match_device), 706 ('BLOCK_JOB_READY', match_device), 707 ('BLOCK_JOB_PENDING', match_id), 708 ('JOB_STATUS_CHANGE', match_id) 709 ] 710 error = None 711 while True: 712 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 713 if ev['event'] != 'JOB_STATUS_CHANGE': 714 log(ev) 715 continue 716 status = ev['data']['status'] 717 if status == 'aborting': 718 result = self.qmp('query-jobs') 719 for j in result['return']: 720 if j['id'] == job: 721 error = j['error'] 722 log('Job failed: %s' % (j['error'])) 723 elif status == 'ready': 724 self.qmp_log('job-complete', id=job) 725 elif status == 'pending' and not auto_finalize: 726 if pre_finalize: 727 pre_finalize() 728 if cancel: 729 self.qmp_log('job-cancel', id=job) 730 else: 731 self.qmp_log('job-finalize', id=job) 732 elif status == 'concluded' and not auto_dismiss: 733 self.qmp_log('job-dismiss', id=job) 734 elif status == 'null': 735 return error 736 737 # Returns None on success, and an error string on failure 738 def blockdev_create(self, options, job_id='job0', filters=None): 739 if filters is None: 740 filters = [filter_qmp_testfiles] 741 result = self.qmp_log('blockdev-create', filters=filters, 742 job_id=job_id, options=options) 743 744 if 'return' in result: 745 assert result['return'] == {} 746 job_result = self.run_job(job_id) 747 else: 748 job_result = result['error'] 749 750 log("") 751 return job_result 752 753 def enable_migration_events(self, name): 754 log('Enabling migration QMP events on %s...' % name) 755 log(self.qmp('migrate-set-capabilities', capabilities=[ 756 { 757 'capability': 'events', 758 'state': True 759 } 760 ])) 761 762 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 763 while True: 764 event = self.event_wait('MIGRATION') 765 # We use the default timeout, and with a timeout, event_wait() 766 # never returns None 767 assert event 768 769 log(event, filters=[filter_qmp_event]) 770 if event['data']['status'] in ('completed', 'failed'): 771 break 772 773 if event['data']['status'] == 'completed': 774 # The event may occur in finish-migrate, so wait for the expected 775 # post-migration runstate 776 runstate = None 777 while runstate != expect_runstate: 778 runstate = self.qmp('query-status')['return']['status'] 779 return True 780 else: 781 return False 782 783 def node_info(self, node_name): 784 nodes = self.qmp('query-named-block-nodes') 785 for x in nodes['return']: 786 if x['node-name'] == node_name: 787 return x 788 return None 789 790 def query_bitmaps(self): 791 res = self.qmp("query-named-block-nodes") 792 return {device['node-name']: device['dirty-bitmaps'] 793 for device in res['return'] if 'dirty-bitmaps' in device} 794 795 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 796 """ 797 get a specific bitmap from the object returned by query_bitmaps. 798 :param recording: If specified, filter results by the specified value. 799 :param bitmaps: If specified, use it instead of call query_bitmaps() 800 """ 801 if bitmaps is None: 802 bitmaps = self.query_bitmaps() 803 804 for bitmap in bitmaps[node_name]: 805 if bitmap.get('name', '') == bitmap_name: 806 if recording is None or bitmap.get('recording') == recording: 807 return bitmap 808 return None 809 810 def check_bitmap_status(self, node_name, bitmap_name, fields): 811 ret = self.get_bitmap(node_name, bitmap_name) 812 813 return fields.items() <= ret.items() 814 815 def assert_block_path(self, root, path, expected_node, graph=None): 816 """ 817 Check whether the node under the given path in the block graph 818 is @expected_node. 819 820 @root is the node name of the node where the @path is rooted. 821 822 @path is a string that consists of child names separated by 823 slashes. It must begin with a slash. 824 825 Examples for @root + @path: 826 - root="qcow2-node", path="/backing/file" 827 - root="quorum-node", path="/children.2/file" 828 829 Hypothetically, @path could be empty, in which case it would 830 point to @root. However, in practice this case is not useful 831 and hence not allowed. 832 833 @expected_node may be None. (All elements of the path but the 834 leaf must still exist.) 835 836 @graph may be None or the result of an x-debug-query-block-graph 837 call that has already been performed. 838 """ 839 if graph is None: 840 graph = self.qmp('x-debug-query-block-graph')['return'] 841 842 iter_path = iter(path.split('/')) 843 844 # Must start with a / 845 assert next(iter_path) == '' 846 847 node = next((node for node in graph['nodes'] if node['name'] == root), 848 None) 849 850 # An empty @path is not allowed, so the root node must be present 851 assert node is not None, 'Root node %s not found' % root 852 853 for child_name in iter_path: 854 assert node is not None, 'Cannot follow path %s%s' % (root, path) 855 856 try: 857 node_id = next(edge['child'] for edge in graph['edges'] 858 if (edge['parent'] == node['id'] and 859 edge['name'] == child_name)) 860 861 node = next(node for node in graph['nodes'] 862 if node['id'] == node_id) 863 864 except StopIteration: 865 node = None 866 867 if node is None: 868 assert expected_node is None, \ 869 'No node found under %s (but expected %s)' % \ 870 (path, expected_node) 871 else: 872 assert node['name'] == expected_node, \ 873 'Found node %s under %s (but expected %s)' % \ 874 (node['name'], path, expected_node) 875 876index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 877 878class QMPTestCase(unittest.TestCase): 879 '''Abstract base class for QMP test cases''' 880 881 def __init__(self, *args, **kwargs): 882 super().__init__(*args, **kwargs) 883 # Many users of this class set a VM property we rely on heavily 884 # in the methods below. 885 self.vm = None 886 887 def dictpath(self, d, path): 888 '''Traverse a path in a nested dict''' 889 for component in path.split('/'): 890 m = index_re.match(component) 891 if m: 892 component, idx = m.groups() 893 idx = int(idx) 894 895 if not isinstance(d, dict) or component not in d: 896 self.fail(f'failed path traversal for "{path}" in "{d}"') 897 d = d[component] 898 899 if m: 900 if not isinstance(d, list): 901 self.fail(f'path component "{component}" in "{path}" ' 902 f'is not a list in "{d}"') 903 try: 904 d = d[idx] 905 except IndexError: 906 self.fail(f'invalid index "{idx}" in path "{path}" ' 907 f'in "{d}"') 908 return d 909 910 def assert_qmp_absent(self, d, path): 911 try: 912 result = self.dictpath(d, path) 913 except AssertionError: 914 return 915 self.fail('path "%s" has value "%s"' % (path, str(result))) 916 917 def assert_qmp(self, d, path, value): 918 '''Assert that the value for a specific path in a QMP dict 919 matches. When given a list of values, assert that any of 920 them matches.''' 921 922 result = self.dictpath(d, path) 923 924 # [] makes no sense as a list of valid values, so treat it as 925 # an actual single value. 926 if isinstance(value, list) and value != []: 927 for v in value: 928 if result == v: 929 return 930 self.fail('no match for "%s" in %s' % (str(result), str(value))) 931 else: 932 self.assertEqual(result, value, 933 '"%s" is "%s", expected "%s"' 934 % (path, str(result), str(value))) 935 936 def assert_no_active_block_jobs(self): 937 result = self.vm.qmp('query-block-jobs') 938 self.assert_qmp(result, 'return', []) 939 940 def assert_has_block_node(self, node_name=None, file_name=None): 941 """Issue a query-named-block-nodes and assert node_name and/or 942 file_name is present in the result""" 943 def check_equal_or_none(a, b): 944 return a is None or b is None or a == b 945 assert node_name or file_name 946 result = self.vm.qmp('query-named-block-nodes') 947 for x in result["return"]: 948 if check_equal_or_none(x.get("node-name"), node_name) and \ 949 check_equal_or_none(x.get("file"), file_name): 950 return 951 self.fail("Cannot find %s %s in result:\n%s" % 952 (node_name, file_name, result)) 953 954 def assert_json_filename_equal(self, json_filename, reference): 955 '''Asserts that the given filename is a json: filename and that its 956 content is equal to the given reference object''' 957 self.assertEqual(json_filename[:5], 'json:') 958 self.assertEqual( 959 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 960 self.vm.flatten_qmp_object(reference) 961 ) 962 963 def cancel_and_wait(self, drive='drive0', force=False, 964 resume=False, wait=60.0): 965 '''Cancel a block job and wait for it to finish, returning the event''' 966 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 967 self.assert_qmp(result, 'return', {}) 968 969 if resume: 970 self.vm.resume_drive(drive) 971 972 cancelled = False 973 result = None 974 while not cancelled: 975 for event in self.vm.get_qmp_events(wait=wait): 976 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 977 event['event'] == 'BLOCK_JOB_CANCELLED': 978 self.assert_qmp(event, 'data/device', drive) 979 result = event 980 cancelled = True 981 elif event['event'] == 'JOB_STATUS_CHANGE': 982 self.assert_qmp(event, 'data/id', drive) 983 984 985 self.assert_no_active_block_jobs() 986 return result 987 988 def wait_until_completed(self, drive='drive0', check_offset=True, 989 wait=60.0, error=None): 990 '''Wait for a block job to finish, returning the event''' 991 while True: 992 for event in self.vm.get_qmp_events(wait=wait): 993 if event['event'] == 'BLOCK_JOB_COMPLETED': 994 self.assert_qmp(event, 'data/device', drive) 995 if error is None: 996 self.assert_qmp_absent(event, 'data/error') 997 if check_offset: 998 self.assert_qmp(event, 'data/offset', 999 event['data']['len']) 1000 else: 1001 self.assert_qmp(event, 'data/error', error) 1002 self.assert_no_active_block_jobs() 1003 return event 1004 if event['event'] == 'JOB_STATUS_CHANGE': 1005 self.assert_qmp(event, 'data/id', drive) 1006 1007 def wait_ready(self, drive='drive0'): 1008 """Wait until a BLOCK_JOB_READY event, and return the event.""" 1009 return self.vm.events_wait([ 1010 ('BLOCK_JOB_READY', 1011 {'data': {'type': 'mirror', 'device': drive}}), 1012 ('BLOCK_JOB_READY', 1013 {'data': {'type': 'commit', 'device': drive}}) 1014 ]) 1015 1016 def wait_ready_and_cancel(self, drive='drive0'): 1017 self.wait_ready(drive=drive) 1018 event = self.cancel_and_wait(drive=drive) 1019 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1020 self.assert_qmp(event, 'data/type', 'mirror') 1021 self.assert_qmp(event, 'data/offset', event['data']['len']) 1022 1023 def complete_and_wait(self, drive='drive0', wait_ready=True, 1024 completion_error=None): 1025 '''Complete a block job and wait for it to finish''' 1026 if wait_ready: 1027 self.wait_ready(drive=drive) 1028 1029 result = self.vm.qmp('block-job-complete', device=drive) 1030 self.assert_qmp(result, 'return', {}) 1031 1032 event = self.wait_until_completed(drive=drive, error=completion_error) 1033 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1034 1035 def pause_wait(self, job_id='job0'): 1036 with Timeout(3, "Timeout waiting for job to pause"): 1037 while True: 1038 result = self.vm.qmp('query-block-jobs') 1039 found = False 1040 for job in result['return']: 1041 if job['device'] == job_id: 1042 found = True 1043 if job['paused'] and not job['busy']: 1044 return job 1045 break 1046 assert found 1047 1048 def pause_job(self, job_id='job0', wait=True): 1049 result = self.vm.qmp('block-job-pause', device=job_id) 1050 self.assert_qmp(result, 'return', {}) 1051 if wait: 1052 return self.pause_wait(job_id) 1053 return result 1054 1055 def case_skip(self, reason): 1056 '''Skip this test case''' 1057 case_notrun(reason) 1058 self.skipTest(reason) 1059 1060 1061def notrun(reason): 1062 '''Skip this test suite''' 1063 # Each test in qemu-iotests has a number ("seq") 1064 seq = os.path.basename(sys.argv[0]) 1065 1066 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1067 logger.warning("%s not run: %s", seq, reason) 1068 sys.exit(0) 1069 1070def case_notrun(reason): 1071 '''Mark this test case as not having been run (without actually 1072 skipping it, that is left to the caller). See 1073 QMPTestCase.case_skip() for a variant that actually skips the 1074 current test case.''' 1075 1076 # Each test in qemu-iotests has a number ("seq") 1077 seq = os.path.basename(sys.argv[0]) 1078 1079 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1080 ' [case not run] ' + reason + '\n') 1081 1082def _verify_image_format(supported_fmts: Sequence[str] = (), 1083 unsupported_fmts: Sequence[str] = ()) -> None: 1084 if 'generic' in supported_fmts and \ 1085 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1086 # similar to 1087 # _supported_fmt generic 1088 # for bash tests 1089 supported_fmts = () 1090 1091 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1092 if not_sup or (imgfmt in unsupported_fmts): 1093 notrun('not suitable for this image format: %s' % imgfmt) 1094 1095 if imgfmt == 'luks': 1096 verify_working_luks() 1097 1098def _verify_protocol(supported: Sequence[str] = (), 1099 unsupported: Sequence[str] = ()) -> None: 1100 assert not (supported and unsupported) 1101 1102 if 'generic' in supported: 1103 return 1104 1105 not_sup = supported and (imgproto not in supported) 1106 if not_sup or (imgproto in unsupported): 1107 notrun('not suitable for this protocol: %s' % imgproto) 1108 1109def _verify_platform(supported: Sequence[str] = (), 1110 unsupported: Sequence[str] = ()) -> None: 1111 if any((sys.platform.startswith(x) for x in unsupported)): 1112 notrun('not suitable for this OS: %s' % sys.platform) 1113 1114 if supported: 1115 if not any((sys.platform.startswith(x) for x in supported)): 1116 notrun('not suitable for this OS: %s' % sys.platform) 1117 1118def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1119 if supported_cache_modes and (cachemode not in supported_cache_modes): 1120 notrun('not suitable for this cache mode: %s' % cachemode) 1121 1122def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1123 if supported_aio_modes and (aiomode not in supported_aio_modes): 1124 notrun('not suitable for this aio mode: %s' % aiomode) 1125 1126def _verify_formats(required_formats: Sequence[str] = ()) -> None: 1127 usf_list = list(set(required_formats) - set(supported_formats())) 1128 if usf_list: 1129 notrun(f'formats {usf_list} are not whitelisted') 1130 1131def supports_quorum(): 1132 return 'quorum' in qemu_img_pipe('--help') 1133 1134def verify_quorum(): 1135 '''Skip test suite if quorum support is not available''' 1136 if not supports_quorum(): 1137 notrun('quorum support missing') 1138 1139def has_working_luks() -> Tuple[bool, str]: 1140 """ 1141 Check whether our LUKS driver can actually create images 1142 (this extends to LUKS encryption for qcow2). 1143 1144 If not, return the reason why. 1145 """ 1146 1147 img_file = f'{test_dir}/luks-test.luks' 1148 (output, status) = \ 1149 qemu_img_pipe_and_status('create', '-f', 'luks', 1150 '--object', luks_default_secret_object, 1151 '-o', luks_default_key_secret_opt, 1152 '-o', 'iter-time=10', 1153 img_file, '1G') 1154 try: 1155 os.remove(img_file) 1156 except OSError: 1157 pass 1158 1159 if status != 0: 1160 reason = output 1161 for line in output.splitlines(): 1162 if img_file + ':' in line: 1163 reason = line.split(img_file + ':', 1)[1].strip() 1164 break 1165 1166 return (False, reason) 1167 else: 1168 return (True, '') 1169 1170def verify_working_luks(): 1171 """ 1172 Skip test suite if LUKS does not work 1173 """ 1174 (working, reason) = has_working_luks() 1175 if not working: 1176 notrun(reason) 1177 1178def qemu_pipe(*args: str) -> str: 1179 """ 1180 Run qemu with an option to print something and exit (e.g. a help option). 1181 1182 :return: QEMU's stdout output. 1183 """ 1184 full_args = [qemu_prog] + qemu_opts + list(args) 1185 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1186 return output 1187 1188def supported_formats(read_only=False): 1189 '''Set 'read_only' to True to check ro-whitelist 1190 Otherwise, rw-whitelist is checked''' 1191 1192 if not hasattr(supported_formats, "formats"): 1193 supported_formats.formats = {} 1194 1195 if read_only not in supported_formats.formats: 1196 format_message = qemu_pipe("-drive", "format=help") 1197 line = 1 if read_only else 0 1198 supported_formats.formats[read_only] = \ 1199 format_message.splitlines()[line].split(":")[1].split() 1200 1201 return supported_formats.formats[read_only] 1202 1203def skip_if_unsupported(required_formats=(), read_only=False): 1204 '''Skip Test Decorator 1205 Runs the test if all the required formats are whitelisted''' 1206 def skip_test_decorator(func): 1207 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1208 **kwargs: Dict[str, Any]) -> None: 1209 if callable(required_formats): 1210 fmts = required_formats(test_case) 1211 else: 1212 fmts = required_formats 1213 1214 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1215 if usf_list: 1216 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1217 test_case.case_skip(msg) 1218 else: 1219 func(test_case, *args, **kwargs) 1220 return func_wrapper 1221 return skip_test_decorator 1222 1223def skip_for_formats(formats: Sequence[str] = ()) \ 1224 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1225 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1226 '''Skip Test Decorator 1227 Skips the test for the given formats''' 1228 def skip_test_decorator(func): 1229 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1230 **kwargs: Dict[str, Any]) -> None: 1231 if imgfmt in formats: 1232 msg = f'{test_case}: Skipped for format {imgfmt}' 1233 test_case.case_skip(msg) 1234 else: 1235 func(test_case, *args, **kwargs) 1236 return func_wrapper 1237 return skip_test_decorator 1238 1239def skip_if_user_is_root(func): 1240 '''Skip Test Decorator 1241 Runs the test only without root permissions''' 1242 def func_wrapper(*args, **kwargs): 1243 if os.getuid() == 0: 1244 case_notrun('{}: cannot be run as root'.format(args[0])) 1245 return None 1246 else: 1247 return func(*args, **kwargs) 1248 return func_wrapper 1249 1250def execute_unittest(debug=False): 1251 """Executes unittests within the calling module.""" 1252 1253 verbosity = 2 if debug else 1 1254 1255 if debug: 1256 output = sys.stdout 1257 else: 1258 # We need to filter out the time taken from the output so that 1259 # qemu-iotest can reliably diff the results against master output. 1260 output = io.StringIO() 1261 1262 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1263 verbosity=verbosity) 1264 try: 1265 # unittest.main() will use sys.exit(); so expect a SystemExit 1266 # exception 1267 unittest.main(testRunner=runner) 1268 finally: 1269 # We need to filter out the time taken from the output so that 1270 # qemu-iotest can reliably diff the results against master output. 1271 if not debug: 1272 out = output.getvalue() 1273 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1274 1275 # Hide skipped tests from the reference output 1276 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1277 out_first_line, out_rest = out.split('\n', 1) 1278 out = out_first_line.replace('s', '.') + '\n' + out_rest 1279 1280 sys.stderr.write(out) 1281 1282def execute_setup_common(supported_fmts: Sequence[str] = (), 1283 supported_platforms: Sequence[str] = (), 1284 supported_cache_modes: Sequence[str] = (), 1285 supported_aio_modes: Sequence[str] = (), 1286 unsupported_fmts: Sequence[str] = (), 1287 supported_protocols: Sequence[str] = (), 1288 unsupported_protocols: Sequence[str] = (), 1289 required_fmts: Sequence[str] = ()) -> bool: 1290 """ 1291 Perform necessary setup for either script-style or unittest-style tests. 1292 1293 :return: Bool; Whether or not debug mode has been requested via the CLI. 1294 """ 1295 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1296 1297 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1298 # indicate that we're not being run via "check". There may be 1299 # other things set up by "check" that individual test cases rely 1300 # on. 1301 if test_dir is None or qemu_default_machine is None: 1302 sys.stderr.write('Please run this test via the "check" script\n') 1303 sys.exit(os.EX_USAGE) 1304 1305 debug = '-d' in sys.argv 1306 if debug: 1307 sys.argv.remove('-d') 1308 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1309 1310 _verify_image_format(supported_fmts, unsupported_fmts) 1311 _verify_protocol(supported_protocols, unsupported_protocols) 1312 _verify_platform(supported=supported_platforms) 1313 _verify_cache_mode(supported_cache_modes) 1314 _verify_aio_mode(supported_aio_modes) 1315 _verify_formats(required_fmts) 1316 1317 return debug 1318 1319def execute_test(*args, test_function=None, **kwargs): 1320 """Run either unittest or script-style tests.""" 1321 1322 debug = execute_setup_common(*args, **kwargs) 1323 if not test_function: 1324 execute_unittest(debug) 1325 else: 1326 test_function() 1327 1328def activate_logging(): 1329 """Activate iotests.log() output to stdout for script-style tests.""" 1330 handler = logging.StreamHandler(stream=sys.stdout) 1331 formatter = logging.Formatter('%(message)s') 1332 handler.setFormatter(formatter) 1333 test_logger.addHandler(handler) 1334 test_logger.setLevel(logging.INFO) 1335 test_logger.propagate = False 1336 1337# This is called from script-style iotests without a single point of entry 1338def script_initialize(*args, **kwargs): 1339 """Initialize script-style tests without running any tests.""" 1340 activate_logging() 1341 execute_setup_common(*args, **kwargs) 1342 1343# This is called from script-style iotests with a single point of entry 1344def script_main(test_function, *args, **kwargs): 1345 """Run script-style tests outside of the unittest framework""" 1346 activate_logging() 1347 execute_test(*args, test_function=test_function, **kwargs) 1348 1349# This is called from unittest style iotests 1350def main(*args, **kwargs): 1351 """Run tests using the unittest framework""" 1352 execute_test(*args, **kwargs) 1353