xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 40f23e4e)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39# pylint: disable=import-error, wrong-import-position
40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41from qemu.machine import qtest
42from qemu.qmp import QMPMessage
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79output_dir = os.environ.get('OUTPUT_DIR', '.')
80
81try:
82    test_dir = os.environ['TEST_DIR']
83    sock_dir = os.environ['SOCK_DIR']
84    cachemode = os.environ['CACHEMODE']
85    aiomode = os.environ['AIOMODE']
86    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
87except KeyError:
88    # We are using these variables as proxies to indicate that we're
89    # not being run via "check". There may be other things set up by
90    # "check" that individual test cases rely on.
91    sys.stderr.write('Please run this test via the "check" script\n')
92    sys.exit(os.EX_USAGE)
93
94socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95
96luks_default_secret_object = 'secret,id=keysec0,data=' + \
97                             os.environ.get('IMGKEYSECRET', '')
98luks_default_key_secret_opt = 'key-secret=keysec0'
99
100sample_img_dir = os.environ['SAMPLE_IMG_DIR']
101
102
103def unarchive_sample_image(sample, fname):
104    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
105    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
106        shutil.copyfileobj(f_in, f_out)
107
108
109def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
110                              connect_stderr: bool = True) -> Tuple[str, int]:
111    """
112    Run a tool and return both its output and its exit code
113    """
114    stderr = subprocess.STDOUT if connect_stderr else None
115    with subprocess.Popen(args, stdout=subprocess.PIPE,
116                          stderr=stderr, universal_newlines=True) as subp:
117        output = subp.communicate()[0]
118        if subp.returncode < 0:
119            cmd = ' '.join(args)
120            sys.stderr.write(f'{tool} received signal \
121                               {-subp.returncode}: {cmd}\n')
122        return (output, subp.returncode)
123
124def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
125    """
126    Run qemu-img and return both its output and its exit code
127    """
128    full_args = qemu_img_args + list(args)
129    return qemu_tool_pipe_and_status('qemu-img', full_args)
130
131def qemu_img(*args: str) -> int:
132    '''Run qemu-img and return the exit code'''
133    return qemu_img_pipe_and_status(*args)[1]
134
135def ordered_qmp(qmsg, conv_keys=True):
136    # Dictionaries are not ordered prior to 3.6, therefore:
137    if isinstance(qmsg, list):
138        return [ordered_qmp(atom) for atom in qmsg]
139    if isinstance(qmsg, dict):
140        od = OrderedDict()
141        for k, v in sorted(qmsg.items()):
142            if conv_keys:
143                k = k.replace('_', '-')
144            od[k] = ordered_qmp(v, conv_keys=False)
145        return od
146    return qmsg
147
148def qemu_img_create(*args):
149    args = list(args)
150
151    # default luks support
152    if '-f' in args and args[args.index('-f') + 1] == 'luks':
153        if '-o' in args:
154            i = args.index('-o')
155            if 'key-secret' not in args[i + 1]:
156                args[i + 1].append(luks_default_key_secret_opt)
157                args.insert(i + 2, '--object')
158                args.insert(i + 3, luks_default_secret_object)
159        else:
160            args = ['-o', luks_default_key_secret_opt,
161                    '--object', luks_default_secret_object] + args
162
163    args.insert(0, 'create')
164
165    return qemu_img(*args)
166
167def qemu_img_measure(*args):
168    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
169
170def qemu_img_check(*args):
171    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
172
173def qemu_img_verbose(*args):
174    '''Run qemu-img without suppressing its output and return the exit code'''
175    exitcode = subprocess.call(qemu_img_args + list(args))
176    if exitcode < 0:
177        sys.stderr.write('qemu-img received signal %i: %s\n'
178                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
179    return exitcode
180
181def qemu_img_pipe(*args: str) -> str:
182    '''Run qemu-img and return its output'''
183    return qemu_img_pipe_and_status(*args)[0]
184
185def qemu_img_log(*args):
186    result = qemu_img_pipe(*args)
187    log(result, filters=[filter_testfiles])
188    return result
189
190def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
191    args = ['info']
192    if imgopts:
193        args.append('--image-opts')
194    else:
195        args += ['-f', imgfmt]
196    args += extra_args
197    args.append(filename)
198
199    output = qemu_img_pipe(*args)
200    if not filter_path:
201        filter_path = filename
202    log(filter_img_info(output, filter_path))
203
204def qemu_io(*args):
205    '''Run qemu-io and return the stdout data'''
206    args = qemu_io_args + list(args)
207    return qemu_tool_pipe_and_status('qemu-io', args)[0]
208
209def qemu_io_log(*args):
210    result = qemu_io(*args)
211    log(result, filters=[filter_testfiles, filter_qemu_io])
212    return result
213
214def qemu_io_silent(*args):
215    '''Run qemu-io and return the exit code, suppressing stdout'''
216    if '-f' in args or '--image-opts' in args:
217        default_args = qemu_io_args_no_fmt
218    else:
219        default_args = qemu_io_args
220
221    args = default_args + list(args)
222    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
223    if exitcode < 0:
224        sys.stderr.write('qemu-io received signal %i: %s\n' %
225                         (-exitcode, ' '.join(args)))
226    return exitcode
227
228def qemu_io_silent_check(*args):
229    '''Run qemu-io and return the true if subprocess returned 0'''
230    args = qemu_io_args + list(args)
231    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
232                               stderr=subprocess.STDOUT)
233    return exitcode == 0
234
235class QemuIoInteractive:
236    def __init__(self, *args):
237        self.args = qemu_io_args_no_fmt + list(args)
238        # We need to keep the Popen objext around, and not
239        # close it immediately. Therefore, disable the pylint check:
240        # pylint: disable=consider-using-with
241        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
242                                   stdout=subprocess.PIPE,
243                                   stderr=subprocess.STDOUT,
244                                   universal_newlines=True)
245        out = self._p.stdout.read(9)
246        if out != 'qemu-io> ':
247            # Most probably qemu-io just failed to start.
248            # Let's collect the whole output and exit.
249            out += self._p.stdout.read()
250            self._p.wait(timeout=1)
251            raise ValueError(out)
252
253    def close(self):
254        self._p.communicate('q\n')
255
256    def _read_output(self):
257        pattern = 'qemu-io> '
258        n = len(pattern)
259        pos = 0
260        s = []
261        while pos != n:
262            c = self._p.stdout.read(1)
263            # check unexpected EOF
264            assert c != ''
265            s.append(c)
266            if c == pattern[pos]:
267                pos += 1
268            else:
269                pos = 0
270
271        return ''.join(s[:-n])
272
273    def cmd(self, cmd):
274        # quit command is in close(), '\n' is added automatically
275        assert '\n' not in cmd
276        cmd = cmd.strip()
277        assert cmd not in ('q', 'quit')
278        self._p.stdin.write(cmd + '\n')
279        self._p.stdin.flush()
280        return self._read_output()
281
282
283def qemu_nbd(*args):
284    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
285    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
286
287def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
288    '''Run qemu-nbd in daemon mode and return both the parent's exit code
289       and its output in case of an error'''
290    full_args = qemu_nbd_args + ['--fork'] + list(args)
291    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
292                                                   connect_stderr=False)
293    return returncode, output if returncode else ''
294
295def qemu_nbd_list_log(*args: str) -> str:
296    '''Run qemu-nbd to list remote exports'''
297    full_args = [qemu_nbd_prog, '-L'] + list(args)
298    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
299    log(output, filters=[filter_testfiles, filter_nbd_exports])
300    return output
301
302@contextmanager
303def qemu_nbd_popen(*args):
304    '''Context manager running qemu-nbd within the context'''
305    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
306
307    assert not os.path.exists(pid_file)
308
309    cmd = list(qemu_nbd_args)
310    cmd.extend(('--persistent', '--pid-file', pid_file))
311    cmd.extend(args)
312
313    log('Start NBD server')
314    with subprocess.Popen(cmd) as p:
315        try:
316            while not os.path.exists(pid_file):
317                if p.poll() is not None:
318                    raise RuntimeError(
319                        "qemu-nbd terminated with exit code {}: {}"
320                        .format(p.returncode, ' '.join(cmd)))
321
322                time.sleep(0.01)
323            yield
324        finally:
325            if os.path.exists(pid_file):
326                os.remove(pid_file)
327            log('Kill NBD server')
328            p.kill()
329            p.wait()
330
331def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
332    '''Return True if two image files are identical'''
333    return qemu_img('compare', '-f', fmt1,
334                    '-F', fmt2, img1, img2) == 0
335
336def create_image(name, size):
337    '''Create a fully-allocated raw image with sector markers'''
338    with open(name, 'wb') as file:
339        i = 0
340        while i < size:
341            sector = struct.pack('>l504xl', i // 512, i // 512)
342            file.write(sector)
343            i = i + 512
344
345def image_size(img):
346    '''Return image's virtual size'''
347    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
348    return json.loads(r)['virtual-size']
349
350def is_str(val):
351    return isinstance(val, str)
352
353test_dir_re = re.compile(r"%s" % test_dir)
354def filter_test_dir(msg):
355    return test_dir_re.sub("TEST_DIR", msg)
356
357win32_re = re.compile(r"\r")
358def filter_win32(msg):
359    return win32_re.sub("", msg)
360
361qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
362                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
363                        r"and [0-9\/.inf]* ops\/sec\)")
364def filter_qemu_io(msg):
365    msg = filter_win32(msg)
366    return qemu_io_re.sub("X ops; XX:XX:XX.X "
367                          "(XXX YYY/sec and XXX ops/sec)", msg)
368
369chown_re = re.compile(r"chown [0-9]+:[0-9]+")
370def filter_chown(msg):
371    return chown_re.sub("chown UID:GID", msg)
372
373def filter_qmp_event(event):
374    '''Filter a QMP event dict'''
375    event = dict(event)
376    if 'timestamp' in event:
377        event['timestamp']['seconds'] = 'SECS'
378        event['timestamp']['microseconds'] = 'USECS'
379    return event
380
381def filter_qmp(qmsg, filter_fn):
382    '''Given a string filter, filter a QMP object's values.
383    filter_fn takes a (key, value) pair.'''
384    # Iterate through either lists or dicts;
385    if isinstance(qmsg, list):
386        items = enumerate(qmsg)
387    else:
388        items = qmsg.items()
389
390    for k, v in items:
391        if isinstance(v, (dict, list)):
392            qmsg[k] = filter_qmp(v, filter_fn)
393        else:
394            qmsg[k] = filter_fn(k, v)
395    return qmsg
396
397def filter_testfiles(msg):
398    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
399    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
400    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
401
402def filter_qmp_testfiles(qmsg):
403    def _filter(_key, value):
404        if is_str(value):
405            return filter_testfiles(value)
406        return value
407    return filter_qmp(qmsg, _filter)
408
409def filter_virtio_scsi(output: str) -> str:
410    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
411
412def filter_qmp_virtio_scsi(qmsg):
413    def _filter(_key, value):
414        if is_str(value):
415            return filter_virtio_scsi(value)
416        return value
417    return filter_qmp(qmsg, _filter)
418
419def filter_generated_node_ids(msg):
420    return re.sub("#block[0-9]+", "NODE_NAME", msg)
421
422def filter_img_info(output, filename):
423    lines = []
424    for line in output.split('\n'):
425        if 'disk size' in line or 'actual-size' in line:
426            continue
427        line = line.replace(filename, 'TEST_IMG')
428        line = filter_testfiles(line)
429        line = line.replace(imgfmt, 'IMGFMT')
430        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
431        line = re.sub('uuid: [-a-f0-9]+',
432                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
433                      line)
434        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
435        lines.append(line)
436    return '\n'.join(lines)
437
438def filter_imgfmt(msg):
439    return msg.replace(imgfmt, 'IMGFMT')
440
441def filter_qmp_imgfmt(qmsg):
442    def _filter(_key, value):
443        if is_str(value):
444            return filter_imgfmt(value)
445        return value
446    return filter_qmp(qmsg, _filter)
447
448def filter_nbd_exports(output: str) -> str:
449    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
450
451
452Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
453
454def log(msg: Msg,
455        filters: Iterable[Callable[[Msg], Msg]] = (),
456        indent: Optional[int] = None) -> None:
457    """
458    Logs either a string message or a JSON serializable message (like QMP).
459    If indent is provided, JSON serializable messages are pretty-printed.
460    """
461    for flt in filters:
462        msg = flt(msg)
463    if isinstance(msg, (dict, list)):
464        # Don't sort if it's already sorted
465        do_sort = not isinstance(msg, OrderedDict)
466        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
467    else:
468        test_logger.info(msg)
469
470class Timeout:
471    def __init__(self, seconds, errmsg="Timeout"):
472        self.seconds = seconds
473        self.errmsg = errmsg
474    def __enter__(self):
475        signal.signal(signal.SIGALRM, self.timeout)
476        signal.setitimer(signal.ITIMER_REAL, self.seconds)
477        return self
478    def __exit__(self, exc_type, value, traceback):
479        signal.setitimer(signal.ITIMER_REAL, 0)
480        return False
481    def timeout(self, signum, frame):
482        raise Exception(self.errmsg)
483
484def file_pattern(name):
485    return "{0}-{1}".format(os.getpid(), name)
486
487class FilePath:
488    """
489    Context manager generating multiple file names. The generated files are
490    removed when exiting the context.
491
492    Example usage:
493
494        with FilePath('a.img', 'b.img') as (img_a, img_b):
495            # Use img_a and img_b here...
496
497        # a.img and b.img are automatically removed here.
498
499    By default images are created in iotests.test_dir. To create sockets use
500    iotests.sock_dir:
501
502       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
503
504    For convenience, calling with one argument yields a single file instead of
505    a tuple with one item.
506
507    """
508    def __init__(self, *names, base_dir=test_dir):
509        self.paths = [os.path.join(base_dir, file_pattern(name))
510                      for name in names]
511
512    def __enter__(self):
513        if len(self.paths) == 1:
514            return self.paths[0]
515        else:
516            return self.paths
517
518    def __exit__(self, exc_type, exc_val, exc_tb):
519        for path in self.paths:
520            try:
521                os.remove(path)
522            except OSError:
523                pass
524        return False
525
526
527def try_remove(img):
528    try:
529        os.remove(img)
530    except OSError:
531        pass
532
533def file_path_remover():
534    for path in reversed(file_path_remover.paths):
535        try_remove(path)
536
537
538def file_path(*names, base_dir=test_dir):
539    ''' Another way to get auto-generated filename that cleans itself up.
540
541    Use is as simple as:
542
543    img_a, img_b = file_path('a.img', 'b.img')
544    sock = file_path('socket')
545    '''
546
547    if not hasattr(file_path_remover, 'paths'):
548        file_path_remover.paths = []
549        atexit.register(file_path_remover)
550
551    paths = []
552    for name in names:
553        filename = file_pattern(name)
554        path = os.path.join(base_dir, filename)
555        file_path_remover.paths.append(path)
556        paths.append(path)
557
558    return paths[0] if len(paths) == 1 else paths
559
560def remote_filename(path):
561    if imgproto == 'file':
562        return path
563    elif imgproto == 'ssh':
564        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
565    else:
566        raise Exception("Protocol %s not supported" % (imgproto))
567
568class VM(qtest.QEMUQtestMachine):
569    '''A QEMU VM'''
570
571    def __init__(self, path_suffix=''):
572        name = "qemu%s-%d" % (path_suffix, os.getpid())
573        super().__init__(qemu_prog, qemu_opts, name=name,
574                         base_temp_dir=test_dir,
575                         socket_scm_helper=socket_scm_helper,
576                         sock_dir=sock_dir)
577        self._num_drives = 0
578
579    def add_object(self, opts):
580        self._args.append('-object')
581        self._args.append(opts)
582        return self
583
584    def add_device(self, opts):
585        self._args.append('-device')
586        self._args.append(opts)
587        return self
588
589    def add_drive_raw(self, opts):
590        self._args.append('-drive')
591        self._args.append(opts)
592        return self
593
594    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
595        '''Add a virtio-blk drive to the VM'''
596        options = ['if=%s' % interface,
597                   'id=drive%d' % self._num_drives]
598
599        if path is not None:
600            options.append('file=%s' % path)
601            options.append('format=%s' % img_format)
602            options.append('cache=%s' % cachemode)
603            options.append('aio=%s' % aiomode)
604
605        if opts:
606            options.append(opts)
607
608        if img_format == 'luks' and 'key-secret' not in opts:
609            # default luks support
610            if luks_default_secret_object not in self._args:
611                self.add_object(luks_default_secret_object)
612
613            options.append(luks_default_key_secret_opt)
614
615        self._args.append('-drive')
616        self._args.append(','.join(options))
617        self._num_drives += 1
618        return self
619
620    def add_blockdev(self, opts):
621        self._args.append('-blockdev')
622        if isinstance(opts, str):
623            self._args.append(opts)
624        else:
625            self._args.append(','.join(opts))
626        return self
627
628    def add_incoming(self, addr):
629        self._args.append('-incoming')
630        self._args.append(addr)
631        return self
632
633    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
634        cmd = 'human-monitor-command'
635        kwargs: Dict[str, Any] = {'command-line': command_line}
636        if use_log:
637            return self.qmp_log(cmd, **kwargs)
638        else:
639            return self.qmp(cmd, **kwargs)
640
641    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
642        """Pause drive r/w operations"""
643        if not event:
644            self.pause_drive(drive, "read_aio")
645            self.pause_drive(drive, "write_aio")
646            return
647        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
648
649    def resume_drive(self, drive: str) -> None:
650        """Resume drive r/w operations"""
651        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
652
653    def hmp_qemu_io(self, drive: str, cmd: str,
654                    use_log: bool = False) -> QMPMessage:
655        """Write to a given drive using an HMP command"""
656        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
657
658    def flatten_qmp_object(self, obj, output=None, basestr=''):
659        if output is None:
660            output = dict()
661        if isinstance(obj, list):
662            for i, item in enumerate(obj):
663                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
664        elif isinstance(obj, dict):
665            for key in obj:
666                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
667        else:
668            output[basestr[:-1]] = obj # Strip trailing '.'
669        return output
670
671    def qmp_to_opts(self, obj):
672        obj = self.flatten_qmp_object(obj)
673        output_list = list()
674        for key in obj:
675            output_list += [key + '=' + obj[key]]
676        return ','.join(output_list)
677
678    def get_qmp_events_filtered(self, wait=60.0):
679        result = []
680        for ev in self.get_qmp_events(wait=wait):
681            result.append(filter_qmp_event(ev))
682        return result
683
684    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
685        full_cmd = OrderedDict((
686            ("execute", cmd),
687            ("arguments", ordered_qmp(kwargs))
688        ))
689        log(full_cmd, filters, indent=indent)
690        result = self.qmp(cmd, **kwargs)
691        log(result, filters, indent=indent)
692        return result
693
694    # Returns None on success, and an error string on failure
695    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
696                pre_finalize=None, cancel=False, wait=60.0):
697        """
698        run_job moves a job from creation through to dismissal.
699
700        :param job: String. ID of recently-launched job
701        :param auto_finalize: Bool. True if the job was launched with
702                              auto_finalize. Defaults to True.
703        :param auto_dismiss: Bool. True if the job was launched with
704                             auto_dismiss=True. Defaults to False.
705        :param pre_finalize: Callback. A callable that takes no arguments to be
706                             invoked prior to issuing job-finalize, if any.
707        :param cancel: Bool. When true, cancels the job after the pre_finalize
708                       callback.
709        :param wait: Float. Timeout value specifying how long to wait for any
710                     event, in seconds. Defaults to 60.0.
711        """
712        match_device = {'data': {'device': job}}
713        match_id = {'data': {'id': job}}
714        events = [
715            ('BLOCK_JOB_COMPLETED', match_device),
716            ('BLOCK_JOB_CANCELLED', match_device),
717            ('BLOCK_JOB_ERROR', match_device),
718            ('BLOCK_JOB_READY', match_device),
719            ('BLOCK_JOB_PENDING', match_id),
720            ('JOB_STATUS_CHANGE', match_id)
721        ]
722        error = None
723        while True:
724            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
725            if ev['event'] != 'JOB_STATUS_CHANGE':
726                log(ev)
727                continue
728            status = ev['data']['status']
729            if status == 'aborting':
730                result = self.qmp('query-jobs')
731                for j in result['return']:
732                    if j['id'] == job:
733                        error = j['error']
734                        log('Job failed: %s' % (j['error']))
735            elif status == 'ready':
736                self.qmp_log('job-complete', id=job)
737            elif status == 'pending' and not auto_finalize:
738                if pre_finalize:
739                    pre_finalize()
740                if cancel:
741                    self.qmp_log('job-cancel', id=job)
742                else:
743                    self.qmp_log('job-finalize', id=job)
744            elif status == 'concluded' and not auto_dismiss:
745                self.qmp_log('job-dismiss', id=job)
746            elif status == 'null':
747                return error
748
749    # Returns None on success, and an error string on failure
750    def blockdev_create(self, options, job_id='job0', filters=None):
751        if filters is None:
752            filters = [filter_qmp_testfiles]
753        result = self.qmp_log('blockdev-create', filters=filters,
754                              job_id=job_id, options=options)
755
756        if 'return' in result:
757            assert result['return'] == {}
758            job_result = self.run_job(job_id)
759        else:
760            job_result = result['error']
761
762        log("")
763        return job_result
764
765    def enable_migration_events(self, name):
766        log('Enabling migration QMP events on %s...' % name)
767        log(self.qmp('migrate-set-capabilities', capabilities=[
768            {
769                'capability': 'events',
770                'state': True
771            }
772        ]))
773
774    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
775        while True:
776            event = self.event_wait('MIGRATION')
777            # We use the default timeout, and with a timeout, event_wait()
778            # never returns None
779            assert event
780
781            log(event, filters=[filter_qmp_event])
782            if event['data']['status'] in ('completed', 'failed'):
783                break
784
785        if event['data']['status'] == 'completed':
786            # The event may occur in finish-migrate, so wait for the expected
787            # post-migration runstate
788            runstate = None
789            while runstate != expect_runstate:
790                runstate = self.qmp('query-status')['return']['status']
791            return True
792        else:
793            return False
794
795    def node_info(self, node_name):
796        nodes = self.qmp('query-named-block-nodes')
797        for x in nodes['return']:
798            if x['node-name'] == node_name:
799                return x
800        return None
801
802    def query_bitmaps(self):
803        res = self.qmp("query-named-block-nodes")
804        return {device['node-name']: device['dirty-bitmaps']
805                for device in res['return'] if 'dirty-bitmaps' in device}
806
807    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
808        """
809        get a specific bitmap from the object returned by query_bitmaps.
810        :param recording: If specified, filter results by the specified value.
811        :param bitmaps: If specified, use it instead of call query_bitmaps()
812        """
813        if bitmaps is None:
814            bitmaps = self.query_bitmaps()
815
816        for bitmap in bitmaps[node_name]:
817            if bitmap.get('name', '') == bitmap_name:
818                if recording is None or bitmap.get('recording') == recording:
819                    return bitmap
820        return None
821
822    def check_bitmap_status(self, node_name, bitmap_name, fields):
823        ret = self.get_bitmap(node_name, bitmap_name)
824
825        return fields.items() <= ret.items()
826
827    def assert_block_path(self, root, path, expected_node, graph=None):
828        """
829        Check whether the node under the given path in the block graph
830        is @expected_node.
831
832        @root is the node name of the node where the @path is rooted.
833
834        @path is a string that consists of child names separated by
835        slashes.  It must begin with a slash.
836
837        Examples for @root + @path:
838          - root="qcow2-node", path="/backing/file"
839          - root="quorum-node", path="/children.2/file"
840
841        Hypothetically, @path could be empty, in which case it would
842        point to @root.  However, in practice this case is not useful
843        and hence not allowed.
844
845        @expected_node may be None.  (All elements of the path but the
846        leaf must still exist.)
847
848        @graph may be None or the result of an x-debug-query-block-graph
849        call that has already been performed.
850        """
851        if graph is None:
852            graph = self.qmp('x-debug-query-block-graph')['return']
853
854        iter_path = iter(path.split('/'))
855
856        # Must start with a /
857        assert next(iter_path) == ''
858
859        node = next((node for node in graph['nodes'] if node['name'] == root),
860                    None)
861
862        # An empty @path is not allowed, so the root node must be present
863        assert node is not None, 'Root node %s not found' % root
864
865        for child_name in iter_path:
866            assert node is not None, 'Cannot follow path %s%s' % (root, path)
867
868            try:
869                node_id = next(edge['child'] for edge in graph['edges']
870                               if (edge['parent'] == node['id'] and
871                                   edge['name'] == child_name))
872
873                node = next(node for node in graph['nodes']
874                            if node['id'] == node_id)
875
876            except StopIteration:
877                node = None
878
879        if node is None:
880            assert expected_node is None, \
881                   'No node found under %s (but expected %s)' % \
882                   (path, expected_node)
883        else:
884            assert node['name'] == expected_node, \
885                   'Found node %s under %s (but expected %s)' % \
886                   (node['name'], path, expected_node)
887
888index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
889
890class QMPTestCase(unittest.TestCase):
891    '''Abstract base class for QMP test cases'''
892
893    def __init__(self, *args, **kwargs):
894        super().__init__(*args, **kwargs)
895        # Many users of this class set a VM property we rely on heavily
896        # in the methods below.
897        self.vm = None
898
899    def dictpath(self, d, path):
900        '''Traverse a path in a nested dict'''
901        for component in path.split('/'):
902            m = index_re.match(component)
903            if m:
904                component, idx = m.groups()
905                idx = int(idx)
906
907            if not isinstance(d, dict) or component not in d:
908                self.fail(f'failed path traversal for "{path}" in "{d}"')
909            d = d[component]
910
911            if m:
912                if not isinstance(d, list):
913                    self.fail(f'path component "{component}" in "{path}" '
914                              f'is not a list in "{d}"')
915                try:
916                    d = d[idx]
917                except IndexError:
918                    self.fail(f'invalid index "{idx}" in path "{path}" '
919                              f'in "{d}"')
920        return d
921
922    def assert_qmp_absent(self, d, path):
923        try:
924            result = self.dictpath(d, path)
925        except AssertionError:
926            return
927        self.fail('path "%s" has value "%s"' % (path, str(result)))
928
929    def assert_qmp(self, d, path, value):
930        '''Assert that the value for a specific path in a QMP dict
931           matches.  When given a list of values, assert that any of
932           them matches.'''
933
934        result = self.dictpath(d, path)
935
936        # [] makes no sense as a list of valid values, so treat it as
937        # an actual single value.
938        if isinstance(value, list) and value != []:
939            for v in value:
940                if result == v:
941                    return
942            self.fail('no match for "%s" in %s' % (str(result), str(value)))
943        else:
944            self.assertEqual(result, value,
945                             '"%s" is "%s", expected "%s"'
946                             % (path, str(result), str(value)))
947
948    def assert_no_active_block_jobs(self):
949        result = self.vm.qmp('query-block-jobs')
950        self.assert_qmp(result, 'return', [])
951
952    def assert_has_block_node(self, node_name=None, file_name=None):
953        """Issue a query-named-block-nodes and assert node_name and/or
954        file_name is present in the result"""
955        def check_equal_or_none(a, b):
956            return a is None or b is None or a == b
957        assert node_name or file_name
958        result = self.vm.qmp('query-named-block-nodes')
959        for x in result["return"]:
960            if check_equal_or_none(x.get("node-name"), node_name) and \
961                    check_equal_or_none(x.get("file"), file_name):
962                return
963        self.fail("Cannot find %s %s in result:\n%s" %
964                  (node_name, file_name, result))
965
966    def assert_json_filename_equal(self, json_filename, reference):
967        '''Asserts that the given filename is a json: filename and that its
968           content is equal to the given reference object'''
969        self.assertEqual(json_filename[:5], 'json:')
970        self.assertEqual(
971            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
972            self.vm.flatten_qmp_object(reference)
973        )
974
975    def cancel_and_wait(self, drive='drive0', force=False,
976                        resume=False, wait=60.0):
977        '''Cancel a block job and wait for it to finish, returning the event'''
978        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
979        self.assert_qmp(result, 'return', {})
980
981        if resume:
982            self.vm.resume_drive(drive)
983
984        cancelled = False
985        result = None
986        while not cancelled:
987            for event in self.vm.get_qmp_events(wait=wait):
988                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
989                   event['event'] == 'BLOCK_JOB_CANCELLED':
990                    self.assert_qmp(event, 'data/device', drive)
991                    result = event
992                    cancelled = True
993                elif event['event'] == 'JOB_STATUS_CHANGE':
994                    self.assert_qmp(event, 'data/id', drive)
995
996
997        self.assert_no_active_block_jobs()
998        return result
999
1000    def wait_until_completed(self, drive='drive0', check_offset=True,
1001                             wait=60.0, error=None):
1002        '''Wait for a block job to finish, returning the event'''
1003        while True:
1004            for event in self.vm.get_qmp_events(wait=wait):
1005                if event['event'] == 'BLOCK_JOB_COMPLETED':
1006                    self.assert_qmp(event, 'data/device', drive)
1007                    if error is None:
1008                        self.assert_qmp_absent(event, 'data/error')
1009                        if check_offset:
1010                            self.assert_qmp(event, 'data/offset',
1011                                            event['data']['len'])
1012                    else:
1013                        self.assert_qmp(event, 'data/error', error)
1014                    self.assert_no_active_block_jobs()
1015                    return event
1016                if event['event'] == 'JOB_STATUS_CHANGE':
1017                    self.assert_qmp(event, 'data/id', drive)
1018
1019    def wait_ready(self, drive='drive0'):
1020        """Wait until a BLOCK_JOB_READY event, and return the event."""
1021        return self.vm.events_wait([
1022            ('BLOCK_JOB_READY',
1023             {'data': {'type': 'mirror', 'device': drive}}),
1024            ('BLOCK_JOB_READY',
1025             {'data': {'type': 'commit', 'device': drive}})
1026        ])
1027
1028    def wait_ready_and_cancel(self, drive='drive0'):
1029        self.wait_ready(drive=drive)
1030        event = self.cancel_and_wait(drive=drive)
1031        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1032        self.assert_qmp(event, 'data/type', 'mirror')
1033        self.assert_qmp(event, 'data/offset', event['data']['len'])
1034
1035    def complete_and_wait(self, drive='drive0', wait_ready=True,
1036                          completion_error=None):
1037        '''Complete a block job and wait for it to finish'''
1038        if wait_ready:
1039            self.wait_ready(drive=drive)
1040
1041        result = self.vm.qmp('block-job-complete', device=drive)
1042        self.assert_qmp(result, 'return', {})
1043
1044        event = self.wait_until_completed(drive=drive, error=completion_error)
1045        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1046
1047    def pause_wait(self, job_id='job0'):
1048        with Timeout(3, "Timeout waiting for job to pause"):
1049            while True:
1050                result = self.vm.qmp('query-block-jobs')
1051                found = False
1052                for job in result['return']:
1053                    if job['device'] == job_id:
1054                        found = True
1055                        if job['paused'] and not job['busy']:
1056                            return job
1057                        break
1058                assert found
1059
1060    def pause_job(self, job_id='job0', wait=True):
1061        result = self.vm.qmp('block-job-pause', device=job_id)
1062        self.assert_qmp(result, 'return', {})
1063        if wait:
1064            return self.pause_wait(job_id)
1065        return result
1066
1067    def case_skip(self, reason):
1068        '''Skip this test case'''
1069        case_notrun(reason)
1070        self.skipTest(reason)
1071
1072
1073def notrun(reason):
1074    '''Skip this test suite'''
1075    # Each test in qemu-iotests has a number ("seq")
1076    seq = os.path.basename(sys.argv[0])
1077
1078    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1079    logger.warning("%s not run: %s", seq, reason)
1080    sys.exit(0)
1081
1082def case_notrun(reason):
1083    '''Mark this test case as not having been run (without actually
1084    skipping it, that is left to the caller).  See
1085    QMPTestCase.case_skip() for a variant that actually skips the
1086    current test case.'''
1087
1088    # Each test in qemu-iotests has a number ("seq")
1089    seq = os.path.basename(sys.argv[0])
1090
1091    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1092        '    [case not run] ' + reason + '\n')
1093
1094def _verify_image_format(supported_fmts: Sequence[str] = (),
1095                         unsupported_fmts: Sequence[str] = ()) -> None:
1096    if 'generic' in supported_fmts and \
1097            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1098        # similar to
1099        #   _supported_fmt generic
1100        # for bash tests
1101        supported_fmts = ()
1102
1103    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1104    if not_sup or (imgfmt in unsupported_fmts):
1105        notrun('not suitable for this image format: %s' % imgfmt)
1106
1107    if imgfmt == 'luks':
1108        verify_working_luks()
1109
1110def _verify_protocol(supported: Sequence[str] = (),
1111                     unsupported: Sequence[str] = ()) -> None:
1112    assert not (supported and unsupported)
1113
1114    if 'generic' in supported:
1115        return
1116
1117    not_sup = supported and (imgproto not in supported)
1118    if not_sup or (imgproto in unsupported):
1119        notrun('not suitable for this protocol: %s' % imgproto)
1120
1121def _verify_platform(supported: Sequence[str] = (),
1122                     unsupported: Sequence[str] = ()) -> None:
1123    if any((sys.platform.startswith(x) for x in unsupported)):
1124        notrun('not suitable for this OS: %s' % sys.platform)
1125
1126    if supported:
1127        if not any((sys.platform.startswith(x) for x in supported)):
1128            notrun('not suitable for this OS: %s' % sys.platform)
1129
1130def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1131    if supported_cache_modes and (cachemode not in supported_cache_modes):
1132        notrun('not suitable for this cache mode: %s' % cachemode)
1133
1134def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1135    if supported_aio_modes and (aiomode not in supported_aio_modes):
1136        notrun('not suitable for this aio mode: %s' % aiomode)
1137
1138def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1139    usf_list = list(set(required_formats) - set(supported_formats()))
1140    if usf_list:
1141        notrun(f'formats {usf_list} are not whitelisted')
1142
1143
1144def _verify_virtio_blk() -> None:
1145    out = qemu_pipe('-M', 'none', '-device', 'help')
1146    if 'virtio-blk' not in out:
1147        notrun('Missing virtio-blk in QEMU binary')
1148
1149def _verify_virtio_scsi_pci_or_ccw() -> None:
1150    out = qemu_pipe('-M', 'none', '-device', 'help')
1151    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1152        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1153
1154
1155def supports_quorum():
1156    return 'quorum' in qemu_img_pipe('--help')
1157
1158def verify_quorum():
1159    '''Skip test suite if quorum support is not available'''
1160    if not supports_quorum():
1161        notrun('quorum support missing')
1162
1163def has_working_luks() -> Tuple[bool, str]:
1164    """
1165    Check whether our LUKS driver can actually create images
1166    (this extends to LUKS encryption for qcow2).
1167
1168    If not, return the reason why.
1169    """
1170
1171    img_file = f'{test_dir}/luks-test.luks'
1172    (output, status) = \
1173        qemu_img_pipe_and_status('create', '-f', 'luks',
1174                                 '--object', luks_default_secret_object,
1175                                 '-o', luks_default_key_secret_opt,
1176                                 '-o', 'iter-time=10',
1177                                 img_file, '1G')
1178    try:
1179        os.remove(img_file)
1180    except OSError:
1181        pass
1182
1183    if status != 0:
1184        reason = output
1185        for line in output.splitlines():
1186            if img_file + ':' in line:
1187                reason = line.split(img_file + ':', 1)[1].strip()
1188                break
1189
1190        return (False, reason)
1191    else:
1192        return (True, '')
1193
1194def verify_working_luks():
1195    """
1196    Skip test suite if LUKS does not work
1197    """
1198    (working, reason) = has_working_luks()
1199    if not working:
1200        notrun(reason)
1201
1202def qemu_pipe(*args: str) -> str:
1203    """
1204    Run qemu with an option to print something and exit (e.g. a help option).
1205
1206    :return: QEMU's stdout output.
1207    """
1208    full_args = [qemu_prog] + qemu_opts + list(args)
1209    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1210    return output
1211
1212def supported_formats(read_only=False):
1213    '''Set 'read_only' to True to check ro-whitelist
1214       Otherwise, rw-whitelist is checked'''
1215
1216    if not hasattr(supported_formats, "formats"):
1217        supported_formats.formats = {}
1218
1219    if read_only not in supported_formats.formats:
1220        format_message = qemu_pipe("-drive", "format=help")
1221        line = 1 if read_only else 0
1222        supported_formats.formats[read_only] = \
1223            format_message.splitlines()[line].split(":")[1].split()
1224
1225    return supported_formats.formats[read_only]
1226
1227def skip_if_unsupported(required_formats=(), read_only=False):
1228    '''Skip Test Decorator
1229       Runs the test if all the required formats are whitelisted'''
1230    def skip_test_decorator(func):
1231        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1232                         **kwargs: Dict[str, Any]) -> None:
1233            if callable(required_formats):
1234                fmts = required_formats(test_case)
1235            else:
1236                fmts = required_formats
1237
1238            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1239            if usf_list:
1240                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1241                test_case.case_skip(msg)
1242            else:
1243                func(test_case, *args, **kwargs)
1244        return func_wrapper
1245    return skip_test_decorator
1246
1247def skip_for_formats(formats: Sequence[str] = ()) \
1248    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1249                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1250    '''Skip Test Decorator
1251       Skips the test for the given formats'''
1252    def skip_test_decorator(func):
1253        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1254                         **kwargs: Dict[str, Any]) -> None:
1255            if imgfmt in formats:
1256                msg = f'{test_case}: Skipped for format {imgfmt}'
1257                test_case.case_skip(msg)
1258            else:
1259                func(test_case, *args, **kwargs)
1260        return func_wrapper
1261    return skip_test_decorator
1262
1263def skip_if_user_is_root(func):
1264    '''Skip Test Decorator
1265       Runs the test only without root permissions'''
1266    def func_wrapper(*args, **kwargs):
1267        if os.getuid() == 0:
1268            case_notrun('{}: cannot be run as root'.format(args[0]))
1269            return None
1270        else:
1271            return func(*args, **kwargs)
1272    return func_wrapper
1273
1274# We need to filter out the time taken from the output so that
1275# qemu-iotest can reliably diff the results against master output,
1276# and hide skipped tests from the reference output.
1277
1278class ReproducibleTestResult(unittest.TextTestResult):
1279    def addSkip(self, test, reason):
1280        # Same as TextTestResult, but print dot instead of "s"
1281        unittest.TestResult.addSkip(self, test, reason)
1282        if self.showAll:
1283            self.stream.writeln("skipped {0!r}".format(reason))
1284        elif self.dots:
1285            self.stream.write(".")
1286            self.stream.flush()
1287
1288class ReproducibleStreamWrapper:
1289    def __init__(self, stream: TextIO):
1290        self.stream = stream
1291
1292    def __getattr__(self, attr):
1293        if attr in ('stream', '__getstate__'):
1294            raise AttributeError(attr)
1295        return getattr(self.stream, attr)
1296
1297    def write(self, arg=None):
1298        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1299        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1300        self.stream.write(arg)
1301
1302class ReproducibleTestRunner(unittest.TextTestRunner):
1303    def __init__(self, stream: Optional[TextIO] = None,
1304             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1305             **kwargs: Any) -> None:
1306        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1307        super().__init__(stream=rstream,           # type: ignore
1308                         descriptions=True,
1309                         resultclass=resultclass,
1310                         **kwargs)
1311
1312def execute_unittest(argv: List[str], debug: bool = False) -> None:
1313    """Executes unittests within the calling module."""
1314
1315    # Some tests have warnings, especially ResourceWarnings for unclosed
1316    # files and sockets.  Ignore them for now to ensure reproducibility of
1317    # the test output.
1318    unittest.main(argv=argv,
1319                  testRunner=ReproducibleTestRunner,
1320                  verbosity=2 if debug else 1,
1321                  warnings=None if sys.warnoptions else 'ignore')
1322
1323def execute_setup_common(supported_fmts: Sequence[str] = (),
1324                         supported_platforms: Sequence[str] = (),
1325                         supported_cache_modes: Sequence[str] = (),
1326                         supported_aio_modes: Sequence[str] = (),
1327                         unsupported_fmts: Sequence[str] = (),
1328                         supported_protocols: Sequence[str] = (),
1329                         unsupported_protocols: Sequence[str] = (),
1330                         required_fmts: Sequence[str] = ()) -> bool:
1331    """
1332    Perform necessary setup for either script-style or unittest-style tests.
1333
1334    :return: Bool; Whether or not debug mode has been requested via the CLI.
1335    """
1336    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1337
1338    debug = '-d' in sys.argv
1339    if debug:
1340        sys.argv.remove('-d')
1341    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1342
1343    _verify_image_format(supported_fmts, unsupported_fmts)
1344    _verify_protocol(supported_protocols, unsupported_protocols)
1345    _verify_platform(supported=supported_platforms)
1346    _verify_cache_mode(supported_cache_modes)
1347    _verify_aio_mode(supported_aio_modes)
1348    _verify_formats(required_fmts)
1349    _verify_virtio_blk()
1350
1351    return debug
1352
1353def execute_test(*args, test_function=None, **kwargs):
1354    """Run either unittest or script-style tests."""
1355
1356    debug = execute_setup_common(*args, **kwargs)
1357    if not test_function:
1358        execute_unittest(sys.argv, debug)
1359    else:
1360        test_function()
1361
1362def activate_logging():
1363    """Activate iotests.log() output to stdout for script-style tests."""
1364    handler = logging.StreamHandler(stream=sys.stdout)
1365    formatter = logging.Formatter('%(message)s')
1366    handler.setFormatter(formatter)
1367    test_logger.addHandler(handler)
1368    test_logger.setLevel(logging.INFO)
1369    test_logger.propagate = False
1370
1371# This is called from script-style iotests without a single point of entry
1372def script_initialize(*args, **kwargs):
1373    """Initialize script-style tests without running any tests."""
1374    activate_logging()
1375    execute_setup_common(*args, **kwargs)
1376
1377# This is called from script-style iotests with a single point of entry
1378def script_main(test_function, *args, **kwargs):
1379    """Run script-style tests outside of the unittest framework"""
1380    activate_logging()
1381    execute_test(*args, test_function=test_function, **kwargs)
1382
1383# This is called from unittest style iotests
1384def main(*args, **kwargs):
1385    """Run tests using the unittest framework"""
1386    execute_test(*args, **kwargs)
1387