xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision c306cdb0)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39from qemu.machine import qtest
40from qemu.qmp import QMPMessage
41
42# Use this logger for logging messages directly from the iotests module
43logger = logging.getLogger('qemu.iotests')
44logger.addHandler(logging.NullHandler())
45
46# Use this logger for messages that ought to be used for diff output.
47test_logger = logging.getLogger('qemu.iotests.diff_io')
48
49
50faulthandler.enable()
51
52# This will not work if arguments contain spaces but is necessary if we
53# want to support the override options that ./check supports.
54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55if os.environ.get('QEMU_IMG_OPTIONS'):
56    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
57
58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59if os.environ.get('QEMU_IO_OPTIONS'):
60    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
61
62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64    qemu_io_args_no_fmt += \
65        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
66
67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
68qemu_nbd_args = [qemu_nbd_prog]
69if os.environ.get('QEMU_NBD_OPTIONS'):
70    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71
72qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74
75gdb_qemu_env = os.environ.get('GDB_OPTIONS')
76qemu_gdb = []
77if gdb_qemu_env:
78    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
79
80qemu_print = os.environ.get('PRINT_QEMU', False)
81
82imgfmt = os.environ.get('IMGFMT', 'raw')
83imgproto = os.environ.get('IMGPROTO', 'file')
84output_dir = os.environ.get('OUTPUT_DIR', '.')
85
86try:
87    test_dir = os.environ['TEST_DIR']
88    sock_dir = os.environ['SOCK_DIR']
89    cachemode = os.environ['CACHEMODE']
90    aiomode = os.environ['AIOMODE']
91    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
92except KeyError:
93    # We are using these variables as proxies to indicate that we're
94    # not being run via "check". There may be other things set up by
95    # "check" that individual test cases rely on.
96    sys.stderr.write('Please run this test via the "check" script\n')
97    sys.exit(os.EX_USAGE)
98
99qemu_valgrind = []
100if os.environ.get('VALGRIND_QEMU') == "y" and \
101    os.environ.get('NO_VALGRIND') != "y":
102    valgrind_logfile = "--log-file=" + test_dir
103    # %p allows to put the valgrind process PID, since
104    # we don't know it a priori (subprocess.Popen is
105    # not yet invoked)
106    valgrind_logfile += "/%p.valgrind"
107
108    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
109
110luks_default_secret_object = 'secret,id=keysec0,data=' + \
111                             os.environ.get('IMGKEYSECRET', '')
112luks_default_key_secret_opt = 'key-secret=keysec0'
113
114sample_img_dir = os.environ['SAMPLE_IMG_DIR']
115
116
117def unarchive_sample_image(sample, fname):
118    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
119    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
120        shutil.copyfileobj(f_in, f_out)
121
122
123def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
124                              connect_stderr: bool = True) -> Tuple[str, int]:
125    """
126    Run a tool and return both its output and its exit code
127    """
128    stderr = subprocess.STDOUT if connect_stderr else None
129    with subprocess.Popen(args, stdout=subprocess.PIPE,
130                          stderr=stderr, universal_newlines=True) as subp:
131        output = subp.communicate()[0]
132        if subp.returncode < 0:
133            cmd = ' '.join(args)
134            sys.stderr.write(f'{tool} received signal \
135                               {-subp.returncode}: {cmd}\n')
136        return (output, subp.returncode)
137
138def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
139    """
140    Run qemu-img and return both its output and its exit code
141    """
142    full_args = qemu_img_args + list(args)
143    return qemu_tool_pipe_and_status('qemu-img', full_args)
144
145def qemu_img(*args: str) -> int:
146    '''Run qemu-img and return the exit code'''
147    return qemu_img_pipe_and_status(*args)[1]
148
149def ordered_qmp(qmsg, conv_keys=True):
150    # Dictionaries are not ordered prior to 3.6, therefore:
151    if isinstance(qmsg, list):
152        return [ordered_qmp(atom) for atom in qmsg]
153    if isinstance(qmsg, dict):
154        od = OrderedDict()
155        for k, v in sorted(qmsg.items()):
156            if conv_keys:
157                k = k.replace('_', '-')
158            od[k] = ordered_qmp(v, conv_keys=False)
159        return od
160    return qmsg
161
162def qemu_img_create(*args):
163    args = list(args)
164
165    # default luks support
166    if '-f' in args and args[args.index('-f') + 1] == 'luks':
167        if '-o' in args:
168            i = args.index('-o')
169            if 'key-secret' not in args[i + 1]:
170                args[i + 1].append(luks_default_key_secret_opt)
171                args.insert(i + 2, '--object')
172                args.insert(i + 3, luks_default_secret_object)
173        else:
174            args = ['-o', luks_default_key_secret_opt,
175                    '--object', luks_default_secret_object] + args
176
177    args.insert(0, 'create')
178
179    return qemu_img(*args)
180
181def qemu_img_measure(*args):
182    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
183
184def qemu_img_check(*args):
185    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
186
187def qemu_img_verbose(*args):
188    '''Run qemu-img without suppressing its output and return the exit code'''
189    exitcode = subprocess.call(qemu_img_args + list(args))
190    if exitcode < 0:
191        sys.stderr.write('qemu-img received signal %i: %s\n'
192                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
193    return exitcode
194
195def qemu_img_pipe(*args: str) -> str:
196    '''Run qemu-img and return its output'''
197    return qemu_img_pipe_and_status(*args)[0]
198
199def qemu_img_log(*args):
200    result = qemu_img_pipe(*args)
201    log(result, filters=[filter_testfiles])
202    return result
203
204def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
205    args = ['info']
206    if imgopts:
207        args.append('--image-opts')
208    else:
209        args += ['-f', imgfmt]
210    args += extra_args
211    args.append(filename)
212
213    output = qemu_img_pipe(*args)
214    if not filter_path:
215        filter_path = filename
216    log(filter_img_info(output, filter_path))
217
218def qemu_io(*args):
219    '''Run qemu-io and return the stdout data'''
220    args = qemu_io_args + list(args)
221    return qemu_tool_pipe_and_status('qemu-io', args)[0]
222
223def qemu_io_log(*args):
224    result = qemu_io(*args)
225    log(result, filters=[filter_testfiles, filter_qemu_io])
226    return result
227
228def qemu_io_silent(*args):
229    '''Run qemu-io and return the exit code, suppressing stdout'''
230    if '-f' in args or '--image-opts' in args:
231        default_args = qemu_io_args_no_fmt
232    else:
233        default_args = qemu_io_args
234
235    args = default_args + list(args)
236    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
237    if result.returncode < 0:
238        sys.stderr.write('qemu-io received signal %i: %s\n' %
239                         (-result.returncode, ' '.join(args)))
240    return result.returncode
241
242def qemu_io_silent_check(*args):
243    '''Run qemu-io and return the true if subprocess returned 0'''
244    args = qemu_io_args + list(args)
245    result = subprocess.run(args, stdout=subprocess.DEVNULL,
246                            stderr=subprocess.STDOUT, check=False)
247    return result.returncode == 0
248
249class QemuIoInteractive:
250    def __init__(self, *args):
251        self.args = qemu_io_args_no_fmt + list(args)
252        # We need to keep the Popen objext around, and not
253        # close it immediately. Therefore, disable the pylint check:
254        # pylint: disable=consider-using-with
255        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
256                                   stdout=subprocess.PIPE,
257                                   stderr=subprocess.STDOUT,
258                                   universal_newlines=True)
259        out = self._p.stdout.read(9)
260        if out != 'qemu-io> ':
261            # Most probably qemu-io just failed to start.
262            # Let's collect the whole output and exit.
263            out += self._p.stdout.read()
264            self._p.wait(timeout=1)
265            raise ValueError(out)
266
267    def close(self):
268        self._p.communicate('q\n')
269
270    def _read_output(self):
271        pattern = 'qemu-io> '
272        n = len(pattern)
273        pos = 0
274        s = []
275        while pos != n:
276            c = self._p.stdout.read(1)
277            # check unexpected EOF
278            assert c != ''
279            s.append(c)
280            if c == pattern[pos]:
281                pos += 1
282            else:
283                pos = 0
284
285        return ''.join(s[:-n])
286
287    def cmd(self, cmd):
288        # quit command is in close(), '\n' is added automatically
289        assert '\n' not in cmd
290        cmd = cmd.strip()
291        assert cmd not in ('q', 'quit')
292        self._p.stdin.write(cmd + '\n')
293        self._p.stdin.flush()
294        return self._read_output()
295
296
297def qemu_nbd(*args):
298    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
299    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
300
301def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
302    '''Run qemu-nbd in daemon mode and return both the parent's exit code
303       and its output in case of an error'''
304    full_args = qemu_nbd_args + ['--fork'] + list(args)
305    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
306                                                   connect_stderr=False)
307    return returncode, output if returncode else ''
308
309def qemu_nbd_list_log(*args: str) -> str:
310    '''Run qemu-nbd to list remote exports'''
311    full_args = [qemu_nbd_prog, '-L'] + list(args)
312    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
313    log(output, filters=[filter_testfiles, filter_nbd_exports])
314    return output
315
316@contextmanager
317def qemu_nbd_popen(*args):
318    '''Context manager running qemu-nbd within the context'''
319    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
320
321    assert not os.path.exists(pid_file)
322
323    cmd = list(qemu_nbd_args)
324    cmd.extend(('--persistent', '--pid-file', pid_file))
325    cmd.extend(args)
326
327    log('Start NBD server')
328    with subprocess.Popen(cmd) as p:
329        try:
330            while not os.path.exists(pid_file):
331                if p.poll() is not None:
332                    raise RuntimeError(
333                        "qemu-nbd terminated with exit code {}: {}"
334                        .format(p.returncode, ' '.join(cmd)))
335
336                time.sleep(0.01)
337            yield
338        finally:
339            if os.path.exists(pid_file):
340                os.remove(pid_file)
341            log('Kill NBD server')
342            p.kill()
343            p.wait()
344
345def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
346    '''Return True if two image files are identical'''
347    return qemu_img('compare', '-f', fmt1,
348                    '-F', fmt2, img1, img2) == 0
349
350def create_image(name, size):
351    '''Create a fully-allocated raw image with sector markers'''
352    with open(name, 'wb') as file:
353        i = 0
354        while i < size:
355            sector = struct.pack('>l504xl', i // 512, i // 512)
356            file.write(sector)
357            i = i + 512
358
359def image_size(img):
360    '''Return image's virtual size'''
361    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
362    return json.loads(r)['virtual-size']
363
364def is_str(val):
365    return isinstance(val, str)
366
367test_dir_re = re.compile(r"%s" % test_dir)
368def filter_test_dir(msg):
369    return test_dir_re.sub("TEST_DIR", msg)
370
371win32_re = re.compile(r"\r")
372def filter_win32(msg):
373    return win32_re.sub("", msg)
374
375qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
376                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
377                        r"and [0-9\/.inf]* ops\/sec\)")
378def filter_qemu_io(msg):
379    msg = filter_win32(msg)
380    return qemu_io_re.sub("X ops; XX:XX:XX.X "
381                          "(XXX YYY/sec and XXX ops/sec)", msg)
382
383chown_re = re.compile(r"chown [0-9]+:[0-9]+")
384def filter_chown(msg):
385    return chown_re.sub("chown UID:GID", msg)
386
387def filter_qmp_event(event):
388    '''Filter a QMP event dict'''
389    event = dict(event)
390    if 'timestamp' in event:
391        event['timestamp']['seconds'] = 'SECS'
392        event['timestamp']['microseconds'] = 'USECS'
393    return event
394
395def filter_qmp(qmsg, filter_fn):
396    '''Given a string filter, filter a QMP object's values.
397    filter_fn takes a (key, value) pair.'''
398    # Iterate through either lists or dicts;
399    if isinstance(qmsg, list):
400        items = enumerate(qmsg)
401    else:
402        items = qmsg.items()
403
404    for k, v in items:
405        if isinstance(v, (dict, list)):
406            qmsg[k] = filter_qmp(v, filter_fn)
407        else:
408            qmsg[k] = filter_fn(k, v)
409    return qmsg
410
411def filter_testfiles(msg):
412    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
413    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
414    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
415
416def filter_qmp_testfiles(qmsg):
417    def _filter(_key, value):
418        if is_str(value):
419            return filter_testfiles(value)
420        return value
421    return filter_qmp(qmsg, _filter)
422
423def filter_virtio_scsi(output: str) -> str:
424    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
425
426def filter_qmp_virtio_scsi(qmsg):
427    def _filter(_key, value):
428        if is_str(value):
429            return filter_virtio_scsi(value)
430        return value
431    return filter_qmp(qmsg, _filter)
432
433def filter_generated_node_ids(msg):
434    return re.sub("#block[0-9]+", "NODE_NAME", msg)
435
436def filter_img_info(output, filename):
437    lines = []
438    for line in output.split('\n'):
439        if 'disk size' in line or 'actual-size' in line:
440            continue
441        line = line.replace(filename, 'TEST_IMG')
442        line = filter_testfiles(line)
443        line = line.replace(imgfmt, 'IMGFMT')
444        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
445        line = re.sub('uuid: [-a-f0-9]+',
446                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
447                      line)
448        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
449        lines.append(line)
450    return '\n'.join(lines)
451
452def filter_imgfmt(msg):
453    return msg.replace(imgfmt, 'IMGFMT')
454
455def filter_qmp_imgfmt(qmsg):
456    def _filter(_key, value):
457        if is_str(value):
458            return filter_imgfmt(value)
459        return value
460    return filter_qmp(qmsg, _filter)
461
462def filter_nbd_exports(output: str) -> str:
463    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
464
465
466Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
467
468def log(msg: Msg,
469        filters: Iterable[Callable[[Msg], Msg]] = (),
470        indent: Optional[int] = None) -> None:
471    """
472    Logs either a string message or a JSON serializable message (like QMP).
473    If indent is provided, JSON serializable messages are pretty-printed.
474    """
475    for flt in filters:
476        msg = flt(msg)
477    if isinstance(msg, (dict, list)):
478        # Don't sort if it's already sorted
479        do_sort = not isinstance(msg, OrderedDict)
480        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
481    else:
482        test_logger.info(msg)
483
484class Timeout:
485    def __init__(self, seconds, errmsg="Timeout"):
486        self.seconds = seconds
487        self.errmsg = errmsg
488    def __enter__(self):
489        if qemu_gdb or qemu_valgrind:
490            return self
491        signal.signal(signal.SIGALRM, self.timeout)
492        signal.setitimer(signal.ITIMER_REAL, self.seconds)
493        return self
494    def __exit__(self, exc_type, value, traceback):
495        if qemu_gdb or qemu_valgrind:
496            return False
497        signal.setitimer(signal.ITIMER_REAL, 0)
498        return False
499    def timeout(self, signum, frame):
500        raise Exception(self.errmsg)
501
502def file_pattern(name):
503    return "{0}-{1}".format(os.getpid(), name)
504
505class FilePath:
506    """
507    Context manager generating multiple file names. The generated files are
508    removed when exiting the context.
509
510    Example usage:
511
512        with FilePath('a.img', 'b.img') as (img_a, img_b):
513            # Use img_a and img_b here...
514
515        # a.img and b.img are automatically removed here.
516
517    By default images are created in iotests.test_dir. To create sockets use
518    iotests.sock_dir:
519
520       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
521
522    For convenience, calling with one argument yields a single file instead of
523    a tuple with one item.
524
525    """
526    def __init__(self, *names, base_dir=test_dir):
527        self.paths = [os.path.join(base_dir, file_pattern(name))
528                      for name in names]
529
530    def __enter__(self):
531        if len(self.paths) == 1:
532            return self.paths[0]
533        else:
534            return self.paths
535
536    def __exit__(self, exc_type, exc_val, exc_tb):
537        for path in self.paths:
538            try:
539                os.remove(path)
540            except OSError:
541                pass
542        return False
543
544
545def try_remove(img):
546    try:
547        os.remove(img)
548    except OSError:
549        pass
550
551def file_path_remover():
552    for path in reversed(file_path_remover.paths):
553        try_remove(path)
554
555
556def file_path(*names, base_dir=test_dir):
557    ''' Another way to get auto-generated filename that cleans itself up.
558
559    Use is as simple as:
560
561    img_a, img_b = file_path('a.img', 'b.img')
562    sock = file_path('socket')
563    '''
564
565    if not hasattr(file_path_remover, 'paths'):
566        file_path_remover.paths = []
567        atexit.register(file_path_remover)
568
569    paths = []
570    for name in names:
571        filename = file_pattern(name)
572        path = os.path.join(base_dir, filename)
573        file_path_remover.paths.append(path)
574        paths.append(path)
575
576    return paths[0] if len(paths) == 1 else paths
577
578def remote_filename(path):
579    if imgproto == 'file':
580        return path
581    elif imgproto == 'ssh':
582        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
583    else:
584        raise Exception("Protocol %s not supported" % (imgproto))
585
586class VM(qtest.QEMUQtestMachine):
587    '''A QEMU VM'''
588
589    def __init__(self, path_suffix=''):
590        name = "qemu%s-%d" % (path_suffix, os.getpid())
591        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
592        if qemu_gdb and qemu_valgrind:
593            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
594            sys.exit(1)
595        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
596        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
597                         name=name,
598                         base_temp_dir=test_dir,
599                         sock_dir=sock_dir, qmp_timer=timer)
600        self._num_drives = 0
601
602    def _post_shutdown(self) -> None:
603        super()._post_shutdown()
604        if not qemu_valgrind or not self._popen:
605            return
606        valgrind_filename =  f"{test_dir}/{self._popen.pid}.valgrind"
607        if self.exitcode() == 99:
608            with open(valgrind_filename, encoding='utf-8') as f:
609                print(f.read())
610        else:
611            os.remove(valgrind_filename)
612
613    def _pre_launch(self) -> None:
614        super()._pre_launch()
615        if qemu_print:
616            # set QEMU binary output to stdout
617            self._close_qemu_log_file()
618
619    def add_object(self, opts):
620        self._args.append('-object')
621        self._args.append(opts)
622        return self
623
624    def add_device(self, opts):
625        self._args.append('-device')
626        self._args.append(opts)
627        return self
628
629    def add_drive_raw(self, opts):
630        self._args.append('-drive')
631        self._args.append(opts)
632        return self
633
634    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
635        '''Add a virtio-blk drive to the VM'''
636        options = ['if=%s' % interface,
637                   'id=drive%d' % self._num_drives]
638
639        if path is not None:
640            options.append('file=%s' % path)
641            options.append('format=%s' % img_format)
642            options.append('cache=%s' % cachemode)
643            options.append('aio=%s' % aiomode)
644
645        if opts:
646            options.append(opts)
647
648        if img_format == 'luks' and 'key-secret' not in opts:
649            # default luks support
650            if luks_default_secret_object not in self._args:
651                self.add_object(luks_default_secret_object)
652
653            options.append(luks_default_key_secret_opt)
654
655        self._args.append('-drive')
656        self._args.append(','.join(options))
657        self._num_drives += 1
658        return self
659
660    def add_blockdev(self, opts):
661        self._args.append('-blockdev')
662        if isinstance(opts, str):
663            self._args.append(opts)
664        else:
665            self._args.append(','.join(opts))
666        return self
667
668    def add_incoming(self, addr):
669        self._args.append('-incoming')
670        self._args.append(addr)
671        return self
672
673    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
674        cmd = 'human-monitor-command'
675        kwargs: Dict[str, Any] = {'command-line': command_line}
676        if use_log:
677            return self.qmp_log(cmd, **kwargs)
678        else:
679            return self.qmp(cmd, **kwargs)
680
681    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
682        """Pause drive r/w operations"""
683        if not event:
684            self.pause_drive(drive, "read_aio")
685            self.pause_drive(drive, "write_aio")
686            return
687        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
688
689    def resume_drive(self, drive: str) -> None:
690        """Resume drive r/w operations"""
691        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
692
693    def hmp_qemu_io(self, drive: str, cmd: str,
694                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
695        """Write to a given drive using an HMP command"""
696        d = '-d ' if qdev else ''
697        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
698
699    def flatten_qmp_object(self, obj, output=None, basestr=''):
700        if output is None:
701            output = {}
702        if isinstance(obj, list):
703            for i, item in enumerate(obj):
704                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
705        elif isinstance(obj, dict):
706            for key in obj:
707                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
708        else:
709            output[basestr[:-1]] = obj # Strip trailing '.'
710        return output
711
712    def qmp_to_opts(self, obj):
713        obj = self.flatten_qmp_object(obj)
714        output_list = []
715        for key in obj:
716            output_list += [key + '=' + obj[key]]
717        return ','.join(output_list)
718
719    def get_qmp_events_filtered(self, wait=60.0):
720        result = []
721        for ev in self.get_qmp_events(wait=wait):
722            result.append(filter_qmp_event(ev))
723        return result
724
725    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
726        full_cmd = OrderedDict((
727            ("execute", cmd),
728            ("arguments", ordered_qmp(kwargs))
729        ))
730        log(full_cmd, filters, indent=indent)
731        result = self.qmp(cmd, **kwargs)
732        log(result, filters, indent=indent)
733        return result
734
735    # Returns None on success, and an error string on failure
736    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
737                pre_finalize=None, cancel=False, wait=60.0):
738        """
739        run_job moves a job from creation through to dismissal.
740
741        :param job: String. ID of recently-launched job
742        :param auto_finalize: Bool. True if the job was launched with
743                              auto_finalize. Defaults to True.
744        :param auto_dismiss: Bool. True if the job was launched with
745                             auto_dismiss=True. Defaults to False.
746        :param pre_finalize: Callback. A callable that takes no arguments to be
747                             invoked prior to issuing job-finalize, if any.
748        :param cancel: Bool. When true, cancels the job after the pre_finalize
749                       callback.
750        :param wait: Float. Timeout value specifying how long to wait for any
751                     event, in seconds. Defaults to 60.0.
752        """
753        match_device = {'data': {'device': job}}
754        match_id = {'data': {'id': job}}
755        events = [
756            ('BLOCK_JOB_COMPLETED', match_device),
757            ('BLOCK_JOB_CANCELLED', match_device),
758            ('BLOCK_JOB_ERROR', match_device),
759            ('BLOCK_JOB_READY', match_device),
760            ('BLOCK_JOB_PENDING', match_id),
761            ('JOB_STATUS_CHANGE', match_id)
762        ]
763        error = None
764        while True:
765            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
766            if ev['event'] != 'JOB_STATUS_CHANGE':
767                log(ev)
768                continue
769            status = ev['data']['status']
770            if status == 'aborting':
771                result = self.qmp('query-jobs')
772                for j in result['return']:
773                    if j['id'] == job:
774                        error = j['error']
775                        log('Job failed: %s' % (j['error']))
776            elif status == 'ready':
777                self.qmp_log('job-complete', id=job)
778            elif status == 'pending' and not auto_finalize:
779                if pre_finalize:
780                    pre_finalize()
781                if cancel:
782                    self.qmp_log('job-cancel', id=job)
783                else:
784                    self.qmp_log('job-finalize', id=job)
785            elif status == 'concluded' and not auto_dismiss:
786                self.qmp_log('job-dismiss', id=job)
787            elif status == 'null':
788                return error
789
790    # Returns None on success, and an error string on failure
791    def blockdev_create(self, options, job_id='job0', filters=None):
792        if filters is None:
793            filters = [filter_qmp_testfiles]
794        result = self.qmp_log('blockdev-create', filters=filters,
795                              job_id=job_id, options=options)
796
797        if 'return' in result:
798            assert result['return'] == {}
799            job_result = self.run_job(job_id)
800        else:
801            job_result = result['error']
802
803        log("")
804        return job_result
805
806    def enable_migration_events(self, name):
807        log('Enabling migration QMP events on %s...' % name)
808        log(self.qmp('migrate-set-capabilities', capabilities=[
809            {
810                'capability': 'events',
811                'state': True
812            }
813        ]))
814
815    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
816        while True:
817            event = self.event_wait('MIGRATION')
818            # We use the default timeout, and with a timeout, event_wait()
819            # never returns None
820            assert event
821
822            log(event, filters=[filter_qmp_event])
823            if event['data']['status'] in ('completed', 'failed'):
824                break
825
826        if event['data']['status'] == 'completed':
827            # The event may occur in finish-migrate, so wait for the expected
828            # post-migration runstate
829            runstate = None
830            while runstate != expect_runstate:
831                runstate = self.qmp('query-status')['return']['status']
832            return True
833        else:
834            return False
835
836    def node_info(self, node_name):
837        nodes = self.qmp('query-named-block-nodes')
838        for x in nodes['return']:
839            if x['node-name'] == node_name:
840                return x
841        return None
842
843    def query_bitmaps(self):
844        res = self.qmp("query-named-block-nodes")
845        return {device['node-name']: device['dirty-bitmaps']
846                for device in res['return'] if 'dirty-bitmaps' in device}
847
848    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
849        """
850        get a specific bitmap from the object returned by query_bitmaps.
851        :param recording: If specified, filter results by the specified value.
852        :param bitmaps: If specified, use it instead of call query_bitmaps()
853        """
854        if bitmaps is None:
855            bitmaps = self.query_bitmaps()
856
857        for bitmap in bitmaps[node_name]:
858            if bitmap.get('name', '') == bitmap_name:
859                if recording is None or bitmap.get('recording') == recording:
860                    return bitmap
861        return None
862
863    def check_bitmap_status(self, node_name, bitmap_name, fields):
864        ret = self.get_bitmap(node_name, bitmap_name)
865
866        return fields.items() <= ret.items()
867
868    def assert_block_path(self, root, path, expected_node, graph=None):
869        """
870        Check whether the node under the given path in the block graph
871        is @expected_node.
872
873        @root is the node name of the node where the @path is rooted.
874
875        @path is a string that consists of child names separated by
876        slashes.  It must begin with a slash.
877
878        Examples for @root + @path:
879          - root="qcow2-node", path="/backing/file"
880          - root="quorum-node", path="/children.2/file"
881
882        Hypothetically, @path could be empty, in which case it would
883        point to @root.  However, in practice this case is not useful
884        and hence not allowed.
885
886        @expected_node may be None.  (All elements of the path but the
887        leaf must still exist.)
888
889        @graph may be None or the result of an x-debug-query-block-graph
890        call that has already been performed.
891        """
892        if graph is None:
893            graph = self.qmp('x-debug-query-block-graph')['return']
894
895        iter_path = iter(path.split('/'))
896
897        # Must start with a /
898        assert next(iter_path) == ''
899
900        node = next((node for node in graph['nodes'] if node['name'] == root),
901                    None)
902
903        # An empty @path is not allowed, so the root node must be present
904        assert node is not None, 'Root node %s not found' % root
905
906        for child_name in iter_path:
907            assert node is not None, 'Cannot follow path %s%s' % (root, path)
908
909            try:
910                node_id = next(edge['child'] for edge in graph['edges']
911                               if (edge['parent'] == node['id'] and
912                                   edge['name'] == child_name))
913
914                node = next(node for node in graph['nodes']
915                            if node['id'] == node_id)
916
917            except StopIteration:
918                node = None
919
920        if node is None:
921            assert expected_node is None, \
922                   'No node found under %s (but expected %s)' % \
923                   (path, expected_node)
924        else:
925            assert node['name'] == expected_node, \
926                   'Found node %s under %s (but expected %s)' % \
927                   (node['name'], path, expected_node)
928
929index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
930
931class QMPTestCase(unittest.TestCase):
932    '''Abstract base class for QMP test cases'''
933
934    def __init__(self, *args, **kwargs):
935        super().__init__(*args, **kwargs)
936        # Many users of this class set a VM property we rely on heavily
937        # in the methods below.
938        self.vm = None
939
940    def dictpath(self, d, path):
941        '''Traverse a path in a nested dict'''
942        for component in path.split('/'):
943            m = index_re.match(component)
944            if m:
945                component, idx = m.groups()
946                idx = int(idx)
947
948            if not isinstance(d, dict) or component not in d:
949                self.fail(f'failed path traversal for "{path}" in "{d}"')
950            d = d[component]
951
952            if m:
953                if not isinstance(d, list):
954                    self.fail(f'path component "{component}" in "{path}" '
955                              f'is not a list in "{d}"')
956                try:
957                    d = d[idx]
958                except IndexError:
959                    self.fail(f'invalid index "{idx}" in path "{path}" '
960                              f'in "{d}"')
961        return d
962
963    def assert_qmp_absent(self, d, path):
964        try:
965            result = self.dictpath(d, path)
966        except AssertionError:
967            return
968        self.fail('path "%s" has value "%s"' % (path, str(result)))
969
970    def assert_qmp(self, d, path, value):
971        '''Assert that the value for a specific path in a QMP dict
972           matches.  When given a list of values, assert that any of
973           them matches.'''
974
975        result = self.dictpath(d, path)
976
977        # [] makes no sense as a list of valid values, so treat it as
978        # an actual single value.
979        if isinstance(value, list) and value != []:
980            for v in value:
981                if result == v:
982                    return
983            self.fail('no match for "%s" in %s' % (str(result), str(value)))
984        else:
985            self.assertEqual(result, value,
986                             '"%s" is "%s", expected "%s"'
987                             % (path, str(result), str(value)))
988
989    def assert_no_active_block_jobs(self):
990        result = self.vm.qmp('query-block-jobs')
991        self.assert_qmp(result, 'return', [])
992
993    def assert_has_block_node(self, node_name=None, file_name=None):
994        """Issue a query-named-block-nodes and assert node_name and/or
995        file_name is present in the result"""
996        def check_equal_or_none(a, b):
997            return a is None or b is None or a == b
998        assert node_name or file_name
999        result = self.vm.qmp('query-named-block-nodes')
1000        for x in result["return"]:
1001            if check_equal_or_none(x.get("node-name"), node_name) and \
1002                    check_equal_or_none(x.get("file"), file_name):
1003                return
1004        self.fail("Cannot find %s %s in result:\n%s" %
1005                  (node_name, file_name, result))
1006
1007    def assert_json_filename_equal(self, json_filename, reference):
1008        '''Asserts that the given filename is a json: filename and that its
1009           content is equal to the given reference object'''
1010        self.assertEqual(json_filename[:5], 'json:')
1011        self.assertEqual(
1012            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1013            self.vm.flatten_qmp_object(reference)
1014        )
1015
1016    def cancel_and_wait(self, drive='drive0', force=False,
1017                        resume=False, wait=60.0):
1018        '''Cancel a block job and wait for it to finish, returning the event'''
1019        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1020        self.assert_qmp(result, 'return', {})
1021
1022        if resume:
1023            self.vm.resume_drive(drive)
1024
1025        cancelled = False
1026        result = None
1027        while not cancelled:
1028            for event in self.vm.get_qmp_events(wait=wait):
1029                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1030                   event['event'] == 'BLOCK_JOB_CANCELLED':
1031                    self.assert_qmp(event, 'data/device', drive)
1032                    result = event
1033                    cancelled = True
1034                elif event['event'] == 'JOB_STATUS_CHANGE':
1035                    self.assert_qmp(event, 'data/id', drive)
1036
1037
1038        self.assert_no_active_block_jobs()
1039        return result
1040
1041    def wait_until_completed(self, drive='drive0', check_offset=True,
1042                             wait=60.0, error=None):
1043        '''Wait for a block job to finish, returning the event'''
1044        while True:
1045            for event in self.vm.get_qmp_events(wait=wait):
1046                if event['event'] == 'BLOCK_JOB_COMPLETED':
1047                    self.assert_qmp(event, 'data/device', drive)
1048                    if error is None:
1049                        self.assert_qmp_absent(event, 'data/error')
1050                        if check_offset:
1051                            self.assert_qmp(event, 'data/offset',
1052                                            event['data']['len'])
1053                    else:
1054                        self.assert_qmp(event, 'data/error', error)
1055                    self.assert_no_active_block_jobs()
1056                    return event
1057                if event['event'] == 'JOB_STATUS_CHANGE':
1058                    self.assert_qmp(event, 'data/id', drive)
1059
1060    def wait_ready(self, drive='drive0'):
1061        """Wait until a BLOCK_JOB_READY event, and return the event."""
1062        return self.vm.events_wait([
1063            ('BLOCK_JOB_READY',
1064             {'data': {'type': 'mirror', 'device': drive}}),
1065            ('BLOCK_JOB_READY',
1066             {'data': {'type': 'commit', 'device': drive}})
1067        ])
1068
1069    def wait_ready_and_cancel(self, drive='drive0'):
1070        self.wait_ready(drive=drive)
1071        event = self.cancel_and_wait(drive=drive)
1072        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1073        self.assert_qmp(event, 'data/type', 'mirror')
1074        self.assert_qmp(event, 'data/offset', event['data']['len'])
1075
1076    def complete_and_wait(self, drive='drive0', wait_ready=True,
1077                          completion_error=None):
1078        '''Complete a block job and wait for it to finish'''
1079        if wait_ready:
1080            self.wait_ready(drive=drive)
1081
1082        result = self.vm.qmp('block-job-complete', device=drive)
1083        self.assert_qmp(result, 'return', {})
1084
1085        event = self.wait_until_completed(drive=drive, error=completion_error)
1086        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1087
1088    def pause_wait(self, job_id='job0'):
1089        with Timeout(3, "Timeout waiting for job to pause"):
1090            while True:
1091                result = self.vm.qmp('query-block-jobs')
1092                found = False
1093                for job in result['return']:
1094                    if job['device'] == job_id:
1095                        found = True
1096                        if job['paused'] and not job['busy']:
1097                            return job
1098                        break
1099                assert found
1100
1101    def pause_job(self, job_id='job0', wait=True):
1102        result = self.vm.qmp('block-job-pause', device=job_id)
1103        self.assert_qmp(result, 'return', {})
1104        if wait:
1105            return self.pause_wait(job_id)
1106        return result
1107
1108    def case_skip(self, reason):
1109        '''Skip this test case'''
1110        case_notrun(reason)
1111        self.skipTest(reason)
1112
1113
1114def notrun(reason):
1115    '''Skip this test suite'''
1116    # Each test in qemu-iotests has a number ("seq")
1117    seq = os.path.basename(sys.argv[0])
1118
1119    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1120            as outfile:
1121        outfile.write(reason + '\n')
1122    logger.warning("%s not run: %s", seq, reason)
1123    sys.exit(0)
1124
1125def case_notrun(reason):
1126    '''Mark this test case as not having been run (without actually
1127    skipping it, that is left to the caller).  See
1128    QMPTestCase.case_skip() for a variant that actually skips the
1129    current test case.'''
1130
1131    # Each test in qemu-iotests has a number ("seq")
1132    seq = os.path.basename(sys.argv[0])
1133
1134    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1135            as outfile:
1136        outfile.write('    [case not run] ' + reason + '\n')
1137
1138def _verify_image_format(supported_fmts: Sequence[str] = (),
1139                         unsupported_fmts: Sequence[str] = ()) -> None:
1140    if 'generic' in supported_fmts and \
1141            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1142        # similar to
1143        #   _supported_fmt generic
1144        # for bash tests
1145        supported_fmts = ()
1146
1147    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1148    if not_sup or (imgfmt in unsupported_fmts):
1149        notrun('not suitable for this image format: %s' % imgfmt)
1150
1151    if imgfmt == 'luks':
1152        verify_working_luks()
1153
1154def _verify_protocol(supported: Sequence[str] = (),
1155                     unsupported: Sequence[str] = ()) -> None:
1156    assert not (supported and unsupported)
1157
1158    if 'generic' in supported:
1159        return
1160
1161    not_sup = supported and (imgproto not in supported)
1162    if not_sup or (imgproto in unsupported):
1163        notrun('not suitable for this protocol: %s' % imgproto)
1164
1165def _verify_platform(supported: Sequence[str] = (),
1166                     unsupported: Sequence[str] = ()) -> None:
1167    if any((sys.platform.startswith(x) for x in unsupported)):
1168        notrun('not suitable for this OS: %s' % sys.platform)
1169
1170    if supported:
1171        if not any((sys.platform.startswith(x) for x in supported)):
1172            notrun('not suitable for this OS: %s' % sys.platform)
1173
1174def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1175    if supported_cache_modes and (cachemode not in supported_cache_modes):
1176        notrun('not suitable for this cache mode: %s' % cachemode)
1177
1178def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1179    if supported_aio_modes and (aiomode not in supported_aio_modes):
1180        notrun('not suitable for this aio mode: %s' % aiomode)
1181
1182def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1183    usf_list = list(set(required_formats) - set(supported_formats()))
1184    if usf_list:
1185        notrun(f'formats {usf_list} are not whitelisted')
1186
1187
1188def _verify_virtio_blk() -> None:
1189    out = qemu_pipe('-M', 'none', '-device', 'help')
1190    if 'virtio-blk' not in out:
1191        notrun('Missing virtio-blk in QEMU binary')
1192
1193def _verify_virtio_scsi_pci_or_ccw() -> None:
1194    out = qemu_pipe('-M', 'none', '-device', 'help')
1195    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1196        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1197
1198
1199def supports_quorum():
1200    return 'quorum' in qemu_img_pipe('--help')
1201
1202def verify_quorum():
1203    '''Skip test suite if quorum support is not available'''
1204    if not supports_quorum():
1205        notrun('quorum support missing')
1206
1207def has_working_luks() -> Tuple[bool, str]:
1208    """
1209    Check whether our LUKS driver can actually create images
1210    (this extends to LUKS encryption for qcow2).
1211
1212    If not, return the reason why.
1213    """
1214
1215    img_file = f'{test_dir}/luks-test.luks'
1216    (output, status) = \
1217        qemu_img_pipe_and_status('create', '-f', 'luks',
1218                                 '--object', luks_default_secret_object,
1219                                 '-o', luks_default_key_secret_opt,
1220                                 '-o', 'iter-time=10',
1221                                 img_file, '1G')
1222    try:
1223        os.remove(img_file)
1224    except OSError:
1225        pass
1226
1227    if status != 0:
1228        reason = output
1229        for line in output.splitlines():
1230            if img_file + ':' in line:
1231                reason = line.split(img_file + ':', 1)[1].strip()
1232                break
1233
1234        return (False, reason)
1235    else:
1236        return (True, '')
1237
1238def verify_working_luks():
1239    """
1240    Skip test suite if LUKS does not work
1241    """
1242    (working, reason) = has_working_luks()
1243    if not working:
1244        notrun(reason)
1245
1246def qemu_pipe(*args: str) -> str:
1247    """
1248    Run qemu with an option to print something and exit (e.g. a help option).
1249
1250    :return: QEMU's stdout output.
1251    """
1252    full_args = [qemu_prog] + qemu_opts + list(args)
1253    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1254    return output
1255
1256def supported_formats(read_only=False):
1257    '''Set 'read_only' to True to check ro-whitelist
1258       Otherwise, rw-whitelist is checked'''
1259
1260    if not hasattr(supported_formats, "formats"):
1261        supported_formats.formats = {}
1262
1263    if read_only not in supported_formats.formats:
1264        format_message = qemu_pipe("-drive", "format=help")
1265        line = 1 if read_only else 0
1266        supported_formats.formats[read_only] = \
1267            format_message.splitlines()[line].split(":")[1].split()
1268
1269    return supported_formats.formats[read_only]
1270
1271def skip_if_unsupported(required_formats=(), read_only=False):
1272    '''Skip Test Decorator
1273       Runs the test if all the required formats are whitelisted'''
1274    def skip_test_decorator(func):
1275        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1276                         **kwargs: Dict[str, Any]) -> None:
1277            if callable(required_formats):
1278                fmts = required_formats(test_case)
1279            else:
1280                fmts = required_formats
1281
1282            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1283            if usf_list:
1284                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1285                test_case.case_skip(msg)
1286            else:
1287                func(test_case, *args, **kwargs)
1288        return func_wrapper
1289    return skip_test_decorator
1290
1291def skip_for_formats(formats: Sequence[str] = ()) \
1292    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1293                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1294    '''Skip Test Decorator
1295       Skips the test for the given formats'''
1296    def skip_test_decorator(func):
1297        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1298                         **kwargs: Dict[str, Any]) -> None:
1299            if imgfmt in formats:
1300                msg = f'{test_case}: Skipped for format {imgfmt}'
1301                test_case.case_skip(msg)
1302            else:
1303                func(test_case, *args, **kwargs)
1304        return func_wrapper
1305    return skip_test_decorator
1306
1307def skip_if_user_is_root(func):
1308    '''Skip Test Decorator
1309       Runs the test only without root permissions'''
1310    def func_wrapper(*args, **kwargs):
1311        if os.getuid() == 0:
1312            case_notrun('{}: cannot be run as root'.format(args[0]))
1313            return None
1314        else:
1315            return func(*args, **kwargs)
1316    return func_wrapper
1317
1318# We need to filter out the time taken from the output so that
1319# qemu-iotest can reliably diff the results against master output,
1320# and hide skipped tests from the reference output.
1321
1322class ReproducibleTestResult(unittest.TextTestResult):
1323    def addSkip(self, test, reason):
1324        # Same as TextTestResult, but print dot instead of "s"
1325        unittest.TestResult.addSkip(self, test, reason)
1326        if self.showAll:
1327            self.stream.writeln("skipped {0!r}".format(reason))
1328        elif self.dots:
1329            self.stream.write(".")
1330            self.stream.flush()
1331
1332class ReproducibleStreamWrapper:
1333    def __init__(self, stream: TextIO):
1334        self.stream = stream
1335
1336    def __getattr__(self, attr):
1337        if attr in ('stream', '__getstate__'):
1338            raise AttributeError(attr)
1339        return getattr(self.stream, attr)
1340
1341    def write(self, arg=None):
1342        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1343        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1344        self.stream.write(arg)
1345
1346class ReproducibleTestRunner(unittest.TextTestRunner):
1347    def __init__(self, stream: Optional[TextIO] = None,
1348             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1349             **kwargs: Any) -> None:
1350        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1351        super().__init__(stream=rstream,           # type: ignore
1352                         descriptions=True,
1353                         resultclass=resultclass,
1354                         **kwargs)
1355
1356def execute_unittest(argv: List[str], debug: bool = False) -> None:
1357    """Executes unittests within the calling module."""
1358
1359    # Some tests have warnings, especially ResourceWarnings for unclosed
1360    # files and sockets.  Ignore them for now to ensure reproducibility of
1361    # the test output.
1362    unittest.main(argv=argv,
1363                  testRunner=ReproducibleTestRunner,
1364                  verbosity=2 if debug else 1,
1365                  warnings=None if sys.warnoptions else 'ignore')
1366
1367def execute_setup_common(supported_fmts: Sequence[str] = (),
1368                         supported_platforms: Sequence[str] = (),
1369                         supported_cache_modes: Sequence[str] = (),
1370                         supported_aio_modes: Sequence[str] = (),
1371                         unsupported_fmts: Sequence[str] = (),
1372                         supported_protocols: Sequence[str] = (),
1373                         unsupported_protocols: Sequence[str] = (),
1374                         required_fmts: Sequence[str] = ()) -> bool:
1375    """
1376    Perform necessary setup for either script-style or unittest-style tests.
1377
1378    :return: Bool; Whether or not debug mode has been requested via the CLI.
1379    """
1380    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1381
1382    debug = '-d' in sys.argv
1383    if debug:
1384        sys.argv.remove('-d')
1385    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1386
1387    _verify_image_format(supported_fmts, unsupported_fmts)
1388    _verify_protocol(supported_protocols, unsupported_protocols)
1389    _verify_platform(supported=supported_platforms)
1390    _verify_cache_mode(supported_cache_modes)
1391    _verify_aio_mode(supported_aio_modes)
1392    _verify_formats(required_fmts)
1393    _verify_virtio_blk()
1394
1395    return debug
1396
1397def execute_test(*args, test_function=None, **kwargs):
1398    """Run either unittest or script-style tests."""
1399
1400    debug = execute_setup_common(*args, **kwargs)
1401    if not test_function:
1402        execute_unittest(sys.argv, debug)
1403    else:
1404        test_function()
1405
1406def activate_logging():
1407    """Activate iotests.log() output to stdout for script-style tests."""
1408    handler = logging.StreamHandler(stream=sys.stdout)
1409    formatter = logging.Formatter('%(message)s')
1410    handler.setFormatter(formatter)
1411    test_logger.addHandler(handler)
1412    test_logger.setLevel(logging.INFO)
1413    test_logger.propagate = False
1414
1415# This is called from script-style iotests without a single point of entry
1416def script_initialize(*args, **kwargs):
1417    """Initialize script-style tests without running any tests."""
1418    activate_logging()
1419    execute_setup_common(*args, **kwargs)
1420
1421# This is called from script-style iotests with a single point of entry
1422def script_main(test_function, *args, **kwargs):
1423    """Run script-style tests outside of the unittest framework"""
1424    activate_logging()
1425    execute_test(*args, test_function=test_function, **kwargs)
1426
1427# This is called from unittest style iotests
1428def main(*args, **kwargs):
1429    """Run tests using the unittest framework"""
1430    execute_test(*args, **kwargs)
1431