xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 0fbb5d2d)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable, Iterator,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39from qemu.machine import qtest
40from qemu.qmp import QMPMessage
41
42# Use this logger for logging messages directly from the iotests module
43logger = logging.getLogger('qemu.iotests')
44logger.addHandler(logging.NullHandler())
45
46# Use this logger for messages that ought to be used for diff output.
47test_logger = logging.getLogger('qemu.iotests.diff_io')
48
49
50faulthandler.enable()
51
52# This will not work if arguments contain spaces but is necessary if we
53# want to support the override options that ./check supports.
54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55if os.environ.get('QEMU_IMG_OPTIONS'):
56    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
57
58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59if os.environ.get('QEMU_IO_OPTIONS'):
60    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
61
62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64    qemu_io_args_no_fmt += \
65        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
66
67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
68qemu_nbd_args = [qemu_nbd_prog]
69if os.environ.get('QEMU_NBD_OPTIONS'):
70    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71
72qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74
75gdb_qemu_env = os.environ.get('GDB_OPTIONS')
76qemu_gdb = []
77if gdb_qemu_env:
78    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
79
80qemu_print = os.environ.get('PRINT_QEMU', False)
81
82imgfmt = os.environ.get('IMGFMT', 'raw')
83imgproto = os.environ.get('IMGPROTO', 'file')
84output_dir = os.environ.get('OUTPUT_DIR', '.')
85
86try:
87    test_dir = os.environ['TEST_DIR']
88    sock_dir = os.environ['SOCK_DIR']
89    cachemode = os.environ['CACHEMODE']
90    aiomode = os.environ['AIOMODE']
91    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
92except KeyError:
93    # We are using these variables as proxies to indicate that we're
94    # not being run via "check". There may be other things set up by
95    # "check" that individual test cases rely on.
96    sys.stderr.write('Please run this test via the "check" script\n')
97    sys.exit(os.EX_USAGE)
98
99qemu_valgrind = []
100if os.environ.get('VALGRIND_QEMU') == "y" and \
101    os.environ.get('NO_VALGRIND') != "y":
102    valgrind_logfile = "--log-file=" + test_dir
103    # %p allows to put the valgrind process PID, since
104    # we don't know it a priori (subprocess.Popen is
105    # not yet invoked)
106    valgrind_logfile += "/%p.valgrind"
107
108    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
109
110luks_default_secret_object = 'secret,id=keysec0,data=' + \
111                             os.environ.get('IMGKEYSECRET', '')
112luks_default_key_secret_opt = 'key-secret=keysec0'
113
114sample_img_dir = os.environ['SAMPLE_IMG_DIR']
115
116
117@contextmanager
118def change_log_level(
119        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
120    """
121    Utility function for temporarily changing the log level of a logger.
122
123    This can be used to silence errors that are expected or uninteresting.
124    """
125    _logger = logging.getLogger(logger_name)
126    current_level = _logger.level
127    _logger.setLevel(level)
128
129    try:
130        yield
131    finally:
132        _logger.setLevel(current_level)
133
134
135def unarchive_sample_image(sample, fname):
136    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
137    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
138        shutil.copyfileobj(f_in, f_out)
139
140
141def qemu_tool_popen(args: Sequence[str],
142                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
143    stderr = subprocess.STDOUT if connect_stderr else None
144    # pylint: disable=consider-using-with
145    return subprocess.Popen(args,
146                            stdout=subprocess.PIPE,
147                            stderr=stderr,
148                            universal_newlines=True)
149
150
151def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
152                              connect_stderr: bool = True) -> Tuple[str, int]:
153    """
154    Run a tool and return both its output and its exit code
155    """
156    with qemu_tool_popen(args, connect_stderr) as subp:
157        output = subp.communicate()[0]
158        if subp.returncode < 0:
159            cmd = ' '.join(args)
160            sys.stderr.write(f'{tool} received signal \
161                               {-subp.returncode}: {cmd}\n')
162        return (output, subp.returncode)
163
164def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
165    """
166    Run qemu-img and return both its output and its exit code
167    """
168    full_args = qemu_img_args + list(args)
169    return qemu_tool_pipe_and_status('qemu-img', full_args)
170
171def qemu_img(*args: str) -> int:
172    '''Run qemu-img and return the exit code'''
173    return qemu_img_pipe_and_status(*args)[1]
174
175def ordered_qmp(qmsg, conv_keys=True):
176    # Dictionaries are not ordered prior to 3.6, therefore:
177    if isinstance(qmsg, list):
178        return [ordered_qmp(atom) for atom in qmsg]
179    if isinstance(qmsg, dict):
180        od = OrderedDict()
181        for k, v in sorted(qmsg.items()):
182            if conv_keys:
183                k = k.replace('_', '-')
184            od[k] = ordered_qmp(v, conv_keys=False)
185        return od
186    return qmsg
187
188def qemu_img_create(*args):
189    args = list(args)
190
191    # default luks support
192    if '-f' in args and args[args.index('-f') + 1] == 'luks':
193        if '-o' in args:
194            i = args.index('-o')
195            if 'key-secret' not in args[i + 1]:
196                args[i + 1].append(luks_default_key_secret_opt)
197                args.insert(i + 2, '--object')
198                args.insert(i + 3, luks_default_secret_object)
199        else:
200            args = ['-o', luks_default_key_secret_opt,
201                    '--object', luks_default_secret_object] + args
202
203    args.insert(0, 'create')
204
205    return qemu_img(*args)
206
207def qemu_img_measure(*args):
208    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
209
210def qemu_img_check(*args):
211    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
212
213def qemu_img_verbose(*args):
214    '''Run qemu-img without suppressing its output and return the exit code'''
215    exitcode = subprocess.call(qemu_img_args + list(args))
216    if exitcode < 0:
217        sys.stderr.write('qemu-img received signal %i: %s\n'
218                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
219    return exitcode
220
221def qemu_img_pipe(*args: str) -> str:
222    '''Run qemu-img and return its output'''
223    return qemu_img_pipe_and_status(*args)[0]
224
225def qemu_img_log(*args):
226    result = qemu_img_pipe(*args)
227    log(result, filters=[filter_testfiles])
228    return result
229
230def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
231    args = ['info']
232    if imgopts:
233        args.append('--image-opts')
234    else:
235        args += ['-f', imgfmt]
236    args += extra_args
237    args.append(filename)
238
239    output = qemu_img_pipe(*args)
240    if not filter_path:
241        filter_path = filename
242    log(filter_img_info(output, filter_path))
243
244def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
245    if '-f' in args or '--image-opts' in args:
246        return qemu_io_args_no_fmt + list(args)
247    else:
248        return qemu_io_args + list(args)
249
250def qemu_io_popen(*args):
251    return qemu_tool_popen(qemu_io_wrap_args(args))
252
253def qemu_io(*args):
254    '''Run qemu-io and return the stdout data'''
255    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
256
257def qemu_io_log(*args):
258    result = qemu_io(*args)
259    log(result, filters=[filter_testfiles, filter_qemu_io])
260    return result
261
262def qemu_io_silent(*args):
263    '''Run qemu-io and return the exit code, suppressing stdout'''
264    args = qemu_io_wrap_args(args)
265    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
266    if result.returncode < 0:
267        sys.stderr.write('qemu-io received signal %i: %s\n' %
268                         (-result.returncode, ' '.join(args)))
269    return result.returncode
270
271def qemu_io_silent_check(*args):
272    '''Run qemu-io and return the true if subprocess returned 0'''
273    args = qemu_io_wrap_args(args)
274    result = subprocess.run(args, stdout=subprocess.DEVNULL,
275                            stderr=subprocess.STDOUT, check=False)
276    return result.returncode == 0
277
278class QemuIoInteractive:
279    def __init__(self, *args):
280        self.args = qemu_io_wrap_args(args)
281        # We need to keep the Popen objext around, and not
282        # close it immediately. Therefore, disable the pylint check:
283        # pylint: disable=consider-using-with
284        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
285                                   stdout=subprocess.PIPE,
286                                   stderr=subprocess.STDOUT,
287                                   universal_newlines=True)
288        out = self._p.stdout.read(9)
289        if out != 'qemu-io> ':
290            # Most probably qemu-io just failed to start.
291            # Let's collect the whole output and exit.
292            out += self._p.stdout.read()
293            self._p.wait(timeout=1)
294            raise ValueError(out)
295
296    def close(self):
297        self._p.communicate('q\n')
298
299    def _read_output(self):
300        pattern = 'qemu-io> '
301        n = len(pattern)
302        pos = 0
303        s = []
304        while pos != n:
305            c = self._p.stdout.read(1)
306            # check unexpected EOF
307            assert c != ''
308            s.append(c)
309            if c == pattern[pos]:
310                pos += 1
311            else:
312                pos = 0
313
314        return ''.join(s[:-n])
315
316    def cmd(self, cmd):
317        # quit command is in close(), '\n' is added automatically
318        assert '\n' not in cmd
319        cmd = cmd.strip()
320        assert cmd not in ('q', 'quit')
321        self._p.stdin.write(cmd + '\n')
322        self._p.stdin.flush()
323        return self._read_output()
324
325
326def qemu_nbd(*args):
327    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
328    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
329
330def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
331    '''Run qemu-nbd in daemon mode and return both the parent's exit code
332       and its output in case of an error'''
333    full_args = qemu_nbd_args + ['--fork'] + list(args)
334    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
335                                                   connect_stderr=False)
336    return returncode, output if returncode else ''
337
338def qemu_nbd_list_log(*args: str) -> str:
339    '''Run qemu-nbd to list remote exports'''
340    full_args = [qemu_nbd_prog, '-L'] + list(args)
341    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
342    log(output, filters=[filter_testfiles, filter_nbd_exports])
343    return output
344
345@contextmanager
346def qemu_nbd_popen(*args):
347    '''Context manager running qemu-nbd within the context'''
348    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
349
350    assert not os.path.exists(pid_file)
351
352    cmd = list(qemu_nbd_args)
353    cmd.extend(('--persistent', '--pid-file', pid_file))
354    cmd.extend(args)
355
356    log('Start NBD server')
357    with subprocess.Popen(cmd) as p:
358        try:
359            while not os.path.exists(pid_file):
360                if p.poll() is not None:
361                    raise RuntimeError(
362                        "qemu-nbd terminated with exit code {}: {}"
363                        .format(p.returncode, ' '.join(cmd)))
364
365                time.sleep(0.01)
366            yield
367        finally:
368            if os.path.exists(pid_file):
369                os.remove(pid_file)
370            log('Kill NBD server')
371            p.kill()
372            p.wait()
373
374def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
375    '''Return True if two image files are identical'''
376    return qemu_img('compare', '-f', fmt1,
377                    '-F', fmt2, img1, img2) == 0
378
379def create_image(name, size):
380    '''Create a fully-allocated raw image with sector markers'''
381    with open(name, 'wb') as file:
382        i = 0
383        while i < size:
384            sector = struct.pack('>l504xl', i // 512, i // 512)
385            file.write(sector)
386            i = i + 512
387
388def image_size(img):
389    '''Return image's virtual size'''
390    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
391    return json.loads(r)['virtual-size']
392
393def is_str(val):
394    return isinstance(val, str)
395
396test_dir_re = re.compile(r"%s" % test_dir)
397def filter_test_dir(msg):
398    return test_dir_re.sub("TEST_DIR", msg)
399
400win32_re = re.compile(r"\r")
401def filter_win32(msg):
402    return win32_re.sub("", msg)
403
404qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
405                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
406                        r"and [0-9\/.inf]* ops\/sec\)")
407def filter_qemu_io(msg):
408    msg = filter_win32(msg)
409    return qemu_io_re.sub("X ops; XX:XX:XX.X "
410                          "(XXX YYY/sec and XXX ops/sec)", msg)
411
412chown_re = re.compile(r"chown [0-9]+:[0-9]+")
413def filter_chown(msg):
414    return chown_re.sub("chown UID:GID", msg)
415
416def filter_qmp_event(event):
417    '''Filter a QMP event dict'''
418    event = dict(event)
419    if 'timestamp' in event:
420        event['timestamp']['seconds'] = 'SECS'
421        event['timestamp']['microseconds'] = 'USECS'
422    return event
423
424def filter_qmp(qmsg, filter_fn):
425    '''Given a string filter, filter a QMP object's values.
426    filter_fn takes a (key, value) pair.'''
427    # Iterate through either lists or dicts;
428    if isinstance(qmsg, list):
429        items = enumerate(qmsg)
430    else:
431        items = qmsg.items()
432
433    for k, v in items:
434        if isinstance(v, (dict, list)):
435            qmsg[k] = filter_qmp(v, filter_fn)
436        else:
437            qmsg[k] = filter_fn(k, v)
438    return qmsg
439
440def filter_testfiles(msg):
441    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
442    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
443    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
444
445def filter_qmp_testfiles(qmsg):
446    def _filter(_key, value):
447        if is_str(value):
448            return filter_testfiles(value)
449        return value
450    return filter_qmp(qmsg, _filter)
451
452def filter_virtio_scsi(output: str) -> str:
453    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
454
455def filter_qmp_virtio_scsi(qmsg):
456    def _filter(_key, value):
457        if is_str(value):
458            return filter_virtio_scsi(value)
459        return value
460    return filter_qmp(qmsg, _filter)
461
462def filter_generated_node_ids(msg):
463    return re.sub("#block[0-9]+", "NODE_NAME", msg)
464
465def filter_img_info(output, filename):
466    lines = []
467    for line in output.split('\n'):
468        if 'disk size' in line or 'actual-size' in line:
469            continue
470        line = line.replace(filename, 'TEST_IMG')
471        line = filter_testfiles(line)
472        line = line.replace(imgfmt, 'IMGFMT')
473        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
474        line = re.sub('uuid: [-a-f0-9]+',
475                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
476                      line)
477        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
478        lines.append(line)
479    return '\n'.join(lines)
480
481def filter_imgfmt(msg):
482    return msg.replace(imgfmt, 'IMGFMT')
483
484def filter_qmp_imgfmt(qmsg):
485    def _filter(_key, value):
486        if is_str(value):
487            return filter_imgfmt(value)
488        return value
489    return filter_qmp(qmsg, _filter)
490
491def filter_nbd_exports(output: str) -> str:
492    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
493
494
495Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
496
497def log(msg: Msg,
498        filters: Iterable[Callable[[Msg], Msg]] = (),
499        indent: Optional[int] = None) -> None:
500    """
501    Logs either a string message or a JSON serializable message (like QMP).
502    If indent is provided, JSON serializable messages are pretty-printed.
503    """
504    for flt in filters:
505        msg = flt(msg)
506    if isinstance(msg, (dict, list)):
507        # Don't sort if it's already sorted
508        do_sort = not isinstance(msg, OrderedDict)
509        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
510    else:
511        test_logger.info(msg)
512
513class Timeout:
514    def __init__(self, seconds, errmsg="Timeout"):
515        self.seconds = seconds
516        self.errmsg = errmsg
517    def __enter__(self):
518        if qemu_gdb or qemu_valgrind:
519            return self
520        signal.signal(signal.SIGALRM, self.timeout)
521        signal.setitimer(signal.ITIMER_REAL, self.seconds)
522        return self
523    def __exit__(self, exc_type, value, traceback):
524        if qemu_gdb or qemu_valgrind:
525            return False
526        signal.setitimer(signal.ITIMER_REAL, 0)
527        return False
528    def timeout(self, signum, frame):
529        raise Exception(self.errmsg)
530
531def file_pattern(name):
532    return "{0}-{1}".format(os.getpid(), name)
533
534class FilePath:
535    """
536    Context manager generating multiple file names. The generated files are
537    removed when exiting the context.
538
539    Example usage:
540
541        with FilePath('a.img', 'b.img') as (img_a, img_b):
542            # Use img_a and img_b here...
543
544        # a.img and b.img are automatically removed here.
545
546    By default images are created in iotests.test_dir. To create sockets use
547    iotests.sock_dir:
548
549       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
550
551    For convenience, calling with one argument yields a single file instead of
552    a tuple with one item.
553
554    """
555    def __init__(self, *names, base_dir=test_dir):
556        self.paths = [os.path.join(base_dir, file_pattern(name))
557                      for name in names]
558
559    def __enter__(self):
560        if len(self.paths) == 1:
561            return self.paths[0]
562        else:
563            return self.paths
564
565    def __exit__(self, exc_type, exc_val, exc_tb):
566        for path in self.paths:
567            try:
568                os.remove(path)
569            except OSError:
570                pass
571        return False
572
573
574def try_remove(img):
575    try:
576        os.remove(img)
577    except OSError:
578        pass
579
580def file_path_remover():
581    for path in reversed(file_path_remover.paths):
582        try_remove(path)
583
584
585def file_path(*names, base_dir=test_dir):
586    ''' Another way to get auto-generated filename that cleans itself up.
587
588    Use is as simple as:
589
590    img_a, img_b = file_path('a.img', 'b.img')
591    sock = file_path('socket')
592    '''
593
594    if not hasattr(file_path_remover, 'paths'):
595        file_path_remover.paths = []
596        atexit.register(file_path_remover)
597
598    paths = []
599    for name in names:
600        filename = file_pattern(name)
601        path = os.path.join(base_dir, filename)
602        file_path_remover.paths.append(path)
603        paths.append(path)
604
605    return paths[0] if len(paths) == 1 else paths
606
607def remote_filename(path):
608    if imgproto == 'file':
609        return path
610    elif imgproto == 'ssh':
611        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
612    else:
613        raise Exception("Protocol %s not supported" % (imgproto))
614
615class VM(qtest.QEMUQtestMachine):
616    '''A QEMU VM'''
617
618    def __init__(self, path_suffix=''):
619        name = "qemu%s-%d" % (path_suffix, os.getpid())
620        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
621        if qemu_gdb and qemu_valgrind:
622            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
623            sys.exit(1)
624        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
625        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
626                         name=name,
627                         base_temp_dir=test_dir,
628                         sock_dir=sock_dir, qmp_timer=timer)
629        self._num_drives = 0
630
631    def _post_shutdown(self) -> None:
632        super()._post_shutdown()
633        if not qemu_valgrind or not self._popen:
634            return
635        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
636        if self.exitcode() == 99:
637            with open(valgrind_filename, encoding='utf-8') as f:
638                print(f.read())
639        else:
640            os.remove(valgrind_filename)
641
642    def _pre_launch(self) -> None:
643        super()._pre_launch()
644        if qemu_print:
645            # set QEMU binary output to stdout
646            self._close_qemu_log_file()
647
648    def add_object(self, opts):
649        self._args.append('-object')
650        self._args.append(opts)
651        return self
652
653    def add_device(self, opts):
654        self._args.append('-device')
655        self._args.append(opts)
656        return self
657
658    def add_drive_raw(self, opts):
659        self._args.append('-drive')
660        self._args.append(opts)
661        return self
662
663    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
664        '''Add a virtio-blk drive to the VM'''
665        options = ['if=%s' % interface,
666                   'id=drive%d' % self._num_drives]
667
668        if path is not None:
669            options.append('file=%s' % path)
670            options.append('format=%s' % img_format)
671            options.append('cache=%s' % cachemode)
672            options.append('aio=%s' % aiomode)
673
674        if opts:
675            options.append(opts)
676
677        if img_format == 'luks' and 'key-secret' not in opts:
678            # default luks support
679            if luks_default_secret_object not in self._args:
680                self.add_object(luks_default_secret_object)
681
682            options.append(luks_default_key_secret_opt)
683
684        self._args.append('-drive')
685        self._args.append(','.join(options))
686        self._num_drives += 1
687        return self
688
689    def add_blockdev(self, opts):
690        self._args.append('-blockdev')
691        if isinstance(opts, str):
692            self._args.append(opts)
693        else:
694            self._args.append(','.join(opts))
695        return self
696
697    def add_incoming(self, addr):
698        self._args.append('-incoming')
699        self._args.append(addr)
700        return self
701
702    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
703        cmd = 'human-monitor-command'
704        kwargs: Dict[str, Any] = {'command-line': command_line}
705        if use_log:
706            return self.qmp_log(cmd, **kwargs)
707        else:
708            return self.qmp(cmd, **kwargs)
709
710    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
711        """Pause drive r/w operations"""
712        if not event:
713            self.pause_drive(drive, "read_aio")
714            self.pause_drive(drive, "write_aio")
715            return
716        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
717
718    def resume_drive(self, drive: str) -> None:
719        """Resume drive r/w operations"""
720        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
721
722    def hmp_qemu_io(self, drive: str, cmd: str,
723                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
724        """Write to a given drive using an HMP command"""
725        d = '-d ' if qdev else ''
726        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
727
728    def flatten_qmp_object(self, obj, output=None, basestr=''):
729        if output is None:
730            output = {}
731        if isinstance(obj, list):
732            for i, item in enumerate(obj):
733                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
734        elif isinstance(obj, dict):
735            for key in obj:
736                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
737        else:
738            output[basestr[:-1]] = obj # Strip trailing '.'
739        return output
740
741    def qmp_to_opts(self, obj):
742        obj = self.flatten_qmp_object(obj)
743        output_list = []
744        for key in obj:
745            output_list += [key + '=' + obj[key]]
746        return ','.join(output_list)
747
748    def get_qmp_events_filtered(self, wait=60.0):
749        result = []
750        for ev in self.get_qmp_events(wait=wait):
751            result.append(filter_qmp_event(ev))
752        return result
753
754    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
755        full_cmd = OrderedDict((
756            ("execute", cmd),
757            ("arguments", ordered_qmp(kwargs))
758        ))
759        log(full_cmd, filters, indent=indent)
760        result = self.qmp(cmd, **kwargs)
761        log(result, filters, indent=indent)
762        return result
763
764    # Returns None on success, and an error string on failure
765    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
766                pre_finalize=None, cancel=False, wait=60.0):
767        """
768        run_job moves a job from creation through to dismissal.
769
770        :param job: String. ID of recently-launched job
771        :param auto_finalize: Bool. True if the job was launched with
772                              auto_finalize. Defaults to True.
773        :param auto_dismiss: Bool. True if the job was launched with
774                             auto_dismiss=True. Defaults to False.
775        :param pre_finalize: Callback. A callable that takes no arguments to be
776                             invoked prior to issuing job-finalize, if any.
777        :param cancel: Bool. When true, cancels the job after the pre_finalize
778                       callback.
779        :param wait: Float. Timeout value specifying how long to wait for any
780                     event, in seconds. Defaults to 60.0.
781        """
782        match_device = {'data': {'device': job}}
783        match_id = {'data': {'id': job}}
784        events = [
785            ('BLOCK_JOB_COMPLETED', match_device),
786            ('BLOCK_JOB_CANCELLED', match_device),
787            ('BLOCK_JOB_ERROR', match_device),
788            ('BLOCK_JOB_READY', match_device),
789            ('BLOCK_JOB_PENDING', match_id),
790            ('JOB_STATUS_CHANGE', match_id)
791        ]
792        error = None
793        while True:
794            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
795            if ev['event'] != 'JOB_STATUS_CHANGE':
796                log(ev)
797                continue
798            status = ev['data']['status']
799            if status == 'aborting':
800                result = self.qmp('query-jobs')
801                for j in result['return']:
802                    if j['id'] == job:
803                        error = j['error']
804                        log('Job failed: %s' % (j['error']))
805            elif status == 'ready':
806                self.qmp_log('job-complete', id=job)
807            elif status == 'pending' and not auto_finalize:
808                if pre_finalize:
809                    pre_finalize()
810                if cancel:
811                    self.qmp_log('job-cancel', id=job)
812                else:
813                    self.qmp_log('job-finalize', id=job)
814            elif status == 'concluded' and not auto_dismiss:
815                self.qmp_log('job-dismiss', id=job)
816            elif status == 'null':
817                return error
818
819    # Returns None on success, and an error string on failure
820    def blockdev_create(self, options, job_id='job0', filters=None):
821        if filters is None:
822            filters = [filter_qmp_testfiles]
823        result = self.qmp_log('blockdev-create', filters=filters,
824                              job_id=job_id, options=options)
825
826        if 'return' in result:
827            assert result['return'] == {}
828            job_result = self.run_job(job_id)
829        else:
830            job_result = result['error']
831
832        log("")
833        return job_result
834
835    def enable_migration_events(self, name):
836        log('Enabling migration QMP events on %s...' % name)
837        log(self.qmp('migrate-set-capabilities', capabilities=[
838            {
839                'capability': 'events',
840                'state': True
841            }
842        ]))
843
844    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
845        while True:
846            event = self.event_wait('MIGRATION')
847            # We use the default timeout, and with a timeout, event_wait()
848            # never returns None
849            assert event
850
851            log(event, filters=[filter_qmp_event])
852            if event['data']['status'] in ('completed', 'failed'):
853                break
854
855        if event['data']['status'] == 'completed':
856            # The event may occur in finish-migrate, so wait for the expected
857            # post-migration runstate
858            runstate = None
859            while runstate != expect_runstate:
860                runstate = self.qmp('query-status')['return']['status']
861            return True
862        else:
863            return False
864
865    def node_info(self, node_name):
866        nodes = self.qmp('query-named-block-nodes')
867        for x in nodes['return']:
868            if x['node-name'] == node_name:
869                return x
870        return None
871
872    def query_bitmaps(self):
873        res = self.qmp("query-named-block-nodes")
874        return {device['node-name']: device['dirty-bitmaps']
875                for device in res['return'] if 'dirty-bitmaps' in device}
876
877    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
878        """
879        get a specific bitmap from the object returned by query_bitmaps.
880        :param recording: If specified, filter results by the specified value.
881        :param bitmaps: If specified, use it instead of call query_bitmaps()
882        """
883        if bitmaps is None:
884            bitmaps = self.query_bitmaps()
885
886        for bitmap in bitmaps[node_name]:
887            if bitmap.get('name', '') == bitmap_name:
888                if recording is None or bitmap.get('recording') == recording:
889                    return bitmap
890        return None
891
892    def check_bitmap_status(self, node_name, bitmap_name, fields):
893        ret = self.get_bitmap(node_name, bitmap_name)
894
895        return fields.items() <= ret.items()
896
897    def assert_block_path(self, root, path, expected_node, graph=None):
898        """
899        Check whether the node under the given path in the block graph
900        is @expected_node.
901
902        @root is the node name of the node where the @path is rooted.
903
904        @path is a string that consists of child names separated by
905        slashes.  It must begin with a slash.
906
907        Examples for @root + @path:
908          - root="qcow2-node", path="/backing/file"
909          - root="quorum-node", path="/children.2/file"
910
911        Hypothetically, @path could be empty, in which case it would
912        point to @root.  However, in practice this case is not useful
913        and hence not allowed.
914
915        @expected_node may be None.  (All elements of the path but the
916        leaf must still exist.)
917
918        @graph may be None or the result of an x-debug-query-block-graph
919        call that has already been performed.
920        """
921        if graph is None:
922            graph = self.qmp('x-debug-query-block-graph')['return']
923
924        iter_path = iter(path.split('/'))
925
926        # Must start with a /
927        assert next(iter_path) == ''
928
929        node = next((node for node in graph['nodes'] if node['name'] == root),
930                    None)
931
932        # An empty @path is not allowed, so the root node must be present
933        assert node is not None, 'Root node %s not found' % root
934
935        for child_name in iter_path:
936            assert node is not None, 'Cannot follow path %s%s' % (root, path)
937
938            try:
939                node_id = next(edge['child'] for edge in graph['edges']
940                               if (edge['parent'] == node['id'] and
941                                   edge['name'] == child_name))
942
943                node = next(node for node in graph['nodes']
944                            if node['id'] == node_id)
945
946            except StopIteration:
947                node = None
948
949        if node is None:
950            assert expected_node is None, \
951                   'No node found under %s (but expected %s)' % \
952                   (path, expected_node)
953        else:
954            assert node['name'] == expected_node, \
955                   'Found node %s under %s (but expected %s)' % \
956                   (node['name'], path, expected_node)
957
958index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
959
960class QMPTestCase(unittest.TestCase):
961    '''Abstract base class for QMP test cases'''
962
963    def __init__(self, *args, **kwargs):
964        super().__init__(*args, **kwargs)
965        # Many users of this class set a VM property we rely on heavily
966        # in the methods below.
967        self.vm = None
968
969    def dictpath(self, d, path):
970        '''Traverse a path in a nested dict'''
971        for component in path.split('/'):
972            m = index_re.match(component)
973            if m:
974                component, idx = m.groups()
975                idx = int(idx)
976
977            if not isinstance(d, dict) or component not in d:
978                self.fail(f'failed path traversal for "{path}" in "{d}"')
979            d = d[component]
980
981            if m:
982                if not isinstance(d, list):
983                    self.fail(f'path component "{component}" in "{path}" '
984                              f'is not a list in "{d}"')
985                try:
986                    d = d[idx]
987                except IndexError:
988                    self.fail(f'invalid index "{idx}" in path "{path}" '
989                              f'in "{d}"')
990        return d
991
992    def assert_qmp_absent(self, d, path):
993        try:
994            result = self.dictpath(d, path)
995        except AssertionError:
996            return
997        self.fail('path "%s" has value "%s"' % (path, str(result)))
998
999    def assert_qmp(self, d, path, value):
1000        '''Assert that the value for a specific path in a QMP dict
1001           matches.  When given a list of values, assert that any of
1002           them matches.'''
1003
1004        result = self.dictpath(d, path)
1005
1006        # [] makes no sense as a list of valid values, so treat it as
1007        # an actual single value.
1008        if isinstance(value, list) and value != []:
1009            for v in value:
1010                if result == v:
1011                    return
1012            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1013        else:
1014            self.assertEqual(result, value,
1015                             '"%s" is "%s", expected "%s"'
1016                             % (path, str(result), str(value)))
1017
1018    def assert_no_active_block_jobs(self):
1019        result = self.vm.qmp('query-block-jobs')
1020        self.assert_qmp(result, 'return', [])
1021
1022    def assert_has_block_node(self, node_name=None, file_name=None):
1023        """Issue a query-named-block-nodes and assert node_name and/or
1024        file_name is present in the result"""
1025        def check_equal_or_none(a, b):
1026            return a is None or b is None or a == b
1027        assert node_name or file_name
1028        result = self.vm.qmp('query-named-block-nodes')
1029        for x in result["return"]:
1030            if check_equal_or_none(x.get("node-name"), node_name) and \
1031                    check_equal_or_none(x.get("file"), file_name):
1032                return
1033        self.fail("Cannot find %s %s in result:\n%s" %
1034                  (node_name, file_name, result))
1035
1036    def assert_json_filename_equal(self, json_filename, reference):
1037        '''Asserts that the given filename is a json: filename and that its
1038           content is equal to the given reference object'''
1039        self.assertEqual(json_filename[:5], 'json:')
1040        self.assertEqual(
1041            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1042            self.vm.flatten_qmp_object(reference)
1043        )
1044
1045    def cancel_and_wait(self, drive='drive0', force=False,
1046                        resume=False, wait=60.0):
1047        '''Cancel a block job and wait for it to finish, returning the event'''
1048        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1049        self.assert_qmp(result, 'return', {})
1050
1051        if resume:
1052            self.vm.resume_drive(drive)
1053
1054        cancelled = False
1055        result = None
1056        while not cancelled:
1057            for event in self.vm.get_qmp_events(wait=wait):
1058                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1059                   event['event'] == 'BLOCK_JOB_CANCELLED':
1060                    self.assert_qmp(event, 'data/device', drive)
1061                    result = event
1062                    cancelled = True
1063                elif event['event'] == 'JOB_STATUS_CHANGE':
1064                    self.assert_qmp(event, 'data/id', drive)
1065
1066
1067        self.assert_no_active_block_jobs()
1068        return result
1069
1070    def wait_until_completed(self, drive='drive0', check_offset=True,
1071                             wait=60.0, error=None):
1072        '''Wait for a block job to finish, returning the event'''
1073        while True:
1074            for event in self.vm.get_qmp_events(wait=wait):
1075                if event['event'] == 'BLOCK_JOB_COMPLETED':
1076                    self.assert_qmp(event, 'data/device', drive)
1077                    if error is None:
1078                        self.assert_qmp_absent(event, 'data/error')
1079                        if check_offset:
1080                            self.assert_qmp(event, 'data/offset',
1081                                            event['data']['len'])
1082                    else:
1083                        self.assert_qmp(event, 'data/error', error)
1084                    self.assert_no_active_block_jobs()
1085                    return event
1086                if event['event'] == 'JOB_STATUS_CHANGE':
1087                    self.assert_qmp(event, 'data/id', drive)
1088
1089    def wait_ready(self, drive='drive0'):
1090        """Wait until a BLOCK_JOB_READY event, and return the event."""
1091        return self.vm.events_wait([
1092            ('BLOCK_JOB_READY',
1093             {'data': {'type': 'mirror', 'device': drive}}),
1094            ('BLOCK_JOB_READY',
1095             {'data': {'type': 'commit', 'device': drive}})
1096        ])
1097
1098    def wait_ready_and_cancel(self, drive='drive0'):
1099        self.wait_ready(drive=drive)
1100        event = self.cancel_and_wait(drive=drive)
1101        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1102        self.assert_qmp(event, 'data/type', 'mirror')
1103        self.assert_qmp(event, 'data/offset', event['data']['len'])
1104
1105    def complete_and_wait(self, drive='drive0', wait_ready=True,
1106                          completion_error=None):
1107        '''Complete a block job and wait for it to finish'''
1108        if wait_ready:
1109            self.wait_ready(drive=drive)
1110
1111        result = self.vm.qmp('block-job-complete', device=drive)
1112        self.assert_qmp(result, 'return', {})
1113
1114        event = self.wait_until_completed(drive=drive, error=completion_error)
1115        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1116
1117    def pause_wait(self, job_id='job0'):
1118        with Timeout(3, "Timeout waiting for job to pause"):
1119            while True:
1120                result = self.vm.qmp('query-block-jobs')
1121                found = False
1122                for job in result['return']:
1123                    if job['device'] == job_id:
1124                        found = True
1125                        if job['paused'] and not job['busy']:
1126                            return job
1127                        break
1128                assert found
1129
1130    def pause_job(self, job_id='job0', wait=True):
1131        result = self.vm.qmp('block-job-pause', device=job_id)
1132        self.assert_qmp(result, 'return', {})
1133        if wait:
1134            return self.pause_wait(job_id)
1135        return result
1136
1137    def case_skip(self, reason):
1138        '''Skip this test case'''
1139        case_notrun(reason)
1140        self.skipTest(reason)
1141
1142
1143def notrun(reason):
1144    '''Skip this test suite'''
1145    # Each test in qemu-iotests has a number ("seq")
1146    seq = os.path.basename(sys.argv[0])
1147
1148    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1149            as outfile:
1150        outfile.write(reason + '\n')
1151    logger.warning("%s not run: %s", seq, reason)
1152    sys.exit(0)
1153
1154def case_notrun(reason):
1155    '''Mark this test case as not having been run (without actually
1156    skipping it, that is left to the caller).  See
1157    QMPTestCase.case_skip() for a variant that actually skips the
1158    current test case.'''
1159
1160    # Each test in qemu-iotests has a number ("seq")
1161    seq = os.path.basename(sys.argv[0])
1162
1163    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1164            as outfile:
1165        outfile.write('    [case not run] ' + reason + '\n')
1166
1167def _verify_image_format(supported_fmts: Sequence[str] = (),
1168                         unsupported_fmts: Sequence[str] = ()) -> None:
1169    if 'generic' in supported_fmts and \
1170            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1171        # similar to
1172        #   _supported_fmt generic
1173        # for bash tests
1174        supported_fmts = ()
1175
1176    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1177    if not_sup or (imgfmt in unsupported_fmts):
1178        notrun('not suitable for this image format: %s' % imgfmt)
1179
1180    if imgfmt == 'luks':
1181        verify_working_luks()
1182
1183def _verify_protocol(supported: Sequence[str] = (),
1184                     unsupported: Sequence[str] = ()) -> None:
1185    assert not (supported and unsupported)
1186
1187    if 'generic' in supported:
1188        return
1189
1190    not_sup = supported and (imgproto not in supported)
1191    if not_sup or (imgproto in unsupported):
1192        notrun('not suitable for this protocol: %s' % imgproto)
1193
1194def _verify_platform(supported: Sequence[str] = (),
1195                     unsupported: Sequence[str] = ()) -> None:
1196    if any((sys.platform.startswith(x) for x in unsupported)):
1197        notrun('not suitable for this OS: %s' % sys.platform)
1198
1199    if supported:
1200        if not any((sys.platform.startswith(x) for x in supported)):
1201            notrun('not suitable for this OS: %s' % sys.platform)
1202
1203def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1204    if supported_cache_modes and (cachemode not in supported_cache_modes):
1205        notrun('not suitable for this cache mode: %s' % cachemode)
1206
1207def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1208    if supported_aio_modes and (aiomode not in supported_aio_modes):
1209        notrun('not suitable for this aio mode: %s' % aiomode)
1210
1211def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1212    usf_list = list(set(required_formats) - set(supported_formats()))
1213    if usf_list:
1214        notrun(f'formats {usf_list} are not whitelisted')
1215
1216
1217def _verify_virtio_blk() -> None:
1218    out = qemu_pipe('-M', 'none', '-device', 'help')
1219    if 'virtio-blk' not in out:
1220        notrun('Missing virtio-blk in QEMU binary')
1221
1222def _verify_virtio_scsi_pci_or_ccw() -> None:
1223    out = qemu_pipe('-M', 'none', '-device', 'help')
1224    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1225        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1226
1227
1228def supports_quorum():
1229    return 'quorum' in qemu_img_pipe('--help')
1230
1231def verify_quorum():
1232    '''Skip test suite if quorum support is not available'''
1233    if not supports_quorum():
1234        notrun('quorum support missing')
1235
1236def has_working_luks() -> Tuple[bool, str]:
1237    """
1238    Check whether our LUKS driver can actually create images
1239    (this extends to LUKS encryption for qcow2).
1240
1241    If not, return the reason why.
1242    """
1243
1244    img_file = f'{test_dir}/luks-test.luks'
1245    (output, status) = \
1246        qemu_img_pipe_and_status('create', '-f', 'luks',
1247                                 '--object', luks_default_secret_object,
1248                                 '-o', luks_default_key_secret_opt,
1249                                 '-o', 'iter-time=10',
1250                                 img_file, '1G')
1251    try:
1252        os.remove(img_file)
1253    except OSError:
1254        pass
1255
1256    if status != 0:
1257        reason = output
1258        for line in output.splitlines():
1259            if img_file + ':' in line:
1260                reason = line.split(img_file + ':', 1)[1].strip()
1261                break
1262
1263        return (False, reason)
1264    else:
1265        return (True, '')
1266
1267def verify_working_luks():
1268    """
1269    Skip test suite if LUKS does not work
1270    """
1271    (working, reason) = has_working_luks()
1272    if not working:
1273        notrun(reason)
1274
1275def qemu_pipe(*args: str) -> str:
1276    """
1277    Run qemu with an option to print something and exit (e.g. a help option).
1278
1279    :return: QEMU's stdout output.
1280    """
1281    full_args = [qemu_prog] + qemu_opts + list(args)
1282    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1283    return output
1284
1285def supported_formats(read_only=False):
1286    '''Set 'read_only' to True to check ro-whitelist
1287       Otherwise, rw-whitelist is checked'''
1288
1289    if not hasattr(supported_formats, "formats"):
1290        supported_formats.formats = {}
1291
1292    if read_only not in supported_formats.formats:
1293        format_message = qemu_pipe("-drive", "format=help")
1294        line = 1 if read_only else 0
1295        supported_formats.formats[read_only] = \
1296            format_message.splitlines()[line].split(":")[1].split()
1297
1298    return supported_formats.formats[read_only]
1299
1300def skip_if_unsupported(required_formats=(), read_only=False):
1301    '''Skip Test Decorator
1302       Runs the test if all the required formats are whitelisted'''
1303    def skip_test_decorator(func):
1304        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1305                         **kwargs: Dict[str, Any]) -> None:
1306            if callable(required_formats):
1307                fmts = required_formats(test_case)
1308            else:
1309                fmts = required_formats
1310
1311            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1312            if usf_list:
1313                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1314                test_case.case_skip(msg)
1315            else:
1316                func(test_case, *args, **kwargs)
1317        return func_wrapper
1318    return skip_test_decorator
1319
1320def skip_for_formats(formats: Sequence[str] = ()) \
1321    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1322                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1323    '''Skip Test Decorator
1324       Skips the test for the given formats'''
1325    def skip_test_decorator(func):
1326        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1327                         **kwargs: Dict[str, Any]) -> None:
1328            if imgfmt in formats:
1329                msg = f'{test_case}: Skipped for format {imgfmt}'
1330                test_case.case_skip(msg)
1331            else:
1332                func(test_case, *args, **kwargs)
1333        return func_wrapper
1334    return skip_test_decorator
1335
1336def skip_if_user_is_root(func):
1337    '''Skip Test Decorator
1338       Runs the test only without root permissions'''
1339    def func_wrapper(*args, **kwargs):
1340        if os.getuid() == 0:
1341            case_notrun('{}: cannot be run as root'.format(args[0]))
1342            return None
1343        else:
1344            return func(*args, **kwargs)
1345    return func_wrapper
1346
1347# We need to filter out the time taken from the output so that
1348# qemu-iotest can reliably diff the results against master output,
1349# and hide skipped tests from the reference output.
1350
1351class ReproducibleTestResult(unittest.TextTestResult):
1352    def addSkip(self, test, reason):
1353        # Same as TextTestResult, but print dot instead of "s"
1354        unittest.TestResult.addSkip(self, test, reason)
1355        if self.showAll:
1356            self.stream.writeln("skipped {0!r}".format(reason))
1357        elif self.dots:
1358            self.stream.write(".")
1359            self.stream.flush()
1360
1361class ReproducibleStreamWrapper:
1362    def __init__(self, stream: TextIO):
1363        self.stream = stream
1364
1365    def __getattr__(self, attr):
1366        if attr in ('stream', '__getstate__'):
1367            raise AttributeError(attr)
1368        return getattr(self.stream, attr)
1369
1370    def write(self, arg=None):
1371        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1372        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1373        self.stream.write(arg)
1374
1375class ReproducibleTestRunner(unittest.TextTestRunner):
1376    def __init__(self, stream: Optional[TextIO] = None,
1377                 resultclass: Type[unittest.TestResult] =
1378                 ReproducibleTestResult,
1379                 **kwargs: Any) -> None:
1380        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1381        super().__init__(stream=rstream,           # type: ignore
1382                         descriptions=True,
1383                         resultclass=resultclass,
1384                         **kwargs)
1385
1386def execute_unittest(argv: List[str], debug: bool = False) -> None:
1387    """Executes unittests within the calling module."""
1388
1389    # Some tests have warnings, especially ResourceWarnings for unclosed
1390    # files and sockets.  Ignore them for now to ensure reproducibility of
1391    # the test output.
1392    unittest.main(argv=argv,
1393                  testRunner=ReproducibleTestRunner,
1394                  verbosity=2 if debug else 1,
1395                  warnings=None if sys.warnoptions else 'ignore')
1396
1397def execute_setup_common(supported_fmts: Sequence[str] = (),
1398                         supported_platforms: Sequence[str] = (),
1399                         supported_cache_modes: Sequence[str] = (),
1400                         supported_aio_modes: Sequence[str] = (),
1401                         unsupported_fmts: Sequence[str] = (),
1402                         supported_protocols: Sequence[str] = (),
1403                         unsupported_protocols: Sequence[str] = (),
1404                         required_fmts: Sequence[str] = ()) -> bool:
1405    """
1406    Perform necessary setup for either script-style or unittest-style tests.
1407
1408    :return: Bool; Whether or not debug mode has been requested via the CLI.
1409    """
1410    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1411
1412    debug = '-d' in sys.argv
1413    if debug:
1414        sys.argv.remove('-d')
1415    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1416
1417    _verify_image_format(supported_fmts, unsupported_fmts)
1418    _verify_protocol(supported_protocols, unsupported_protocols)
1419    _verify_platform(supported=supported_platforms)
1420    _verify_cache_mode(supported_cache_modes)
1421    _verify_aio_mode(supported_aio_modes)
1422    _verify_formats(required_fmts)
1423    _verify_virtio_blk()
1424
1425    return debug
1426
1427def execute_test(*args, test_function=None, **kwargs):
1428    """Run either unittest or script-style tests."""
1429
1430    debug = execute_setup_common(*args, **kwargs)
1431    if not test_function:
1432        execute_unittest(sys.argv, debug)
1433    else:
1434        test_function()
1435
1436def activate_logging():
1437    """Activate iotests.log() output to stdout for script-style tests."""
1438    handler = logging.StreamHandler(stream=sys.stdout)
1439    formatter = logging.Formatter('%(message)s')
1440    handler.setFormatter(formatter)
1441    test_logger.addHandler(handler)
1442    test_logger.setLevel(logging.INFO)
1443    test_logger.propagate = False
1444
1445# This is called from script-style iotests without a single point of entry
1446def script_initialize(*args, **kwargs):
1447    """Initialize script-style tests without running any tests."""
1448    activate_logging()
1449    execute_setup_common(*args, **kwargs)
1450
1451# This is called from script-style iotests with a single point of entry
1452def script_main(test_function, *args, **kwargs):
1453    """Run script-style tests outside of the unittest framework"""
1454    activate_logging()
1455    execute_test(*args, test_function=test_function, **kwargs)
1456
1457# This is called from unittest style iotests
1458def main(*args, **kwargs):
1459    """Run tests using the unittest framework"""
1460    execute_test(*args, **kwargs)
1461