xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision d65173f924ef0a170693fc59692a542d38f31879)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39# pylint: disable=import-error, wrong-import-position
40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41from qemu import qtest
42from qemu.qmp import QMPMessage
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79output_dir = os.environ.get('OUTPUT_DIR', '.')
80
81try:
82    test_dir = os.environ['TEST_DIR']
83    sock_dir = os.environ['SOCK_DIR']
84    cachemode = os.environ['CACHEMODE']
85    aiomode = os.environ['AIOMODE']
86    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
87except KeyError:
88    # We are using these variables as proxies to indicate that we're
89    # not being run via "check". There may be other things set up by
90    # "check" that individual test cases rely on.
91    sys.stderr.write('Please run this test via the "check" script\n')
92    sys.exit(os.EX_USAGE)
93
94socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95
96luks_default_secret_object = 'secret,id=keysec0,data=' + \
97                             os.environ.get('IMGKEYSECRET', '')
98luks_default_key_secret_opt = 'key-secret=keysec0'
99
100sample_img_dir = os.environ['SAMPLE_IMG_DIR']
101
102
103def unarchive_sample_image(sample, fname):
104    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
105    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
106        shutil.copyfileobj(f_in, f_out)
107
108
109def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
110                              connect_stderr: bool = True) -> Tuple[str, int]:
111    """
112    Run a tool and return both its output and its exit code
113    """
114    stderr = subprocess.STDOUT if connect_stderr else None
115    subp = subprocess.Popen(args,
116                            stdout=subprocess.PIPE,
117                            stderr=stderr,
118                            universal_newlines=True)
119    output = subp.communicate()[0]
120    if subp.returncode < 0:
121        cmd = ' '.join(args)
122        sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
123    return (output, subp.returncode)
124
125def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
126    """
127    Run qemu-img and return both its output and its exit code
128    """
129    full_args = qemu_img_args + list(args)
130    return qemu_tool_pipe_and_status('qemu-img', full_args)
131
132def qemu_img(*args: str) -> int:
133    '''Run qemu-img and return the exit code'''
134    return qemu_img_pipe_and_status(*args)[1]
135
136def ordered_qmp(qmsg, conv_keys=True):
137    # Dictionaries are not ordered prior to 3.6, therefore:
138    if isinstance(qmsg, list):
139        return [ordered_qmp(atom) for atom in qmsg]
140    if isinstance(qmsg, dict):
141        od = OrderedDict()
142        for k, v in sorted(qmsg.items()):
143            if conv_keys:
144                k = k.replace('_', '-')
145            od[k] = ordered_qmp(v, conv_keys=False)
146        return od
147    return qmsg
148
149def qemu_img_create(*args):
150    args = list(args)
151
152    # default luks support
153    if '-f' in args and args[args.index('-f') + 1] == 'luks':
154        if '-o' in args:
155            i = args.index('-o')
156            if 'key-secret' not in args[i + 1]:
157                args[i + 1].append(luks_default_key_secret_opt)
158                args.insert(i + 2, '--object')
159                args.insert(i + 3, luks_default_secret_object)
160        else:
161            args = ['-o', luks_default_key_secret_opt,
162                    '--object', luks_default_secret_object] + args
163
164    args.insert(0, 'create')
165
166    return qemu_img(*args)
167
168def qemu_img_measure(*args):
169    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
170
171def qemu_img_check(*args):
172    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
173
174def qemu_img_verbose(*args):
175    '''Run qemu-img without suppressing its output and return the exit code'''
176    exitcode = subprocess.call(qemu_img_args + list(args))
177    if exitcode < 0:
178        sys.stderr.write('qemu-img received signal %i: %s\n'
179                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
180    return exitcode
181
182def qemu_img_pipe(*args: str) -> str:
183    '''Run qemu-img and return its output'''
184    return qemu_img_pipe_and_status(*args)[0]
185
186def qemu_img_log(*args):
187    result = qemu_img_pipe(*args)
188    log(result, filters=[filter_testfiles])
189    return result
190
191def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
192    args = ['info']
193    if imgopts:
194        args.append('--image-opts')
195    else:
196        args += ['-f', imgfmt]
197    args += extra_args
198    args.append(filename)
199
200    output = qemu_img_pipe(*args)
201    if not filter_path:
202        filter_path = filename
203    log(filter_img_info(output, filter_path))
204
205def qemu_io(*args):
206    '''Run qemu-io and return the stdout data'''
207    args = qemu_io_args + list(args)
208    return qemu_tool_pipe_and_status('qemu-io', args)[0]
209
210def qemu_io_log(*args):
211    result = qemu_io(*args)
212    log(result, filters=[filter_testfiles, filter_qemu_io])
213    return result
214
215def qemu_io_silent(*args):
216    '''Run qemu-io and return the exit code, suppressing stdout'''
217    if '-f' in args or '--image-opts' in args:
218        default_args = qemu_io_args_no_fmt
219    else:
220        default_args = qemu_io_args
221
222    args = default_args + list(args)
223    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
224    if exitcode < 0:
225        sys.stderr.write('qemu-io received signal %i: %s\n' %
226                         (-exitcode, ' '.join(args)))
227    return exitcode
228
229def qemu_io_silent_check(*args):
230    '''Run qemu-io and return the true if subprocess returned 0'''
231    args = qemu_io_args + list(args)
232    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
233                               stderr=subprocess.STDOUT)
234    return exitcode == 0
235
236class QemuIoInteractive:
237    def __init__(self, *args):
238        self.args = qemu_io_args_no_fmt + list(args)
239        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
240                                   stdout=subprocess.PIPE,
241                                   stderr=subprocess.STDOUT,
242                                   universal_newlines=True)
243        out = self._p.stdout.read(9)
244        if out != 'qemu-io> ':
245            # Most probably qemu-io just failed to start.
246            # Let's collect the whole output and exit.
247            out += self._p.stdout.read()
248            self._p.wait(timeout=1)
249            raise ValueError(out)
250
251    def close(self):
252        self._p.communicate('q\n')
253
254    def _read_output(self):
255        pattern = 'qemu-io> '
256        n = len(pattern)
257        pos = 0
258        s = []
259        while pos != n:
260            c = self._p.stdout.read(1)
261            # check unexpected EOF
262            assert c != ''
263            s.append(c)
264            if c == pattern[pos]:
265                pos += 1
266            else:
267                pos = 0
268
269        return ''.join(s[:-n])
270
271    def cmd(self, cmd):
272        # quit command is in close(), '\n' is added automatically
273        assert '\n' not in cmd
274        cmd = cmd.strip()
275        assert cmd not in ('q', 'quit')
276        self._p.stdin.write(cmd + '\n')
277        self._p.stdin.flush()
278        return self._read_output()
279
280
281def qemu_nbd(*args):
282    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
283    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
284
285def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
286    '''Run qemu-nbd in daemon mode and return both the parent's exit code
287       and its output in case of an error'''
288    full_args = qemu_nbd_args + ['--fork'] + list(args)
289    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
290                                                   connect_stderr=False)
291    return returncode, output if returncode else ''
292
293def qemu_nbd_list_log(*args: str) -> str:
294    '''Run qemu-nbd to list remote exports'''
295    full_args = [qemu_nbd_prog, '-L'] + list(args)
296    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
297    log(output, filters=[filter_testfiles, filter_nbd_exports])
298    return output
299
300@contextmanager
301def qemu_nbd_popen(*args):
302    '''Context manager running qemu-nbd within the context'''
303    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
304
305    assert not os.path.exists(pid_file)
306
307    cmd = list(qemu_nbd_args)
308    cmd.extend(('--persistent', '--pid-file', pid_file))
309    cmd.extend(args)
310
311    log('Start NBD server')
312    p = subprocess.Popen(cmd)
313    try:
314        while not os.path.exists(pid_file):
315            if p.poll() is not None:
316                raise RuntimeError(
317                    "qemu-nbd terminated with exit code {}: {}"
318                    .format(p.returncode, ' '.join(cmd)))
319
320            time.sleep(0.01)
321        yield
322    finally:
323        if os.path.exists(pid_file):
324            os.remove(pid_file)
325        log('Kill NBD server')
326        p.kill()
327        p.wait()
328
329def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
330    '''Return True if two image files are identical'''
331    return qemu_img('compare', '-f', fmt1,
332                    '-F', fmt2, img1, img2) == 0
333
334def create_image(name, size):
335    '''Create a fully-allocated raw image with sector markers'''
336    file = open(name, 'wb')
337    i = 0
338    while i < size:
339        sector = struct.pack('>l504xl', i // 512, i // 512)
340        file.write(sector)
341        i = i + 512
342    file.close()
343
344def image_size(img):
345    '''Return image's virtual size'''
346    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
347    return json.loads(r)['virtual-size']
348
349def is_str(val):
350    return isinstance(val, str)
351
352test_dir_re = re.compile(r"%s" % test_dir)
353def filter_test_dir(msg):
354    return test_dir_re.sub("TEST_DIR", msg)
355
356win32_re = re.compile(r"\r")
357def filter_win32(msg):
358    return win32_re.sub("", msg)
359
360qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
361                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
362                        r"and [0-9\/.inf]* ops\/sec\)")
363def filter_qemu_io(msg):
364    msg = filter_win32(msg)
365    return qemu_io_re.sub("X ops; XX:XX:XX.X "
366                          "(XXX YYY/sec and XXX ops/sec)", msg)
367
368chown_re = re.compile(r"chown [0-9]+:[0-9]+")
369def filter_chown(msg):
370    return chown_re.sub("chown UID:GID", msg)
371
372def filter_qmp_event(event):
373    '''Filter a QMP event dict'''
374    event = dict(event)
375    if 'timestamp' in event:
376        event['timestamp']['seconds'] = 'SECS'
377        event['timestamp']['microseconds'] = 'USECS'
378    return event
379
380def filter_qmp(qmsg, filter_fn):
381    '''Given a string filter, filter a QMP object's values.
382    filter_fn takes a (key, value) pair.'''
383    # Iterate through either lists or dicts;
384    if isinstance(qmsg, list):
385        items = enumerate(qmsg)
386    else:
387        items = qmsg.items()
388
389    for k, v in items:
390        if isinstance(v, (dict, list)):
391            qmsg[k] = filter_qmp(v, filter_fn)
392        else:
393            qmsg[k] = filter_fn(k, v)
394    return qmsg
395
396def filter_testfiles(msg):
397    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
398    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
399    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
400
401def filter_qmp_testfiles(qmsg):
402    def _filter(_key, value):
403        if is_str(value):
404            return filter_testfiles(value)
405        return value
406    return filter_qmp(qmsg, _filter)
407
408def filter_virtio_scsi(output: str) -> str:
409    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
410
411def filter_qmp_virtio_scsi(qmsg):
412    def _filter(_key, value):
413        if is_str(value):
414            return filter_virtio_scsi(value)
415        return value
416    return filter_qmp(qmsg, _filter)
417
418def filter_generated_node_ids(msg):
419    return re.sub("#block[0-9]+", "NODE_NAME", msg)
420
421def filter_img_info(output, filename):
422    lines = []
423    for line in output.split('\n'):
424        if 'disk size' in line or 'actual-size' in line:
425            continue
426        line = line.replace(filename, 'TEST_IMG')
427        line = filter_testfiles(line)
428        line = line.replace(imgfmt, 'IMGFMT')
429        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
430        line = re.sub('uuid: [-a-f0-9]+',
431                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
432                      line)
433        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
434        lines.append(line)
435    return '\n'.join(lines)
436
437def filter_imgfmt(msg):
438    return msg.replace(imgfmt, 'IMGFMT')
439
440def filter_qmp_imgfmt(qmsg):
441    def _filter(_key, value):
442        if is_str(value):
443            return filter_imgfmt(value)
444        return value
445    return filter_qmp(qmsg, _filter)
446
447def filter_nbd_exports(output: str) -> str:
448    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
449
450
451Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
452
453def log(msg: Msg,
454        filters: Iterable[Callable[[Msg], Msg]] = (),
455        indent: Optional[int] = None) -> None:
456    """
457    Logs either a string message or a JSON serializable message (like QMP).
458    If indent is provided, JSON serializable messages are pretty-printed.
459    """
460    for flt in filters:
461        msg = flt(msg)
462    if isinstance(msg, (dict, list)):
463        # Don't sort if it's already sorted
464        do_sort = not isinstance(msg, OrderedDict)
465        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
466    else:
467        test_logger.info(msg)
468
469class Timeout:
470    def __init__(self, seconds, errmsg="Timeout"):
471        self.seconds = seconds
472        self.errmsg = errmsg
473    def __enter__(self):
474        signal.signal(signal.SIGALRM, self.timeout)
475        signal.setitimer(signal.ITIMER_REAL, self.seconds)
476        return self
477    def __exit__(self, exc_type, value, traceback):
478        signal.setitimer(signal.ITIMER_REAL, 0)
479        return False
480    def timeout(self, signum, frame):
481        raise Exception(self.errmsg)
482
483def file_pattern(name):
484    return "{0}-{1}".format(os.getpid(), name)
485
486class FilePath:
487    """
488    Context manager generating multiple file names. The generated files are
489    removed when exiting the context.
490
491    Example usage:
492
493        with FilePath('a.img', 'b.img') as (img_a, img_b):
494            # Use img_a and img_b here...
495
496        # a.img and b.img are automatically removed here.
497
498    By default images are created in iotests.test_dir. To create sockets use
499    iotests.sock_dir:
500
501       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
502
503    For convenience, calling with one argument yields a single file instead of
504    a tuple with one item.
505
506    """
507    def __init__(self, *names, base_dir=test_dir):
508        self.paths = [os.path.join(base_dir, file_pattern(name))
509                      for name in names]
510
511    def __enter__(self):
512        if len(self.paths) == 1:
513            return self.paths[0]
514        else:
515            return self.paths
516
517    def __exit__(self, exc_type, exc_val, exc_tb):
518        for path in self.paths:
519            try:
520                os.remove(path)
521            except OSError:
522                pass
523        return False
524
525
526def try_remove(img):
527    try:
528        os.remove(img)
529    except OSError:
530        pass
531
532def file_path_remover():
533    for path in reversed(file_path_remover.paths):
534        try_remove(path)
535
536
537def file_path(*names, base_dir=test_dir):
538    ''' Another way to get auto-generated filename that cleans itself up.
539
540    Use is as simple as:
541
542    img_a, img_b = file_path('a.img', 'b.img')
543    sock = file_path('socket')
544    '''
545
546    if not hasattr(file_path_remover, 'paths'):
547        file_path_remover.paths = []
548        atexit.register(file_path_remover)
549
550    paths = []
551    for name in names:
552        filename = file_pattern(name)
553        path = os.path.join(base_dir, filename)
554        file_path_remover.paths.append(path)
555        paths.append(path)
556
557    return paths[0] if len(paths) == 1 else paths
558
559def remote_filename(path):
560    if imgproto == 'file':
561        return path
562    elif imgproto == 'ssh':
563        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
564    else:
565        raise Exception("Protocol %s not supported" % (imgproto))
566
567class VM(qtest.QEMUQtestMachine):
568    '''A QEMU VM'''
569
570    def __init__(self, path_suffix=''):
571        name = "qemu%s-%d" % (path_suffix, os.getpid())
572        super().__init__(qemu_prog, qemu_opts, name=name,
573                         test_dir=test_dir,
574                         socket_scm_helper=socket_scm_helper,
575                         sock_dir=sock_dir)
576        self._num_drives = 0
577
578    def add_object(self, opts):
579        self._args.append('-object')
580        self._args.append(opts)
581        return self
582
583    def add_device(self, opts):
584        self._args.append('-device')
585        self._args.append(opts)
586        return self
587
588    def add_drive_raw(self, opts):
589        self._args.append('-drive')
590        self._args.append(opts)
591        return self
592
593    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
594        '''Add a virtio-blk drive to the VM'''
595        options = ['if=%s' % interface,
596                   'id=drive%d' % self._num_drives]
597
598        if path is not None:
599            options.append('file=%s' % path)
600            options.append('format=%s' % img_format)
601            options.append('cache=%s' % cachemode)
602            options.append('aio=%s' % aiomode)
603
604        if opts:
605            options.append(opts)
606
607        if img_format == 'luks' and 'key-secret' not in opts:
608            # default luks support
609            if luks_default_secret_object not in self._args:
610                self.add_object(luks_default_secret_object)
611
612            options.append(luks_default_key_secret_opt)
613
614        self._args.append('-drive')
615        self._args.append(','.join(options))
616        self._num_drives += 1
617        return self
618
619    def add_blockdev(self, opts):
620        self._args.append('-blockdev')
621        if isinstance(opts, str):
622            self._args.append(opts)
623        else:
624            self._args.append(','.join(opts))
625        return self
626
627    def add_incoming(self, addr):
628        self._args.append('-incoming')
629        self._args.append(addr)
630        return self
631
632    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
633        cmd = 'human-monitor-command'
634        kwargs: Dict[str, Any] = {'command-line': command_line}
635        if use_log:
636            return self.qmp_log(cmd, **kwargs)
637        else:
638            return self.qmp(cmd, **kwargs)
639
640    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
641        """Pause drive r/w operations"""
642        if not event:
643            self.pause_drive(drive, "read_aio")
644            self.pause_drive(drive, "write_aio")
645            return
646        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
647
648    def resume_drive(self, drive: str) -> None:
649        """Resume drive r/w operations"""
650        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
651
652    def hmp_qemu_io(self, drive: str, cmd: str,
653                    use_log: bool = False) -> QMPMessage:
654        """Write to a given drive using an HMP command"""
655        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
656
657    def flatten_qmp_object(self, obj, output=None, basestr=''):
658        if output is None:
659            output = dict()
660        if isinstance(obj, list):
661            for i, item in enumerate(obj):
662                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
663        elif isinstance(obj, dict):
664            for key in obj:
665                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
666        else:
667            output[basestr[:-1]] = obj # Strip trailing '.'
668        return output
669
670    def qmp_to_opts(self, obj):
671        obj = self.flatten_qmp_object(obj)
672        output_list = list()
673        for key in obj:
674            output_list += [key + '=' + obj[key]]
675        return ','.join(output_list)
676
677    def get_qmp_events_filtered(self, wait=60.0):
678        result = []
679        for ev in self.get_qmp_events(wait=wait):
680            result.append(filter_qmp_event(ev))
681        return result
682
683    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
684        full_cmd = OrderedDict((
685            ("execute", cmd),
686            ("arguments", ordered_qmp(kwargs))
687        ))
688        log(full_cmd, filters, indent=indent)
689        result = self.qmp(cmd, **kwargs)
690        log(result, filters, indent=indent)
691        return result
692
693    # Returns None on success, and an error string on failure
694    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
695                pre_finalize=None, cancel=False, wait=60.0):
696        """
697        run_job moves a job from creation through to dismissal.
698
699        :param job: String. ID of recently-launched job
700        :param auto_finalize: Bool. True if the job was launched with
701                              auto_finalize. Defaults to True.
702        :param auto_dismiss: Bool. True if the job was launched with
703                             auto_dismiss=True. Defaults to False.
704        :param pre_finalize: Callback. A callable that takes no arguments to be
705                             invoked prior to issuing job-finalize, if any.
706        :param cancel: Bool. When true, cancels the job after the pre_finalize
707                       callback.
708        :param wait: Float. Timeout value specifying how long to wait for any
709                     event, in seconds. Defaults to 60.0.
710        """
711        match_device = {'data': {'device': job}}
712        match_id = {'data': {'id': job}}
713        events = [
714            ('BLOCK_JOB_COMPLETED', match_device),
715            ('BLOCK_JOB_CANCELLED', match_device),
716            ('BLOCK_JOB_ERROR', match_device),
717            ('BLOCK_JOB_READY', match_device),
718            ('BLOCK_JOB_PENDING', match_id),
719            ('JOB_STATUS_CHANGE', match_id)
720        ]
721        error = None
722        while True:
723            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
724            if ev['event'] != 'JOB_STATUS_CHANGE':
725                log(ev)
726                continue
727            status = ev['data']['status']
728            if status == 'aborting':
729                result = self.qmp('query-jobs')
730                for j in result['return']:
731                    if j['id'] == job:
732                        error = j['error']
733                        log('Job failed: %s' % (j['error']))
734            elif status == 'ready':
735                self.qmp_log('job-complete', id=job)
736            elif status == 'pending' and not auto_finalize:
737                if pre_finalize:
738                    pre_finalize()
739                if cancel:
740                    self.qmp_log('job-cancel', id=job)
741                else:
742                    self.qmp_log('job-finalize', id=job)
743            elif status == 'concluded' and not auto_dismiss:
744                self.qmp_log('job-dismiss', id=job)
745            elif status == 'null':
746                return error
747
748    # Returns None on success, and an error string on failure
749    def blockdev_create(self, options, job_id='job0', filters=None):
750        if filters is None:
751            filters = [filter_qmp_testfiles]
752        result = self.qmp_log('blockdev-create', filters=filters,
753                              job_id=job_id, options=options)
754
755        if 'return' in result:
756            assert result['return'] == {}
757            job_result = self.run_job(job_id)
758        else:
759            job_result = result['error']
760
761        log("")
762        return job_result
763
764    def enable_migration_events(self, name):
765        log('Enabling migration QMP events on %s...' % name)
766        log(self.qmp('migrate-set-capabilities', capabilities=[
767            {
768                'capability': 'events',
769                'state': True
770            }
771        ]))
772
773    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
774        while True:
775            event = self.event_wait('MIGRATION')
776            # We use the default timeout, and with a timeout, event_wait()
777            # never returns None
778            assert event
779
780            log(event, filters=[filter_qmp_event])
781            if event['data']['status'] in ('completed', 'failed'):
782                break
783
784        if event['data']['status'] == 'completed':
785            # The event may occur in finish-migrate, so wait for the expected
786            # post-migration runstate
787            runstate = None
788            while runstate != expect_runstate:
789                runstate = self.qmp('query-status')['return']['status']
790            return True
791        else:
792            return False
793
794    def node_info(self, node_name):
795        nodes = self.qmp('query-named-block-nodes')
796        for x in nodes['return']:
797            if x['node-name'] == node_name:
798                return x
799        return None
800
801    def query_bitmaps(self):
802        res = self.qmp("query-named-block-nodes")
803        return {device['node-name']: device['dirty-bitmaps']
804                for device in res['return'] if 'dirty-bitmaps' in device}
805
806    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
807        """
808        get a specific bitmap from the object returned by query_bitmaps.
809        :param recording: If specified, filter results by the specified value.
810        :param bitmaps: If specified, use it instead of call query_bitmaps()
811        """
812        if bitmaps is None:
813            bitmaps = self.query_bitmaps()
814
815        for bitmap in bitmaps[node_name]:
816            if bitmap.get('name', '') == bitmap_name:
817                if recording is None or bitmap.get('recording') == recording:
818                    return bitmap
819        return None
820
821    def check_bitmap_status(self, node_name, bitmap_name, fields):
822        ret = self.get_bitmap(node_name, bitmap_name)
823
824        return fields.items() <= ret.items()
825
826    def assert_block_path(self, root, path, expected_node, graph=None):
827        """
828        Check whether the node under the given path in the block graph
829        is @expected_node.
830
831        @root is the node name of the node where the @path is rooted.
832
833        @path is a string that consists of child names separated by
834        slashes.  It must begin with a slash.
835
836        Examples for @root + @path:
837          - root="qcow2-node", path="/backing/file"
838          - root="quorum-node", path="/children.2/file"
839
840        Hypothetically, @path could be empty, in which case it would
841        point to @root.  However, in practice this case is not useful
842        and hence not allowed.
843
844        @expected_node may be None.  (All elements of the path but the
845        leaf must still exist.)
846
847        @graph may be None or the result of an x-debug-query-block-graph
848        call that has already been performed.
849        """
850        if graph is None:
851            graph = self.qmp('x-debug-query-block-graph')['return']
852
853        iter_path = iter(path.split('/'))
854
855        # Must start with a /
856        assert next(iter_path) == ''
857
858        node = next((node for node in graph['nodes'] if node['name'] == root),
859                    None)
860
861        # An empty @path is not allowed, so the root node must be present
862        assert node is not None, 'Root node %s not found' % root
863
864        for child_name in iter_path:
865            assert node is not None, 'Cannot follow path %s%s' % (root, path)
866
867            try:
868                node_id = next(edge['child'] for edge in graph['edges']
869                               if (edge['parent'] == node['id'] and
870                                   edge['name'] == child_name))
871
872                node = next(node for node in graph['nodes']
873                            if node['id'] == node_id)
874
875            except StopIteration:
876                node = None
877
878        if node is None:
879            assert expected_node is None, \
880                   'No node found under %s (but expected %s)' % \
881                   (path, expected_node)
882        else:
883            assert node['name'] == expected_node, \
884                   'Found node %s under %s (but expected %s)' % \
885                   (node['name'], path, expected_node)
886
887index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
888
889class QMPTestCase(unittest.TestCase):
890    '''Abstract base class for QMP test cases'''
891
892    def __init__(self, *args, **kwargs):
893        super().__init__(*args, **kwargs)
894        # Many users of this class set a VM property we rely on heavily
895        # in the methods below.
896        self.vm = None
897
898    def dictpath(self, d, path):
899        '''Traverse a path in a nested dict'''
900        for component in path.split('/'):
901            m = index_re.match(component)
902            if m:
903                component, idx = m.groups()
904                idx = int(idx)
905
906            if not isinstance(d, dict) or component not in d:
907                self.fail(f'failed path traversal for "{path}" in "{d}"')
908            d = d[component]
909
910            if m:
911                if not isinstance(d, list):
912                    self.fail(f'path component "{component}" in "{path}" '
913                              f'is not a list in "{d}"')
914                try:
915                    d = d[idx]
916                except IndexError:
917                    self.fail(f'invalid index "{idx}" in path "{path}" '
918                              f'in "{d}"')
919        return d
920
921    def assert_qmp_absent(self, d, path):
922        try:
923            result = self.dictpath(d, path)
924        except AssertionError:
925            return
926        self.fail('path "%s" has value "%s"' % (path, str(result)))
927
928    def assert_qmp(self, d, path, value):
929        '''Assert that the value for a specific path in a QMP dict
930           matches.  When given a list of values, assert that any of
931           them matches.'''
932
933        result = self.dictpath(d, path)
934
935        # [] makes no sense as a list of valid values, so treat it as
936        # an actual single value.
937        if isinstance(value, list) and value != []:
938            for v in value:
939                if result == v:
940                    return
941            self.fail('no match for "%s" in %s' % (str(result), str(value)))
942        else:
943            self.assertEqual(result, value,
944                             '"%s" is "%s", expected "%s"'
945                             % (path, str(result), str(value)))
946
947    def assert_no_active_block_jobs(self):
948        result = self.vm.qmp('query-block-jobs')
949        self.assert_qmp(result, 'return', [])
950
951    def assert_has_block_node(self, node_name=None, file_name=None):
952        """Issue a query-named-block-nodes and assert node_name and/or
953        file_name is present in the result"""
954        def check_equal_or_none(a, b):
955            return a is None or b is None or a == b
956        assert node_name or file_name
957        result = self.vm.qmp('query-named-block-nodes')
958        for x in result["return"]:
959            if check_equal_or_none(x.get("node-name"), node_name) and \
960                    check_equal_or_none(x.get("file"), file_name):
961                return
962        self.fail("Cannot find %s %s in result:\n%s" %
963                  (node_name, file_name, result))
964
965    def assert_json_filename_equal(self, json_filename, reference):
966        '''Asserts that the given filename is a json: filename and that its
967           content is equal to the given reference object'''
968        self.assertEqual(json_filename[:5], 'json:')
969        self.assertEqual(
970            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
971            self.vm.flatten_qmp_object(reference)
972        )
973
974    def cancel_and_wait(self, drive='drive0', force=False,
975                        resume=False, wait=60.0):
976        '''Cancel a block job and wait for it to finish, returning the event'''
977        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
978        self.assert_qmp(result, 'return', {})
979
980        if resume:
981            self.vm.resume_drive(drive)
982
983        cancelled = False
984        result = None
985        while not cancelled:
986            for event in self.vm.get_qmp_events(wait=wait):
987                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
988                   event['event'] == 'BLOCK_JOB_CANCELLED':
989                    self.assert_qmp(event, 'data/device', drive)
990                    result = event
991                    cancelled = True
992                elif event['event'] == 'JOB_STATUS_CHANGE':
993                    self.assert_qmp(event, 'data/id', drive)
994
995
996        self.assert_no_active_block_jobs()
997        return result
998
999    def wait_until_completed(self, drive='drive0', check_offset=True,
1000                             wait=60.0, error=None):
1001        '''Wait for a block job to finish, returning the event'''
1002        while True:
1003            for event in self.vm.get_qmp_events(wait=wait):
1004                if event['event'] == 'BLOCK_JOB_COMPLETED':
1005                    self.assert_qmp(event, 'data/device', drive)
1006                    if error is None:
1007                        self.assert_qmp_absent(event, 'data/error')
1008                        if check_offset:
1009                            self.assert_qmp(event, 'data/offset',
1010                                            event['data']['len'])
1011                    else:
1012                        self.assert_qmp(event, 'data/error', error)
1013                    self.assert_no_active_block_jobs()
1014                    return event
1015                if event['event'] == 'JOB_STATUS_CHANGE':
1016                    self.assert_qmp(event, 'data/id', drive)
1017
1018    def wait_ready(self, drive='drive0'):
1019        """Wait until a BLOCK_JOB_READY event, and return the event."""
1020        return self.vm.events_wait([
1021            ('BLOCK_JOB_READY',
1022             {'data': {'type': 'mirror', 'device': drive}}),
1023            ('BLOCK_JOB_READY',
1024             {'data': {'type': 'commit', 'device': drive}})
1025        ])
1026
1027    def wait_ready_and_cancel(self, drive='drive0'):
1028        self.wait_ready(drive=drive)
1029        event = self.cancel_and_wait(drive=drive)
1030        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1031        self.assert_qmp(event, 'data/type', 'mirror')
1032        self.assert_qmp(event, 'data/offset', event['data']['len'])
1033
1034    def complete_and_wait(self, drive='drive0', wait_ready=True,
1035                          completion_error=None):
1036        '''Complete a block job and wait for it to finish'''
1037        if wait_ready:
1038            self.wait_ready(drive=drive)
1039
1040        result = self.vm.qmp('block-job-complete', device=drive)
1041        self.assert_qmp(result, 'return', {})
1042
1043        event = self.wait_until_completed(drive=drive, error=completion_error)
1044        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1045
1046    def pause_wait(self, job_id='job0'):
1047        with Timeout(3, "Timeout waiting for job to pause"):
1048            while True:
1049                result = self.vm.qmp('query-block-jobs')
1050                found = False
1051                for job in result['return']:
1052                    if job['device'] == job_id:
1053                        found = True
1054                        if job['paused'] and not job['busy']:
1055                            return job
1056                        break
1057                assert found
1058
1059    def pause_job(self, job_id='job0', wait=True):
1060        result = self.vm.qmp('block-job-pause', device=job_id)
1061        self.assert_qmp(result, 'return', {})
1062        if wait:
1063            return self.pause_wait(job_id)
1064        return result
1065
1066    def case_skip(self, reason):
1067        '''Skip this test case'''
1068        case_notrun(reason)
1069        self.skipTest(reason)
1070
1071
1072def notrun(reason):
1073    '''Skip this test suite'''
1074    # Each test in qemu-iotests has a number ("seq")
1075    seq = os.path.basename(sys.argv[0])
1076
1077    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1078    logger.warning("%s not run: %s", seq, reason)
1079    sys.exit(0)
1080
1081def case_notrun(reason):
1082    '''Mark this test case as not having been run (without actually
1083    skipping it, that is left to the caller).  See
1084    QMPTestCase.case_skip() for a variant that actually skips the
1085    current test case.'''
1086
1087    # Each test in qemu-iotests has a number ("seq")
1088    seq = os.path.basename(sys.argv[0])
1089
1090    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1091        '    [case not run] ' + reason + '\n')
1092
1093def _verify_image_format(supported_fmts: Sequence[str] = (),
1094                         unsupported_fmts: Sequence[str] = ()) -> None:
1095    if 'generic' in supported_fmts and \
1096            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1097        # similar to
1098        #   _supported_fmt generic
1099        # for bash tests
1100        supported_fmts = ()
1101
1102    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1103    if not_sup or (imgfmt in unsupported_fmts):
1104        notrun('not suitable for this image format: %s' % imgfmt)
1105
1106    if imgfmt == 'luks':
1107        verify_working_luks()
1108
1109def _verify_protocol(supported: Sequence[str] = (),
1110                     unsupported: Sequence[str] = ()) -> None:
1111    assert not (supported and unsupported)
1112
1113    if 'generic' in supported:
1114        return
1115
1116    not_sup = supported and (imgproto not in supported)
1117    if not_sup or (imgproto in unsupported):
1118        notrun('not suitable for this protocol: %s' % imgproto)
1119
1120def _verify_platform(supported: Sequence[str] = (),
1121                     unsupported: Sequence[str] = ()) -> None:
1122    if any((sys.platform.startswith(x) for x in unsupported)):
1123        notrun('not suitable for this OS: %s' % sys.platform)
1124
1125    if supported:
1126        if not any((sys.platform.startswith(x) for x in supported)):
1127            notrun('not suitable for this OS: %s' % sys.platform)
1128
1129def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1130    if supported_cache_modes and (cachemode not in supported_cache_modes):
1131        notrun('not suitable for this cache mode: %s' % cachemode)
1132
1133def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1134    if supported_aio_modes and (aiomode not in supported_aio_modes):
1135        notrun('not suitable for this aio mode: %s' % aiomode)
1136
1137def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1138    usf_list = list(set(required_formats) - set(supported_formats()))
1139    if usf_list:
1140        notrun(f'formats {usf_list} are not whitelisted')
1141
1142
1143def _verify_virtio_blk() -> None:
1144    out = qemu_pipe('-M', 'none', '-device', 'help')
1145    if 'virtio-blk' not in out:
1146        notrun('Missing virtio-blk in QEMU binary')
1147
1148def _verify_virtio_scsi_pci_or_ccw() -> None:
1149    out = qemu_pipe('-M', 'none', '-device', 'help')
1150    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1151        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1152
1153
1154def supports_quorum():
1155    return 'quorum' in qemu_img_pipe('--help')
1156
1157def verify_quorum():
1158    '''Skip test suite if quorum support is not available'''
1159    if not supports_quorum():
1160        notrun('quorum support missing')
1161
1162def has_working_luks() -> Tuple[bool, str]:
1163    """
1164    Check whether our LUKS driver can actually create images
1165    (this extends to LUKS encryption for qcow2).
1166
1167    If not, return the reason why.
1168    """
1169
1170    img_file = f'{test_dir}/luks-test.luks'
1171    (output, status) = \
1172        qemu_img_pipe_and_status('create', '-f', 'luks',
1173                                 '--object', luks_default_secret_object,
1174                                 '-o', luks_default_key_secret_opt,
1175                                 '-o', 'iter-time=10',
1176                                 img_file, '1G')
1177    try:
1178        os.remove(img_file)
1179    except OSError:
1180        pass
1181
1182    if status != 0:
1183        reason = output
1184        for line in output.splitlines():
1185            if img_file + ':' in line:
1186                reason = line.split(img_file + ':', 1)[1].strip()
1187                break
1188
1189        return (False, reason)
1190    else:
1191        return (True, '')
1192
1193def verify_working_luks():
1194    """
1195    Skip test suite if LUKS does not work
1196    """
1197    (working, reason) = has_working_luks()
1198    if not working:
1199        notrun(reason)
1200
1201def qemu_pipe(*args: str) -> str:
1202    """
1203    Run qemu with an option to print something and exit (e.g. a help option).
1204
1205    :return: QEMU's stdout output.
1206    """
1207    full_args = [qemu_prog] + qemu_opts + list(args)
1208    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1209    return output
1210
1211def supported_formats(read_only=False):
1212    '''Set 'read_only' to True to check ro-whitelist
1213       Otherwise, rw-whitelist is checked'''
1214
1215    if not hasattr(supported_formats, "formats"):
1216        supported_formats.formats = {}
1217
1218    if read_only not in supported_formats.formats:
1219        format_message = qemu_pipe("-drive", "format=help")
1220        line = 1 if read_only else 0
1221        supported_formats.formats[read_only] = \
1222            format_message.splitlines()[line].split(":")[1].split()
1223
1224    return supported_formats.formats[read_only]
1225
1226def skip_if_unsupported(required_formats=(), read_only=False):
1227    '''Skip Test Decorator
1228       Runs the test if all the required formats are whitelisted'''
1229    def skip_test_decorator(func):
1230        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1231                         **kwargs: Dict[str, Any]) -> None:
1232            if callable(required_formats):
1233                fmts = required_formats(test_case)
1234            else:
1235                fmts = required_formats
1236
1237            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1238            if usf_list:
1239                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1240                test_case.case_skip(msg)
1241            else:
1242                func(test_case, *args, **kwargs)
1243        return func_wrapper
1244    return skip_test_decorator
1245
1246def skip_for_formats(formats: Sequence[str] = ()) \
1247    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1248                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1249    '''Skip Test Decorator
1250       Skips the test for the given formats'''
1251    def skip_test_decorator(func):
1252        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1253                         **kwargs: Dict[str, Any]) -> None:
1254            if imgfmt in formats:
1255                msg = f'{test_case}: Skipped for format {imgfmt}'
1256                test_case.case_skip(msg)
1257            else:
1258                func(test_case, *args, **kwargs)
1259        return func_wrapper
1260    return skip_test_decorator
1261
1262def skip_if_user_is_root(func):
1263    '''Skip Test Decorator
1264       Runs the test only without root permissions'''
1265    def func_wrapper(*args, **kwargs):
1266        if os.getuid() == 0:
1267            case_notrun('{}: cannot be run as root'.format(args[0]))
1268            return None
1269        else:
1270            return func(*args, **kwargs)
1271    return func_wrapper
1272
1273# We need to filter out the time taken from the output so that
1274# qemu-iotest can reliably diff the results against master output,
1275# and hide skipped tests from the reference output.
1276
1277class ReproducibleTestResult(unittest.TextTestResult):
1278    def addSkip(self, test, reason):
1279        # Same as TextTestResult, but print dot instead of "s"
1280        unittest.TestResult.addSkip(self, test, reason)
1281        if self.showAll:
1282            self.stream.writeln("skipped {0!r}".format(reason))
1283        elif self.dots:
1284            self.stream.write(".")
1285            self.stream.flush()
1286
1287class ReproducibleStreamWrapper:
1288    def __init__(self, stream: TextIO):
1289        self.stream = stream
1290
1291    def __getattr__(self, attr):
1292        if attr in ('stream', '__getstate__'):
1293            raise AttributeError(attr)
1294        return getattr(self.stream, attr)
1295
1296    def write(self, arg=None):
1297        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1298        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1299        self.stream.write(arg)
1300
1301class ReproducibleTestRunner(unittest.TextTestRunner):
1302    def __init__(self, stream: Optional[TextIO] = None,
1303             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1304             **kwargs: Any) -> None:
1305        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1306        super().__init__(stream=rstream,           # type: ignore
1307                         descriptions=True,
1308                         resultclass=resultclass,
1309                         **kwargs)
1310
1311def execute_unittest(argv: List[str], debug: bool = False) -> None:
1312    """Executes unittests within the calling module."""
1313
1314    # Some tests have warnings, especially ResourceWarnings for unclosed
1315    # files and sockets.  Ignore them for now to ensure reproducibility of
1316    # the test output.
1317    unittest.main(argv=argv,
1318                  testRunner=ReproducibleTestRunner,
1319                  verbosity=2 if debug else 1,
1320                  warnings=None if sys.warnoptions else 'ignore')
1321
1322def execute_setup_common(supported_fmts: Sequence[str] = (),
1323                         supported_platforms: Sequence[str] = (),
1324                         supported_cache_modes: Sequence[str] = (),
1325                         supported_aio_modes: Sequence[str] = (),
1326                         unsupported_fmts: Sequence[str] = (),
1327                         supported_protocols: Sequence[str] = (),
1328                         unsupported_protocols: Sequence[str] = (),
1329                         required_fmts: Sequence[str] = ()) -> bool:
1330    """
1331    Perform necessary setup for either script-style or unittest-style tests.
1332
1333    :return: Bool; Whether or not debug mode has been requested via the CLI.
1334    """
1335    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1336
1337    debug = '-d' in sys.argv
1338    if debug:
1339        sys.argv.remove('-d')
1340    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1341
1342    _verify_image_format(supported_fmts, unsupported_fmts)
1343    _verify_protocol(supported_protocols, unsupported_protocols)
1344    _verify_platform(supported=supported_platforms)
1345    _verify_cache_mode(supported_cache_modes)
1346    _verify_aio_mode(supported_aio_modes)
1347    _verify_formats(required_fmts)
1348    _verify_virtio_blk()
1349
1350    return debug
1351
1352def execute_test(*args, test_function=None, **kwargs):
1353    """Run either unittest or script-style tests."""
1354
1355    debug = execute_setup_common(*args, **kwargs)
1356    if not test_function:
1357        execute_unittest(sys.argv, debug)
1358    else:
1359        test_function()
1360
1361def activate_logging():
1362    """Activate iotests.log() output to stdout for script-style tests."""
1363    handler = logging.StreamHandler(stream=sys.stdout)
1364    formatter = logging.Formatter('%(message)s')
1365    handler.setFormatter(formatter)
1366    test_logger.addHandler(handler)
1367    test_logger.setLevel(logging.INFO)
1368    test_logger.propagate = False
1369
1370# This is called from script-style iotests without a single point of entry
1371def script_initialize(*args, **kwargs):
1372    """Initialize script-style tests without running any tests."""
1373    activate_logging()
1374    execute_setup_common(*args, **kwargs)
1375
1376# This is called from script-style iotests with a single point of entry
1377def script_main(test_function, *args, **kwargs):
1378    """Run script-style tests outside of the unittest framework"""
1379    activate_logging()
1380    execute_test(*args, test_function=test_function, **kwargs)
1381
1382# This is called from unittest style iotests
1383def main(*args, **kwargs):
1384    """Run tests using the unittest framework"""
1385    execute_test(*args, **kwargs)
1386