xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 30b6852c)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39from qemu.machine import qtest
40from qemu.qmp import QMPMessage
41
42# Use this logger for logging messages directly from the iotests module
43logger = logging.getLogger('qemu.iotests')
44logger.addHandler(logging.NullHandler())
45
46# Use this logger for messages that ought to be used for diff output.
47test_logger = logging.getLogger('qemu.iotests.diff_io')
48
49
50faulthandler.enable()
51
52# This will not work if arguments contain spaces but is necessary if we
53# want to support the override options that ./check supports.
54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55if os.environ.get('QEMU_IMG_OPTIONS'):
56    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
57
58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59if os.environ.get('QEMU_IO_OPTIONS'):
60    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
61
62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64    qemu_io_args_no_fmt += \
65        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
66
67qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
68qemu_nbd_args = [qemu_nbd_prog]
69if os.environ.get('QEMU_NBD_OPTIONS'):
70    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71
72qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74
75gdb_qemu_env = os.environ.get('GDB_OPTIONS')
76qemu_gdb = []
77if gdb_qemu_env:
78    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
79
80qemu_print = os.environ.get('PRINT_QEMU', False)
81
82imgfmt = os.environ.get('IMGFMT', 'raw')
83imgproto = os.environ.get('IMGPROTO', 'file')
84output_dir = os.environ.get('OUTPUT_DIR', '.')
85
86try:
87    test_dir = os.environ['TEST_DIR']
88    sock_dir = os.environ['SOCK_DIR']
89    cachemode = os.environ['CACHEMODE']
90    aiomode = os.environ['AIOMODE']
91    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
92except KeyError:
93    # We are using these variables as proxies to indicate that we're
94    # not being run via "check". There may be other things set up by
95    # "check" that individual test cases rely on.
96    sys.stderr.write('Please run this test via the "check" script\n')
97    sys.exit(os.EX_USAGE)
98
99qemu_valgrind = []
100if os.environ.get('VALGRIND_QEMU') == "y" and \
101    os.environ.get('NO_VALGRIND') != "y":
102    valgrind_logfile = "--log-file=" + test_dir
103    # %p allows to put the valgrind process PID, since
104    # we don't know it a priori (subprocess.Popen is
105    # not yet invoked)
106    valgrind_logfile += "/%p.valgrind"
107
108    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
109
110socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
111
112luks_default_secret_object = 'secret,id=keysec0,data=' + \
113                             os.environ.get('IMGKEYSECRET', '')
114luks_default_key_secret_opt = 'key-secret=keysec0'
115
116sample_img_dir = os.environ['SAMPLE_IMG_DIR']
117
118
119def unarchive_sample_image(sample, fname):
120    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
121    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
122        shutil.copyfileobj(f_in, f_out)
123
124
125def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
126                              connect_stderr: bool = True) -> Tuple[str, int]:
127    """
128    Run a tool and return both its output and its exit code
129    """
130    stderr = subprocess.STDOUT if connect_stderr else None
131    with subprocess.Popen(args, stdout=subprocess.PIPE,
132                          stderr=stderr, universal_newlines=True) as subp:
133        output = subp.communicate()[0]
134        if subp.returncode < 0:
135            cmd = ' '.join(args)
136            sys.stderr.write(f'{tool} received signal \
137                               {-subp.returncode}: {cmd}\n')
138        return (output, subp.returncode)
139
140def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
141    """
142    Run qemu-img and return both its output and its exit code
143    """
144    full_args = qemu_img_args + list(args)
145    return qemu_tool_pipe_and_status('qemu-img', full_args)
146
147def qemu_img(*args: str) -> int:
148    '''Run qemu-img and return the exit code'''
149    return qemu_img_pipe_and_status(*args)[1]
150
151def ordered_qmp(qmsg, conv_keys=True):
152    # Dictionaries are not ordered prior to 3.6, therefore:
153    if isinstance(qmsg, list):
154        return [ordered_qmp(atom) for atom in qmsg]
155    if isinstance(qmsg, dict):
156        od = OrderedDict()
157        for k, v in sorted(qmsg.items()):
158            if conv_keys:
159                k = k.replace('_', '-')
160            od[k] = ordered_qmp(v, conv_keys=False)
161        return od
162    return qmsg
163
164def qemu_img_create(*args):
165    args = list(args)
166
167    # default luks support
168    if '-f' in args and args[args.index('-f') + 1] == 'luks':
169        if '-o' in args:
170            i = args.index('-o')
171            if 'key-secret' not in args[i + 1]:
172                args[i + 1].append(luks_default_key_secret_opt)
173                args.insert(i + 2, '--object')
174                args.insert(i + 3, luks_default_secret_object)
175        else:
176            args = ['-o', luks_default_key_secret_opt,
177                    '--object', luks_default_secret_object] + args
178
179    args.insert(0, 'create')
180
181    return qemu_img(*args)
182
183def qemu_img_measure(*args):
184    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
185
186def qemu_img_check(*args):
187    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
188
189def qemu_img_verbose(*args):
190    '''Run qemu-img without suppressing its output and return the exit code'''
191    exitcode = subprocess.call(qemu_img_args + list(args))
192    if exitcode < 0:
193        sys.stderr.write('qemu-img received signal %i: %s\n'
194                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
195    return exitcode
196
197def qemu_img_pipe(*args: str) -> str:
198    '''Run qemu-img and return its output'''
199    return qemu_img_pipe_and_status(*args)[0]
200
201def qemu_img_log(*args):
202    result = qemu_img_pipe(*args)
203    log(result, filters=[filter_testfiles])
204    return result
205
206def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
207    args = ['info']
208    if imgopts:
209        args.append('--image-opts')
210    else:
211        args += ['-f', imgfmt]
212    args += extra_args
213    args.append(filename)
214
215    output = qemu_img_pipe(*args)
216    if not filter_path:
217        filter_path = filename
218    log(filter_img_info(output, filter_path))
219
220def qemu_io(*args):
221    '''Run qemu-io and return the stdout data'''
222    args = qemu_io_args + list(args)
223    return qemu_tool_pipe_and_status('qemu-io', args)[0]
224
225def qemu_io_log(*args):
226    result = qemu_io(*args)
227    log(result, filters=[filter_testfiles, filter_qemu_io])
228    return result
229
230def qemu_io_silent(*args):
231    '''Run qemu-io and return the exit code, suppressing stdout'''
232    if '-f' in args or '--image-opts' in args:
233        default_args = qemu_io_args_no_fmt
234    else:
235        default_args = qemu_io_args
236
237    args = default_args + list(args)
238    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
239    if result.returncode < 0:
240        sys.stderr.write('qemu-io received signal %i: %s\n' %
241                         (-result.returncode, ' '.join(args)))
242    return result.returncode
243
244def qemu_io_silent_check(*args):
245    '''Run qemu-io and return the true if subprocess returned 0'''
246    args = qemu_io_args + list(args)
247    result = subprocess.run(args, stdout=subprocess.DEVNULL,
248                            stderr=subprocess.STDOUT, check=False)
249    return result.returncode == 0
250
251class QemuIoInteractive:
252    def __init__(self, *args):
253        self.args = qemu_io_args_no_fmt + list(args)
254        # We need to keep the Popen objext around, and not
255        # close it immediately. Therefore, disable the pylint check:
256        # pylint: disable=consider-using-with
257        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
258                                   stdout=subprocess.PIPE,
259                                   stderr=subprocess.STDOUT,
260                                   universal_newlines=True)
261        out = self._p.stdout.read(9)
262        if out != 'qemu-io> ':
263            # Most probably qemu-io just failed to start.
264            # Let's collect the whole output and exit.
265            out += self._p.stdout.read()
266            self._p.wait(timeout=1)
267            raise ValueError(out)
268
269    def close(self):
270        self._p.communicate('q\n')
271
272    def _read_output(self):
273        pattern = 'qemu-io> '
274        n = len(pattern)
275        pos = 0
276        s = []
277        while pos != n:
278            c = self._p.stdout.read(1)
279            # check unexpected EOF
280            assert c != ''
281            s.append(c)
282            if c == pattern[pos]:
283                pos += 1
284            else:
285                pos = 0
286
287        return ''.join(s[:-n])
288
289    def cmd(self, cmd):
290        # quit command is in close(), '\n' is added automatically
291        assert '\n' not in cmd
292        cmd = cmd.strip()
293        assert cmd not in ('q', 'quit')
294        self._p.stdin.write(cmd + '\n')
295        self._p.stdin.flush()
296        return self._read_output()
297
298
299def qemu_nbd(*args):
300    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
301    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
302
303def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
304    '''Run qemu-nbd in daemon mode and return both the parent's exit code
305       and its output in case of an error'''
306    full_args = qemu_nbd_args + ['--fork'] + list(args)
307    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
308                                                   connect_stderr=False)
309    return returncode, output if returncode else ''
310
311def qemu_nbd_list_log(*args: str) -> str:
312    '''Run qemu-nbd to list remote exports'''
313    full_args = [qemu_nbd_prog, '-L'] + list(args)
314    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
315    log(output, filters=[filter_testfiles, filter_nbd_exports])
316    return output
317
318@contextmanager
319def qemu_nbd_popen(*args):
320    '''Context manager running qemu-nbd within the context'''
321    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
322
323    assert not os.path.exists(pid_file)
324
325    cmd = list(qemu_nbd_args)
326    cmd.extend(('--persistent', '--pid-file', pid_file))
327    cmd.extend(args)
328
329    log('Start NBD server')
330    with subprocess.Popen(cmd) as p:
331        try:
332            while not os.path.exists(pid_file):
333                if p.poll() is not None:
334                    raise RuntimeError(
335                        "qemu-nbd terminated with exit code {}: {}"
336                        .format(p.returncode, ' '.join(cmd)))
337
338                time.sleep(0.01)
339            yield
340        finally:
341            if os.path.exists(pid_file):
342                os.remove(pid_file)
343            log('Kill NBD server')
344            p.kill()
345            p.wait()
346
347def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
348    '''Return True if two image files are identical'''
349    return qemu_img('compare', '-f', fmt1,
350                    '-F', fmt2, img1, img2) == 0
351
352def create_image(name, size):
353    '''Create a fully-allocated raw image with sector markers'''
354    with open(name, 'wb') as file:
355        i = 0
356        while i < size:
357            sector = struct.pack('>l504xl', i // 512, i // 512)
358            file.write(sector)
359            i = i + 512
360
361def image_size(img):
362    '''Return image's virtual size'''
363    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
364    return json.loads(r)['virtual-size']
365
366def is_str(val):
367    return isinstance(val, str)
368
369test_dir_re = re.compile(r"%s" % test_dir)
370def filter_test_dir(msg):
371    return test_dir_re.sub("TEST_DIR", msg)
372
373win32_re = re.compile(r"\r")
374def filter_win32(msg):
375    return win32_re.sub("", msg)
376
377qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
378                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
379                        r"and [0-9\/.inf]* ops\/sec\)")
380def filter_qemu_io(msg):
381    msg = filter_win32(msg)
382    return qemu_io_re.sub("X ops; XX:XX:XX.X "
383                          "(XXX YYY/sec and XXX ops/sec)", msg)
384
385chown_re = re.compile(r"chown [0-9]+:[0-9]+")
386def filter_chown(msg):
387    return chown_re.sub("chown UID:GID", msg)
388
389def filter_qmp_event(event):
390    '''Filter a QMP event dict'''
391    event = dict(event)
392    if 'timestamp' in event:
393        event['timestamp']['seconds'] = 'SECS'
394        event['timestamp']['microseconds'] = 'USECS'
395    return event
396
397def filter_qmp(qmsg, filter_fn):
398    '''Given a string filter, filter a QMP object's values.
399    filter_fn takes a (key, value) pair.'''
400    # Iterate through either lists or dicts;
401    if isinstance(qmsg, list):
402        items = enumerate(qmsg)
403    else:
404        items = qmsg.items()
405
406    for k, v in items:
407        if isinstance(v, (dict, list)):
408            qmsg[k] = filter_qmp(v, filter_fn)
409        else:
410            qmsg[k] = filter_fn(k, v)
411    return qmsg
412
413def filter_testfiles(msg):
414    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
415    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
416    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
417
418def filter_qmp_testfiles(qmsg):
419    def _filter(_key, value):
420        if is_str(value):
421            return filter_testfiles(value)
422        return value
423    return filter_qmp(qmsg, _filter)
424
425def filter_virtio_scsi(output: str) -> str:
426    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
427
428def filter_qmp_virtio_scsi(qmsg):
429    def _filter(_key, value):
430        if is_str(value):
431            return filter_virtio_scsi(value)
432        return value
433    return filter_qmp(qmsg, _filter)
434
435def filter_generated_node_ids(msg):
436    return re.sub("#block[0-9]+", "NODE_NAME", msg)
437
438def filter_img_info(output, filename):
439    lines = []
440    for line in output.split('\n'):
441        if 'disk size' in line or 'actual-size' in line:
442            continue
443        line = line.replace(filename, 'TEST_IMG')
444        line = filter_testfiles(line)
445        line = line.replace(imgfmt, 'IMGFMT')
446        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
447        line = re.sub('uuid: [-a-f0-9]+',
448                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
449                      line)
450        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
451        lines.append(line)
452    return '\n'.join(lines)
453
454def filter_imgfmt(msg):
455    return msg.replace(imgfmt, 'IMGFMT')
456
457def filter_qmp_imgfmt(qmsg):
458    def _filter(_key, value):
459        if is_str(value):
460            return filter_imgfmt(value)
461        return value
462    return filter_qmp(qmsg, _filter)
463
464def filter_nbd_exports(output: str) -> str:
465    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
466
467
468Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
469
470def log(msg: Msg,
471        filters: Iterable[Callable[[Msg], Msg]] = (),
472        indent: Optional[int] = None) -> None:
473    """
474    Logs either a string message or a JSON serializable message (like QMP).
475    If indent is provided, JSON serializable messages are pretty-printed.
476    """
477    for flt in filters:
478        msg = flt(msg)
479    if isinstance(msg, (dict, list)):
480        # Don't sort if it's already sorted
481        do_sort = not isinstance(msg, OrderedDict)
482        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
483    else:
484        test_logger.info(msg)
485
486class Timeout:
487    def __init__(self, seconds, errmsg="Timeout"):
488        self.seconds = seconds
489        self.errmsg = errmsg
490    def __enter__(self):
491        if qemu_gdb or qemu_valgrind:
492            return self
493        signal.signal(signal.SIGALRM, self.timeout)
494        signal.setitimer(signal.ITIMER_REAL, self.seconds)
495        return self
496    def __exit__(self, exc_type, value, traceback):
497        if qemu_gdb or qemu_valgrind:
498            return False
499        signal.setitimer(signal.ITIMER_REAL, 0)
500        return False
501    def timeout(self, signum, frame):
502        raise Exception(self.errmsg)
503
504def file_pattern(name):
505    return "{0}-{1}".format(os.getpid(), name)
506
507class FilePath:
508    """
509    Context manager generating multiple file names. The generated files are
510    removed when exiting the context.
511
512    Example usage:
513
514        with FilePath('a.img', 'b.img') as (img_a, img_b):
515            # Use img_a and img_b here...
516
517        # a.img and b.img are automatically removed here.
518
519    By default images are created in iotests.test_dir. To create sockets use
520    iotests.sock_dir:
521
522       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
523
524    For convenience, calling with one argument yields a single file instead of
525    a tuple with one item.
526
527    """
528    def __init__(self, *names, base_dir=test_dir):
529        self.paths = [os.path.join(base_dir, file_pattern(name))
530                      for name in names]
531
532    def __enter__(self):
533        if len(self.paths) == 1:
534            return self.paths[0]
535        else:
536            return self.paths
537
538    def __exit__(self, exc_type, exc_val, exc_tb):
539        for path in self.paths:
540            try:
541                os.remove(path)
542            except OSError:
543                pass
544        return False
545
546
547def try_remove(img):
548    try:
549        os.remove(img)
550    except OSError:
551        pass
552
553def file_path_remover():
554    for path in reversed(file_path_remover.paths):
555        try_remove(path)
556
557
558def file_path(*names, base_dir=test_dir):
559    ''' Another way to get auto-generated filename that cleans itself up.
560
561    Use is as simple as:
562
563    img_a, img_b = file_path('a.img', 'b.img')
564    sock = file_path('socket')
565    '''
566
567    if not hasattr(file_path_remover, 'paths'):
568        file_path_remover.paths = []
569        atexit.register(file_path_remover)
570
571    paths = []
572    for name in names:
573        filename = file_pattern(name)
574        path = os.path.join(base_dir, filename)
575        file_path_remover.paths.append(path)
576        paths.append(path)
577
578    return paths[0] if len(paths) == 1 else paths
579
580def remote_filename(path):
581    if imgproto == 'file':
582        return path
583    elif imgproto == 'ssh':
584        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
585    else:
586        raise Exception("Protocol %s not supported" % (imgproto))
587
588class VM(qtest.QEMUQtestMachine):
589    '''A QEMU VM'''
590
591    def __init__(self, path_suffix=''):
592        name = "qemu%s-%d" % (path_suffix, os.getpid())
593        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
594        if qemu_gdb and qemu_valgrind:
595            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
596            sys.exit(1)
597        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
598        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
599                         name=name,
600                         base_temp_dir=test_dir,
601                         socket_scm_helper=socket_scm_helper,
602                         sock_dir=sock_dir, qmp_timer=timer)
603        self._num_drives = 0
604
605    def _post_shutdown(self) -> None:
606        super()._post_shutdown()
607        if not qemu_valgrind or not self._popen:
608            return
609        valgrind_filename =  f"{test_dir}/{self._popen.pid}.valgrind"
610        if self.exitcode() == 99:
611            with open(valgrind_filename, encoding='utf-8') as f:
612                print(f.read())
613        else:
614            os.remove(valgrind_filename)
615
616    def _pre_launch(self) -> None:
617        super()._pre_launch()
618        if qemu_print:
619            # set QEMU binary output to stdout
620            self._close_qemu_log_file()
621
622    def add_object(self, opts):
623        self._args.append('-object')
624        self._args.append(opts)
625        return self
626
627    def add_device(self, opts):
628        self._args.append('-device')
629        self._args.append(opts)
630        return self
631
632    def add_drive_raw(self, opts):
633        self._args.append('-drive')
634        self._args.append(opts)
635        return self
636
637    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
638        '''Add a virtio-blk drive to the VM'''
639        options = ['if=%s' % interface,
640                   'id=drive%d' % self._num_drives]
641
642        if path is not None:
643            options.append('file=%s' % path)
644            options.append('format=%s' % img_format)
645            options.append('cache=%s' % cachemode)
646            options.append('aio=%s' % aiomode)
647
648        if opts:
649            options.append(opts)
650
651        if img_format == 'luks' and 'key-secret' not in opts:
652            # default luks support
653            if luks_default_secret_object not in self._args:
654                self.add_object(luks_default_secret_object)
655
656            options.append(luks_default_key_secret_opt)
657
658        self._args.append('-drive')
659        self._args.append(','.join(options))
660        self._num_drives += 1
661        return self
662
663    def add_blockdev(self, opts):
664        self._args.append('-blockdev')
665        if isinstance(opts, str):
666            self._args.append(opts)
667        else:
668            self._args.append(','.join(opts))
669        return self
670
671    def add_incoming(self, addr):
672        self._args.append('-incoming')
673        self._args.append(addr)
674        return self
675
676    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
677        cmd = 'human-monitor-command'
678        kwargs: Dict[str, Any] = {'command-line': command_line}
679        if use_log:
680            return self.qmp_log(cmd, **kwargs)
681        else:
682            return self.qmp(cmd, **kwargs)
683
684    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
685        """Pause drive r/w operations"""
686        if not event:
687            self.pause_drive(drive, "read_aio")
688            self.pause_drive(drive, "write_aio")
689            return
690        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
691
692    def resume_drive(self, drive: str) -> None:
693        """Resume drive r/w operations"""
694        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
695
696    def hmp_qemu_io(self, drive: str, cmd: str,
697                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
698        """Write to a given drive using an HMP command"""
699        d = '-d ' if qdev else ''
700        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
701
702    def flatten_qmp_object(self, obj, output=None, basestr=''):
703        if output is None:
704            output = {}
705        if isinstance(obj, list):
706            for i, item in enumerate(obj):
707                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
708        elif isinstance(obj, dict):
709            for key in obj:
710                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
711        else:
712            output[basestr[:-1]] = obj # Strip trailing '.'
713        return output
714
715    def qmp_to_opts(self, obj):
716        obj = self.flatten_qmp_object(obj)
717        output_list = []
718        for key in obj:
719            output_list += [key + '=' + obj[key]]
720        return ','.join(output_list)
721
722    def get_qmp_events_filtered(self, wait=60.0):
723        result = []
724        for ev in self.get_qmp_events(wait=wait):
725            result.append(filter_qmp_event(ev))
726        return result
727
728    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
729        full_cmd = OrderedDict((
730            ("execute", cmd),
731            ("arguments", ordered_qmp(kwargs))
732        ))
733        log(full_cmd, filters, indent=indent)
734        result = self.qmp(cmd, **kwargs)
735        log(result, filters, indent=indent)
736        return result
737
738    # Returns None on success, and an error string on failure
739    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
740                pre_finalize=None, cancel=False, wait=60.0):
741        """
742        run_job moves a job from creation through to dismissal.
743
744        :param job: String. ID of recently-launched job
745        :param auto_finalize: Bool. True if the job was launched with
746                              auto_finalize. Defaults to True.
747        :param auto_dismiss: Bool. True if the job was launched with
748                             auto_dismiss=True. Defaults to False.
749        :param pre_finalize: Callback. A callable that takes no arguments to be
750                             invoked prior to issuing job-finalize, if any.
751        :param cancel: Bool. When true, cancels the job after the pre_finalize
752                       callback.
753        :param wait: Float. Timeout value specifying how long to wait for any
754                     event, in seconds. Defaults to 60.0.
755        """
756        match_device = {'data': {'device': job}}
757        match_id = {'data': {'id': job}}
758        events = [
759            ('BLOCK_JOB_COMPLETED', match_device),
760            ('BLOCK_JOB_CANCELLED', match_device),
761            ('BLOCK_JOB_ERROR', match_device),
762            ('BLOCK_JOB_READY', match_device),
763            ('BLOCK_JOB_PENDING', match_id),
764            ('JOB_STATUS_CHANGE', match_id)
765        ]
766        error = None
767        while True:
768            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
769            if ev['event'] != 'JOB_STATUS_CHANGE':
770                log(ev)
771                continue
772            status = ev['data']['status']
773            if status == 'aborting':
774                result = self.qmp('query-jobs')
775                for j in result['return']:
776                    if j['id'] == job:
777                        error = j['error']
778                        log('Job failed: %s' % (j['error']))
779            elif status == 'ready':
780                self.qmp_log('job-complete', id=job)
781            elif status == 'pending' and not auto_finalize:
782                if pre_finalize:
783                    pre_finalize()
784                if cancel:
785                    self.qmp_log('job-cancel', id=job)
786                else:
787                    self.qmp_log('job-finalize', id=job)
788            elif status == 'concluded' and not auto_dismiss:
789                self.qmp_log('job-dismiss', id=job)
790            elif status == 'null':
791                return error
792
793    # Returns None on success, and an error string on failure
794    def blockdev_create(self, options, job_id='job0', filters=None):
795        if filters is None:
796            filters = [filter_qmp_testfiles]
797        result = self.qmp_log('blockdev-create', filters=filters,
798                              job_id=job_id, options=options)
799
800        if 'return' in result:
801            assert result['return'] == {}
802            job_result = self.run_job(job_id)
803        else:
804            job_result = result['error']
805
806        log("")
807        return job_result
808
809    def enable_migration_events(self, name):
810        log('Enabling migration QMP events on %s...' % name)
811        log(self.qmp('migrate-set-capabilities', capabilities=[
812            {
813                'capability': 'events',
814                'state': True
815            }
816        ]))
817
818    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
819        while True:
820            event = self.event_wait('MIGRATION')
821            # We use the default timeout, and with a timeout, event_wait()
822            # never returns None
823            assert event
824
825            log(event, filters=[filter_qmp_event])
826            if event['data']['status'] in ('completed', 'failed'):
827                break
828
829        if event['data']['status'] == 'completed':
830            # The event may occur in finish-migrate, so wait for the expected
831            # post-migration runstate
832            runstate = None
833            while runstate != expect_runstate:
834                runstate = self.qmp('query-status')['return']['status']
835            return True
836        else:
837            return False
838
839    def node_info(self, node_name):
840        nodes = self.qmp('query-named-block-nodes')
841        for x in nodes['return']:
842            if x['node-name'] == node_name:
843                return x
844        return None
845
846    def query_bitmaps(self):
847        res = self.qmp("query-named-block-nodes")
848        return {device['node-name']: device['dirty-bitmaps']
849                for device in res['return'] if 'dirty-bitmaps' in device}
850
851    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
852        """
853        get a specific bitmap from the object returned by query_bitmaps.
854        :param recording: If specified, filter results by the specified value.
855        :param bitmaps: If specified, use it instead of call query_bitmaps()
856        """
857        if bitmaps is None:
858            bitmaps = self.query_bitmaps()
859
860        for bitmap in bitmaps[node_name]:
861            if bitmap.get('name', '') == bitmap_name:
862                if recording is None or bitmap.get('recording') == recording:
863                    return bitmap
864        return None
865
866    def check_bitmap_status(self, node_name, bitmap_name, fields):
867        ret = self.get_bitmap(node_name, bitmap_name)
868
869        return fields.items() <= ret.items()
870
871    def assert_block_path(self, root, path, expected_node, graph=None):
872        """
873        Check whether the node under the given path in the block graph
874        is @expected_node.
875
876        @root is the node name of the node where the @path is rooted.
877
878        @path is a string that consists of child names separated by
879        slashes.  It must begin with a slash.
880
881        Examples for @root + @path:
882          - root="qcow2-node", path="/backing/file"
883          - root="quorum-node", path="/children.2/file"
884
885        Hypothetically, @path could be empty, in which case it would
886        point to @root.  However, in practice this case is not useful
887        and hence not allowed.
888
889        @expected_node may be None.  (All elements of the path but the
890        leaf must still exist.)
891
892        @graph may be None or the result of an x-debug-query-block-graph
893        call that has already been performed.
894        """
895        if graph is None:
896            graph = self.qmp('x-debug-query-block-graph')['return']
897
898        iter_path = iter(path.split('/'))
899
900        # Must start with a /
901        assert next(iter_path) == ''
902
903        node = next((node for node in graph['nodes'] if node['name'] == root),
904                    None)
905
906        # An empty @path is not allowed, so the root node must be present
907        assert node is not None, 'Root node %s not found' % root
908
909        for child_name in iter_path:
910            assert node is not None, 'Cannot follow path %s%s' % (root, path)
911
912            try:
913                node_id = next(edge['child'] for edge in graph['edges']
914                               if (edge['parent'] == node['id'] and
915                                   edge['name'] == child_name))
916
917                node = next(node for node in graph['nodes']
918                            if node['id'] == node_id)
919
920            except StopIteration:
921                node = None
922
923        if node is None:
924            assert expected_node is None, \
925                   'No node found under %s (but expected %s)' % \
926                   (path, expected_node)
927        else:
928            assert node['name'] == expected_node, \
929                   'Found node %s under %s (but expected %s)' % \
930                   (node['name'], path, expected_node)
931
932index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
933
934class QMPTestCase(unittest.TestCase):
935    '''Abstract base class for QMP test cases'''
936
937    def __init__(self, *args, **kwargs):
938        super().__init__(*args, **kwargs)
939        # Many users of this class set a VM property we rely on heavily
940        # in the methods below.
941        self.vm = None
942
943    def dictpath(self, d, path):
944        '''Traverse a path in a nested dict'''
945        for component in path.split('/'):
946            m = index_re.match(component)
947            if m:
948                component, idx = m.groups()
949                idx = int(idx)
950
951            if not isinstance(d, dict) or component not in d:
952                self.fail(f'failed path traversal for "{path}" in "{d}"')
953            d = d[component]
954
955            if m:
956                if not isinstance(d, list):
957                    self.fail(f'path component "{component}" in "{path}" '
958                              f'is not a list in "{d}"')
959                try:
960                    d = d[idx]
961                except IndexError:
962                    self.fail(f'invalid index "{idx}" in path "{path}" '
963                              f'in "{d}"')
964        return d
965
966    def assert_qmp_absent(self, d, path):
967        try:
968            result = self.dictpath(d, path)
969        except AssertionError:
970            return
971        self.fail('path "%s" has value "%s"' % (path, str(result)))
972
973    def assert_qmp(self, d, path, value):
974        '''Assert that the value for a specific path in a QMP dict
975           matches.  When given a list of values, assert that any of
976           them matches.'''
977
978        result = self.dictpath(d, path)
979
980        # [] makes no sense as a list of valid values, so treat it as
981        # an actual single value.
982        if isinstance(value, list) and value != []:
983            for v in value:
984                if result == v:
985                    return
986            self.fail('no match for "%s" in %s' % (str(result), str(value)))
987        else:
988            self.assertEqual(result, value,
989                             '"%s" is "%s", expected "%s"'
990                             % (path, str(result), str(value)))
991
992    def assert_no_active_block_jobs(self):
993        result = self.vm.qmp('query-block-jobs')
994        self.assert_qmp(result, 'return', [])
995
996    def assert_has_block_node(self, node_name=None, file_name=None):
997        """Issue a query-named-block-nodes and assert node_name and/or
998        file_name is present in the result"""
999        def check_equal_or_none(a, b):
1000            return a is None or b is None or a == b
1001        assert node_name or file_name
1002        result = self.vm.qmp('query-named-block-nodes')
1003        for x in result["return"]:
1004            if check_equal_or_none(x.get("node-name"), node_name) and \
1005                    check_equal_or_none(x.get("file"), file_name):
1006                return
1007        self.fail("Cannot find %s %s in result:\n%s" %
1008                  (node_name, file_name, result))
1009
1010    def assert_json_filename_equal(self, json_filename, reference):
1011        '''Asserts that the given filename is a json: filename and that its
1012           content is equal to the given reference object'''
1013        self.assertEqual(json_filename[:5], 'json:')
1014        self.assertEqual(
1015            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1016            self.vm.flatten_qmp_object(reference)
1017        )
1018
1019    def cancel_and_wait(self, drive='drive0', force=False,
1020                        resume=False, wait=60.0):
1021        '''Cancel a block job and wait for it to finish, returning the event'''
1022        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1023        self.assert_qmp(result, 'return', {})
1024
1025        if resume:
1026            self.vm.resume_drive(drive)
1027
1028        cancelled = False
1029        result = None
1030        while not cancelled:
1031            for event in self.vm.get_qmp_events(wait=wait):
1032                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1033                   event['event'] == 'BLOCK_JOB_CANCELLED':
1034                    self.assert_qmp(event, 'data/device', drive)
1035                    result = event
1036                    cancelled = True
1037                elif event['event'] == 'JOB_STATUS_CHANGE':
1038                    self.assert_qmp(event, 'data/id', drive)
1039
1040
1041        self.assert_no_active_block_jobs()
1042        return result
1043
1044    def wait_until_completed(self, drive='drive0', check_offset=True,
1045                             wait=60.0, error=None):
1046        '''Wait for a block job to finish, returning the event'''
1047        while True:
1048            for event in self.vm.get_qmp_events(wait=wait):
1049                if event['event'] == 'BLOCK_JOB_COMPLETED':
1050                    self.assert_qmp(event, 'data/device', drive)
1051                    if error is None:
1052                        self.assert_qmp_absent(event, 'data/error')
1053                        if check_offset:
1054                            self.assert_qmp(event, 'data/offset',
1055                                            event['data']['len'])
1056                    else:
1057                        self.assert_qmp(event, 'data/error', error)
1058                    self.assert_no_active_block_jobs()
1059                    return event
1060                if event['event'] == 'JOB_STATUS_CHANGE':
1061                    self.assert_qmp(event, 'data/id', drive)
1062
1063    def wait_ready(self, drive='drive0'):
1064        """Wait until a BLOCK_JOB_READY event, and return the event."""
1065        return self.vm.events_wait([
1066            ('BLOCK_JOB_READY',
1067             {'data': {'type': 'mirror', 'device': drive}}),
1068            ('BLOCK_JOB_READY',
1069             {'data': {'type': 'commit', 'device': drive}})
1070        ])
1071
1072    def wait_ready_and_cancel(self, drive='drive0'):
1073        self.wait_ready(drive=drive)
1074        event = self.cancel_and_wait(drive=drive)
1075        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1076        self.assert_qmp(event, 'data/type', 'mirror')
1077        self.assert_qmp(event, 'data/offset', event['data']['len'])
1078
1079    def complete_and_wait(self, drive='drive0', wait_ready=True,
1080                          completion_error=None):
1081        '''Complete a block job and wait for it to finish'''
1082        if wait_ready:
1083            self.wait_ready(drive=drive)
1084
1085        result = self.vm.qmp('block-job-complete', device=drive)
1086        self.assert_qmp(result, 'return', {})
1087
1088        event = self.wait_until_completed(drive=drive, error=completion_error)
1089        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1090
1091    def pause_wait(self, job_id='job0'):
1092        with Timeout(3, "Timeout waiting for job to pause"):
1093            while True:
1094                result = self.vm.qmp('query-block-jobs')
1095                found = False
1096                for job in result['return']:
1097                    if job['device'] == job_id:
1098                        found = True
1099                        if job['paused'] and not job['busy']:
1100                            return job
1101                        break
1102                assert found
1103
1104    def pause_job(self, job_id='job0', wait=True):
1105        result = self.vm.qmp('block-job-pause', device=job_id)
1106        self.assert_qmp(result, 'return', {})
1107        if wait:
1108            return self.pause_wait(job_id)
1109        return result
1110
1111    def case_skip(self, reason):
1112        '''Skip this test case'''
1113        case_notrun(reason)
1114        self.skipTest(reason)
1115
1116
1117def notrun(reason):
1118    '''Skip this test suite'''
1119    # Each test in qemu-iotests has a number ("seq")
1120    seq = os.path.basename(sys.argv[0])
1121
1122    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1123            as outfile:
1124        outfile.write(reason + '\n')
1125    logger.warning("%s not run: %s", seq, reason)
1126    sys.exit(0)
1127
1128def case_notrun(reason):
1129    '''Mark this test case as not having been run (without actually
1130    skipping it, that is left to the caller).  See
1131    QMPTestCase.case_skip() for a variant that actually skips the
1132    current test case.'''
1133
1134    # Each test in qemu-iotests has a number ("seq")
1135    seq = os.path.basename(sys.argv[0])
1136
1137    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1138            as outfile:
1139        outfile.write('    [case not run] ' + reason + '\n')
1140
1141def _verify_image_format(supported_fmts: Sequence[str] = (),
1142                         unsupported_fmts: Sequence[str] = ()) -> None:
1143    if 'generic' in supported_fmts and \
1144            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1145        # similar to
1146        #   _supported_fmt generic
1147        # for bash tests
1148        supported_fmts = ()
1149
1150    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1151    if not_sup or (imgfmt in unsupported_fmts):
1152        notrun('not suitable for this image format: %s' % imgfmt)
1153
1154    if imgfmt == 'luks':
1155        verify_working_luks()
1156
1157def _verify_protocol(supported: Sequence[str] = (),
1158                     unsupported: Sequence[str] = ()) -> None:
1159    assert not (supported and unsupported)
1160
1161    if 'generic' in supported:
1162        return
1163
1164    not_sup = supported and (imgproto not in supported)
1165    if not_sup or (imgproto in unsupported):
1166        notrun('not suitable for this protocol: %s' % imgproto)
1167
1168def _verify_platform(supported: Sequence[str] = (),
1169                     unsupported: Sequence[str] = ()) -> None:
1170    if any((sys.platform.startswith(x) for x in unsupported)):
1171        notrun('not suitable for this OS: %s' % sys.platform)
1172
1173    if supported:
1174        if not any((sys.platform.startswith(x) for x in supported)):
1175            notrun('not suitable for this OS: %s' % sys.platform)
1176
1177def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1178    if supported_cache_modes and (cachemode not in supported_cache_modes):
1179        notrun('not suitable for this cache mode: %s' % cachemode)
1180
1181def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1182    if supported_aio_modes and (aiomode not in supported_aio_modes):
1183        notrun('not suitable for this aio mode: %s' % aiomode)
1184
1185def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1186    usf_list = list(set(required_formats) - set(supported_formats()))
1187    if usf_list:
1188        notrun(f'formats {usf_list} are not whitelisted')
1189
1190
1191def _verify_virtio_blk() -> None:
1192    out = qemu_pipe('-M', 'none', '-device', 'help')
1193    if 'virtio-blk' not in out:
1194        notrun('Missing virtio-blk in QEMU binary')
1195
1196def _verify_virtio_scsi_pci_or_ccw() -> None:
1197    out = qemu_pipe('-M', 'none', '-device', 'help')
1198    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1199        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1200
1201
1202def supports_quorum():
1203    return 'quorum' in qemu_img_pipe('--help')
1204
1205def verify_quorum():
1206    '''Skip test suite if quorum support is not available'''
1207    if not supports_quorum():
1208        notrun('quorum support missing')
1209
1210def has_working_luks() -> Tuple[bool, str]:
1211    """
1212    Check whether our LUKS driver can actually create images
1213    (this extends to LUKS encryption for qcow2).
1214
1215    If not, return the reason why.
1216    """
1217
1218    img_file = f'{test_dir}/luks-test.luks'
1219    (output, status) = \
1220        qemu_img_pipe_and_status('create', '-f', 'luks',
1221                                 '--object', luks_default_secret_object,
1222                                 '-o', luks_default_key_secret_opt,
1223                                 '-o', 'iter-time=10',
1224                                 img_file, '1G')
1225    try:
1226        os.remove(img_file)
1227    except OSError:
1228        pass
1229
1230    if status != 0:
1231        reason = output
1232        for line in output.splitlines():
1233            if img_file + ':' in line:
1234                reason = line.split(img_file + ':', 1)[1].strip()
1235                break
1236
1237        return (False, reason)
1238    else:
1239        return (True, '')
1240
1241def verify_working_luks():
1242    """
1243    Skip test suite if LUKS does not work
1244    """
1245    (working, reason) = has_working_luks()
1246    if not working:
1247        notrun(reason)
1248
1249def qemu_pipe(*args: str) -> str:
1250    """
1251    Run qemu with an option to print something and exit (e.g. a help option).
1252
1253    :return: QEMU's stdout output.
1254    """
1255    full_args = [qemu_prog] + qemu_opts + list(args)
1256    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1257    return output
1258
1259def supported_formats(read_only=False):
1260    '''Set 'read_only' to True to check ro-whitelist
1261       Otherwise, rw-whitelist is checked'''
1262
1263    if not hasattr(supported_formats, "formats"):
1264        supported_formats.formats = {}
1265
1266    if read_only not in supported_formats.formats:
1267        format_message = qemu_pipe("-drive", "format=help")
1268        line = 1 if read_only else 0
1269        supported_formats.formats[read_only] = \
1270            format_message.splitlines()[line].split(":")[1].split()
1271
1272    return supported_formats.formats[read_only]
1273
1274def skip_if_unsupported(required_formats=(), read_only=False):
1275    '''Skip Test Decorator
1276       Runs the test if all the required formats are whitelisted'''
1277    def skip_test_decorator(func):
1278        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1279                         **kwargs: Dict[str, Any]) -> None:
1280            if callable(required_formats):
1281                fmts = required_formats(test_case)
1282            else:
1283                fmts = required_formats
1284
1285            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1286            if usf_list:
1287                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1288                test_case.case_skip(msg)
1289            else:
1290                func(test_case, *args, **kwargs)
1291        return func_wrapper
1292    return skip_test_decorator
1293
1294def skip_for_formats(formats: Sequence[str] = ()) \
1295    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1296                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1297    '''Skip Test Decorator
1298       Skips the test for the given formats'''
1299    def skip_test_decorator(func):
1300        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1301                         **kwargs: Dict[str, Any]) -> None:
1302            if imgfmt in formats:
1303                msg = f'{test_case}: Skipped for format {imgfmt}'
1304                test_case.case_skip(msg)
1305            else:
1306                func(test_case, *args, **kwargs)
1307        return func_wrapper
1308    return skip_test_decorator
1309
1310def skip_if_user_is_root(func):
1311    '''Skip Test Decorator
1312       Runs the test only without root permissions'''
1313    def func_wrapper(*args, **kwargs):
1314        if os.getuid() == 0:
1315            case_notrun('{}: cannot be run as root'.format(args[0]))
1316            return None
1317        else:
1318            return func(*args, **kwargs)
1319    return func_wrapper
1320
1321# We need to filter out the time taken from the output so that
1322# qemu-iotest can reliably diff the results against master output,
1323# and hide skipped tests from the reference output.
1324
1325class ReproducibleTestResult(unittest.TextTestResult):
1326    def addSkip(self, test, reason):
1327        # Same as TextTestResult, but print dot instead of "s"
1328        unittest.TestResult.addSkip(self, test, reason)
1329        if self.showAll:
1330            self.stream.writeln("skipped {0!r}".format(reason))
1331        elif self.dots:
1332            self.stream.write(".")
1333            self.stream.flush()
1334
1335class ReproducibleStreamWrapper:
1336    def __init__(self, stream: TextIO):
1337        self.stream = stream
1338
1339    def __getattr__(self, attr):
1340        if attr in ('stream', '__getstate__'):
1341            raise AttributeError(attr)
1342        return getattr(self.stream, attr)
1343
1344    def write(self, arg=None):
1345        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1346        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1347        self.stream.write(arg)
1348
1349class ReproducibleTestRunner(unittest.TextTestRunner):
1350    def __init__(self, stream: Optional[TextIO] = None,
1351             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1352             **kwargs: Any) -> None:
1353        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1354        super().__init__(stream=rstream,           # type: ignore
1355                         descriptions=True,
1356                         resultclass=resultclass,
1357                         **kwargs)
1358
1359def execute_unittest(argv: List[str], debug: bool = False) -> None:
1360    """Executes unittests within the calling module."""
1361
1362    # Some tests have warnings, especially ResourceWarnings for unclosed
1363    # files and sockets.  Ignore them for now to ensure reproducibility of
1364    # the test output.
1365    unittest.main(argv=argv,
1366                  testRunner=ReproducibleTestRunner,
1367                  verbosity=2 if debug else 1,
1368                  warnings=None if sys.warnoptions else 'ignore')
1369
1370def execute_setup_common(supported_fmts: Sequence[str] = (),
1371                         supported_platforms: Sequence[str] = (),
1372                         supported_cache_modes: Sequence[str] = (),
1373                         supported_aio_modes: Sequence[str] = (),
1374                         unsupported_fmts: Sequence[str] = (),
1375                         supported_protocols: Sequence[str] = (),
1376                         unsupported_protocols: Sequence[str] = (),
1377                         required_fmts: Sequence[str] = ()) -> bool:
1378    """
1379    Perform necessary setup for either script-style or unittest-style tests.
1380
1381    :return: Bool; Whether or not debug mode has been requested via the CLI.
1382    """
1383    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1384
1385    debug = '-d' in sys.argv
1386    if debug:
1387        sys.argv.remove('-d')
1388    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1389
1390    _verify_image_format(supported_fmts, unsupported_fmts)
1391    _verify_protocol(supported_protocols, unsupported_protocols)
1392    _verify_platform(supported=supported_platforms)
1393    _verify_cache_mode(supported_cache_modes)
1394    _verify_aio_mode(supported_aio_modes)
1395    _verify_formats(required_fmts)
1396    _verify_virtio_blk()
1397
1398    return debug
1399
1400def execute_test(*args, test_function=None, **kwargs):
1401    """Run either unittest or script-style tests."""
1402
1403    debug = execute_setup_common(*args, **kwargs)
1404    if not test_function:
1405        execute_unittest(sys.argv, debug)
1406    else:
1407        test_function()
1408
1409def activate_logging():
1410    """Activate iotests.log() output to stdout for script-style tests."""
1411    handler = logging.StreamHandler(stream=sys.stdout)
1412    formatter = logging.Formatter('%(message)s')
1413    handler.setFormatter(formatter)
1414    test_logger.addHandler(handler)
1415    test_logger.setLevel(logging.INFO)
1416    test_logger.propagate = False
1417
1418# This is called from script-style iotests without a single point of entry
1419def script_initialize(*args, **kwargs):
1420    """Initialize script-style tests without running any tests."""
1421    activate_logging()
1422    execute_setup_common(*args, **kwargs)
1423
1424# This is called from script-style iotests with a single point of entry
1425def script_main(test_function, *args, **kwargs):
1426    """Run script-style tests outside of the unittest framework"""
1427    activate_logging()
1428    execute_test(*args, test_function=test_function, **kwargs)
1429
1430# This is called from unittest style iotests
1431def main(*args, **kwargs):
1432    """Run tests using the unittest framework"""
1433    execute_test(*args, **kwargs)
1434