1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import atexit 20from collections import OrderedDict 21import faulthandler 22import io 23import json 24import logging 25import os 26import re 27import signal 28import struct 29import subprocess 30import sys 31import time 32from typing import (Any, Callable, Dict, Iterable, 33 List, Optional, Sequence, Tuple, TypeVar) 34import unittest 35 36from contextlib import contextmanager 37 38# pylint: disable=import-error, wrong-import-position 39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) 40from qemu import qtest 41from qemu.qmp import QMPMessage 42 43# Use this logger for logging messages directly from the iotests module 44logger = logging.getLogger('qemu.iotests') 45logger.addHandler(logging.NullHandler()) 46 47# Use this logger for messages that ought to be used for diff output. 48test_logger = logging.getLogger('qemu.iotests.diff_io') 49 50 51faulthandler.enable() 52 53# This will not work if arguments contain spaces but is necessary if we 54# want to support the override options that ./check supports. 55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 56if os.environ.get('QEMU_IMG_OPTIONS'): 57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 58 59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 60if os.environ.get('QEMU_IO_OPTIONS'): 61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 62 63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'): 65 qemu_io_args_no_fmt += \ 66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ') 67 68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd') 69qemu_nbd_args = [qemu_nbd_prog] 70if os.environ.get('QEMU_NBD_OPTIONS'): 71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 72 73qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 75 76imgfmt = os.environ.get('IMGFMT', 'raw') 77imgproto = os.environ.get('IMGPROTO', 'file') 78test_dir = os.environ.get('TEST_DIR') 79sock_dir = os.environ.get('SOCK_DIR') 80output_dir = os.environ.get('OUTPUT_DIR', '.') 81cachemode = os.environ.get('CACHEMODE') 82aiomode = os.environ.get('AIOMODE') 83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 84 85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 86 87luks_default_secret_object = 'secret,id=keysec0,data=' + \ 88 os.environ.get('IMGKEYSECRET', '') 89luks_default_key_secret_opt = 'key-secret=keysec0' 90 91 92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str], 93 connect_stderr: bool = True) -> Tuple[str, int]: 94 """ 95 Run a tool and return both its output and its exit code 96 """ 97 stderr = subprocess.STDOUT if connect_stderr else None 98 subp = subprocess.Popen(args, 99 stdout=subprocess.PIPE, 100 stderr=stderr, 101 universal_newlines=True) 102 output = subp.communicate()[0] 103 if subp.returncode < 0: 104 sys.stderr.write('%s received signal %i: %s\n' 105 % (tool, -subp.returncode, 106 ' '.join(qemu_img_args + list(args)))) 107 return (output, subp.returncode) 108 109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]: 110 """ 111 Run qemu-img and return both its output and its exit code 112 """ 113 full_args = qemu_img_args + list(args) 114 return qemu_tool_pipe_and_status('qemu-img', full_args) 115 116def qemu_img(*args: str) -> int: 117 '''Run qemu-img and return the exit code''' 118 return qemu_img_pipe_and_status(*args)[1] 119 120def ordered_qmp(qmsg, conv_keys=True): 121 # Dictionaries are not ordered prior to 3.6, therefore: 122 if isinstance(qmsg, list): 123 return [ordered_qmp(atom) for atom in qmsg] 124 if isinstance(qmsg, dict): 125 od = OrderedDict() 126 for k, v in sorted(qmsg.items()): 127 if conv_keys: 128 k = k.replace('_', '-') 129 od[k] = ordered_qmp(v, conv_keys=False) 130 return od 131 return qmsg 132 133def qemu_img_create(*args): 134 args = list(args) 135 136 # default luks support 137 if '-f' in args and args[args.index('-f') + 1] == 'luks': 138 if '-o' in args: 139 i = args.index('-o') 140 if 'key-secret' not in args[i + 1]: 141 args[i + 1].append(luks_default_key_secret_opt) 142 args.insert(i + 2, '--object') 143 args.insert(i + 3, luks_default_secret_object) 144 else: 145 args = ['-o', luks_default_key_secret_opt, 146 '--object', luks_default_secret_object] + args 147 148 args.insert(0, 'create') 149 150 return qemu_img(*args) 151 152def qemu_img_measure(*args): 153 return json.loads(qemu_img_pipe("measure", "--output", "json", *args)) 154 155def qemu_img_check(*args): 156 return json.loads(qemu_img_pipe("check", "--output", "json", *args)) 157 158def qemu_img_verbose(*args): 159 '''Run qemu-img without suppressing its output and return the exit code''' 160 exitcode = subprocess.call(qemu_img_args + list(args)) 161 if exitcode < 0: 162 sys.stderr.write('qemu-img received signal %i: %s\n' 163 % (-exitcode, ' '.join(qemu_img_args + list(args)))) 164 return exitcode 165 166def qemu_img_pipe(*args: str) -> str: 167 '''Run qemu-img and return its output''' 168 return qemu_img_pipe_and_status(*args)[0] 169 170def qemu_img_log(*args): 171 result = qemu_img_pipe(*args) 172 log(result, filters=[filter_testfiles]) 173 return result 174 175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): 176 args = ['info'] 177 if imgopts: 178 args.append('--image-opts') 179 else: 180 args += ['-f', imgfmt] 181 args += extra_args 182 args.append(filename) 183 184 output = qemu_img_pipe(*args) 185 if not filter_path: 186 filter_path = filename 187 log(filter_img_info(output, filter_path)) 188 189def qemu_io(*args): 190 '''Run qemu-io and return the stdout data''' 191 args = qemu_io_args + list(args) 192 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 193 stderr=subprocess.STDOUT, 194 universal_newlines=True) 195 output = subp.communicate()[0] 196 if subp.returncode < 0: 197 sys.stderr.write('qemu-io received signal %i: %s\n' 198 % (-subp.returncode, ' '.join(args))) 199 return output 200 201def qemu_io_log(*args): 202 result = qemu_io(*args) 203 log(result, filters=[filter_testfiles, filter_qemu_io]) 204 return result 205 206def qemu_io_silent(*args): 207 '''Run qemu-io and return the exit code, suppressing stdout''' 208 args = qemu_io_args + list(args) 209 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 210 if exitcode < 0: 211 sys.stderr.write('qemu-io received signal %i: %s\n' % 212 (-exitcode, ' '.join(args))) 213 return exitcode 214 215def qemu_io_silent_check(*args): 216 '''Run qemu-io and return the true if subprocess returned 0''' 217 args = qemu_io_args + list(args) 218 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'), 219 stderr=subprocess.STDOUT) 220 return exitcode == 0 221 222def get_virtio_scsi_device(): 223 if qemu_default_machine == 's390-ccw-virtio': 224 return 'virtio-scsi-ccw' 225 return 'virtio-scsi-pci' 226 227class QemuIoInteractive: 228 def __init__(self, *args): 229 self.args = qemu_io_args_no_fmt + list(args) 230 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 231 stdout=subprocess.PIPE, 232 stderr=subprocess.STDOUT, 233 universal_newlines=True) 234 out = self._p.stdout.read(9) 235 if out != 'qemu-io> ': 236 # Most probably qemu-io just failed to start. 237 # Let's collect the whole output and exit. 238 out += self._p.stdout.read() 239 self._p.wait(timeout=1) 240 raise ValueError(out) 241 242 def close(self): 243 self._p.communicate('q\n') 244 245 def _read_output(self): 246 pattern = 'qemu-io> ' 247 n = len(pattern) 248 pos = 0 249 s = [] 250 while pos != n: 251 c = self._p.stdout.read(1) 252 # check unexpected EOF 253 assert c != '' 254 s.append(c) 255 if c == pattern[pos]: 256 pos += 1 257 else: 258 pos = 0 259 260 return ''.join(s[:-n]) 261 262 def cmd(self, cmd): 263 # quit command is in close(), '\n' is added automatically 264 assert '\n' not in cmd 265 cmd = cmd.strip() 266 assert cmd not in ('q', 'quit') 267 self._p.stdin.write(cmd + '\n') 268 self._p.stdin.flush() 269 return self._read_output() 270 271 272def qemu_nbd(*args): 273 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 274 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 275 276def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]: 277 '''Run qemu-nbd in daemon mode and return both the parent's exit code 278 and its output in case of an error''' 279 full_args = qemu_nbd_args + ['--fork'] + list(args) 280 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args, 281 connect_stderr=False) 282 return returncode, output if returncode else '' 283 284def qemu_nbd_list_log(*args: str) -> str: 285 '''Run qemu-nbd to list remote exports''' 286 full_args = [qemu_nbd_prog, '-L'] + list(args) 287 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args) 288 log(output, filters=[filter_testfiles, filter_nbd_exports]) 289 return output 290 291@contextmanager 292def qemu_nbd_popen(*args): 293 '''Context manager running qemu-nbd within the context''' 294 pid_file = file_path("pid") 295 296 cmd = list(qemu_nbd_args) 297 cmd.extend(('--persistent', '--pid-file', pid_file)) 298 cmd.extend(args) 299 300 log('Start NBD server') 301 p = subprocess.Popen(cmd) 302 try: 303 while not os.path.exists(pid_file): 304 if p.poll() is not None: 305 raise RuntimeError( 306 "qemu-nbd terminated with exit code {}: {}" 307 .format(p.returncode, ' '.join(cmd))) 308 309 time.sleep(0.01) 310 yield 311 finally: 312 log('Kill NBD server') 313 p.kill() 314 p.wait() 315 316def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 317 '''Return True if two image files are identical''' 318 return qemu_img('compare', '-f', fmt1, 319 '-F', fmt2, img1, img2) == 0 320 321def create_image(name, size): 322 '''Create a fully-allocated raw image with sector markers''' 323 file = open(name, 'wb') 324 i = 0 325 while i < size: 326 sector = struct.pack('>l504xl', i // 512, i // 512) 327 file.write(sector) 328 i = i + 512 329 file.close() 330 331def image_size(img): 332 '''Return image's virtual size''' 333 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 334 return json.loads(r)['virtual-size'] 335 336def is_str(val): 337 return isinstance(val, str) 338 339test_dir_re = re.compile(r"%s" % test_dir) 340def filter_test_dir(msg): 341 return test_dir_re.sub("TEST_DIR", msg) 342 343win32_re = re.compile(r"\r") 344def filter_win32(msg): 345 return win32_re.sub("", msg) 346 347qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " 348 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " 349 r"and [0-9\/.inf]* ops\/sec\)") 350def filter_qemu_io(msg): 351 msg = filter_win32(msg) 352 return qemu_io_re.sub("X ops; XX:XX:XX.X " 353 "(XXX YYY/sec and XXX ops/sec)", msg) 354 355chown_re = re.compile(r"chown [0-9]+:[0-9]+") 356def filter_chown(msg): 357 return chown_re.sub("chown UID:GID", msg) 358 359def filter_qmp_event(event): 360 '''Filter a QMP event dict''' 361 event = dict(event) 362 if 'timestamp' in event: 363 event['timestamp']['seconds'] = 'SECS' 364 event['timestamp']['microseconds'] = 'USECS' 365 return event 366 367def filter_qmp(qmsg, filter_fn): 368 '''Given a string filter, filter a QMP object's values. 369 filter_fn takes a (key, value) pair.''' 370 # Iterate through either lists or dicts; 371 if isinstance(qmsg, list): 372 items = enumerate(qmsg) 373 else: 374 items = qmsg.items() 375 376 for k, v in items: 377 if isinstance(v, (dict, list)): 378 qmsg[k] = filter_qmp(v, filter_fn) 379 else: 380 qmsg[k] = filter_fn(k, v) 381 return qmsg 382 383def filter_testfiles(msg): 384 pref1 = os.path.join(test_dir, "%s-" % (os.getpid())) 385 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid())) 386 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-') 387 388def filter_qmp_testfiles(qmsg): 389 def _filter(_key, value): 390 if is_str(value): 391 return filter_testfiles(value) 392 return value 393 return filter_qmp(qmsg, _filter) 394 395def filter_generated_node_ids(msg): 396 return re.sub("#block[0-9]+", "NODE_NAME", msg) 397 398def filter_img_info(output, filename): 399 lines = [] 400 for line in output.split('\n'): 401 if 'disk size' in line or 'actual-size' in line: 402 continue 403 line = line.replace(filename, 'TEST_IMG') 404 line = filter_testfiles(line) 405 line = line.replace(imgfmt, 'IMGFMT') 406 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 407 line = re.sub('uuid: [-a-f0-9]+', 408 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 409 line) 410 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) 411 lines.append(line) 412 return '\n'.join(lines) 413 414def filter_imgfmt(msg): 415 return msg.replace(imgfmt, 'IMGFMT') 416 417def filter_qmp_imgfmt(qmsg): 418 def _filter(_key, value): 419 if is_str(value): 420 return filter_imgfmt(value) 421 return value 422 return filter_qmp(qmsg, _filter) 423 424def filter_nbd_exports(output: str) -> str: 425 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output) 426 427 428Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) 429 430def log(msg: Msg, 431 filters: Iterable[Callable[[Msg], Msg]] = (), 432 indent: Optional[int] = None) -> None: 433 """ 434 Logs either a string message or a JSON serializable message (like QMP). 435 If indent is provided, JSON serializable messages are pretty-printed. 436 """ 437 for flt in filters: 438 msg = flt(msg) 439 if isinstance(msg, (dict, list)): 440 # Don't sort if it's already sorted 441 do_sort = not isinstance(msg, OrderedDict) 442 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) 443 else: 444 test_logger.info(msg) 445 446class Timeout: 447 def __init__(self, seconds, errmsg="Timeout"): 448 self.seconds = seconds 449 self.errmsg = errmsg 450 def __enter__(self): 451 signal.signal(signal.SIGALRM, self.timeout) 452 signal.setitimer(signal.ITIMER_REAL, self.seconds) 453 return self 454 def __exit__(self, exc_type, value, traceback): 455 signal.setitimer(signal.ITIMER_REAL, 0) 456 return False 457 def timeout(self, signum, frame): 458 raise Exception(self.errmsg) 459 460def file_pattern(name): 461 return "{0}-{1}".format(os.getpid(), name) 462 463class FilePath: 464 """ 465 Context manager generating multiple file names. The generated files are 466 removed when exiting the context. 467 468 Example usage: 469 470 with FilePath('a.img', 'b.img') as (img_a, img_b): 471 # Use img_a and img_b here... 472 473 # a.img and b.img are automatically removed here. 474 475 By default images are created in iotests.test_dir. To create sockets use 476 iotests.sock_dir: 477 478 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock: 479 480 For convenience, calling with one argument yields a single file instead of 481 a tuple with one item. 482 483 """ 484 def __init__(self, *names, base_dir=test_dir): 485 self.paths = [os.path.join(base_dir, file_pattern(name)) 486 for name in names] 487 488 def __enter__(self): 489 if len(self.paths) == 1: 490 return self.paths[0] 491 else: 492 return self.paths 493 494 def __exit__(self, exc_type, exc_val, exc_tb): 495 for path in self.paths: 496 try: 497 os.remove(path) 498 except OSError: 499 pass 500 return False 501 502 503def file_path_remover(): 504 for path in reversed(file_path_remover.paths): 505 try: 506 os.remove(path) 507 except OSError: 508 pass 509 510 511def file_path(*names, base_dir=test_dir): 512 ''' Another way to get auto-generated filename that cleans itself up. 513 514 Use is as simple as: 515 516 img_a, img_b = file_path('a.img', 'b.img') 517 sock = file_path('socket') 518 ''' 519 520 if not hasattr(file_path_remover, 'paths'): 521 file_path_remover.paths = [] 522 atexit.register(file_path_remover) 523 524 paths = [] 525 for name in names: 526 filename = file_pattern(name) 527 path = os.path.join(base_dir, filename) 528 file_path_remover.paths.append(path) 529 paths.append(path) 530 531 return paths[0] if len(paths) == 1 else paths 532 533def remote_filename(path): 534 if imgproto == 'file': 535 return path 536 elif imgproto == 'ssh': 537 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path) 538 else: 539 raise Exception("Protocol %s not supported" % (imgproto)) 540 541class VM(qtest.QEMUQtestMachine): 542 '''A QEMU VM''' 543 544 def __init__(self, path_suffix=''): 545 name = "qemu%s-%d" % (path_suffix, os.getpid()) 546 super().__init__(qemu_prog, qemu_opts, name=name, 547 test_dir=test_dir, 548 socket_scm_helper=socket_scm_helper, 549 sock_dir=sock_dir) 550 self._num_drives = 0 551 552 def add_object(self, opts): 553 self._args.append('-object') 554 self._args.append(opts) 555 return self 556 557 def add_device(self, opts): 558 self._args.append('-device') 559 self._args.append(opts) 560 return self 561 562 def add_drive_raw(self, opts): 563 self._args.append('-drive') 564 self._args.append(opts) 565 return self 566 567 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): 568 '''Add a virtio-blk drive to the VM''' 569 options = ['if=%s' % interface, 570 'id=drive%d' % self._num_drives] 571 572 if path is not None: 573 options.append('file=%s' % path) 574 options.append('format=%s' % img_format) 575 options.append('cache=%s' % cachemode) 576 options.append('aio=%s' % aiomode) 577 578 if opts: 579 options.append(opts) 580 581 if img_format == 'luks' and 'key-secret' not in opts: 582 # default luks support 583 if luks_default_secret_object not in self._args: 584 self.add_object(luks_default_secret_object) 585 586 options.append(luks_default_key_secret_opt) 587 588 self._args.append('-drive') 589 self._args.append(','.join(options)) 590 self._num_drives += 1 591 return self 592 593 def add_blockdev(self, opts): 594 self._args.append('-blockdev') 595 if isinstance(opts, str): 596 self._args.append(opts) 597 else: 598 self._args.append(','.join(opts)) 599 return self 600 601 def add_incoming(self, addr): 602 self._args.append('-incoming') 603 self._args.append(addr) 604 return self 605 606 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage: 607 cmd = 'human-monitor-command' 608 kwargs: Dict[str, Any] = {'command-line': command_line} 609 if use_log: 610 return self.qmp_log(cmd, **kwargs) 611 else: 612 return self.qmp(cmd, **kwargs) 613 614 def pause_drive(self, drive: str, event: Optional[str] = None) -> None: 615 """Pause drive r/w operations""" 616 if not event: 617 self.pause_drive(drive, "read_aio") 618 self.pause_drive(drive, "write_aio") 619 return 620 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') 621 622 def resume_drive(self, drive: str) -> None: 623 """Resume drive r/w operations""" 624 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') 625 626 def hmp_qemu_io(self, drive: str, cmd: str, 627 use_log: bool = False) -> QMPMessage: 628 """Write to a given drive using an HMP command""" 629 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) 630 631 def flatten_qmp_object(self, obj, output=None, basestr=''): 632 if output is None: 633 output = dict() 634 if isinstance(obj, list): 635 for i, item in enumerate(obj): 636 self.flatten_qmp_object(item, output, basestr + str(i) + '.') 637 elif isinstance(obj, dict): 638 for key in obj: 639 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 640 else: 641 output[basestr[:-1]] = obj # Strip trailing '.' 642 return output 643 644 def qmp_to_opts(self, obj): 645 obj = self.flatten_qmp_object(obj) 646 output_list = list() 647 for key in obj: 648 output_list += [key + '=' + obj[key]] 649 return ','.join(output_list) 650 651 def get_qmp_events_filtered(self, wait=60.0): 652 result = [] 653 for ev in self.get_qmp_events(wait=wait): 654 result.append(filter_qmp_event(ev)) 655 return result 656 657 def qmp_log(self, cmd, filters=(), indent=None, **kwargs): 658 full_cmd = OrderedDict(( 659 ("execute", cmd), 660 ("arguments", ordered_qmp(kwargs)) 661 )) 662 log(full_cmd, filters, indent=indent) 663 result = self.qmp(cmd, **kwargs) 664 log(result, filters, indent=indent) 665 return result 666 667 # Returns None on success, and an error string on failure 668 def run_job(self, job, auto_finalize=True, auto_dismiss=False, 669 pre_finalize=None, cancel=False, wait=60.0): 670 """ 671 run_job moves a job from creation through to dismissal. 672 673 :param job: String. ID of recently-launched job 674 :param auto_finalize: Bool. True if the job was launched with 675 auto_finalize. Defaults to True. 676 :param auto_dismiss: Bool. True if the job was launched with 677 auto_dismiss=True. Defaults to False. 678 :param pre_finalize: Callback. A callable that takes no arguments to be 679 invoked prior to issuing job-finalize, if any. 680 :param cancel: Bool. When true, cancels the job after the pre_finalize 681 callback. 682 :param wait: Float. Timeout value specifying how long to wait for any 683 event, in seconds. Defaults to 60.0. 684 """ 685 match_device = {'data': {'device': job}} 686 match_id = {'data': {'id': job}} 687 events = [ 688 ('BLOCK_JOB_COMPLETED', match_device), 689 ('BLOCK_JOB_CANCELLED', match_device), 690 ('BLOCK_JOB_ERROR', match_device), 691 ('BLOCK_JOB_READY', match_device), 692 ('BLOCK_JOB_PENDING', match_id), 693 ('JOB_STATUS_CHANGE', match_id) 694 ] 695 error = None 696 while True: 697 ev = filter_qmp_event(self.events_wait(events, timeout=wait)) 698 if ev['event'] != 'JOB_STATUS_CHANGE': 699 log(ev) 700 continue 701 status = ev['data']['status'] 702 if status == 'aborting': 703 result = self.qmp('query-jobs') 704 for j in result['return']: 705 if j['id'] == job: 706 error = j['error'] 707 log('Job failed: %s' % (j['error'])) 708 elif status == 'ready': 709 self.qmp_log('job-complete', id=job) 710 elif status == 'pending' and not auto_finalize: 711 if pre_finalize: 712 pre_finalize() 713 if cancel: 714 self.qmp_log('job-cancel', id=job) 715 else: 716 self.qmp_log('job-finalize', id=job) 717 elif status == 'concluded' and not auto_dismiss: 718 self.qmp_log('job-dismiss', id=job) 719 elif status == 'null': 720 return error 721 722 # Returns None on success, and an error string on failure 723 def blockdev_create(self, options, job_id='job0', filters=None): 724 if filters is None: 725 filters = [filter_qmp_testfiles] 726 result = self.qmp_log('blockdev-create', filters=filters, 727 job_id=job_id, options=options) 728 729 if 'return' in result: 730 assert result['return'] == {} 731 job_result = self.run_job(job_id) 732 else: 733 job_result = result['error'] 734 735 log("") 736 return job_result 737 738 def enable_migration_events(self, name): 739 log('Enabling migration QMP events on %s...' % name) 740 log(self.qmp('migrate-set-capabilities', capabilities=[ 741 { 742 'capability': 'events', 743 'state': True 744 } 745 ])) 746 747 def wait_migration(self, expect_runstate: Optional[str]) -> bool: 748 while True: 749 event = self.event_wait('MIGRATION') 750 # We use the default timeout, and with a timeout, event_wait() 751 # never returns None 752 assert event 753 754 log(event, filters=[filter_qmp_event]) 755 if event['data']['status'] in ('completed', 'failed'): 756 break 757 758 if event['data']['status'] == 'completed': 759 # The event may occur in finish-migrate, so wait for the expected 760 # post-migration runstate 761 runstate = None 762 while runstate != expect_runstate: 763 runstate = self.qmp('query-status')['return']['status'] 764 return True 765 else: 766 return False 767 768 def node_info(self, node_name): 769 nodes = self.qmp('query-named-block-nodes') 770 for x in nodes['return']: 771 if x['node-name'] == node_name: 772 return x 773 return None 774 775 def query_bitmaps(self): 776 res = self.qmp("query-named-block-nodes") 777 return {device['node-name']: device['dirty-bitmaps'] 778 for device in res['return'] if 'dirty-bitmaps' in device} 779 780 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None): 781 """ 782 get a specific bitmap from the object returned by query_bitmaps. 783 :param recording: If specified, filter results by the specified value. 784 :param bitmaps: If specified, use it instead of call query_bitmaps() 785 """ 786 if bitmaps is None: 787 bitmaps = self.query_bitmaps() 788 789 for bitmap in bitmaps[node_name]: 790 if bitmap.get('name', '') == bitmap_name: 791 if recording is None or bitmap.get('recording') == recording: 792 return bitmap 793 return None 794 795 def check_bitmap_status(self, node_name, bitmap_name, fields): 796 ret = self.get_bitmap(node_name, bitmap_name) 797 798 return fields.items() <= ret.items() 799 800 def assert_block_path(self, root, path, expected_node, graph=None): 801 """ 802 Check whether the node under the given path in the block graph 803 is @expected_node. 804 805 @root is the node name of the node where the @path is rooted. 806 807 @path is a string that consists of child names separated by 808 slashes. It must begin with a slash. 809 810 Examples for @root + @path: 811 - root="qcow2-node", path="/backing/file" 812 - root="quorum-node", path="/children.2/file" 813 814 Hypothetically, @path could be empty, in which case it would 815 point to @root. However, in practice this case is not useful 816 and hence not allowed. 817 818 @expected_node may be None. (All elements of the path but the 819 leaf must still exist.) 820 821 @graph may be None or the result of an x-debug-query-block-graph 822 call that has already been performed. 823 """ 824 if graph is None: 825 graph = self.qmp('x-debug-query-block-graph')['return'] 826 827 iter_path = iter(path.split('/')) 828 829 # Must start with a / 830 assert next(iter_path) == '' 831 832 node = next((node for node in graph['nodes'] if node['name'] == root), 833 None) 834 835 # An empty @path is not allowed, so the root node must be present 836 assert node is not None, 'Root node %s not found' % root 837 838 for child_name in iter_path: 839 assert node is not None, 'Cannot follow path %s%s' % (root, path) 840 841 try: 842 node_id = next(edge['child'] for edge in graph['edges'] 843 if (edge['parent'] == node['id'] and 844 edge['name'] == child_name)) 845 846 node = next(node for node in graph['nodes'] 847 if node['id'] == node_id) 848 849 except StopIteration: 850 node = None 851 852 if node is None: 853 assert expected_node is None, \ 854 'No node found under %s (but expected %s)' % \ 855 (path, expected_node) 856 else: 857 assert node['name'] == expected_node, \ 858 'Found node %s under %s (but expected %s)' % \ 859 (node['name'], path, expected_node) 860 861index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 862 863class QMPTestCase(unittest.TestCase): 864 '''Abstract base class for QMP test cases''' 865 866 def __init__(self, *args, **kwargs): 867 super().__init__(*args, **kwargs) 868 # Many users of this class set a VM property we rely on heavily 869 # in the methods below. 870 self.vm = None 871 872 def dictpath(self, d, path): 873 '''Traverse a path in a nested dict''' 874 for component in path.split('/'): 875 m = index_re.match(component) 876 if m: 877 component, idx = m.groups() 878 idx = int(idx) 879 880 if not isinstance(d, dict) or component not in d: 881 self.fail(f'failed path traversal for "{path}" in "{d}"') 882 d = d[component] 883 884 if m: 885 if not isinstance(d, list): 886 self.fail(f'path component "{component}" in "{path}" ' 887 f'is not a list in "{d}"') 888 try: 889 d = d[idx] 890 except IndexError: 891 self.fail(f'invalid index "{idx}" in path "{path}" ' 892 f'in "{d}"') 893 return d 894 895 def assert_qmp_absent(self, d, path): 896 try: 897 result = self.dictpath(d, path) 898 except AssertionError: 899 return 900 self.fail('path "%s" has value "%s"' % (path, str(result))) 901 902 def assert_qmp(self, d, path, value): 903 '''Assert that the value for a specific path in a QMP dict 904 matches. When given a list of values, assert that any of 905 them matches.''' 906 907 result = self.dictpath(d, path) 908 909 # [] makes no sense as a list of valid values, so treat it as 910 # an actual single value. 911 if isinstance(value, list) and value != []: 912 for v in value: 913 if result == v: 914 return 915 self.fail('no match for "%s" in %s' % (str(result), str(value))) 916 else: 917 self.assertEqual(result, value, 918 '"%s" is "%s", expected "%s"' 919 % (path, str(result), str(value))) 920 921 def assert_no_active_block_jobs(self): 922 result = self.vm.qmp('query-block-jobs') 923 self.assert_qmp(result, 'return', []) 924 925 def assert_has_block_node(self, node_name=None, file_name=None): 926 """Issue a query-named-block-nodes and assert node_name and/or 927 file_name is present in the result""" 928 def check_equal_or_none(a, b): 929 return a is None or b is None or a == b 930 assert node_name or file_name 931 result = self.vm.qmp('query-named-block-nodes') 932 for x in result["return"]: 933 if check_equal_or_none(x.get("node-name"), node_name) and \ 934 check_equal_or_none(x.get("file"), file_name): 935 return 936 self.fail("Cannot find %s %s in result:\n%s" % 937 (node_name, file_name, result)) 938 939 def assert_json_filename_equal(self, json_filename, reference): 940 '''Asserts that the given filename is a json: filename and that its 941 content is equal to the given reference object''' 942 self.assertEqual(json_filename[:5], 'json:') 943 self.assertEqual( 944 self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 945 self.vm.flatten_qmp_object(reference) 946 ) 947 948 def cancel_and_wait(self, drive='drive0', force=False, 949 resume=False, wait=60.0): 950 '''Cancel a block job and wait for it to finish, returning the event''' 951 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 952 self.assert_qmp(result, 'return', {}) 953 954 if resume: 955 self.vm.resume_drive(drive) 956 957 cancelled = False 958 result = None 959 while not cancelled: 960 for event in self.vm.get_qmp_events(wait=wait): 961 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 962 event['event'] == 'BLOCK_JOB_CANCELLED': 963 self.assert_qmp(event, 'data/device', drive) 964 result = event 965 cancelled = True 966 elif event['event'] == 'JOB_STATUS_CHANGE': 967 self.assert_qmp(event, 'data/id', drive) 968 969 970 self.assert_no_active_block_jobs() 971 return result 972 973 def wait_until_completed(self, drive='drive0', check_offset=True, 974 wait=60.0, error=None): 975 '''Wait for a block job to finish, returning the event''' 976 while True: 977 for event in self.vm.get_qmp_events(wait=wait): 978 if event['event'] == 'BLOCK_JOB_COMPLETED': 979 self.assert_qmp(event, 'data/device', drive) 980 if error is None: 981 self.assert_qmp_absent(event, 'data/error') 982 if check_offset: 983 self.assert_qmp(event, 'data/offset', 984 event['data']['len']) 985 else: 986 self.assert_qmp(event, 'data/error', error) 987 self.assert_no_active_block_jobs() 988 return event 989 if event['event'] == 'JOB_STATUS_CHANGE': 990 self.assert_qmp(event, 'data/id', drive) 991 992 def wait_ready(self, drive='drive0'): 993 """Wait until a BLOCK_JOB_READY event, and return the event.""" 994 return self.vm.events_wait([ 995 ('BLOCK_JOB_READY', 996 {'data': {'type': 'mirror', 'device': drive}}), 997 ('BLOCK_JOB_READY', 998 {'data': {'type': 'commit', 'device': drive}}) 999 ]) 1000 1001 def wait_ready_and_cancel(self, drive='drive0'): 1002 self.wait_ready(drive=drive) 1003 event = self.cancel_and_wait(drive=drive) 1004 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 1005 self.assert_qmp(event, 'data/type', 'mirror') 1006 self.assert_qmp(event, 'data/offset', event['data']['len']) 1007 1008 def complete_and_wait(self, drive='drive0', wait_ready=True, 1009 completion_error=None): 1010 '''Complete a block job and wait for it to finish''' 1011 if wait_ready: 1012 self.wait_ready(drive=drive) 1013 1014 result = self.vm.qmp('block-job-complete', device=drive) 1015 self.assert_qmp(result, 'return', {}) 1016 1017 event = self.wait_until_completed(drive=drive, error=completion_error) 1018 self.assertTrue(event['data']['type'] in ['mirror', 'commit']) 1019 1020 def pause_wait(self, job_id='job0'): 1021 with Timeout(3, "Timeout waiting for job to pause"): 1022 while True: 1023 result = self.vm.qmp('query-block-jobs') 1024 found = False 1025 for job in result['return']: 1026 if job['device'] == job_id: 1027 found = True 1028 if job['paused'] and not job['busy']: 1029 return job 1030 break 1031 assert found 1032 1033 def pause_job(self, job_id='job0', wait=True): 1034 result = self.vm.qmp('block-job-pause', device=job_id) 1035 self.assert_qmp(result, 'return', {}) 1036 if wait: 1037 return self.pause_wait(job_id) 1038 return result 1039 1040 def case_skip(self, reason): 1041 '''Skip this test case''' 1042 case_notrun(reason) 1043 self.skipTest(reason) 1044 1045 1046def notrun(reason): 1047 '''Skip this test suite''' 1048 # Each test in qemu-iotests has a number ("seq") 1049 seq = os.path.basename(sys.argv[0]) 1050 1051 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') 1052 logger.warning("%s not run: %s", seq, reason) 1053 sys.exit(0) 1054 1055def case_notrun(reason): 1056 '''Mark this test case as not having been run (without actually 1057 skipping it, that is left to the caller). See 1058 QMPTestCase.case_skip() for a variant that actually skips the 1059 current test case.''' 1060 1061 # Each test in qemu-iotests has a number ("seq") 1062 seq = os.path.basename(sys.argv[0]) 1063 1064 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( 1065 ' [case not run] ' + reason + '\n') 1066 1067def _verify_image_format(supported_fmts: Sequence[str] = (), 1068 unsupported_fmts: Sequence[str] = ()) -> None: 1069 if 'generic' in supported_fmts and \ 1070 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 1071 # similar to 1072 # _supported_fmt generic 1073 # for bash tests 1074 supported_fmts = () 1075 1076 not_sup = supported_fmts and (imgfmt not in supported_fmts) 1077 if not_sup or (imgfmt in unsupported_fmts): 1078 notrun('not suitable for this image format: %s' % imgfmt) 1079 1080 if imgfmt == 'luks': 1081 verify_working_luks() 1082 1083def _verify_protocol(supported: Sequence[str] = (), 1084 unsupported: Sequence[str] = ()) -> None: 1085 assert not (supported and unsupported) 1086 1087 if 'generic' in supported: 1088 return 1089 1090 not_sup = supported and (imgproto not in supported) 1091 if not_sup or (imgproto in unsupported): 1092 notrun('not suitable for this protocol: %s' % imgproto) 1093 1094def _verify_platform(supported: Sequence[str] = (), 1095 unsupported: Sequence[str] = ()) -> None: 1096 if any((sys.platform.startswith(x) for x in unsupported)): 1097 notrun('not suitable for this OS: %s' % sys.platform) 1098 1099 if supported: 1100 if not any((sys.platform.startswith(x) for x in supported)): 1101 notrun('not suitable for this OS: %s' % sys.platform) 1102 1103def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: 1104 if supported_cache_modes and (cachemode not in supported_cache_modes): 1105 notrun('not suitable for this cache mode: %s' % cachemode) 1106 1107def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None: 1108 if supported_aio_modes and (aiomode not in supported_aio_modes): 1109 notrun('not suitable for this aio mode: %s' % aiomode) 1110 1111def supports_quorum(): 1112 return 'quorum' in qemu_img_pipe('--help') 1113 1114def verify_quorum(): 1115 '''Skip test suite if quorum support is not available''' 1116 if not supports_quorum(): 1117 notrun('quorum support missing') 1118 1119def has_working_luks() -> Tuple[bool, str]: 1120 """ 1121 Check whether our LUKS driver can actually create images 1122 (this extends to LUKS encryption for qcow2). 1123 1124 If not, return the reason why. 1125 """ 1126 1127 img_file = f'{test_dir}/luks-test.luks' 1128 (output, status) = \ 1129 qemu_img_pipe_and_status('create', '-f', 'luks', 1130 '--object', luks_default_secret_object, 1131 '-o', luks_default_key_secret_opt, 1132 '-o', 'iter-time=10', 1133 img_file, '1G') 1134 try: 1135 os.remove(img_file) 1136 except OSError: 1137 pass 1138 1139 if status != 0: 1140 reason = output 1141 for line in output.splitlines(): 1142 if img_file + ':' in line: 1143 reason = line.split(img_file + ':', 1)[1].strip() 1144 break 1145 1146 return (False, reason) 1147 else: 1148 return (True, '') 1149 1150def verify_working_luks(): 1151 """ 1152 Skip test suite if LUKS does not work 1153 """ 1154 (working, reason) = has_working_luks() 1155 if not working: 1156 notrun(reason) 1157 1158def qemu_pipe(*args: str) -> str: 1159 """ 1160 Run qemu with an option to print something and exit (e.g. a help option). 1161 1162 :return: QEMU's stdout output. 1163 """ 1164 full_args = [qemu_prog] + qemu_opts + list(args) 1165 output, _ = qemu_tool_pipe_and_status('qemu', full_args) 1166 return output 1167 1168def supported_formats(read_only=False): 1169 '''Set 'read_only' to True to check ro-whitelist 1170 Otherwise, rw-whitelist is checked''' 1171 1172 if not hasattr(supported_formats, "formats"): 1173 supported_formats.formats = {} 1174 1175 if read_only not in supported_formats.formats: 1176 format_message = qemu_pipe("-drive", "format=help") 1177 line = 1 if read_only else 0 1178 supported_formats.formats[read_only] = \ 1179 format_message.splitlines()[line].split(":")[1].split() 1180 1181 return supported_formats.formats[read_only] 1182 1183def skip_if_unsupported(required_formats=(), read_only=False): 1184 '''Skip Test Decorator 1185 Runs the test if all the required formats are whitelisted''' 1186 def skip_test_decorator(func): 1187 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1188 **kwargs: Dict[str, Any]) -> None: 1189 if callable(required_formats): 1190 fmts = required_formats(test_case) 1191 else: 1192 fmts = required_formats 1193 1194 usf_list = list(set(fmts) - set(supported_formats(read_only))) 1195 if usf_list: 1196 msg = f'{test_case}: formats {usf_list} are not whitelisted' 1197 test_case.case_skip(msg) 1198 else: 1199 func(test_case, *args, **kwargs) 1200 return func_wrapper 1201 return skip_test_decorator 1202 1203def skip_for_formats(formats: Sequence[str] = ()) \ 1204 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]], 1205 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]: 1206 '''Skip Test Decorator 1207 Skips the test for the given formats''' 1208 def skip_test_decorator(func): 1209 def func_wrapper(test_case: QMPTestCase, *args: List[Any], 1210 **kwargs: Dict[str, Any]) -> None: 1211 if imgfmt in formats: 1212 msg = f'{test_case}: Skipped for format {imgfmt}' 1213 test_case.case_skip(msg) 1214 else: 1215 func(test_case, *args, **kwargs) 1216 return func_wrapper 1217 return skip_test_decorator 1218 1219def skip_if_user_is_root(func): 1220 '''Skip Test Decorator 1221 Runs the test only without root permissions''' 1222 def func_wrapper(*args, **kwargs): 1223 if os.getuid() == 0: 1224 case_notrun('{}: cannot be run as root'.format(args[0])) 1225 return None 1226 else: 1227 return func(*args, **kwargs) 1228 return func_wrapper 1229 1230def execute_unittest(debug=False): 1231 """Executes unittests within the calling module.""" 1232 1233 verbosity = 2 if debug else 1 1234 1235 if debug: 1236 output = sys.stdout 1237 else: 1238 # We need to filter out the time taken from the output so that 1239 # qemu-iotest can reliably diff the results against master output. 1240 output = io.StringIO() 1241 1242 runner = unittest.TextTestRunner(stream=output, descriptions=True, 1243 verbosity=verbosity) 1244 try: 1245 # unittest.main() will use sys.exit(); so expect a SystemExit 1246 # exception 1247 unittest.main(testRunner=runner) 1248 finally: 1249 # We need to filter out the time taken from the output so that 1250 # qemu-iotest can reliably diff the results against master output. 1251 if not debug: 1252 out = output.getvalue() 1253 out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) 1254 1255 # Hide skipped tests from the reference output 1256 out = re.sub(r'OK \(skipped=\d+\)', 'OK', out) 1257 out_first_line, out_rest = out.split('\n', 1) 1258 out = out_first_line.replace('s', '.') + '\n' + out_rest 1259 1260 sys.stderr.write(out) 1261 1262def execute_setup_common(supported_fmts: Sequence[str] = (), 1263 supported_platforms: Sequence[str] = (), 1264 supported_cache_modes: Sequence[str] = (), 1265 supported_aio_modes: Sequence[str] = (), 1266 unsupported_fmts: Sequence[str] = (), 1267 supported_protocols: Sequence[str] = (), 1268 unsupported_protocols: Sequence[str] = ()) -> bool: 1269 """ 1270 Perform necessary setup for either script-style or unittest-style tests. 1271 1272 :return: Bool; Whether or not debug mode has been requested via the CLI. 1273 """ 1274 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. 1275 1276 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 1277 # indicate that we're not being run via "check". There may be 1278 # other things set up by "check" that individual test cases rely 1279 # on. 1280 if test_dir is None or qemu_default_machine is None: 1281 sys.stderr.write('Please run this test via the "check" script\n') 1282 sys.exit(os.EX_USAGE) 1283 1284 debug = '-d' in sys.argv 1285 if debug: 1286 sys.argv.remove('-d') 1287 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 1288 1289 _verify_image_format(supported_fmts, unsupported_fmts) 1290 _verify_protocol(supported_protocols, unsupported_protocols) 1291 _verify_platform(supported=supported_platforms) 1292 _verify_cache_mode(supported_cache_modes) 1293 _verify_aio_mode(supported_aio_modes) 1294 1295 return debug 1296 1297def execute_test(*args, test_function=None, **kwargs): 1298 """Run either unittest or script-style tests.""" 1299 1300 debug = execute_setup_common(*args, **kwargs) 1301 if not test_function: 1302 execute_unittest(debug) 1303 else: 1304 test_function() 1305 1306def activate_logging(): 1307 """Activate iotests.log() output to stdout for script-style tests.""" 1308 handler = logging.StreamHandler(stream=sys.stdout) 1309 formatter = logging.Formatter('%(message)s') 1310 handler.setFormatter(formatter) 1311 test_logger.addHandler(handler) 1312 test_logger.setLevel(logging.INFO) 1313 test_logger.propagate = False 1314 1315# This is called from script-style iotests without a single point of entry 1316def script_initialize(*args, **kwargs): 1317 """Initialize script-style tests without running any tests.""" 1318 activate_logging() 1319 execute_setup_common(*args, **kwargs) 1320 1321# This is called from script-style iotests with a single point of entry 1322def script_main(test_function, *args, **kwargs): 1323 """Run script-style tests outside of the unittest framework""" 1324 activate_logging() 1325 execute_test(*args, test_function=test_function, **kwargs) 1326 1327# This is called from unittest style iotests 1328def main(*args, **kwargs): 1329 """Run tests using the unittest framework""" 1330 execute_test(*args, **kwargs) 1331