xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision e2f948a8)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20import bz2
21from collections import OrderedDict
22import faulthandler
23import json
24import logging
25import os
26import re
27import shutil
28import signal
29import struct
30import subprocess
31import sys
32import time
33from typing import (Any, Callable, Dict, Iterable,
34                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35import unittest
36
37from contextlib import contextmanager
38
39# pylint: disable=import-error, wrong-import-position
40sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41from qemu.machine import qtest
42from qemu.qmp import QMPMessage
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79output_dir = os.environ.get('OUTPUT_DIR', '.')
80
81try:
82    test_dir = os.environ['TEST_DIR']
83    sock_dir = os.environ['SOCK_DIR']
84    cachemode = os.environ['CACHEMODE']
85    aiomode = os.environ['AIOMODE']
86    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
87except KeyError:
88    # We are using these variables as proxies to indicate that we're
89    # not being run via "check". There may be other things set up by
90    # "check" that individual test cases rely on.
91    sys.stderr.write('Please run this test via the "check" script\n')
92    sys.exit(os.EX_USAGE)
93
94socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95
96luks_default_secret_object = 'secret,id=keysec0,data=' + \
97                             os.environ.get('IMGKEYSECRET', '')
98luks_default_key_secret_opt = 'key-secret=keysec0'
99
100sample_img_dir = os.environ['SAMPLE_IMG_DIR']
101
102
103def unarchive_sample_image(sample, fname):
104    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
105    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
106        shutil.copyfileobj(f_in, f_out)
107
108
109def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
110                              connect_stderr: bool = True) -> Tuple[str, int]:
111    """
112    Run a tool and return both its output and its exit code
113    """
114    stderr = subprocess.STDOUT if connect_stderr else None
115    with subprocess.Popen(args, stdout=subprocess.PIPE,
116                          stderr=stderr, universal_newlines=True) as subp:
117        output = subp.communicate()[0]
118        if subp.returncode < 0:
119            cmd = ' '.join(args)
120            sys.stderr.write(f'{tool} received signal \
121                               {-subp.returncode}: {cmd}\n')
122        return (output, subp.returncode)
123
124def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
125    """
126    Run qemu-img and return both its output and its exit code
127    """
128    full_args = qemu_img_args + list(args)
129    return qemu_tool_pipe_and_status('qemu-img', full_args)
130
131def qemu_img(*args: str) -> int:
132    '''Run qemu-img and return the exit code'''
133    return qemu_img_pipe_and_status(*args)[1]
134
135def ordered_qmp(qmsg, conv_keys=True):
136    # Dictionaries are not ordered prior to 3.6, therefore:
137    if isinstance(qmsg, list):
138        return [ordered_qmp(atom) for atom in qmsg]
139    if isinstance(qmsg, dict):
140        od = OrderedDict()
141        for k, v in sorted(qmsg.items()):
142            if conv_keys:
143                k = k.replace('_', '-')
144            od[k] = ordered_qmp(v, conv_keys=False)
145        return od
146    return qmsg
147
148def qemu_img_create(*args):
149    args = list(args)
150
151    # default luks support
152    if '-f' in args and args[args.index('-f') + 1] == 'luks':
153        if '-o' in args:
154            i = args.index('-o')
155            if 'key-secret' not in args[i + 1]:
156                args[i + 1].append(luks_default_key_secret_opt)
157                args.insert(i + 2, '--object')
158                args.insert(i + 3, luks_default_secret_object)
159        else:
160            args = ['-o', luks_default_key_secret_opt,
161                    '--object', luks_default_secret_object] + args
162
163    args.insert(0, 'create')
164
165    return qemu_img(*args)
166
167def qemu_img_measure(*args):
168    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
169
170def qemu_img_check(*args):
171    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
172
173def qemu_img_verbose(*args):
174    '''Run qemu-img without suppressing its output and return the exit code'''
175    exitcode = subprocess.call(qemu_img_args + list(args))
176    if exitcode < 0:
177        sys.stderr.write('qemu-img received signal %i: %s\n'
178                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
179    return exitcode
180
181def qemu_img_pipe(*args: str) -> str:
182    '''Run qemu-img and return its output'''
183    return qemu_img_pipe_and_status(*args)[0]
184
185def qemu_img_log(*args):
186    result = qemu_img_pipe(*args)
187    log(result, filters=[filter_testfiles])
188    return result
189
190def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
191    args = ['info']
192    if imgopts:
193        args.append('--image-opts')
194    else:
195        args += ['-f', imgfmt]
196    args += extra_args
197    args.append(filename)
198
199    output = qemu_img_pipe(*args)
200    if not filter_path:
201        filter_path = filename
202    log(filter_img_info(output, filter_path))
203
204def qemu_io(*args):
205    '''Run qemu-io and return the stdout data'''
206    args = qemu_io_args + list(args)
207    return qemu_tool_pipe_and_status('qemu-io', args)[0]
208
209def qemu_io_log(*args):
210    result = qemu_io(*args)
211    log(result, filters=[filter_testfiles, filter_qemu_io])
212    return result
213
214def qemu_io_silent(*args):
215    '''Run qemu-io and return the exit code, suppressing stdout'''
216    if '-f' in args or '--image-opts' in args:
217        default_args = qemu_io_args_no_fmt
218    else:
219        default_args = qemu_io_args
220
221    args = default_args + list(args)
222    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
223    if exitcode < 0:
224        sys.stderr.write('qemu-io received signal %i: %s\n' %
225                         (-exitcode, ' '.join(args)))
226    return exitcode
227
228def qemu_io_silent_check(*args):
229    '''Run qemu-io and return the true if subprocess returned 0'''
230    args = qemu_io_args + list(args)
231    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
232                               stderr=subprocess.STDOUT)
233    return exitcode == 0
234
235class QemuIoInteractive:
236    def __init__(self, *args):
237        self.args = qemu_io_args_no_fmt + list(args)
238        # We need to keep the Popen objext around, and not
239        # close it immediately. Therefore, disable the pylint check:
240        # pylint: disable=consider-using-with
241        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
242                                   stdout=subprocess.PIPE,
243                                   stderr=subprocess.STDOUT,
244                                   universal_newlines=True)
245        out = self._p.stdout.read(9)
246        if out != 'qemu-io> ':
247            # Most probably qemu-io just failed to start.
248            # Let's collect the whole output and exit.
249            out += self._p.stdout.read()
250            self._p.wait(timeout=1)
251            raise ValueError(out)
252
253    def close(self):
254        self._p.communicate('q\n')
255
256    def _read_output(self):
257        pattern = 'qemu-io> '
258        n = len(pattern)
259        pos = 0
260        s = []
261        while pos != n:
262            c = self._p.stdout.read(1)
263            # check unexpected EOF
264            assert c != ''
265            s.append(c)
266            if c == pattern[pos]:
267                pos += 1
268            else:
269                pos = 0
270
271        return ''.join(s[:-n])
272
273    def cmd(self, cmd):
274        # quit command is in close(), '\n' is added automatically
275        assert '\n' not in cmd
276        cmd = cmd.strip()
277        assert cmd not in ('q', 'quit')
278        self._p.stdin.write(cmd + '\n')
279        self._p.stdin.flush()
280        return self._read_output()
281
282
283def qemu_nbd(*args):
284    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
285    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
286
287def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
288    '''Run qemu-nbd in daemon mode and return both the parent's exit code
289       and its output in case of an error'''
290    full_args = qemu_nbd_args + ['--fork'] + list(args)
291    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
292                                                   connect_stderr=False)
293    return returncode, output if returncode else ''
294
295def qemu_nbd_list_log(*args: str) -> str:
296    '''Run qemu-nbd to list remote exports'''
297    full_args = [qemu_nbd_prog, '-L'] + list(args)
298    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
299    log(output, filters=[filter_testfiles, filter_nbd_exports])
300    return output
301
302@contextmanager
303def qemu_nbd_popen(*args):
304    '''Context manager running qemu-nbd within the context'''
305    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
306
307    assert not os.path.exists(pid_file)
308
309    cmd = list(qemu_nbd_args)
310    cmd.extend(('--persistent', '--pid-file', pid_file))
311    cmd.extend(args)
312
313    log('Start NBD server')
314    with subprocess.Popen(cmd) as p:
315        try:
316            while not os.path.exists(pid_file):
317                if p.poll() is not None:
318                    raise RuntimeError(
319                        "qemu-nbd terminated with exit code {}: {}"
320                        .format(p.returncode, ' '.join(cmd)))
321
322                time.sleep(0.01)
323            yield
324        finally:
325            if os.path.exists(pid_file):
326                os.remove(pid_file)
327            log('Kill NBD server')
328            p.kill()
329            p.wait()
330
331def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
332    '''Return True if two image files are identical'''
333    return qemu_img('compare', '-f', fmt1,
334                    '-F', fmt2, img1, img2) == 0
335
336def create_image(name, size):
337    '''Create a fully-allocated raw image with sector markers'''
338    with open(name, 'wb') as file:
339        i = 0
340        while i < size:
341            sector = struct.pack('>l504xl', i // 512, i // 512)
342            file.write(sector)
343            i = i + 512
344
345def image_size(img):
346    '''Return image's virtual size'''
347    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
348    return json.loads(r)['virtual-size']
349
350def is_str(val):
351    return isinstance(val, str)
352
353test_dir_re = re.compile(r"%s" % test_dir)
354def filter_test_dir(msg):
355    return test_dir_re.sub("TEST_DIR", msg)
356
357win32_re = re.compile(r"\r")
358def filter_win32(msg):
359    return win32_re.sub("", msg)
360
361qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
362                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
363                        r"and [0-9\/.inf]* ops\/sec\)")
364def filter_qemu_io(msg):
365    msg = filter_win32(msg)
366    return qemu_io_re.sub("X ops; XX:XX:XX.X "
367                          "(XXX YYY/sec and XXX ops/sec)", msg)
368
369chown_re = re.compile(r"chown [0-9]+:[0-9]+")
370def filter_chown(msg):
371    return chown_re.sub("chown UID:GID", msg)
372
373def filter_qmp_event(event):
374    '''Filter a QMP event dict'''
375    event = dict(event)
376    if 'timestamp' in event:
377        event['timestamp']['seconds'] = 'SECS'
378        event['timestamp']['microseconds'] = 'USECS'
379    return event
380
381def filter_qmp(qmsg, filter_fn):
382    '''Given a string filter, filter a QMP object's values.
383    filter_fn takes a (key, value) pair.'''
384    # Iterate through either lists or dicts;
385    if isinstance(qmsg, list):
386        items = enumerate(qmsg)
387    else:
388        items = qmsg.items()
389
390    for k, v in items:
391        if isinstance(v, (dict, list)):
392            qmsg[k] = filter_qmp(v, filter_fn)
393        else:
394            qmsg[k] = filter_fn(k, v)
395    return qmsg
396
397def filter_testfiles(msg):
398    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
399    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
400    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
401
402def filter_qmp_testfiles(qmsg):
403    def _filter(_key, value):
404        if is_str(value):
405            return filter_testfiles(value)
406        return value
407    return filter_qmp(qmsg, _filter)
408
409def filter_virtio_scsi(output: str) -> str:
410    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
411
412def filter_qmp_virtio_scsi(qmsg):
413    def _filter(_key, value):
414        if is_str(value):
415            return filter_virtio_scsi(value)
416        return value
417    return filter_qmp(qmsg, _filter)
418
419def filter_generated_node_ids(msg):
420    return re.sub("#block[0-9]+", "NODE_NAME", msg)
421
422def filter_img_info(output, filename):
423    lines = []
424    for line in output.split('\n'):
425        if 'disk size' in line or 'actual-size' in line:
426            continue
427        line = line.replace(filename, 'TEST_IMG')
428        line = filter_testfiles(line)
429        line = line.replace(imgfmt, 'IMGFMT')
430        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
431        line = re.sub('uuid: [-a-f0-9]+',
432                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
433                      line)
434        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
435        lines.append(line)
436    return '\n'.join(lines)
437
438def filter_imgfmt(msg):
439    return msg.replace(imgfmt, 'IMGFMT')
440
441def filter_qmp_imgfmt(qmsg):
442    def _filter(_key, value):
443        if is_str(value):
444            return filter_imgfmt(value)
445        return value
446    return filter_qmp(qmsg, _filter)
447
448def filter_nbd_exports(output: str) -> str:
449    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
450
451
452Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
453
454def log(msg: Msg,
455        filters: Iterable[Callable[[Msg], Msg]] = (),
456        indent: Optional[int] = None) -> None:
457    """
458    Logs either a string message or a JSON serializable message (like QMP).
459    If indent is provided, JSON serializable messages are pretty-printed.
460    """
461    for flt in filters:
462        msg = flt(msg)
463    if isinstance(msg, (dict, list)):
464        # Don't sort if it's already sorted
465        do_sort = not isinstance(msg, OrderedDict)
466        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
467    else:
468        test_logger.info(msg)
469
470class Timeout:
471    def __init__(self, seconds, errmsg="Timeout"):
472        self.seconds = seconds
473        self.errmsg = errmsg
474    def __enter__(self):
475        signal.signal(signal.SIGALRM, self.timeout)
476        signal.setitimer(signal.ITIMER_REAL, self.seconds)
477        return self
478    def __exit__(self, exc_type, value, traceback):
479        signal.setitimer(signal.ITIMER_REAL, 0)
480        return False
481    def timeout(self, signum, frame):
482        raise Exception(self.errmsg)
483
484def file_pattern(name):
485    return "{0}-{1}".format(os.getpid(), name)
486
487class FilePath:
488    """
489    Context manager generating multiple file names. The generated files are
490    removed when exiting the context.
491
492    Example usage:
493
494        with FilePath('a.img', 'b.img') as (img_a, img_b):
495            # Use img_a and img_b here...
496
497        # a.img and b.img are automatically removed here.
498
499    By default images are created in iotests.test_dir. To create sockets use
500    iotests.sock_dir:
501
502       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
503
504    For convenience, calling with one argument yields a single file instead of
505    a tuple with one item.
506
507    """
508    def __init__(self, *names, base_dir=test_dir):
509        self.paths = [os.path.join(base_dir, file_pattern(name))
510                      for name in names]
511
512    def __enter__(self):
513        if len(self.paths) == 1:
514            return self.paths[0]
515        else:
516            return self.paths
517
518    def __exit__(self, exc_type, exc_val, exc_tb):
519        for path in self.paths:
520            try:
521                os.remove(path)
522            except OSError:
523                pass
524        return False
525
526
527def try_remove(img):
528    try:
529        os.remove(img)
530    except OSError:
531        pass
532
533def file_path_remover():
534    for path in reversed(file_path_remover.paths):
535        try_remove(path)
536
537
538def file_path(*names, base_dir=test_dir):
539    ''' Another way to get auto-generated filename that cleans itself up.
540
541    Use is as simple as:
542
543    img_a, img_b = file_path('a.img', 'b.img')
544    sock = file_path('socket')
545    '''
546
547    if not hasattr(file_path_remover, 'paths'):
548        file_path_remover.paths = []
549        atexit.register(file_path_remover)
550
551    paths = []
552    for name in names:
553        filename = file_pattern(name)
554        path = os.path.join(base_dir, filename)
555        file_path_remover.paths.append(path)
556        paths.append(path)
557
558    return paths[0] if len(paths) == 1 else paths
559
560def remote_filename(path):
561    if imgproto == 'file':
562        return path
563    elif imgproto == 'ssh':
564        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
565    else:
566        raise Exception("Protocol %s not supported" % (imgproto))
567
568class VM(qtest.QEMUQtestMachine):
569    '''A QEMU VM'''
570
571    def __init__(self, path_suffix=''):
572        name = "qemu%s-%d" % (path_suffix, os.getpid())
573        timer = 15.0
574        super().__init__(qemu_prog, qemu_opts, name=name,
575                         base_temp_dir=test_dir,
576                         socket_scm_helper=socket_scm_helper,
577                         sock_dir=sock_dir, qmp_timer=timer)
578        self._num_drives = 0
579
580    def add_object(self, opts):
581        self._args.append('-object')
582        self._args.append(opts)
583        return self
584
585    def add_device(self, opts):
586        self._args.append('-device')
587        self._args.append(opts)
588        return self
589
590    def add_drive_raw(self, opts):
591        self._args.append('-drive')
592        self._args.append(opts)
593        return self
594
595    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
596        '''Add a virtio-blk drive to the VM'''
597        options = ['if=%s' % interface,
598                   'id=drive%d' % self._num_drives]
599
600        if path is not None:
601            options.append('file=%s' % path)
602            options.append('format=%s' % img_format)
603            options.append('cache=%s' % cachemode)
604            options.append('aio=%s' % aiomode)
605
606        if opts:
607            options.append(opts)
608
609        if img_format == 'luks' and 'key-secret' not in opts:
610            # default luks support
611            if luks_default_secret_object not in self._args:
612                self.add_object(luks_default_secret_object)
613
614            options.append(luks_default_key_secret_opt)
615
616        self._args.append('-drive')
617        self._args.append(','.join(options))
618        self._num_drives += 1
619        return self
620
621    def add_blockdev(self, opts):
622        self._args.append('-blockdev')
623        if isinstance(opts, str):
624            self._args.append(opts)
625        else:
626            self._args.append(','.join(opts))
627        return self
628
629    def add_incoming(self, addr):
630        self._args.append('-incoming')
631        self._args.append(addr)
632        return self
633
634    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
635        cmd = 'human-monitor-command'
636        kwargs: Dict[str, Any] = {'command-line': command_line}
637        if use_log:
638            return self.qmp_log(cmd, **kwargs)
639        else:
640            return self.qmp(cmd, **kwargs)
641
642    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
643        """Pause drive r/w operations"""
644        if not event:
645            self.pause_drive(drive, "read_aio")
646            self.pause_drive(drive, "write_aio")
647            return
648        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
649
650    def resume_drive(self, drive: str) -> None:
651        """Resume drive r/w operations"""
652        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
653
654    def hmp_qemu_io(self, drive: str, cmd: str,
655                    use_log: bool = False) -> QMPMessage:
656        """Write to a given drive using an HMP command"""
657        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
658
659    def flatten_qmp_object(self, obj, output=None, basestr=''):
660        if output is None:
661            output = dict()
662        if isinstance(obj, list):
663            for i, item in enumerate(obj):
664                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
665        elif isinstance(obj, dict):
666            for key in obj:
667                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
668        else:
669            output[basestr[:-1]] = obj # Strip trailing '.'
670        return output
671
672    def qmp_to_opts(self, obj):
673        obj = self.flatten_qmp_object(obj)
674        output_list = list()
675        for key in obj:
676            output_list += [key + '=' + obj[key]]
677        return ','.join(output_list)
678
679    def get_qmp_events_filtered(self, wait=60.0):
680        result = []
681        for ev in self.get_qmp_events(wait=wait):
682            result.append(filter_qmp_event(ev))
683        return result
684
685    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
686        full_cmd = OrderedDict((
687            ("execute", cmd),
688            ("arguments", ordered_qmp(kwargs))
689        ))
690        log(full_cmd, filters, indent=indent)
691        result = self.qmp(cmd, **kwargs)
692        log(result, filters, indent=indent)
693        return result
694
695    # Returns None on success, and an error string on failure
696    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
697                pre_finalize=None, cancel=False, wait=60.0):
698        """
699        run_job moves a job from creation through to dismissal.
700
701        :param job: String. ID of recently-launched job
702        :param auto_finalize: Bool. True if the job was launched with
703                              auto_finalize. Defaults to True.
704        :param auto_dismiss: Bool. True if the job was launched with
705                             auto_dismiss=True. Defaults to False.
706        :param pre_finalize: Callback. A callable that takes no arguments to be
707                             invoked prior to issuing job-finalize, if any.
708        :param cancel: Bool. When true, cancels the job after the pre_finalize
709                       callback.
710        :param wait: Float. Timeout value specifying how long to wait for any
711                     event, in seconds. Defaults to 60.0.
712        """
713        match_device = {'data': {'device': job}}
714        match_id = {'data': {'id': job}}
715        events = [
716            ('BLOCK_JOB_COMPLETED', match_device),
717            ('BLOCK_JOB_CANCELLED', match_device),
718            ('BLOCK_JOB_ERROR', match_device),
719            ('BLOCK_JOB_READY', match_device),
720            ('BLOCK_JOB_PENDING', match_id),
721            ('JOB_STATUS_CHANGE', match_id)
722        ]
723        error = None
724        while True:
725            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
726            if ev['event'] != 'JOB_STATUS_CHANGE':
727                log(ev)
728                continue
729            status = ev['data']['status']
730            if status == 'aborting':
731                result = self.qmp('query-jobs')
732                for j in result['return']:
733                    if j['id'] == job:
734                        error = j['error']
735                        log('Job failed: %s' % (j['error']))
736            elif status == 'ready':
737                self.qmp_log('job-complete', id=job)
738            elif status == 'pending' and not auto_finalize:
739                if pre_finalize:
740                    pre_finalize()
741                if cancel:
742                    self.qmp_log('job-cancel', id=job)
743                else:
744                    self.qmp_log('job-finalize', id=job)
745            elif status == 'concluded' and not auto_dismiss:
746                self.qmp_log('job-dismiss', id=job)
747            elif status == 'null':
748                return error
749
750    # Returns None on success, and an error string on failure
751    def blockdev_create(self, options, job_id='job0', filters=None):
752        if filters is None:
753            filters = [filter_qmp_testfiles]
754        result = self.qmp_log('blockdev-create', filters=filters,
755                              job_id=job_id, options=options)
756
757        if 'return' in result:
758            assert result['return'] == {}
759            job_result = self.run_job(job_id)
760        else:
761            job_result = result['error']
762
763        log("")
764        return job_result
765
766    def enable_migration_events(self, name):
767        log('Enabling migration QMP events on %s...' % name)
768        log(self.qmp('migrate-set-capabilities', capabilities=[
769            {
770                'capability': 'events',
771                'state': True
772            }
773        ]))
774
775    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
776        while True:
777            event = self.event_wait('MIGRATION')
778            # We use the default timeout, and with a timeout, event_wait()
779            # never returns None
780            assert event
781
782            log(event, filters=[filter_qmp_event])
783            if event['data']['status'] in ('completed', 'failed'):
784                break
785
786        if event['data']['status'] == 'completed':
787            # The event may occur in finish-migrate, so wait for the expected
788            # post-migration runstate
789            runstate = None
790            while runstate != expect_runstate:
791                runstate = self.qmp('query-status')['return']['status']
792            return True
793        else:
794            return False
795
796    def node_info(self, node_name):
797        nodes = self.qmp('query-named-block-nodes')
798        for x in nodes['return']:
799            if x['node-name'] == node_name:
800                return x
801        return None
802
803    def query_bitmaps(self):
804        res = self.qmp("query-named-block-nodes")
805        return {device['node-name']: device['dirty-bitmaps']
806                for device in res['return'] if 'dirty-bitmaps' in device}
807
808    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
809        """
810        get a specific bitmap from the object returned by query_bitmaps.
811        :param recording: If specified, filter results by the specified value.
812        :param bitmaps: If specified, use it instead of call query_bitmaps()
813        """
814        if bitmaps is None:
815            bitmaps = self.query_bitmaps()
816
817        for bitmap in bitmaps[node_name]:
818            if bitmap.get('name', '') == bitmap_name:
819                if recording is None or bitmap.get('recording') == recording:
820                    return bitmap
821        return None
822
823    def check_bitmap_status(self, node_name, bitmap_name, fields):
824        ret = self.get_bitmap(node_name, bitmap_name)
825
826        return fields.items() <= ret.items()
827
828    def assert_block_path(self, root, path, expected_node, graph=None):
829        """
830        Check whether the node under the given path in the block graph
831        is @expected_node.
832
833        @root is the node name of the node where the @path is rooted.
834
835        @path is a string that consists of child names separated by
836        slashes.  It must begin with a slash.
837
838        Examples for @root + @path:
839          - root="qcow2-node", path="/backing/file"
840          - root="quorum-node", path="/children.2/file"
841
842        Hypothetically, @path could be empty, in which case it would
843        point to @root.  However, in practice this case is not useful
844        and hence not allowed.
845
846        @expected_node may be None.  (All elements of the path but the
847        leaf must still exist.)
848
849        @graph may be None or the result of an x-debug-query-block-graph
850        call that has already been performed.
851        """
852        if graph is None:
853            graph = self.qmp('x-debug-query-block-graph')['return']
854
855        iter_path = iter(path.split('/'))
856
857        # Must start with a /
858        assert next(iter_path) == ''
859
860        node = next((node for node in graph['nodes'] if node['name'] == root),
861                    None)
862
863        # An empty @path is not allowed, so the root node must be present
864        assert node is not None, 'Root node %s not found' % root
865
866        for child_name in iter_path:
867            assert node is not None, 'Cannot follow path %s%s' % (root, path)
868
869            try:
870                node_id = next(edge['child'] for edge in graph['edges']
871                               if (edge['parent'] == node['id'] and
872                                   edge['name'] == child_name))
873
874                node = next(node for node in graph['nodes']
875                            if node['id'] == node_id)
876
877            except StopIteration:
878                node = None
879
880        if node is None:
881            assert expected_node is None, \
882                   'No node found under %s (but expected %s)' % \
883                   (path, expected_node)
884        else:
885            assert node['name'] == expected_node, \
886                   'Found node %s under %s (but expected %s)' % \
887                   (node['name'], path, expected_node)
888
889index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
890
891class QMPTestCase(unittest.TestCase):
892    '''Abstract base class for QMP test cases'''
893
894    def __init__(self, *args, **kwargs):
895        super().__init__(*args, **kwargs)
896        # Many users of this class set a VM property we rely on heavily
897        # in the methods below.
898        self.vm = None
899
900    def dictpath(self, d, path):
901        '''Traverse a path in a nested dict'''
902        for component in path.split('/'):
903            m = index_re.match(component)
904            if m:
905                component, idx = m.groups()
906                idx = int(idx)
907
908            if not isinstance(d, dict) or component not in d:
909                self.fail(f'failed path traversal for "{path}" in "{d}"')
910            d = d[component]
911
912            if m:
913                if not isinstance(d, list):
914                    self.fail(f'path component "{component}" in "{path}" '
915                              f'is not a list in "{d}"')
916                try:
917                    d = d[idx]
918                except IndexError:
919                    self.fail(f'invalid index "{idx}" in path "{path}" '
920                              f'in "{d}"')
921        return d
922
923    def assert_qmp_absent(self, d, path):
924        try:
925            result = self.dictpath(d, path)
926        except AssertionError:
927            return
928        self.fail('path "%s" has value "%s"' % (path, str(result)))
929
930    def assert_qmp(self, d, path, value):
931        '''Assert that the value for a specific path in a QMP dict
932           matches.  When given a list of values, assert that any of
933           them matches.'''
934
935        result = self.dictpath(d, path)
936
937        # [] makes no sense as a list of valid values, so treat it as
938        # an actual single value.
939        if isinstance(value, list) and value != []:
940            for v in value:
941                if result == v:
942                    return
943            self.fail('no match for "%s" in %s' % (str(result), str(value)))
944        else:
945            self.assertEqual(result, value,
946                             '"%s" is "%s", expected "%s"'
947                             % (path, str(result), str(value)))
948
949    def assert_no_active_block_jobs(self):
950        result = self.vm.qmp('query-block-jobs')
951        self.assert_qmp(result, 'return', [])
952
953    def assert_has_block_node(self, node_name=None, file_name=None):
954        """Issue a query-named-block-nodes and assert node_name and/or
955        file_name is present in the result"""
956        def check_equal_or_none(a, b):
957            return a is None or b is None or a == b
958        assert node_name or file_name
959        result = self.vm.qmp('query-named-block-nodes')
960        for x in result["return"]:
961            if check_equal_or_none(x.get("node-name"), node_name) and \
962                    check_equal_or_none(x.get("file"), file_name):
963                return
964        self.fail("Cannot find %s %s in result:\n%s" %
965                  (node_name, file_name, result))
966
967    def assert_json_filename_equal(self, json_filename, reference):
968        '''Asserts that the given filename is a json: filename and that its
969           content is equal to the given reference object'''
970        self.assertEqual(json_filename[:5], 'json:')
971        self.assertEqual(
972            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
973            self.vm.flatten_qmp_object(reference)
974        )
975
976    def cancel_and_wait(self, drive='drive0', force=False,
977                        resume=False, wait=60.0):
978        '''Cancel a block job and wait for it to finish, returning the event'''
979        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
980        self.assert_qmp(result, 'return', {})
981
982        if resume:
983            self.vm.resume_drive(drive)
984
985        cancelled = False
986        result = None
987        while not cancelled:
988            for event in self.vm.get_qmp_events(wait=wait):
989                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
990                   event['event'] == 'BLOCK_JOB_CANCELLED':
991                    self.assert_qmp(event, 'data/device', drive)
992                    result = event
993                    cancelled = True
994                elif event['event'] == 'JOB_STATUS_CHANGE':
995                    self.assert_qmp(event, 'data/id', drive)
996
997
998        self.assert_no_active_block_jobs()
999        return result
1000
1001    def wait_until_completed(self, drive='drive0', check_offset=True,
1002                             wait=60.0, error=None):
1003        '''Wait for a block job to finish, returning the event'''
1004        while True:
1005            for event in self.vm.get_qmp_events(wait=wait):
1006                if event['event'] == 'BLOCK_JOB_COMPLETED':
1007                    self.assert_qmp(event, 'data/device', drive)
1008                    if error is None:
1009                        self.assert_qmp_absent(event, 'data/error')
1010                        if check_offset:
1011                            self.assert_qmp(event, 'data/offset',
1012                                            event['data']['len'])
1013                    else:
1014                        self.assert_qmp(event, 'data/error', error)
1015                    self.assert_no_active_block_jobs()
1016                    return event
1017                if event['event'] == 'JOB_STATUS_CHANGE':
1018                    self.assert_qmp(event, 'data/id', drive)
1019
1020    def wait_ready(self, drive='drive0'):
1021        """Wait until a BLOCK_JOB_READY event, and return the event."""
1022        return self.vm.events_wait([
1023            ('BLOCK_JOB_READY',
1024             {'data': {'type': 'mirror', 'device': drive}}),
1025            ('BLOCK_JOB_READY',
1026             {'data': {'type': 'commit', 'device': drive}})
1027        ])
1028
1029    def wait_ready_and_cancel(self, drive='drive0'):
1030        self.wait_ready(drive=drive)
1031        event = self.cancel_and_wait(drive=drive)
1032        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1033        self.assert_qmp(event, 'data/type', 'mirror')
1034        self.assert_qmp(event, 'data/offset', event['data']['len'])
1035
1036    def complete_and_wait(self, drive='drive0', wait_ready=True,
1037                          completion_error=None):
1038        '''Complete a block job and wait for it to finish'''
1039        if wait_ready:
1040            self.wait_ready(drive=drive)
1041
1042        result = self.vm.qmp('block-job-complete', device=drive)
1043        self.assert_qmp(result, 'return', {})
1044
1045        event = self.wait_until_completed(drive=drive, error=completion_error)
1046        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1047
1048    def pause_wait(self, job_id='job0'):
1049        with Timeout(3, "Timeout waiting for job to pause"):
1050            while True:
1051                result = self.vm.qmp('query-block-jobs')
1052                found = False
1053                for job in result['return']:
1054                    if job['device'] == job_id:
1055                        found = True
1056                        if job['paused'] and not job['busy']:
1057                            return job
1058                        break
1059                assert found
1060
1061    def pause_job(self, job_id='job0', wait=True):
1062        result = self.vm.qmp('block-job-pause', device=job_id)
1063        self.assert_qmp(result, 'return', {})
1064        if wait:
1065            return self.pause_wait(job_id)
1066        return result
1067
1068    def case_skip(self, reason):
1069        '''Skip this test case'''
1070        case_notrun(reason)
1071        self.skipTest(reason)
1072
1073
1074def notrun(reason):
1075    '''Skip this test suite'''
1076    # Each test in qemu-iotests has a number ("seq")
1077    seq = os.path.basename(sys.argv[0])
1078
1079    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1080    logger.warning("%s not run: %s", seq, reason)
1081    sys.exit(0)
1082
1083def case_notrun(reason):
1084    '''Mark this test case as not having been run (without actually
1085    skipping it, that is left to the caller).  See
1086    QMPTestCase.case_skip() for a variant that actually skips the
1087    current test case.'''
1088
1089    # Each test in qemu-iotests has a number ("seq")
1090    seq = os.path.basename(sys.argv[0])
1091
1092    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1093        '    [case not run] ' + reason + '\n')
1094
1095def _verify_image_format(supported_fmts: Sequence[str] = (),
1096                         unsupported_fmts: Sequence[str] = ()) -> None:
1097    if 'generic' in supported_fmts and \
1098            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1099        # similar to
1100        #   _supported_fmt generic
1101        # for bash tests
1102        supported_fmts = ()
1103
1104    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1105    if not_sup or (imgfmt in unsupported_fmts):
1106        notrun('not suitable for this image format: %s' % imgfmt)
1107
1108    if imgfmt == 'luks':
1109        verify_working_luks()
1110
1111def _verify_protocol(supported: Sequence[str] = (),
1112                     unsupported: Sequence[str] = ()) -> None:
1113    assert not (supported and unsupported)
1114
1115    if 'generic' in supported:
1116        return
1117
1118    not_sup = supported and (imgproto not in supported)
1119    if not_sup or (imgproto in unsupported):
1120        notrun('not suitable for this protocol: %s' % imgproto)
1121
1122def _verify_platform(supported: Sequence[str] = (),
1123                     unsupported: Sequence[str] = ()) -> None:
1124    if any((sys.platform.startswith(x) for x in unsupported)):
1125        notrun('not suitable for this OS: %s' % sys.platform)
1126
1127    if supported:
1128        if not any((sys.platform.startswith(x) for x in supported)):
1129            notrun('not suitable for this OS: %s' % sys.platform)
1130
1131def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1132    if supported_cache_modes and (cachemode not in supported_cache_modes):
1133        notrun('not suitable for this cache mode: %s' % cachemode)
1134
1135def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1136    if supported_aio_modes and (aiomode not in supported_aio_modes):
1137        notrun('not suitable for this aio mode: %s' % aiomode)
1138
1139def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1140    usf_list = list(set(required_formats) - set(supported_formats()))
1141    if usf_list:
1142        notrun(f'formats {usf_list} are not whitelisted')
1143
1144
1145def _verify_virtio_blk() -> None:
1146    out = qemu_pipe('-M', 'none', '-device', 'help')
1147    if 'virtio-blk' not in out:
1148        notrun('Missing virtio-blk in QEMU binary')
1149
1150def _verify_virtio_scsi_pci_or_ccw() -> None:
1151    out = qemu_pipe('-M', 'none', '-device', 'help')
1152    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1153        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1154
1155
1156def supports_quorum():
1157    return 'quorum' in qemu_img_pipe('--help')
1158
1159def verify_quorum():
1160    '''Skip test suite if quorum support is not available'''
1161    if not supports_quorum():
1162        notrun('quorum support missing')
1163
1164def has_working_luks() -> Tuple[bool, str]:
1165    """
1166    Check whether our LUKS driver can actually create images
1167    (this extends to LUKS encryption for qcow2).
1168
1169    If not, return the reason why.
1170    """
1171
1172    img_file = f'{test_dir}/luks-test.luks'
1173    (output, status) = \
1174        qemu_img_pipe_and_status('create', '-f', 'luks',
1175                                 '--object', luks_default_secret_object,
1176                                 '-o', luks_default_key_secret_opt,
1177                                 '-o', 'iter-time=10',
1178                                 img_file, '1G')
1179    try:
1180        os.remove(img_file)
1181    except OSError:
1182        pass
1183
1184    if status != 0:
1185        reason = output
1186        for line in output.splitlines():
1187            if img_file + ':' in line:
1188                reason = line.split(img_file + ':', 1)[1].strip()
1189                break
1190
1191        return (False, reason)
1192    else:
1193        return (True, '')
1194
1195def verify_working_luks():
1196    """
1197    Skip test suite if LUKS does not work
1198    """
1199    (working, reason) = has_working_luks()
1200    if not working:
1201        notrun(reason)
1202
1203def qemu_pipe(*args: str) -> str:
1204    """
1205    Run qemu with an option to print something and exit (e.g. a help option).
1206
1207    :return: QEMU's stdout output.
1208    """
1209    full_args = [qemu_prog] + qemu_opts + list(args)
1210    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1211    return output
1212
1213def supported_formats(read_only=False):
1214    '''Set 'read_only' to True to check ro-whitelist
1215       Otherwise, rw-whitelist is checked'''
1216
1217    if not hasattr(supported_formats, "formats"):
1218        supported_formats.formats = {}
1219
1220    if read_only not in supported_formats.formats:
1221        format_message = qemu_pipe("-drive", "format=help")
1222        line = 1 if read_only else 0
1223        supported_formats.formats[read_only] = \
1224            format_message.splitlines()[line].split(":")[1].split()
1225
1226    return supported_formats.formats[read_only]
1227
1228def skip_if_unsupported(required_formats=(), read_only=False):
1229    '''Skip Test Decorator
1230       Runs the test if all the required formats are whitelisted'''
1231    def skip_test_decorator(func):
1232        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1233                         **kwargs: Dict[str, Any]) -> None:
1234            if callable(required_formats):
1235                fmts = required_formats(test_case)
1236            else:
1237                fmts = required_formats
1238
1239            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1240            if usf_list:
1241                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1242                test_case.case_skip(msg)
1243            else:
1244                func(test_case, *args, **kwargs)
1245        return func_wrapper
1246    return skip_test_decorator
1247
1248def skip_for_formats(formats: Sequence[str] = ()) \
1249    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1250                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1251    '''Skip Test Decorator
1252       Skips the test for the given formats'''
1253    def skip_test_decorator(func):
1254        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1255                         **kwargs: Dict[str, Any]) -> None:
1256            if imgfmt in formats:
1257                msg = f'{test_case}: Skipped for format {imgfmt}'
1258                test_case.case_skip(msg)
1259            else:
1260                func(test_case, *args, **kwargs)
1261        return func_wrapper
1262    return skip_test_decorator
1263
1264def skip_if_user_is_root(func):
1265    '''Skip Test Decorator
1266       Runs the test only without root permissions'''
1267    def func_wrapper(*args, **kwargs):
1268        if os.getuid() == 0:
1269            case_notrun('{}: cannot be run as root'.format(args[0]))
1270            return None
1271        else:
1272            return func(*args, **kwargs)
1273    return func_wrapper
1274
1275# We need to filter out the time taken from the output so that
1276# qemu-iotest can reliably diff the results against master output,
1277# and hide skipped tests from the reference output.
1278
1279class ReproducibleTestResult(unittest.TextTestResult):
1280    def addSkip(self, test, reason):
1281        # Same as TextTestResult, but print dot instead of "s"
1282        unittest.TestResult.addSkip(self, test, reason)
1283        if self.showAll:
1284            self.stream.writeln("skipped {0!r}".format(reason))
1285        elif self.dots:
1286            self.stream.write(".")
1287            self.stream.flush()
1288
1289class ReproducibleStreamWrapper:
1290    def __init__(self, stream: TextIO):
1291        self.stream = stream
1292
1293    def __getattr__(self, attr):
1294        if attr in ('stream', '__getstate__'):
1295            raise AttributeError(attr)
1296        return getattr(self.stream, attr)
1297
1298    def write(self, arg=None):
1299        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1300        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1301        self.stream.write(arg)
1302
1303class ReproducibleTestRunner(unittest.TextTestRunner):
1304    def __init__(self, stream: Optional[TextIO] = None,
1305             resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1306             **kwargs: Any) -> None:
1307        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1308        super().__init__(stream=rstream,           # type: ignore
1309                         descriptions=True,
1310                         resultclass=resultclass,
1311                         **kwargs)
1312
1313def execute_unittest(argv: List[str], debug: bool = False) -> None:
1314    """Executes unittests within the calling module."""
1315
1316    # Some tests have warnings, especially ResourceWarnings for unclosed
1317    # files and sockets.  Ignore them for now to ensure reproducibility of
1318    # the test output.
1319    unittest.main(argv=argv,
1320                  testRunner=ReproducibleTestRunner,
1321                  verbosity=2 if debug else 1,
1322                  warnings=None if sys.warnoptions else 'ignore')
1323
1324def execute_setup_common(supported_fmts: Sequence[str] = (),
1325                         supported_platforms: Sequence[str] = (),
1326                         supported_cache_modes: Sequence[str] = (),
1327                         supported_aio_modes: Sequence[str] = (),
1328                         unsupported_fmts: Sequence[str] = (),
1329                         supported_protocols: Sequence[str] = (),
1330                         unsupported_protocols: Sequence[str] = (),
1331                         required_fmts: Sequence[str] = ()) -> bool:
1332    """
1333    Perform necessary setup for either script-style or unittest-style tests.
1334
1335    :return: Bool; Whether or not debug mode has been requested via the CLI.
1336    """
1337    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1338
1339    debug = '-d' in sys.argv
1340    if debug:
1341        sys.argv.remove('-d')
1342    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1343
1344    _verify_image_format(supported_fmts, unsupported_fmts)
1345    _verify_protocol(supported_protocols, unsupported_protocols)
1346    _verify_platform(supported=supported_platforms)
1347    _verify_cache_mode(supported_cache_modes)
1348    _verify_aio_mode(supported_aio_modes)
1349    _verify_formats(required_fmts)
1350    _verify_virtio_blk()
1351
1352    return debug
1353
1354def execute_test(*args, test_function=None, **kwargs):
1355    """Run either unittest or script-style tests."""
1356
1357    debug = execute_setup_common(*args, **kwargs)
1358    if not test_function:
1359        execute_unittest(sys.argv, debug)
1360    else:
1361        test_function()
1362
1363def activate_logging():
1364    """Activate iotests.log() output to stdout for script-style tests."""
1365    handler = logging.StreamHandler(stream=sys.stdout)
1366    formatter = logging.Formatter('%(message)s')
1367    handler.setFormatter(formatter)
1368    test_logger.addHandler(handler)
1369    test_logger.setLevel(logging.INFO)
1370    test_logger.propagate = False
1371
1372# This is called from script-style iotests without a single point of entry
1373def script_initialize(*args, **kwargs):
1374    """Initialize script-style tests without running any tests."""
1375    activate_logging()
1376    execute_setup_common(*args, **kwargs)
1377
1378# This is called from script-style iotests with a single point of entry
1379def script_main(test_function, *args, **kwargs):
1380    """Run script-style tests outside of the unittest framework"""
1381    activate_logging()
1382    execute_test(*args, test_function=test_function, **kwargs)
1383
1384# This is called from unittest style iotests
1385def main(*args, **kwargs):
1386    """Run tests using the unittest framework"""
1387    execute_test(*args, **kwargs)
1388