xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision c7daa57e)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39# pylint: disable=import-error, wrong-import-position
40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41from qemu.machine import qtest
42from qemu.qmp import QMPMessage
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77gdb_qemu_env = os.environ.get('GDB_OPTIONS')
78qemu_gdb = []
79if gdb_qemu_env:
80    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
81
82qemu_print = os.environ.get('PRINT_QEMU', False)
83
84imgfmt = os.environ.get('IMGFMT', 'raw')
85imgproto = os.environ.get('IMGPROTO', 'file')
86output_dir = os.environ.get('OUTPUT_DIR', '.')
87
88try:
89    test_dir = os.environ['TEST_DIR']
90    sock_dir = os.environ['SOCK_DIR']
91    cachemode = os.environ['CACHEMODE']
92    aiomode = os.environ['AIOMODE']
93    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
94except KeyError:
95    # We are using these variables as proxies to indicate that we're
96    # not being run via "check". There may be other things set up by
97    # "check" that individual test cases rely on.
98    sys.stderr.write('Please run this test via the "check" script\n')
99    sys.exit(os.EX_USAGE)
100
101qemu_valgrind = []
102if os.environ.get('VALGRIND_QEMU') == "y" and \
103    os.environ.get('NO_VALGRIND') != "y":
104    valgrind_logfile = "--log-file=" + test_dir
105    # %p allows to put the valgrind process PID, since
106    # we don't know it a priori (subprocess.Popen is
107    # not yet invoked)
108    valgrind_logfile += "/%p.valgrind"
109
110    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
111
112socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
113
114luks_default_secret_object = 'secret,id=keysec0,data=' + \
115                             os.environ.get('IMGKEYSECRET', '')
116luks_default_key_secret_opt = 'key-secret=keysec0'
117
118sample_img_dir = os.environ['SAMPLE_IMG_DIR']
119
120
121def unarchive_sample_image(sample, fname):
122    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
123    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
124        shutil.copyfileobj(f_in, f_out)
125
126
127def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
128                              connect_stderr: bool = True) -> Tuple[str, int]:
129    """
130    Run a tool and return both its output and its exit code
131    """
132    stderr = subprocess.STDOUT if connect_stderr else None
133    with subprocess.Popen(args, stdout=subprocess.PIPE,
134                          stderr=stderr, universal_newlines=True) as subp:
135        output = subp.communicate()[0]
136        if subp.returncode < 0:
137            cmd = ' '.join(args)
138            sys.stderr.write(f'{tool} received signal \
139                               {-subp.returncode}: {cmd}\n')
140        return (output, subp.returncode)
141
142def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
143    """
144    Run qemu-img and return both its output and its exit code
145    """
146    full_args = qemu_img_args + list(args)
147    return qemu_tool_pipe_and_status('qemu-img', full_args)
148
149def qemu_img(*args: str) -> int:
150    '''Run qemu-img and return the exit code'''
151    return qemu_img_pipe_and_status(*args)[1]
152
153def ordered_qmp(qmsg, conv_keys=True):
154    # Dictionaries are not ordered prior to 3.6, therefore:
155    if isinstance(qmsg, list):
156        return [ordered_qmp(atom) for atom in qmsg]
157    if isinstance(qmsg, dict):
158        od = OrderedDict()
159        for k, v in sorted(qmsg.items()):
160            if conv_keys:
161                k = k.replace('_', '-')
162            od[k] = ordered_qmp(v, conv_keys=False)
163        return od
164    return qmsg
165
166def qemu_img_create(*args):
167    args = list(args)
168
169    # default luks support
170    if '-f' in args and args[args.index('-f') + 1] == 'luks':
171        if '-o' in args:
172            i = args.index('-o')
173            if 'key-secret' not in args[i + 1]:
174                args[i + 1].append(luks_default_key_secret_opt)
175                args.insert(i + 2, '--object')
176                args.insert(i + 3, luks_default_secret_object)
177        else:
178            args = ['-o', luks_default_key_secret_opt,
179                    '--object', luks_default_secret_object] + args
180
181    args.insert(0, 'create')
182
183    return qemu_img(*args)
184
185def qemu_img_measure(*args):
186    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
187
188def qemu_img_check(*args):
189    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
190
191def qemu_img_verbose(*args):
192    '''Run qemu-img without suppressing its output and return the exit code'''
193    exitcode = subprocess.call(qemu_img_args + list(args))
194    if exitcode < 0:
195        sys.stderr.write('qemu-img received signal %i: %s\n'
196                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
197    return exitcode
198
199def qemu_img_pipe(*args: str) -> str:
200    '''Run qemu-img and return its output'''
201    return qemu_img_pipe_and_status(*args)[0]
202
203def qemu_img_log(*args):
204    result = qemu_img_pipe(*args)
205    log(result, filters=[filter_testfiles])
206    return result
207
208def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
209    args = ['info']
210    if imgopts:
211        args.append('--image-opts')
212    else:
213        args += ['-f', imgfmt]
214    args += extra_args
215    args.append(filename)
216
217    output = qemu_img_pipe(*args)
218    if not filter_path:
219        filter_path = filename
220    log(filter_img_info(output, filter_path))
221
222def qemu_io(*args):
223    '''Run qemu-io and return the stdout data'''
224    args = qemu_io_args + list(args)
225    return qemu_tool_pipe_and_status('qemu-io', args)[0]
226
227def qemu_io_log(*args):
228    result = qemu_io(*args)
229    log(result, filters=[filter_testfiles, filter_qemu_io])
230    return result
231
232def qemu_io_silent(*args):
233    '''Run qemu-io and return the exit code, suppressing stdout'''
234    if '-f' in args or '--image-opts' in args:
235        default_args = qemu_io_args_no_fmt
236    else:
237        default_args = qemu_io_args
238
239    args = default_args + list(args)
240    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
241    if result.returncode < 0:
242        sys.stderr.write('qemu-io received signal %i: %s\n' %
243                         (-result.returncode, ' '.join(args)))
244    return result.returncode
245
246def qemu_io_silent_check(*args):
247    '''Run qemu-io and return the true if subprocess returned 0'''
248    args = qemu_io_args + list(args)
249    result = subprocess.run(args, stdout=subprocess.DEVNULL,
250                            stderr=subprocess.STDOUT, check=False)
251    return result.returncode == 0
252
253class QemuIoInteractive:
254    def __init__(self, *args):
255        self.args = qemu_io_args_no_fmt + list(args)
256        # We need to keep the Popen objext around, and not
257        # close it immediately. Therefore, disable the pylint check:
258        # pylint: disable=consider-using-with
259        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
260                                   stdout=subprocess.PIPE,
261                                   stderr=subprocess.STDOUT,
262                                   universal_newlines=True)
263        out = self._p.stdout.read(9)
264        if out != 'qemu-io> ':
265            # Most probably qemu-io just failed to start.
266            # Let's collect the whole output and exit.
267            out += self._p.stdout.read()
268            self._p.wait(timeout=1)
269            raise ValueError(out)
270
271    def close(self):
272        self._p.communicate('q\n')
273
274    def _read_output(self):
275        pattern = 'qemu-io> '
276        n = len(pattern)
277        pos = 0
278        s = []
279        while pos != n:
280            c = self._p.stdout.read(1)
281            # check unexpected EOF
282            assert c != ''
283            s.append(c)
284            if c == pattern[pos]:
285                pos += 1
286            else:
287                pos = 0
288
289        return ''.join(s[:-n])
290
291    def cmd(self, cmd):
292        # quit command is in close(), '\n' is added automatically
293        assert '\n' not in cmd
294        cmd = cmd.strip()
295        assert cmd not in ('q', 'quit')
296        self._p.stdin.write(cmd + '\n')
297        self._p.stdin.flush()
298        return self._read_output()
299
300
301def qemu_nbd(*args):
302    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
303    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
304
305def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
306    '''Run qemu-nbd in daemon mode and return both the parent's exit code
307       and its output in case of an error'''
308    full_args = qemu_nbd_args + ['--fork'] + list(args)
309    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
310                                                   connect_stderr=False)
311    return returncode, output if returncode else ''
312
313def qemu_nbd_list_log(*args: str) -> str:
314    '''Run qemu-nbd to list remote exports'''
315    full_args = [qemu_nbd_prog, '-L'] + list(args)
316    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
317    log(output, filters=[filter_testfiles, filter_nbd_exports])
318    return output
319
320@contextmanager
321def qemu_nbd_popen(*args):
322    '''Context manager running qemu-nbd within the context'''
323    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
324
325    assert not os.path.exists(pid_file)
326
327    cmd = list(qemu_nbd_args)
328    cmd.extend(('--persistent', '--pid-file', pid_file))
329    cmd.extend(args)
330
331    log('Start NBD server')
332    with subprocess.Popen(cmd) as p:
333        try:
334            while not os.path.exists(pid_file):
335                if p.poll() is not None:
336                    raise RuntimeError(
337                        "qemu-nbd terminated with exit code {}: {}"
338                        .format(p.returncode, ' '.join(cmd)))
339
340                time.sleep(0.01)
341            yield
342        finally:
343            if os.path.exists(pid_file):
344                os.remove(pid_file)
345            log('Kill NBD server')
346            p.kill()
347            p.wait()
348
349def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
350    '''Return True if two image files are identical'''
351    return qemu_img('compare', '-f', fmt1,
352                    '-F', fmt2, img1, img2) == 0
353
354def create_image(name, size):
355    '''Create a fully-allocated raw image with sector markers'''
356    with open(name, 'wb') as file:
357        i = 0
358        while i < size:
359            sector = struct.pack('>l504xl', i // 512, i // 512)
360            file.write(sector)
361            i = i + 512
362
363def image_size(img):
364    '''Return image's virtual size'''
365    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
366    return json.loads(r)['virtual-size']
367
368def is_str(val):
369    return isinstance(val, str)
370
371test_dir_re = re.compile(r"%s" % test_dir)
372def filter_test_dir(msg):
373    return test_dir_re.sub("TEST_DIR", msg)
374
375win32_re = re.compile(r"\r")
376def filter_win32(msg):
377    return win32_re.sub("", msg)
378
379qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
380                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
381                        r"and [0-9\/.inf]* ops\/sec\)")
382def filter_qemu_io(msg):
383    msg = filter_win32(msg)
384    return qemu_io_re.sub("X ops; XX:XX:XX.X "
385                          "(XXX YYY/sec and XXX ops/sec)", msg)
386
387chown_re = re.compile(r"chown [0-9]+:[0-9]+")
388def filter_chown(msg):
389    return chown_re.sub("chown UID:GID", msg)
390
391def filter_qmp_event(event):
392    '''Filter a QMP event dict'''
393    event = dict(event)
394    if 'timestamp' in event:
395        event['timestamp']['seconds'] = 'SECS'
396        event['timestamp']['microseconds'] = 'USECS'
397    return event
398
399def filter_qmp(qmsg, filter_fn):
400    '''Given a string filter, filter a QMP object's values.
401    filter_fn takes a (key, value) pair.'''
402    # Iterate through either lists or dicts;
403    if isinstance(qmsg, list):
404        items = enumerate(qmsg)
405    else:
406        items = qmsg.items()
407
408    for k, v in items:
409        if isinstance(v, (dict, list)):
410            qmsg[k] = filter_qmp(v, filter_fn)
411        else:
412            qmsg[k] = filter_fn(k, v)
413    return qmsg
414
415def filter_testfiles(msg):
416    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
417    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
418    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
419
420def filter_qmp_testfiles(qmsg):
421    def _filter(_key, value):
422        if is_str(value):
423            return filter_testfiles(value)
424        return value
425    return filter_qmp(qmsg, _filter)
426
427def filter_virtio_scsi(output: str) -> str:
428    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
429
430def filter_qmp_virtio_scsi(qmsg):
431    def _filter(_key, value):
432        if is_str(value):
433            return filter_virtio_scsi(value)
434        return value
435    return filter_qmp(qmsg, _filter)
436
437def filter_generated_node_ids(msg):
438    return re.sub("#block[0-9]+", "NODE_NAME", msg)
439
440def filter_img_info(output, filename):
441    lines = []
442    for line in output.split('\n'):
443        if 'disk size' in line or 'actual-size' in line:
444            continue
445        line = line.replace(filename, 'TEST_IMG')
446        line = filter_testfiles(line)
447        line = line.replace(imgfmt, 'IMGFMT')
448        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
449        line = re.sub('uuid: [-a-f0-9]+',
450                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
451                      line)
452        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
453        lines.append(line)
454    return '\n'.join(lines)
455
456def filter_imgfmt(msg):
457    return msg.replace(imgfmt, 'IMGFMT')
458
459def filter_qmp_imgfmt(qmsg):
460    def _filter(_key, value):
461        if is_str(value):
462            return filter_imgfmt(value)
463        return value
464    return filter_qmp(qmsg, _filter)
465
466def filter_nbd_exports(output: str) -> str:
467    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
468
469
470Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
471
472def log(msg: Msg,
473        filters: Iterable[Callable[[Msg], Msg]] = (),
474        indent: Optional[int] = None) -> None:
475    """
476    Logs either a string message or a JSON serializable message (like QMP).
477    If indent is provided, JSON serializable messages are pretty-printed.
478    """
479    for flt in filters:
480        msg = flt(msg)
481    if isinstance(msg, (dict, list)):
482        # Don't sort if it's already sorted
483        do_sort = not isinstance(msg, OrderedDict)
484        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
485    else:
486        test_logger.info(msg)
487
488class Timeout:
489    def __init__(self, seconds, errmsg="Timeout"):
490        self.seconds = seconds
491        self.errmsg = errmsg
492    def __enter__(self):
493        if qemu_gdb or qemu_valgrind:
494            return self
495        signal.signal(signal.SIGALRM, self.timeout)
496        signal.setitimer(signal.ITIMER_REAL, self.seconds)
497        return self
498    def __exit__(self, exc_type, value, traceback):
499        if qemu_gdb or qemu_valgrind:
500            return False
501        signal.setitimer(signal.ITIMER_REAL, 0)
502        return False
503    def timeout(self, signum, frame):
504        raise Exception(self.errmsg)
505
506def file_pattern(name):
507    return "{0}-{1}".format(os.getpid(), name)
508
509class FilePath:
510    """
511    Context manager generating multiple file names. The generated files are
512    removed when exiting the context.
513
514    Example usage:
515
516        with FilePath('a.img', 'b.img') as (img_a, img_b):
517            # Use img_a and img_b here...
518
519        # a.img and b.img are automatically removed here.
520
521    By default images are created in iotests.test_dir. To create sockets use
522    iotests.sock_dir:
523
524       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
525
526    For convenience, calling with one argument yields a single file instead of
527    a tuple with one item.
528
529    """
530    def __init__(self, *names, base_dir=test_dir):
531        self.paths = [os.path.join(base_dir, file_pattern(name))
532                      for name in names]
533
534    def __enter__(self):
535        if len(self.paths) == 1:
536            return self.paths[0]
537        else:
538            return self.paths
539
540    def __exit__(self, exc_type, exc_val, exc_tb):
541        for path in self.paths:
542            try:
543                os.remove(path)
544            except OSError:
545                pass
546        return False
547
548
549def try_remove(img):
550    try:
551        os.remove(img)
552    except OSError:
553        pass
554
555def file_path_remover():
556    for path in reversed(file_path_remover.paths):
557        try_remove(path)
558
559
560def file_path(*names, base_dir=test_dir):
561    ''' Another way to get auto-generated filename that cleans itself up.
562
563    Use is as simple as:
564
565    img_a, img_b = file_path('a.img', 'b.img')
566    sock = file_path('socket')
567    '''
568
569    if not hasattr(file_path_remover, 'paths'):
570        file_path_remover.paths = []
571        atexit.register(file_path_remover)
572
573    paths = []
574    for name in names:
575        filename = file_pattern(name)
576        path = os.path.join(base_dir, filename)
577        file_path_remover.paths.append(path)
578        paths.append(path)
579
580    return paths[0] if len(paths) == 1 else paths
581
582def remote_filename(path):
583    if imgproto == 'file':
584        return path
585    elif imgproto == 'ssh':
586        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
587    else:
588        raise Exception("Protocol %s not supported" % (imgproto))
589
590class VM(qtest.QEMUQtestMachine):
591    '''A QEMU VM'''
592
593    def __init__(self, path_suffix=''):
594        name = "qemu%s-%d" % (path_suffix, os.getpid())
595        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
596        if qemu_gdb and qemu_valgrind:
597            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
598            sys.exit(1)
599        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
600        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
601                         name=name,
602                         base_temp_dir=test_dir,
603                         socket_scm_helper=socket_scm_helper,
604                         sock_dir=sock_dir, qmp_timer=timer)
605        self._num_drives = 0
606
607    def _post_shutdown(self) -> None:
608        super()._post_shutdown()
609        if not qemu_valgrind or not self._popen:
610            return
611        valgrind_filename =  f"{test_dir}/{self._popen.pid}.valgrind"
612        if self.exitcode() == 99:
613            with open(valgrind_filename) as f:
614                print(f.read())
615        else:
616            os.remove(valgrind_filename)
617
618    def _pre_launch(self) -> None:
619        super()._pre_launch()
620        if qemu_print:
621            # set QEMU binary output to stdout
622            self._close_qemu_log_file()
623
624    def add_object(self, opts):
625        self._args.append('-object')
626        self._args.append(opts)
627        return self
628
629    def add_device(self, opts):
630        self._args.append('-device')
631        self._args.append(opts)
632        return self
633
634    def add_drive_raw(self, opts):
635        self._args.append('-drive')
636        self._args.append(opts)
637        return self
638
639    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
640        '''Add a virtio-blk drive to the VM'''
641        options = ['if=%s' % interface,
642                   'id=drive%d' % self._num_drives]
643
644        if path is not None:
645            options.append('file=%s' % path)
646            options.append('format=%s' % img_format)
647            options.append('cache=%s' % cachemode)
648            options.append('aio=%s' % aiomode)
649
650        if opts:
651            options.append(opts)
652
653        if img_format == 'luks' and 'key-secret' not in opts:
654            # default luks support
655            if luks_default_secret_object not in self._args:
656                self.add_object(luks_default_secret_object)
657
658            options.append(luks_default_key_secret_opt)
659
660        self._args.append('-drive')
661        self._args.append(','.join(options))
662        self._num_drives += 1
663        return self
664
665    def add_blockdev(self, opts):
666        self._args.append('-blockdev')
667        if isinstance(opts, str):
668            self._args.append(opts)
669        else:
670            self._args.append(','.join(opts))
671        return self
672
673    def add_incoming(self, addr):
674        self._args.append('-incoming')
675        self._args.append(addr)
676        return self
677
678    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
679        cmd = 'human-monitor-command'
680        kwargs: Dict[str, Any] = {'command-line': command_line}
681        if use_log:
682            return self.qmp_log(cmd, **kwargs)
683        else:
684            return self.qmp(cmd, **kwargs)
685
686    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
687        """Pause drive r/w operations"""
688        if not event:
689            self.pause_drive(drive, "read_aio")
690            self.pause_drive(drive, "write_aio")
691            return
692        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
693
694    def resume_drive(self, drive: str) -> None:
695        """Resume drive r/w operations"""
696        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
697
698    def hmp_qemu_io(self, drive: str, cmd: str,
699                    use_log: bool = False) -> QMPMessage:
700        """Write to a given drive using an HMP command"""
701        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
702
703    def flatten_qmp_object(self, obj, output=None, basestr=''):
704        if output is None:
705            output = dict()
706        if isinstance(obj, list):
707            for i, item in enumerate(obj):
708                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
709        elif isinstance(obj, dict):
710            for key in obj:
711                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
712        else:
713            output[basestr[:-1]] = obj # Strip trailing '.'
714        return output
715
716    def qmp_to_opts(self, obj):
717        obj = self.flatten_qmp_object(obj)
718        output_list = list()
719        for key in obj:
720            output_list += [key + '=' + obj[key]]
721        return ','.join(output_list)
722
723    def get_qmp_events_filtered(self, wait=60.0):
724        result = []
725        for ev in self.get_qmp_events(wait=wait):
726            result.append(filter_qmp_event(ev))
727        return result
728
729    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
730        full_cmd = OrderedDict((
731            ("execute", cmd),
732            ("arguments", ordered_qmp(kwargs))
733        ))
734        log(full_cmd, filters, indent=indent)
735        result = self.qmp(cmd, **kwargs)
736        log(result, filters, indent=indent)
737        return result
738
739    # Returns None on success, and an error string on failure
740    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
741                pre_finalize=None, cancel=False, wait=60.0):
742        """
743        run_job moves a job from creation through to dismissal.
744
745        :param job: String. ID of recently-launched job
746        :param auto_finalize: Bool. True if the job was launched with
747                              auto_finalize. Defaults to True.
748        :param auto_dismiss: Bool. True if the job was launched with
749                             auto_dismiss=True. Defaults to False.
750        :param pre_finalize: Callback. A callable that takes no arguments to be
751                             invoked prior to issuing job-finalize, if any.
752        :param cancel: Bool. When true, cancels the job after the pre_finalize
753                       callback.
754        :param wait: Float. Timeout value specifying how long to wait for any
755                     event, in seconds. Defaults to 60.0.
756        """
757        match_device = {'data': {'device': job}}
758        match_id = {'data': {'id': job}}
759        events = [
760            ('BLOCK_JOB_COMPLETED', match_device),
761            ('BLOCK_JOB_CANCELLED', match_device),
762            ('BLOCK_JOB_ERROR', match_device),
763            ('BLOCK_JOB_READY', match_device),
764            ('BLOCK_JOB_PENDING', match_id),
765            ('JOB_STATUS_CHANGE', match_id)
766        ]
767        error = None
768        while True:
769            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
770            if ev['event'] != 'JOB_STATUS_CHANGE':
771                log(ev)
772                continue
773            status = ev['data']['status']
774            if status == 'aborting':
775                result = self.qmp('query-jobs')
776                for j in result['return']:
777                    if j['id'] == job:
778                        error = j['error']
779                        log('Job failed: %s' % (j['error']))
780            elif status == 'ready':
781                self.qmp_log('job-complete', id=job)
782            elif status == 'pending' and not auto_finalize:
783                if pre_finalize:
784                    pre_finalize()
785                if cancel:
786                    self.qmp_log('job-cancel', id=job)
787                else:
788                    self.qmp_log('job-finalize', id=job)
789            elif status == 'concluded' and not auto_dismiss:
790                self.qmp_log('job-dismiss', id=job)
791            elif status == 'null':
792                return error
793
794    # Returns None on success, and an error string on failure
795    def blockdev_create(self, options, job_id='job0', filters=None):
796        if filters is None:
797            filters = [filter_qmp_testfiles]
798        result = self.qmp_log('blockdev-create', filters=filters,
799                              job_id=job_id, options=options)
800
801        if 'return' in result:
802            assert result['return'] == {}
803            job_result = self.run_job(job_id)
804        else:
805            job_result = result['error']
806
807        log("")
808        return job_result
809
810    def enable_migration_events(self, name):
811        log('Enabling migration QMP events on %s...' % name)
812        log(self.qmp('migrate-set-capabilities', capabilities=[
813            {
814                'capability': 'events',
815                'state': True
816            }
817        ]))
818
819    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
820        while True:
821            event = self.event_wait('MIGRATION')
822            # We use the default timeout, and with a timeout, event_wait()
823            # never returns None
824            assert event
825
826            log(event, filters=[filter_qmp_event])
827            if event['data']['status'] in ('completed', 'failed'):
828                break
829
830        if event['data']['status'] == 'completed':
831            # The event may occur in finish-migrate, so wait for the expected
832            # post-migration runstate
833            runstate = None
834            while runstate != expect_runstate:
835                runstate = self.qmp('query-status')['return']['status']
836            return True
837        else:
838            return False
839
840    def node_info(self, node_name):
841        nodes = self.qmp('query-named-block-nodes')
842        for x in nodes['return']:
843            if x['node-name'] == node_name:
844                return x
845        return None
846
847    def query_bitmaps(self):
848        res = self.qmp("query-named-block-nodes")
849        return {device['node-name']: device['dirty-bitmaps']
850                for device in res['return'] if 'dirty-bitmaps' in device}
851
852    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
853        """
854        get a specific bitmap from the object returned by query_bitmaps.
855        :param recording: If specified, filter results by the specified value.
856        :param bitmaps: If specified, use it instead of call query_bitmaps()
857        """
858        if bitmaps is None:
859            bitmaps = self.query_bitmaps()
860
861        for bitmap in bitmaps[node_name]:
862            if bitmap.get('name', '') == bitmap_name:
863                if recording is None or bitmap.get('recording') == recording:
864                    return bitmap
865        return None
866
867    def check_bitmap_status(self, node_name, bitmap_name, fields):
868        ret = self.get_bitmap(node_name, bitmap_name)
869
870        return fields.items() <= ret.items()
871
872    def assert_block_path(self, root, path, expected_node, graph=None):
873        """
874        Check whether the node under the given path in the block graph
875        is @expected_node.
876
877        @root is the node name of the node where the @path is rooted.
878
879        @path is a string that consists of child names separated by
880        slashes.  It must begin with a slash.
881
882        Examples for @root + @path:
883          - root="qcow2-node", path="/backing/file"
884          - root="quorum-node", path="/children.2/file"
885
886        Hypothetically, @path could be empty, in which case it would
887        point to @root.  However, in practice this case is not useful
888        and hence not allowed.
889
890        @expected_node may be None.  (All elements of the path but the
891        leaf must still exist.)
892
893        @graph may be None or the result of an x-debug-query-block-graph
894        call that has already been performed.
895        """
896        if graph is None:
897            graph = self.qmp('x-debug-query-block-graph')['return']
898
899        iter_path = iter(path.split('/'))
900
901        # Must start with a /
902        assert next(iter_path) == ''
903
904        node = next((node for node in graph['nodes'] if node['name'] == root),
905                    None)
906
907        # An empty @path is not allowed, so the root node must be present
908        assert node is not None, 'Root node %s not found' % root
909
910        for child_name in iter_path:
911            assert node is not None, 'Cannot follow path %s%s' % (root, path)
912
913            try:
914                node_id = next(edge['child'] for edge in graph['edges']
915                               if (edge['parent'] == node['id'] and
916                                   edge['name'] == child_name))
917
918                node = next(node for node in graph['nodes']
919                            if node['id'] == node_id)
920
921            except StopIteration:
922                node = None
923
924        if node is None:
925            assert expected_node is None, \
926                   'No node found under %s (but expected %s)' % \
927                   (path, expected_node)
928        else:
929            assert node['name'] == expected_node, \
930                   'Found node %s under %s (but expected %s)' % \
931                   (node['name'], path, expected_node)
932
933index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
934
935class QMPTestCase(unittest.TestCase):
936    '''Abstract base class for QMP test cases'''
937
938    def __init__(self, *args, **kwargs):
939        super().__init__(*args, **kwargs)
940        # Many users of this class set a VM property we rely on heavily
941        # in the methods below.
942        self.vm = None
943
944    def dictpath(self, d, path):
945        '''Traverse a path in a nested dict'''
946        for component in path.split('/'):
947            m = index_re.match(component)
948            if m:
949                component, idx = m.groups()
950                idx = int(idx)
951
952            if not isinstance(d, dict) or component not in d:
953                self.fail(f'failed path traversal for "{path}" in "{d}"')
954            d = d[component]
955
956            if m:
957                if not isinstance(d, list):
958                    self.fail(f'path component "{component}" in "{path}" '
959                              f'is not a list in "{d}"')
960                try:
961                    d = d[idx]
962                except IndexError:
963                    self.fail(f'invalid index "{idx}" in path "{path}" '
964                              f'in "{d}"')
965        return d
966
967    def assert_qmp_absent(self, d, path):
968        try:
969            result = self.dictpath(d, path)
970        except AssertionError:
971            return
972        self.fail('path "%s" has value "%s"' % (path, str(result)))
973
974    def assert_qmp(self, d, path, value):
975        '''Assert that the value for a specific path in a QMP dict
976           matches.  When given a list of values, assert that any of
977           them matches.'''
978
979        result = self.dictpath(d, path)
980
981        # [] makes no sense as a list of valid values, so treat it as
982        # an actual single value.
983        if isinstance(value, list) and value != []:
984            for v in value:
985                if result == v:
986                    return
987            self.fail('no match for "%s" in %s' % (str(result), str(value)))
988        else:
989            self.assertEqual(result, value,
990                             '"%s" is "%s", expected "%s"'
991                             % (path, str(result), str(value)))
992
993    def assert_no_active_block_jobs(self):
994        result = self.vm.qmp('query-block-jobs')
995        self.assert_qmp(result, 'return', [])
996
997    def assert_has_block_node(self, node_name=None, file_name=None):
998        """Issue a query-named-block-nodes and assert node_name and/or
999        file_name is present in the result"""
1000        def check_equal_or_none(a, b):
1001            return a is None or b is None or a == b
1002        assert node_name or file_name
1003        result = self.vm.qmp('query-named-block-nodes')
1004        for x in result["return"]:
1005            if check_equal_or_none(x.get("node-name"), node_name) and \
1006                    check_equal_or_none(x.get("file"), file_name):
1007                return
1008        self.fail("Cannot find %s %s in result:\n%s" %
1009                  (node_name, file_name, result))
1010
1011    def assert_json_filename_equal(self, json_filename, reference):
1012        '''Asserts that the given filename is a json: filename and that its
1013           content is equal to the given reference object'''
1014        self.assertEqual(json_filename[:5], 'json:')
1015        self.assertEqual(
1016            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1017            self.vm.flatten_qmp_object(reference)
1018        )
1019
1020    def cancel_and_wait(self, drive='drive0', force=False,
1021                        resume=False, wait=60.0):
1022        '''Cancel a block job and wait for it to finish, returning the event'''
1023        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1024        self.assert_qmp(result, 'return', {})
1025
1026        if resume:
1027            self.vm.resume_drive(drive)
1028
1029        cancelled = False
1030        result = None
1031        while not cancelled:
1032            for event in self.vm.get_qmp_events(wait=wait):
1033                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1034                   event['event'] == 'BLOCK_JOB_CANCELLED':
1035                    self.assert_qmp(event, 'data/device', drive)
1036                    result = event
1037                    cancelled = True
1038                elif event['event'] == 'JOB_STATUS_CHANGE':
1039                    self.assert_qmp(event, 'data/id', drive)
1040
1041
1042        self.assert_no_active_block_jobs()
1043        return result
1044
1045    def wait_until_completed(self, drive='drive0', check_offset=True,
1046                             wait=60.0, error=None):
1047        '''Wait for a block job to finish, returning the event'''
1048        while True:
1049            for event in self.vm.get_qmp_events(wait=wait):
1050                if event['event'] == 'BLOCK_JOB_COMPLETED':
1051                    self.assert_qmp(event, 'data/device', drive)
1052                    if error is None:
1053                        self.assert_qmp_absent(event, 'data/error')
1054                        if check_offset:
1055                            self.assert_qmp(event, 'data/offset',
1056                                            event['data']['len'])
1057                    else:
1058                        self.assert_qmp(event, 'data/error', error)
1059                    self.assert_no_active_block_jobs()
1060                    return event
1061                if event['event'] == 'JOB_STATUS_CHANGE':
1062                    self.assert_qmp(event, 'data/id', drive)
1063
1064    def wait_ready(self, drive='drive0'):
1065        """Wait until a BLOCK_JOB_READY event, and return the event."""
1066        return self.vm.events_wait([
1067            ('BLOCK_JOB_READY',
1068             {'data': {'type': 'mirror', 'device': drive}}),
1069            ('BLOCK_JOB_READY',
1070             {'data': {'type': 'commit', 'device': drive}})
1071        ])
1072
1073    def wait_ready_and_cancel(self, drive='drive0'):
1074        self.wait_ready(drive=drive)
1075        event = self.cancel_and_wait(drive=drive)
1076        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1077        self.assert_qmp(event, 'data/type', 'mirror')
1078        self.assert_qmp(event, 'data/offset', event['data']['len'])
1079
1080    def complete_and_wait(self, drive='drive0', wait_ready=True,
1081                          completion_error=None):
1082        '''Complete a block job and wait for it to finish'''
1083        if wait_ready:
1084            self.wait_ready(drive=drive)
1085
1086        result = self.vm.qmp('block-job-complete', device=drive)
1087        self.assert_qmp(result, 'return', {})
1088
1089        event = self.wait_until_completed(drive=drive, error=completion_error)
1090        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1091
1092    def pause_wait(self, job_id='job0'):
1093        with Timeout(3, "Timeout waiting for job to pause"):
1094            while True:
1095                result = self.vm.qmp('query-block-jobs')
1096                found = False
1097                for job in result['return']:
1098                    if job['device'] == job_id:
1099                        found = True
1100                        if job['paused'] and not job['busy']:
1101                            return job
1102                        break
1103                assert found
1104
1105    def pause_job(self, job_id='job0', wait=True):
1106        result = self.vm.qmp('block-job-pause', device=job_id)
1107        self.assert_qmp(result, 'return', {})
1108        if wait:
1109            return self.pause_wait(job_id)
1110        return result
1111
1112    def case_skip(self, reason):
1113        '''Skip this test case'''
1114        case_notrun(reason)
1115        self.skipTest(reason)
1116
1117
1118def notrun(reason):
1119    '''Skip this test suite'''
1120    # Each test in qemu-iotests has a number ("seq")
1121    seq = os.path.basename(sys.argv[0])
1122
1123    with open('%s/%s.notrun' % (output_dir, seq), 'w') as outfile:
1124        outfile.write(reason + '\n')
1125    logger.warning("%s not run: %s", seq, reason)
1126    sys.exit(0)
1127
1128def case_notrun(reason):
1129    '''Mark this test case as not having been run (without actually
1130    skipping it, that is left to the caller).  See
1131    QMPTestCase.case_skip() for a variant that actually skips the
1132    current test case.'''
1133
1134    # Each test in qemu-iotests has a number ("seq")
1135    seq = os.path.basename(sys.argv[0])
1136
1137    with open('%s/%s.casenotrun' % (output_dir, seq), 'a') as outfile:
1138        outfile.write('    [case not run] ' + reason + '\n')
1139
1140def _verify_image_format(supported_fmts: Sequence[str] = (),
1141                         unsupported_fmts: Sequence[str] = ()) -> None:
1142    if 'generic' in supported_fmts and \
1143            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1144        # similar to
1145        #   _supported_fmt generic
1146        # for bash tests
1147        supported_fmts = ()
1148
1149    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1150    if not_sup or (imgfmt in unsupported_fmts):
1151        notrun('not suitable for this image format: %s' % imgfmt)
1152
1153    if imgfmt == 'luks':
1154        verify_working_luks()
1155
1156def _verify_protocol(supported: Sequence[str] = (),
1157                     unsupported: Sequence[str] = ()) -> None:
1158    assert not (supported and unsupported)
1159
1160    if 'generic' in supported:
1161        return
1162
1163    not_sup = supported and (imgproto not in supported)
1164    if not_sup or (imgproto in unsupported):
1165        notrun('not suitable for this protocol: %s' % imgproto)
1166
1167def _verify_platform(supported: Sequence[str] = (),
1168                     unsupported: Sequence[str] = ()) -> None:
1169    if any((sys.platform.startswith(x) for x in unsupported)):
1170        notrun('not suitable for this OS: %s' % sys.platform)
1171
1172    if supported:
1173        if not any((sys.platform.startswith(x) for x in supported)):
1174            notrun('not suitable for this OS: %s' % sys.platform)
1175
1176def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1177    if supported_cache_modes and (cachemode not in supported_cache_modes):
1178        notrun('not suitable for this cache mode: %s' % cachemode)
1179
1180def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1181    if supported_aio_modes and (aiomode not in supported_aio_modes):
1182        notrun('not suitable for this aio mode: %s' % aiomode)
1183
1184def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1185    usf_list = list(set(required_formats) - set(supported_formats()))
1186    if usf_list:
1187        notrun(f'formats {usf_list} are not whitelisted')
1188
1189
1190def _verify_virtio_blk() -> None:
1191    out = qemu_pipe('-M', 'none', '-device', 'help')
1192    if 'virtio-blk' not in out:
1193        notrun('Missing virtio-blk in QEMU binary')
1194
1195def _verify_virtio_scsi_pci_or_ccw() -> None:
1196    out = qemu_pipe('-M', 'none', '-device', 'help')
1197    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1198        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1199
1200
1201def supports_quorum():
1202    return 'quorum' in qemu_img_pipe('--help')
1203
1204def verify_quorum():
1205    '''Skip test suite if quorum support is not available'''
1206    if not supports_quorum():
1207        notrun('quorum support missing')
1208
1209def has_working_luks() -> Tuple[bool, str]:
1210    """
1211    Check whether our LUKS driver can actually create images
1212    (this extends to LUKS encryption for qcow2).
1213
1214    If not, return the reason why.
1215    """
1216
1217    img_file = f'{test_dir}/luks-test.luks'
1218    (output, status) = \
1219        qemu_img_pipe_and_status('create', '-f', 'luks',
1220                                 '--object', luks_default_secret_object,
1221                                 '-o', luks_default_key_secret_opt,
1222                                 '-o', 'iter-time=10',
1223                                 img_file, '1G')
1224    try:
1225        os.remove(img_file)
1226    except OSError:
1227        pass
1228
1229    if status != 0:
1230        reason = output
1231        for line in output.splitlines():
1232            if img_file + ':' in line:
1233                reason = line.split(img_file + ':', 1)[1].strip()
1234                break
1235
1236        return (False, reason)
1237    else:
1238        return (True, '')
1239
1240def verify_working_luks():
1241    """
1242    Skip test suite if LUKS does not work
1243    """
1244    (working, reason) = has_working_luks()
1245    if not working:
1246        notrun(reason)
1247
1248def qemu_pipe(*args: str) -> str:
1249    """
1250    Run qemu with an option to print something and exit (e.g. a help option).
1251
1252    :return: QEMU's stdout output.
1253    """
1254    full_args = [qemu_prog] + qemu_opts + list(args)
1255    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1256    return output
1257
1258def supported_formats(read_only=False):
1259    '''Set 'read_only' to True to check ro-whitelist
1260       Otherwise, rw-whitelist is checked'''
1261
1262    if not hasattr(supported_formats, "formats"):
1263        supported_formats.formats = {}
1264
1265    if read_only not in supported_formats.formats:
1266        format_message = qemu_pipe("-drive", "format=help")
1267        line = 1 if read_only else 0
1268        supported_formats.formats[read_only] = \
1269            format_message.splitlines()[line].split(":")[1].split()
1270
1271    return supported_formats.formats[read_only]
1272
1273def skip_if_unsupported(required_formats=(), read_only=False):
1274    '''Skip Test Decorator
1275       Runs the test if all the required formats are whitelisted'''
1276    def skip_test_decorator(func):
1277        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1278                         **kwargs: Dict[str, Any]) -> None:
1279            if callable(required_formats):
1280                fmts = required_formats(test_case)
1281            else:
1282                fmts = required_formats
1283
1284            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1285            if usf_list:
1286                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1287                test_case.case_skip(msg)
1288            else:
1289                func(test_case, *args, **kwargs)
1290        return func_wrapper
1291    return skip_test_decorator
1292
1293def skip_for_formats(formats: Sequence[str] = ()) \
1294    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1295                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1296    '''Skip Test Decorator
1297       Skips the test for the given formats'''
1298    def skip_test_decorator(func):
1299        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1300                         **kwargs: Dict[str, Any]) -> None:
1301            if imgfmt in formats:
1302                msg = f'{test_case}: Skipped for format {imgfmt}'
1303                test_case.case_skip(msg)
1304            else:
1305                func(test_case, *args, **kwargs)
1306        return func_wrapper
1307    return skip_test_decorator
1308
1309def skip_if_user_is_root(func):
1310    '''Skip Test Decorator
1311       Runs the test only without root permissions'''
1312    def func_wrapper(*args, **kwargs):
1313        if os.getuid() == 0:
1314            case_notrun('{}: cannot be run as root'.format(args[0]))
1315            return None
1316        else:
1317            return func(*args, **kwargs)
1318    return func_wrapper
1319
1320# We need to filter out the time taken from the output so that
1321# qemu-iotest can reliably diff the results against master output,
1322# and hide skipped tests from the reference output.
1323
1324class ReproducibleTestResult(unittest.TextTestResult):
1325    def addSkip(self, test, reason):
1326        # Same as TextTestResult, but print dot instead of "s"
1327        unittest.TestResult.addSkip(self, test, reason)
1328        if self.showAll:
1329            self.stream.writeln("skipped {0!r}".format(reason))
1330        elif self.dots:
1331            self.stream.write(".")
1332            self.stream.flush()
1333
1334class ReproducibleStreamWrapper:
1335    def __init__(self, stream: TextIO):
1336        self.stream = stream
1337
1338    def __getattr__(self, attr):
1339        if attr in ('stream', '__getstate__'):
1340            raise AttributeError(attr)
1341        return getattr(self.stream, attr)
1342
1343    def write(self, arg=None):
1344        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1345        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1346        self.stream.write(arg)
1347
1348class ReproducibleTestRunner(unittest.TextTestRunner):
1349    def __init__(self, stream: Optional[TextIO] = None,
1350             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1351             **kwargs: Any) -> None:
1352        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1353        super().__init__(stream=rstream,           # type: ignore
1354                         descriptions=True,
1355                         resultclass=resultclass,
1356                         **kwargs)
1357
1358def execute_unittest(argv: List[str], debug: bool = False) -> None:
1359    """Executes unittests within the calling module."""
1360
1361    # Some tests have warnings, especially ResourceWarnings for unclosed
1362    # files and sockets.  Ignore them for now to ensure reproducibility of
1363    # the test output.
1364    unittest.main(argv=argv,
1365                  testRunner=ReproducibleTestRunner,
1366                  verbosity=2 if debug else 1,
1367                  warnings=None if sys.warnoptions else 'ignore')
1368
1369def execute_setup_common(supported_fmts: Sequence[str] = (),
1370                         supported_platforms: Sequence[str] = (),
1371                         supported_cache_modes: Sequence[str] = (),
1372                         supported_aio_modes: Sequence[str] = (),
1373                         unsupported_fmts: Sequence[str] = (),
1374                         supported_protocols: Sequence[str] = (),
1375                         unsupported_protocols: Sequence[str] = (),
1376                         required_fmts: Sequence[str] = ()) -> bool:
1377    """
1378    Perform necessary setup for either script-style or unittest-style tests.
1379
1380    :return: Bool; Whether or not debug mode has been requested via the CLI.
1381    """
1382    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1383
1384    debug = '-d' in sys.argv
1385    if debug:
1386        sys.argv.remove('-d')
1387    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1388
1389    _verify_image_format(supported_fmts, unsupported_fmts)
1390    _verify_protocol(supported_protocols, unsupported_protocols)
1391    _verify_platform(supported=supported_platforms)
1392    _verify_cache_mode(supported_cache_modes)
1393    _verify_aio_mode(supported_aio_modes)
1394    _verify_formats(required_fmts)
1395    _verify_virtio_blk()
1396
1397    return debug
1398
1399def execute_test(*args, test_function=None, **kwargs):
1400    """Run either unittest or script-style tests."""
1401
1402    debug = execute_setup_common(*args, **kwargs)
1403    if not test_function:
1404        execute_unittest(sys.argv, debug)
1405    else:
1406        test_function()
1407
1408def activate_logging():
1409    """Activate iotests.log() output to stdout for script-style tests."""
1410    handler = logging.StreamHandler(stream=sys.stdout)
1411    formatter = logging.Formatter('%(message)s')
1412    handler.setFormatter(formatter)
1413    test_logger.addHandler(handler)
1414    test_logger.setLevel(logging.INFO)
1415    test_logger.propagate = False
1416
1417# This is called from script-style iotests without a single point of entry
1418def script_initialize(*args, **kwargs):
1419    """Initialize script-style tests without running any tests."""
1420    activate_logging()
1421    execute_setup_common(*args, **kwargs)
1422
1423# This is called from script-style iotests with a single point of entry
1424def script_main(test_function, *args, **kwargs):
1425    """Run script-style tests outside of the unittest framework"""
1426    activate_logging()
1427    execute_test(*args, test_function=test_function, **kwargs)
1428
1429# This is called from unittest style iotests
1430def main(*args, **kwargs):
1431    """Run tests using the unittest framework"""
1432    execute_test(*args, **kwargs)
1433