xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 0ebfc997d29c3789b9bd41fe53fc198c2fd550da)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.aqmp.legacy import QEMUMonitorProtocol
41from qemu.machine import qtest
42from qemu.qmp import QMPMessage
43from qemu.utils import VerboseProcessError
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
71qemu_nbd_args = [qemu_nbd_prog]
72if os.environ.get('QEMU_NBD_OPTIONS'):
73    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
74
75qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
76qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
77
78qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
79
80gdb_qemu_env = os.environ.get('GDB_OPTIONS')
81qemu_gdb = []
82if gdb_qemu_env:
83    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
84
85qemu_print = os.environ.get('PRINT_QEMU', False)
86
87imgfmt = os.environ.get('IMGFMT', 'raw')
88imgproto = os.environ.get('IMGPROTO', 'file')
89
90try:
91    test_dir = os.environ['TEST_DIR']
92    sock_dir = os.environ['SOCK_DIR']
93    cachemode = os.environ['CACHEMODE']
94    aiomode = os.environ['AIOMODE']
95    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
96except KeyError:
97    # We are using these variables as proxies to indicate that we're
98    # not being run via "check". There may be other things set up by
99    # "check" that individual test cases rely on.
100    sys.stderr.write('Please run this test via the "check" script\n')
101    sys.exit(os.EX_USAGE)
102
103qemu_valgrind = []
104if os.environ.get('VALGRIND_QEMU') == "y" and \
105    os.environ.get('NO_VALGRIND') != "y":
106    valgrind_logfile = "--log-file=" + test_dir
107    # %p allows to put the valgrind process PID, since
108    # we don't know it a priori (subprocess.Popen is
109    # not yet invoked)
110    valgrind_logfile += "/%p.valgrind"
111
112    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
113
114luks_default_secret_object = 'secret,id=keysec0,data=' + \
115                             os.environ.get('IMGKEYSECRET', '')
116luks_default_key_secret_opt = 'key-secret=keysec0'
117
118sample_img_dir = os.environ['SAMPLE_IMG_DIR']
119
120
121@contextmanager
122def change_log_level(
123        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124    """
125    Utility function for temporarily changing the log level of a logger.
126
127    This can be used to silence errors that are expected or uninteresting.
128    """
129    _logger = logging.getLogger(logger_name)
130    current_level = _logger.level
131    _logger.setLevel(level)
132
133    try:
134        yield
135    finally:
136        _logger.setLevel(current_level)
137
138
139def unarchive_sample_image(sample, fname):
140    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
141    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
142        shutil.copyfileobj(f_in, f_out)
143
144
145def qemu_tool_popen(args: Sequence[str],
146                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
147    stderr = subprocess.STDOUT if connect_stderr else None
148    # pylint: disable=consider-using-with
149    return subprocess.Popen(args,
150                            stdout=subprocess.PIPE,
151                            stderr=stderr,
152                            universal_newlines=True)
153
154
155def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
156                              connect_stderr: bool = True,
157                              drop_successful_output: bool = False) \
158        -> Tuple[str, int]:
159    """
160    Run a tool and return both its output and its exit code
161    """
162    with qemu_tool_popen(args, connect_stderr) as subp:
163        output = subp.communicate()[0]
164        if subp.returncode < 0:
165            cmd = ' '.join(args)
166            sys.stderr.write(f'{tool} received signal \
167                               {-subp.returncode}: {cmd}\n')
168        if drop_successful_output and subp.returncode == 0:
169            output = ''
170        return (output, subp.returncode)
171
172def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
173    if not args or args[0] != 'create':
174        return list(args)
175    args = args[1:]
176
177    p = argparse.ArgumentParser(allow_abbrev=False)
178    # -o option may be specified several times
179    p.add_argument('-o', action='append', default=[])
180    p.add_argument('-f')
181    parsed, remaining = p.parse_known_args(args)
182
183    opts_list = parsed.o
184
185    result = ['create']
186    if parsed.f is not None:
187        result += ['-f', parsed.f]
188
189    # IMGOPTS most probably contain options specific for the selected format,
190    # like extended_l2 or compression_type for qcow2. Test may want to create
191    # additional images in other formats that doesn't support these options.
192    # So, use IMGOPTS only for images created in imgfmt format.
193    imgopts = os.environ.get('IMGOPTS')
194    if imgopts and parsed.f == imgfmt:
195        opts_list.insert(0, imgopts)
196
197    # default luks support
198    if parsed.f == 'luks' and \
199            all('key-secret' not in opts for opts in opts_list):
200        result += ['--object', luks_default_secret_object]
201        opts_list.append(luks_default_key_secret_opt)
202
203    for opts in opts_list:
204        result += ['-o', opts]
205
206    result += remaining
207
208    return result
209
210def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
211             ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run qemu_img and return the status code and console output.
214
215    This function always prepends QEMU_IMG_OPTIONS and may further alter
216    the args for 'create' commands.
217
218    :param args: command-line arguments to qemu-img.
219    :param check: Enforce a return code of zero.
220    :param combine_stdio: set to False to keep stdout/stderr separated.
221
222    :raise VerboseProcessError:
223        When the return code is negative, or on any non-zero exit code
224        when 'check=True' was provided (the default). This exception has
225        'stdout', 'stderr', and 'returncode' properties that may be
226        inspected to show greater detail. If this exception is not
227        handled, the command-line, return code, and all console output
228        will be included at the bottom of the stack trace.
229
230    :return:
231        a CompletedProcess. This object has args, returncode, and stdout
232        properties. If streams are not combined, it will also have a
233        stderr property.
234    """
235    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
236
237    subp = subprocess.run(
238        full_args,
239        stdout=subprocess.PIPE,
240        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
241        universal_newlines=True,
242        check=False
243    )
244
245    if check and subp.returncode or (subp.returncode < 0):
246        raise VerboseProcessError(
247            subp.returncode, full_args,
248            output=subp.stdout,
249            stderr=subp.stderr,
250        )
251
252    return subp
253
254
255def ordered_qmp(qmsg, conv_keys=True):
256    # Dictionaries are not ordered prior to 3.6, therefore:
257    if isinstance(qmsg, list):
258        return [ordered_qmp(atom) for atom in qmsg]
259    if isinstance(qmsg, dict):
260        od = OrderedDict()
261        for k, v in sorted(qmsg.items()):
262            if conv_keys:
263                k = k.replace('_', '-')
264            od[k] = ordered_qmp(v, conv_keys=False)
265        return od
266    return qmsg
267
268def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
269    return qemu_img('create', *args)
270
271def qemu_img_json(*args: str) -> Any:
272    """
273    Run qemu-img and return its output as deserialized JSON.
274
275    :raise CalledProcessError:
276        When qemu-img crashes, or returns a non-zero exit code without
277        producing a valid JSON document to stdout.
278    :raise JSONDecoderError:
279        When qemu-img returns 0, but failed to produce a valid JSON document.
280
281    :return: A deserialized JSON object; probably a dict[str, Any].
282    """
283    try:
284        res = qemu_img(*args, combine_stdio=False)
285    except subprocess.CalledProcessError as exc:
286        # Terminated due to signal. Don't bother.
287        if exc.returncode < 0:
288            raise
289
290        # Commands like 'check' can return failure (exit codes 2 and 3)
291        # to indicate command completion, but with errors found. For
292        # multi-command flexibility, ignore the exact error codes and
293        # *try* to load JSON.
294        try:
295            return json.loads(exc.stdout)
296        except json.JSONDecodeError:
297            # Nope. This thing is toast. Raise the /process/ error.
298            pass
299        raise
300
301    return json.loads(res.stdout)
302
303def qemu_img_measure(*args: str) -> Any:
304    return qemu_img_json("measure", "--output", "json", *args)
305
306def qemu_img_check(*args: str) -> Any:
307    return qemu_img_json("check", "--output", "json", *args)
308
309def qemu_img_info(*args: str) -> Any:
310    return qemu_img_json('info', "--output", "json", *args)
311
312def qemu_img_map(*args: str) -> Any:
313    return qemu_img_json('map', "--output", "json", *args)
314
315def qemu_img_log(*args: str, check: bool = True
316                 ) -> 'subprocess.CompletedProcess[str]':
317    result = qemu_img(*args, check=check)
318    log(result.stdout, filters=[filter_testfiles])
319    return result
320
321def img_info_log(filename: str, filter_path: Optional[str] = None,
322                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
323                 check: bool = True,
324                 ) -> None:
325    args = ['info']
326    if use_image_opts:
327        args.append('--image-opts')
328    else:
329        args += ['-f', imgfmt]
330    args += extra_args
331    args.append(filename)
332
333    output = qemu_img(*args, check=check).stdout
334    if not filter_path:
335        filter_path = filename
336    log(filter_img_info(output, filter_path))
337
338def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
339    if '-f' in args or '--image-opts' in args:
340        return qemu_io_args_no_fmt + list(args)
341    else:
342        return qemu_io_args + list(args)
343
344def qemu_io_popen(*args):
345    return qemu_tool_popen(qemu_io_wrap_args(args))
346
347def qemu_io(*args):
348    '''Run qemu-io and return the stdout data'''
349    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
350
351def qemu_io_pipe_and_status(*args):
352    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))
353
354def qemu_io_log(*args):
355    result = qemu_io(*args)
356    log(result, filters=[filter_testfiles, filter_qemu_io])
357    return result
358
359def qemu_io_silent(*args):
360    '''Run qemu-io and return the exit code, suppressing stdout'''
361    args = qemu_io_wrap_args(args)
362    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
363    if result.returncode < 0:
364        sys.stderr.write('qemu-io received signal %i: %s\n' %
365                         (-result.returncode, ' '.join(args)))
366    return result.returncode
367
368def qemu_io_silent_check(*args):
369    '''Run qemu-io and return the true if subprocess returned 0'''
370    args = qemu_io_wrap_args(args)
371    result = subprocess.run(args, stdout=subprocess.DEVNULL,
372                            stderr=subprocess.STDOUT, check=False)
373    return result.returncode == 0
374
375class QemuIoInteractive:
376    def __init__(self, *args):
377        self.args = qemu_io_wrap_args(args)
378        # We need to keep the Popen objext around, and not
379        # close it immediately. Therefore, disable the pylint check:
380        # pylint: disable=consider-using-with
381        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
382                                   stdout=subprocess.PIPE,
383                                   stderr=subprocess.STDOUT,
384                                   universal_newlines=True)
385        out = self._p.stdout.read(9)
386        if out != 'qemu-io> ':
387            # Most probably qemu-io just failed to start.
388            # Let's collect the whole output and exit.
389            out += self._p.stdout.read()
390            self._p.wait(timeout=1)
391            raise ValueError(out)
392
393    def close(self):
394        self._p.communicate('q\n')
395
396    def _read_output(self):
397        pattern = 'qemu-io> '
398        n = len(pattern)
399        pos = 0
400        s = []
401        while pos != n:
402            c = self._p.stdout.read(1)
403            # check unexpected EOF
404            assert c != ''
405            s.append(c)
406            if c == pattern[pos]:
407                pos += 1
408            else:
409                pos = 0
410
411        return ''.join(s[:-n])
412
413    def cmd(self, cmd):
414        # quit command is in close(), '\n' is added automatically
415        assert '\n' not in cmd
416        cmd = cmd.strip()
417        assert cmd not in ('q', 'quit')
418        self._p.stdin.write(cmd + '\n')
419        self._p.stdin.flush()
420        return self._read_output()
421
422
423class QemuStorageDaemon:
424    _qmp: Optional[QEMUMonitorProtocol] = None
425    _qmpsock: Optional[str] = None
426    # Python < 3.8 would complain if this type were not a string literal
427    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
428    _p: 'Optional[subprocess.Popen[bytes]]' = None
429
430    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
431        assert '--pidfile' not in args
432        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
433        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
434
435        if qmp:
436            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
437            all_args += ['--chardev',
438                         f'socket,id=qmp-sock,path={self._qmpsock}',
439                         '--monitor', 'qmp-sock']
440
441            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
442
443        # Cannot use with here, we want the subprocess to stay around
444        # pylint: disable=consider-using-with
445        self._p = subprocess.Popen(all_args)
446        if self._qmp is not None:
447            self._qmp.accept()
448        while not os.path.exists(self.pidfile):
449            if self._p.poll() is not None:
450                cmd = ' '.join(all_args)
451                raise RuntimeError(
452                    'qemu-storage-daemon terminated with exit code ' +
453                    f'{self._p.returncode}: {cmd}')
454
455            time.sleep(0.01)
456
457        with open(self.pidfile, encoding='utf-8') as f:
458            self._pid = int(f.read().strip())
459
460        assert self._pid == self._p.pid
461
462    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
463            -> QMPMessage:
464        assert self._qmp is not None
465        return self._qmp.cmd(cmd, args)
466
467    def stop(self, kill_signal=15):
468        self._p.send_signal(kill_signal)
469        self._p.wait()
470        self._p = None
471
472        if self._qmp:
473            self._qmp.close()
474
475        if self._qmpsock is not None:
476            try:
477                os.remove(self._qmpsock)
478            except OSError:
479                pass
480        try:
481            os.remove(self.pidfile)
482        except OSError:
483            pass
484
485    def __del__(self):
486        if self._p is not None:
487            self.stop(kill_signal=9)
488
489
490def qemu_nbd(*args):
491    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
492    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
493
494def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
495    '''Run qemu-nbd in daemon mode and return both the parent's exit code
496       and its output in case of an error'''
497    full_args = qemu_nbd_args + ['--fork'] + list(args)
498    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
499                                                   connect_stderr=False)
500    return returncode, output if returncode else ''
501
502def qemu_nbd_list_log(*args: str) -> str:
503    '''Run qemu-nbd to list remote exports'''
504    full_args = [qemu_nbd_prog, '-L'] + list(args)
505    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
506    log(output, filters=[filter_testfiles, filter_nbd_exports])
507    return output
508
509@contextmanager
510def qemu_nbd_popen(*args):
511    '''Context manager running qemu-nbd within the context'''
512    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
513
514    assert not os.path.exists(pid_file)
515
516    cmd = list(qemu_nbd_args)
517    cmd.extend(('--persistent', '--pid-file', pid_file))
518    cmd.extend(args)
519
520    log('Start NBD server')
521    with subprocess.Popen(cmd) as p:
522        try:
523            while not os.path.exists(pid_file):
524                if p.poll() is not None:
525                    raise RuntimeError(
526                        "qemu-nbd terminated with exit code {}: {}"
527                        .format(p.returncode, ' '.join(cmd)))
528
529                time.sleep(0.01)
530            yield
531        finally:
532            if os.path.exists(pid_file):
533                os.remove(pid_file)
534            log('Kill NBD server')
535            p.kill()
536            p.wait()
537
538def compare_images(img1: str, img2: str,
539                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
540    """
541    Compare two images with QEMU_IMG; return True if they are identical.
542
543    :raise CalledProcessError:
544        when qemu-img crashes or returns a status code of anything other
545        than 0 (identical) or 1 (different).
546    """
547    try:
548        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
549        return True
550    except subprocess.CalledProcessError as exc:
551        if exc.returncode == 1:
552            return False
553        raise
554
555def create_image(name, size):
556    '''Create a fully-allocated raw image with sector markers'''
557    with open(name, 'wb') as file:
558        i = 0
559        while i < size:
560            sector = struct.pack('>l504xl', i // 512, i // 512)
561            file.write(sector)
562            i = i + 512
563
564def image_size(img: str) -> int:
565    """Return image's virtual size"""
566    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
567    if not isinstance(value, int):
568        type_name = type(value).__name__
569        raise TypeError("Expected 'int' for 'virtual-size', "
570                        f"got '{value}' of type '{type_name}'")
571    return value
572
573def is_str(val):
574    return isinstance(val, str)
575
576test_dir_re = re.compile(r"%s" % test_dir)
577def filter_test_dir(msg):
578    return test_dir_re.sub("TEST_DIR", msg)
579
580win32_re = re.compile(r"\r")
581def filter_win32(msg):
582    return win32_re.sub("", msg)
583
584qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
585                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
586                        r"and [0-9\/.inf]* ops\/sec\)")
587def filter_qemu_io(msg):
588    msg = filter_win32(msg)
589    return qemu_io_re.sub("X ops; XX:XX:XX.X "
590                          "(XXX YYY/sec and XXX ops/sec)", msg)
591
592chown_re = re.compile(r"chown [0-9]+:[0-9]+")
593def filter_chown(msg):
594    return chown_re.sub("chown UID:GID", msg)
595
596def filter_qmp_event(event):
597    '''Filter a QMP event dict'''
598    event = dict(event)
599    if 'timestamp' in event:
600        event['timestamp']['seconds'] = 'SECS'
601        event['timestamp']['microseconds'] = 'USECS'
602    return event
603
604def filter_qmp(qmsg, filter_fn):
605    '''Given a string filter, filter a QMP object's values.
606    filter_fn takes a (key, value) pair.'''
607    # Iterate through either lists or dicts;
608    if isinstance(qmsg, list):
609        items = enumerate(qmsg)
610    elif isinstance(qmsg, dict):
611        items = qmsg.items()
612    else:
613        return filter_fn(None, qmsg)
614
615    for k, v in items:
616        if isinstance(v, (dict, list)):
617            qmsg[k] = filter_qmp(v, filter_fn)
618        else:
619            qmsg[k] = filter_fn(k, v)
620    return qmsg
621
622def filter_testfiles(msg):
623    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
624    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
625    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
626
627def filter_qmp_testfiles(qmsg):
628    def _filter(_key, value):
629        if is_str(value):
630            return filter_testfiles(value)
631        return value
632    return filter_qmp(qmsg, _filter)
633
634def filter_virtio_scsi(output: str) -> str:
635    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
636
637def filter_qmp_virtio_scsi(qmsg):
638    def _filter(_key, value):
639        if is_str(value):
640            return filter_virtio_scsi(value)
641        return value
642    return filter_qmp(qmsg, _filter)
643
644def filter_generated_node_ids(msg):
645    return re.sub("#block[0-9]+", "NODE_NAME", msg)
646
647def filter_img_info(output, filename):
648    lines = []
649    for line in output.split('\n'):
650        if 'disk size' in line or 'actual-size' in line:
651            continue
652        line = line.replace(filename, 'TEST_IMG')
653        line = filter_testfiles(line)
654        line = line.replace(imgfmt, 'IMGFMT')
655        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
656        line = re.sub('uuid: [-a-f0-9]+',
657                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
658                      line)
659        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
660        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
661                      line)
662        lines.append(line)
663    return '\n'.join(lines)
664
665def filter_imgfmt(msg):
666    return msg.replace(imgfmt, 'IMGFMT')
667
668def filter_qmp_imgfmt(qmsg):
669    def _filter(_key, value):
670        if is_str(value):
671            return filter_imgfmt(value)
672        return value
673    return filter_qmp(qmsg, _filter)
674
675def filter_nbd_exports(output: str) -> str:
676    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
677
678
679Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
680
681def log(msg: Msg,
682        filters: Iterable[Callable[[Msg], Msg]] = (),
683        indent: Optional[int] = None) -> None:
684    """
685    Logs either a string message or a JSON serializable message (like QMP).
686    If indent is provided, JSON serializable messages are pretty-printed.
687    """
688    for flt in filters:
689        msg = flt(msg)
690    if isinstance(msg, (dict, list)):
691        # Don't sort if it's already sorted
692        do_sort = not isinstance(msg, OrderedDict)
693        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
694    else:
695        test_logger.info(msg)
696
697class Timeout:
698    def __init__(self, seconds, errmsg="Timeout"):
699        self.seconds = seconds
700        self.errmsg = errmsg
701    def __enter__(self):
702        if qemu_gdb or qemu_valgrind:
703            return self
704        signal.signal(signal.SIGALRM, self.timeout)
705        signal.setitimer(signal.ITIMER_REAL, self.seconds)
706        return self
707    def __exit__(self, exc_type, value, traceback):
708        if qemu_gdb or qemu_valgrind:
709            return False
710        signal.setitimer(signal.ITIMER_REAL, 0)
711        return False
712    def timeout(self, signum, frame):
713        raise Exception(self.errmsg)
714
715def file_pattern(name):
716    return "{0}-{1}".format(os.getpid(), name)
717
718class FilePath:
719    """
720    Context manager generating multiple file names. The generated files are
721    removed when exiting the context.
722
723    Example usage:
724
725        with FilePath('a.img', 'b.img') as (img_a, img_b):
726            # Use img_a and img_b here...
727
728        # a.img and b.img are automatically removed here.
729
730    By default images are created in iotests.test_dir. To create sockets use
731    iotests.sock_dir:
732
733       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
734
735    For convenience, calling with one argument yields a single file instead of
736    a tuple with one item.
737
738    """
739    def __init__(self, *names, base_dir=test_dir):
740        self.paths = [os.path.join(base_dir, file_pattern(name))
741                      for name in names]
742
743    def __enter__(self):
744        if len(self.paths) == 1:
745            return self.paths[0]
746        else:
747            return self.paths
748
749    def __exit__(self, exc_type, exc_val, exc_tb):
750        for path in self.paths:
751            try:
752                os.remove(path)
753            except OSError:
754                pass
755        return False
756
757
758def try_remove(img):
759    try:
760        os.remove(img)
761    except OSError:
762        pass
763
764def file_path_remover():
765    for path in reversed(file_path_remover.paths):
766        try_remove(path)
767
768
769def file_path(*names, base_dir=test_dir):
770    ''' Another way to get auto-generated filename that cleans itself up.
771
772    Use is as simple as:
773
774    img_a, img_b = file_path('a.img', 'b.img')
775    sock = file_path('socket')
776    '''
777
778    if not hasattr(file_path_remover, 'paths'):
779        file_path_remover.paths = []
780        atexit.register(file_path_remover)
781
782    paths = []
783    for name in names:
784        filename = file_pattern(name)
785        path = os.path.join(base_dir, filename)
786        file_path_remover.paths.append(path)
787        paths.append(path)
788
789    return paths[0] if len(paths) == 1 else paths
790
791def remote_filename(path):
792    if imgproto == 'file':
793        return path
794    elif imgproto == 'ssh':
795        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
796    else:
797        raise Exception("Protocol %s not supported" % (imgproto))
798
799class VM(qtest.QEMUQtestMachine):
800    '''A QEMU VM'''
801
802    def __init__(self, path_suffix=''):
803        name = "qemu%s-%d" % (path_suffix, os.getpid())
804        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
805        if qemu_gdb and qemu_valgrind:
806            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
807            sys.exit(1)
808        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
809        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
810                         name=name,
811                         base_temp_dir=test_dir,
812                         sock_dir=sock_dir, qmp_timer=timer)
813        self._num_drives = 0
814
815    def _post_shutdown(self) -> None:
816        super()._post_shutdown()
817        if not qemu_valgrind or not self._popen:
818            return
819        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
820        if self.exitcode() == 99:
821            with open(valgrind_filename, encoding='utf-8') as f:
822                print(f.read())
823        else:
824            os.remove(valgrind_filename)
825
826    def _pre_launch(self) -> None:
827        super()._pre_launch()
828        if qemu_print:
829            # set QEMU binary output to stdout
830            self._close_qemu_log_file()
831
832    def add_object(self, opts):
833        self._args.append('-object')
834        self._args.append(opts)
835        return self
836
837    def add_device(self, opts):
838        self._args.append('-device')
839        self._args.append(opts)
840        return self
841
842    def add_drive_raw(self, opts):
843        self._args.append('-drive')
844        self._args.append(opts)
845        return self
846
847    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
848        '''Add a virtio-blk drive to the VM'''
849        options = ['if=%s' % interface,
850                   'id=drive%d' % self._num_drives]
851
852        if path is not None:
853            options.append('file=%s' % path)
854            options.append('format=%s' % img_format)
855            options.append('cache=%s' % cachemode)
856            options.append('aio=%s' % aiomode)
857
858        if opts:
859            options.append(opts)
860
861        if img_format == 'luks' and 'key-secret' not in opts:
862            # default luks support
863            if luks_default_secret_object not in self._args:
864                self.add_object(luks_default_secret_object)
865
866            options.append(luks_default_key_secret_opt)
867
868        self._args.append('-drive')
869        self._args.append(','.join(options))
870        self._num_drives += 1
871        return self
872
873    def add_blockdev(self, opts):
874        self._args.append('-blockdev')
875        if isinstance(opts, str):
876            self._args.append(opts)
877        else:
878            self._args.append(','.join(opts))
879        return self
880
881    def add_incoming(self, addr):
882        self._args.append('-incoming')
883        self._args.append(addr)
884        return self
885
886    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
887        cmd = 'human-monitor-command'
888        kwargs: Dict[str, Any] = {'command-line': command_line}
889        if use_log:
890            return self.qmp_log(cmd, **kwargs)
891        else:
892            return self.qmp(cmd, **kwargs)
893
894    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
895        """Pause drive r/w operations"""
896        if not event:
897            self.pause_drive(drive, "read_aio")
898            self.pause_drive(drive, "write_aio")
899            return
900        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
901
902    def resume_drive(self, drive: str) -> None:
903        """Resume drive r/w operations"""
904        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
905
906    def hmp_qemu_io(self, drive: str, cmd: str,
907                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
908        """Write to a given drive using an HMP command"""
909        d = '-d ' if qdev else ''
910        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
911
912    def flatten_qmp_object(self, obj, output=None, basestr=''):
913        if output is None:
914            output = {}
915        if isinstance(obj, list):
916            for i, item in enumerate(obj):
917                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
918        elif isinstance(obj, dict):
919            for key in obj:
920                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
921        else:
922            output[basestr[:-1]] = obj # Strip trailing '.'
923        return output
924
925    def qmp_to_opts(self, obj):
926        obj = self.flatten_qmp_object(obj)
927        output_list = []
928        for key in obj:
929            output_list += [key + '=' + obj[key]]
930        return ','.join(output_list)
931
932    def get_qmp_events_filtered(self, wait=60.0):
933        result = []
934        for ev in self.get_qmp_events(wait=wait):
935            result.append(filter_qmp_event(ev))
936        return result
937
938    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
939        full_cmd = OrderedDict((
940            ("execute", cmd),
941            ("arguments", ordered_qmp(kwargs))
942        ))
943        log(full_cmd, filters, indent=indent)
944        result = self.qmp(cmd, **kwargs)
945        log(result, filters, indent=indent)
946        return result
947
948    # Returns None on success, and an error string on failure
949    def run_job(self, job: str, auto_finalize: bool = True,
950                auto_dismiss: bool = False,
951                pre_finalize: Optional[Callable[[], None]] = None,
952                cancel: bool = False, wait: float = 60.0,
953                filters: Iterable[Callable[[Any], Any]] = (),
954                ) -> Optional[str]:
955        """
956        run_job moves a job from creation through to dismissal.
957
958        :param job: String. ID of recently-launched job
959        :param auto_finalize: Bool. True if the job was launched with
960                              auto_finalize. Defaults to True.
961        :param auto_dismiss: Bool. True if the job was launched with
962                             auto_dismiss=True. Defaults to False.
963        :param pre_finalize: Callback. A callable that takes no arguments to be
964                             invoked prior to issuing job-finalize, if any.
965        :param cancel: Bool. When true, cancels the job after the pre_finalize
966                       callback.
967        :param wait: Float. Timeout value specifying how long to wait for any
968                     event, in seconds. Defaults to 60.0.
969        """
970        match_device = {'data': {'device': job}}
971        match_id = {'data': {'id': job}}
972        events = [
973            ('BLOCK_JOB_COMPLETED', match_device),
974            ('BLOCK_JOB_CANCELLED', match_device),
975            ('BLOCK_JOB_ERROR', match_device),
976            ('BLOCK_JOB_READY', match_device),
977            ('BLOCK_JOB_PENDING', match_id),
978            ('JOB_STATUS_CHANGE', match_id)
979        ]
980        error = None
981        while True:
982            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
983            if ev['event'] != 'JOB_STATUS_CHANGE':
984                log(ev, filters=filters)
985                continue
986            status = ev['data']['status']
987            if status == 'aborting':
988                result = self.qmp('query-jobs')
989                for j in result['return']:
990                    if j['id'] == job:
991                        error = j['error']
992                        log('Job failed: %s' % (j['error']), filters=filters)
993            elif status == 'ready':
994                self.qmp_log('job-complete', id=job, filters=filters)
995            elif status == 'pending' and not auto_finalize:
996                if pre_finalize:
997                    pre_finalize()
998                if cancel:
999                    self.qmp_log('job-cancel', id=job, filters=filters)
1000                else:
1001                    self.qmp_log('job-finalize', id=job, filters=filters)
1002            elif status == 'concluded' and not auto_dismiss:
1003                self.qmp_log('job-dismiss', id=job, filters=filters)
1004            elif status == 'null':
1005                return error
1006
1007    # Returns None on success, and an error string on failure
1008    def blockdev_create(self, options, job_id='job0', filters=None):
1009        if filters is None:
1010            filters = [filter_qmp_testfiles]
1011        result = self.qmp_log('blockdev-create', filters=filters,
1012                              job_id=job_id, options=options)
1013
1014        if 'return' in result:
1015            assert result['return'] == {}
1016            job_result = self.run_job(job_id, filters=filters)
1017        else:
1018            job_result = result['error']
1019
1020        log("")
1021        return job_result
1022
1023    def enable_migration_events(self, name):
1024        log('Enabling migration QMP events on %s...' % name)
1025        log(self.qmp('migrate-set-capabilities', capabilities=[
1026            {
1027                'capability': 'events',
1028                'state': True
1029            }
1030        ]))
1031
1032    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1033        while True:
1034            event = self.event_wait('MIGRATION')
1035            # We use the default timeout, and with a timeout, event_wait()
1036            # never returns None
1037            assert event
1038
1039            log(event, filters=[filter_qmp_event])
1040            if event['data']['status'] in ('completed', 'failed'):
1041                break
1042
1043        if event['data']['status'] == 'completed':
1044            # The event may occur in finish-migrate, so wait for the expected
1045            # post-migration runstate
1046            runstate = None
1047            while runstate != expect_runstate:
1048                runstate = self.qmp('query-status')['return']['status']
1049            return True
1050        else:
1051            return False
1052
1053    def node_info(self, node_name):
1054        nodes = self.qmp('query-named-block-nodes')
1055        for x in nodes['return']:
1056            if x['node-name'] == node_name:
1057                return x
1058        return None
1059
1060    def query_bitmaps(self):
1061        res = self.qmp("query-named-block-nodes")
1062        return {device['node-name']: device['dirty-bitmaps']
1063                for device in res['return'] if 'dirty-bitmaps' in device}
1064
1065    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1066        """
1067        get a specific bitmap from the object returned by query_bitmaps.
1068        :param recording: If specified, filter results by the specified value.
1069        :param bitmaps: If specified, use it instead of call query_bitmaps()
1070        """
1071        if bitmaps is None:
1072            bitmaps = self.query_bitmaps()
1073
1074        for bitmap in bitmaps[node_name]:
1075            if bitmap.get('name', '') == bitmap_name:
1076                if recording is None or bitmap.get('recording') == recording:
1077                    return bitmap
1078        return None
1079
1080    def check_bitmap_status(self, node_name, bitmap_name, fields):
1081        ret = self.get_bitmap(node_name, bitmap_name)
1082
1083        return fields.items() <= ret.items()
1084
1085    def assert_block_path(self, root, path, expected_node, graph=None):
1086        """
1087        Check whether the node under the given path in the block graph
1088        is @expected_node.
1089
1090        @root is the node name of the node where the @path is rooted.
1091
1092        @path is a string that consists of child names separated by
1093        slashes.  It must begin with a slash.
1094
1095        Examples for @root + @path:
1096          - root="qcow2-node", path="/backing/file"
1097          - root="quorum-node", path="/children.2/file"
1098
1099        Hypothetically, @path could be empty, in which case it would
1100        point to @root.  However, in practice this case is not useful
1101        and hence not allowed.
1102
1103        @expected_node may be None.  (All elements of the path but the
1104        leaf must still exist.)
1105
1106        @graph may be None or the result of an x-debug-query-block-graph
1107        call that has already been performed.
1108        """
1109        if graph is None:
1110            graph = self.qmp('x-debug-query-block-graph')['return']
1111
1112        iter_path = iter(path.split('/'))
1113
1114        # Must start with a /
1115        assert next(iter_path) == ''
1116
1117        node = next((node for node in graph['nodes'] if node['name'] == root),
1118                    None)
1119
1120        # An empty @path is not allowed, so the root node must be present
1121        assert node is not None, 'Root node %s not found' % root
1122
1123        for child_name in iter_path:
1124            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1125
1126            try:
1127                node_id = next(edge['child'] for edge in graph['edges']
1128                               if (edge['parent'] == node['id'] and
1129                                   edge['name'] == child_name))
1130
1131                node = next(node for node in graph['nodes']
1132                            if node['id'] == node_id)
1133
1134            except StopIteration:
1135                node = None
1136
1137        if node is None:
1138            assert expected_node is None, \
1139                   'No node found under %s (but expected %s)' % \
1140                   (path, expected_node)
1141        else:
1142            assert node['name'] == expected_node, \
1143                   'Found node %s under %s (but expected %s)' % \
1144                   (node['name'], path, expected_node)
1145
1146index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1147
1148class QMPTestCase(unittest.TestCase):
1149    '''Abstract base class for QMP test cases'''
1150
1151    def __init__(self, *args, **kwargs):
1152        super().__init__(*args, **kwargs)
1153        # Many users of this class set a VM property we rely on heavily
1154        # in the methods below.
1155        self.vm = None
1156
1157    def dictpath(self, d, path):
1158        '''Traverse a path in a nested dict'''
1159        for component in path.split('/'):
1160            m = index_re.match(component)
1161            if m:
1162                component, idx = m.groups()
1163                idx = int(idx)
1164
1165            if not isinstance(d, dict) or component not in d:
1166                self.fail(f'failed path traversal for "{path}" in "{d}"')
1167            d = d[component]
1168
1169            if m:
1170                if not isinstance(d, list):
1171                    self.fail(f'path component "{component}" in "{path}" '
1172                              f'is not a list in "{d}"')
1173                try:
1174                    d = d[idx]
1175                except IndexError:
1176                    self.fail(f'invalid index "{idx}" in path "{path}" '
1177                              f'in "{d}"')
1178        return d
1179
1180    def assert_qmp_absent(self, d, path):
1181        try:
1182            result = self.dictpath(d, path)
1183        except AssertionError:
1184            return
1185        self.fail('path "%s" has value "%s"' % (path, str(result)))
1186
1187    def assert_qmp(self, d, path, value):
1188        '''Assert that the value for a specific path in a QMP dict
1189           matches.  When given a list of values, assert that any of
1190           them matches.'''
1191
1192        result = self.dictpath(d, path)
1193
1194        # [] makes no sense as a list of valid values, so treat it as
1195        # an actual single value.
1196        if isinstance(value, list) and value != []:
1197            for v in value:
1198                if result == v:
1199                    return
1200            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1201        else:
1202            self.assertEqual(result, value,
1203                             '"%s" is "%s", expected "%s"'
1204                             % (path, str(result), str(value)))
1205
1206    def assert_no_active_block_jobs(self):
1207        result = self.vm.qmp('query-block-jobs')
1208        self.assert_qmp(result, 'return', [])
1209
1210    def assert_has_block_node(self, node_name=None, file_name=None):
1211        """Issue a query-named-block-nodes and assert node_name and/or
1212        file_name is present in the result"""
1213        def check_equal_or_none(a, b):
1214            return a is None or b is None or a == b
1215        assert node_name or file_name
1216        result = self.vm.qmp('query-named-block-nodes')
1217        for x in result["return"]:
1218            if check_equal_or_none(x.get("node-name"), node_name) and \
1219                    check_equal_or_none(x.get("file"), file_name):
1220                return
1221        self.fail("Cannot find %s %s in result:\n%s" %
1222                  (node_name, file_name, result))
1223
1224    def assert_json_filename_equal(self, json_filename, reference):
1225        '''Asserts that the given filename is a json: filename and that its
1226           content is equal to the given reference object'''
1227        self.assertEqual(json_filename[:5], 'json:')
1228        self.assertEqual(
1229            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1230            self.vm.flatten_qmp_object(reference)
1231        )
1232
1233    def cancel_and_wait(self, drive='drive0', force=False,
1234                        resume=False, wait=60.0):
1235        '''Cancel a block job and wait for it to finish, returning the event'''
1236        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1237        self.assert_qmp(result, 'return', {})
1238
1239        if resume:
1240            self.vm.resume_drive(drive)
1241
1242        cancelled = False
1243        result = None
1244        while not cancelled:
1245            for event in self.vm.get_qmp_events(wait=wait):
1246                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1247                   event['event'] == 'BLOCK_JOB_CANCELLED':
1248                    self.assert_qmp(event, 'data/device', drive)
1249                    result = event
1250                    cancelled = True
1251                elif event['event'] == 'JOB_STATUS_CHANGE':
1252                    self.assert_qmp(event, 'data/id', drive)
1253
1254
1255        self.assert_no_active_block_jobs()
1256        return result
1257
1258    def wait_until_completed(self, drive='drive0', check_offset=True,
1259                             wait=60.0, error=None):
1260        '''Wait for a block job to finish, returning the event'''
1261        while True:
1262            for event in self.vm.get_qmp_events(wait=wait):
1263                if event['event'] == 'BLOCK_JOB_COMPLETED':
1264                    self.assert_qmp(event, 'data/device', drive)
1265                    if error is None:
1266                        self.assert_qmp_absent(event, 'data/error')
1267                        if check_offset:
1268                            self.assert_qmp(event, 'data/offset',
1269                                            event['data']['len'])
1270                    else:
1271                        self.assert_qmp(event, 'data/error', error)
1272                    self.assert_no_active_block_jobs()
1273                    return event
1274                if event['event'] == 'JOB_STATUS_CHANGE':
1275                    self.assert_qmp(event, 'data/id', drive)
1276
1277    def wait_ready(self, drive='drive0'):
1278        """Wait until a BLOCK_JOB_READY event, and return the event."""
1279        return self.vm.events_wait([
1280            ('BLOCK_JOB_READY',
1281             {'data': {'type': 'mirror', 'device': drive}}),
1282            ('BLOCK_JOB_READY',
1283             {'data': {'type': 'commit', 'device': drive}})
1284        ])
1285
1286    def wait_ready_and_cancel(self, drive='drive0'):
1287        self.wait_ready(drive=drive)
1288        event = self.cancel_and_wait(drive=drive)
1289        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1290        self.assert_qmp(event, 'data/type', 'mirror')
1291        self.assert_qmp(event, 'data/offset', event['data']['len'])
1292
1293    def complete_and_wait(self, drive='drive0', wait_ready=True,
1294                          completion_error=None):
1295        '''Complete a block job and wait for it to finish'''
1296        if wait_ready:
1297            self.wait_ready(drive=drive)
1298
1299        result = self.vm.qmp('block-job-complete', device=drive)
1300        self.assert_qmp(result, 'return', {})
1301
1302        event = self.wait_until_completed(drive=drive, error=completion_error)
1303        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1304
1305    def pause_wait(self, job_id='job0'):
1306        with Timeout(3, "Timeout waiting for job to pause"):
1307            while True:
1308                result = self.vm.qmp('query-block-jobs')
1309                found = False
1310                for job in result['return']:
1311                    if job['device'] == job_id:
1312                        found = True
1313                        if job['paused'] and not job['busy']:
1314                            return job
1315                        break
1316                assert found
1317
1318    def pause_job(self, job_id='job0', wait=True):
1319        result = self.vm.qmp('block-job-pause', device=job_id)
1320        self.assert_qmp(result, 'return', {})
1321        if wait:
1322            return self.pause_wait(job_id)
1323        return result
1324
1325    def case_skip(self, reason):
1326        '''Skip this test case'''
1327        case_notrun(reason)
1328        self.skipTest(reason)
1329
1330
1331def notrun(reason):
1332    '''Skip this test suite'''
1333    # Each test in qemu-iotests has a number ("seq")
1334    seq = os.path.basename(sys.argv[0])
1335
1336    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1337            as outfile:
1338        outfile.write(reason + '\n')
1339    logger.warning("%s not run: %s", seq, reason)
1340    sys.exit(0)
1341
1342def case_notrun(reason):
1343    '''Mark this test case as not having been run (without actually
1344    skipping it, that is left to the caller).  See
1345    QMPTestCase.case_skip() for a variant that actually skips the
1346    current test case.'''
1347
1348    # Each test in qemu-iotests has a number ("seq")
1349    seq = os.path.basename(sys.argv[0])
1350
1351    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1352            as outfile:
1353        outfile.write('    [case not run] ' + reason + '\n')
1354
1355def _verify_image_format(supported_fmts: Sequence[str] = (),
1356                         unsupported_fmts: Sequence[str] = ()) -> None:
1357    if 'generic' in supported_fmts and \
1358            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1359        # similar to
1360        #   _supported_fmt generic
1361        # for bash tests
1362        supported_fmts = ()
1363
1364    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1365    if not_sup or (imgfmt in unsupported_fmts):
1366        notrun('not suitable for this image format: %s' % imgfmt)
1367
1368    if imgfmt == 'luks':
1369        verify_working_luks()
1370
1371def _verify_protocol(supported: Sequence[str] = (),
1372                     unsupported: Sequence[str] = ()) -> None:
1373    assert not (supported and unsupported)
1374
1375    if 'generic' in supported:
1376        return
1377
1378    not_sup = supported and (imgproto not in supported)
1379    if not_sup or (imgproto in unsupported):
1380        notrun('not suitable for this protocol: %s' % imgproto)
1381
1382def _verify_platform(supported: Sequence[str] = (),
1383                     unsupported: Sequence[str] = ()) -> None:
1384    if any((sys.platform.startswith(x) for x in unsupported)):
1385        notrun('not suitable for this OS: %s' % sys.platform)
1386
1387    if supported:
1388        if not any((sys.platform.startswith(x) for x in supported)):
1389            notrun('not suitable for this OS: %s' % sys.platform)
1390
1391def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1392    if supported_cache_modes and (cachemode not in supported_cache_modes):
1393        notrun('not suitable for this cache mode: %s' % cachemode)
1394
1395def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1396    if supported_aio_modes and (aiomode not in supported_aio_modes):
1397        notrun('not suitable for this aio mode: %s' % aiomode)
1398
1399def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1400    usf_list = list(set(required_formats) - set(supported_formats()))
1401    if usf_list:
1402        notrun(f'formats {usf_list} are not whitelisted')
1403
1404
1405def _verify_virtio_blk() -> None:
1406    out = qemu_pipe('-M', 'none', '-device', 'help')
1407    if 'virtio-blk' not in out:
1408        notrun('Missing virtio-blk in QEMU binary')
1409
1410def _verify_virtio_scsi_pci_or_ccw() -> None:
1411    out = qemu_pipe('-M', 'none', '-device', 'help')
1412    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1413        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1414
1415
1416def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1417    imgopts = os.environ.get('IMGOPTS')
1418    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1419    # but it supported only for bash tests. We don't have a concept of global
1420    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1421    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1422    unsup = list(unsupported) + ['$']
1423    if imgopts and any(x in imgopts for x in unsup):
1424        notrun(f'not suitable for this imgopts: {imgopts}')
1425
1426
1427def supports_quorum() -> bool:
1428    return 'quorum' in qemu_img('--help').stdout
1429
1430def verify_quorum():
1431    '''Skip test suite if quorum support is not available'''
1432    if not supports_quorum():
1433        notrun('quorum support missing')
1434
1435def has_working_luks() -> Tuple[bool, str]:
1436    """
1437    Check whether our LUKS driver can actually create images
1438    (this extends to LUKS encryption for qcow2).
1439
1440    If not, return the reason why.
1441    """
1442
1443    img_file = f'{test_dir}/luks-test.luks'
1444    res = qemu_img('create', '-f', 'luks',
1445                   '--object', luks_default_secret_object,
1446                   '-o', luks_default_key_secret_opt,
1447                   '-o', 'iter-time=10',
1448                   img_file, '1G',
1449                   check=False)
1450    try:
1451        os.remove(img_file)
1452    except OSError:
1453        pass
1454
1455    if res.returncode:
1456        reason = res.stdout
1457        for line in res.stdout.splitlines():
1458            if img_file + ':' in line:
1459                reason = line.split(img_file + ':', 1)[1].strip()
1460                break
1461
1462        return (False, reason)
1463    else:
1464        return (True, '')
1465
1466def verify_working_luks():
1467    """
1468    Skip test suite if LUKS does not work
1469    """
1470    (working, reason) = has_working_luks()
1471    if not working:
1472        notrun(reason)
1473
1474def supports_qcow2_zstd_compression() -> bool:
1475    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1476    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1477                   img_file, '0',
1478                   check=False)
1479    try:
1480        os.remove(img_file)
1481    except OSError:
1482        pass
1483
1484    if res.returncode == 1 and \
1485            "'compression-type' does not accept value 'zstd'" in res.stdout:
1486        return False
1487    else:
1488        return True
1489
1490def verify_qcow2_zstd_compression():
1491    if not supports_qcow2_zstd_compression():
1492        notrun('zstd compression not supported')
1493
1494def qemu_pipe(*args: str) -> str:
1495    """
1496    Run qemu with an option to print something and exit (e.g. a help option).
1497
1498    :return: QEMU's stdout output.
1499    """
1500    full_args = [qemu_prog] + qemu_opts + list(args)
1501    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1502    return output
1503
1504def supported_formats(read_only=False):
1505    '''Set 'read_only' to True to check ro-whitelist
1506       Otherwise, rw-whitelist is checked'''
1507
1508    if not hasattr(supported_formats, "formats"):
1509        supported_formats.formats = {}
1510
1511    if read_only not in supported_formats.formats:
1512        format_message = qemu_pipe("-drive", "format=help")
1513        line = 1 if read_only else 0
1514        supported_formats.formats[read_only] = \
1515            format_message.splitlines()[line].split(":")[1].split()
1516
1517    return supported_formats.formats[read_only]
1518
1519def skip_if_unsupported(required_formats=(), read_only=False):
1520    '''Skip Test Decorator
1521       Runs the test if all the required formats are whitelisted'''
1522    def skip_test_decorator(func):
1523        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1524                         **kwargs: Dict[str, Any]) -> None:
1525            if callable(required_formats):
1526                fmts = required_formats(test_case)
1527            else:
1528                fmts = required_formats
1529
1530            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1531            if usf_list:
1532                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1533                test_case.case_skip(msg)
1534            else:
1535                func(test_case, *args, **kwargs)
1536        return func_wrapper
1537    return skip_test_decorator
1538
1539def skip_for_formats(formats: Sequence[str] = ()) \
1540    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1541                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1542    '''Skip Test Decorator
1543       Skips the test for the given formats'''
1544    def skip_test_decorator(func):
1545        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1546                         **kwargs: Dict[str, Any]) -> None:
1547            if imgfmt in formats:
1548                msg = f'{test_case}: Skipped for format {imgfmt}'
1549                test_case.case_skip(msg)
1550            else:
1551                func(test_case, *args, **kwargs)
1552        return func_wrapper
1553    return skip_test_decorator
1554
1555def skip_if_user_is_root(func):
1556    '''Skip Test Decorator
1557       Runs the test only without root permissions'''
1558    def func_wrapper(*args, **kwargs):
1559        if os.getuid() == 0:
1560            case_notrun('{}: cannot be run as root'.format(args[0]))
1561            return None
1562        else:
1563            return func(*args, **kwargs)
1564    return func_wrapper
1565
1566# We need to filter out the time taken from the output so that
1567# qemu-iotest can reliably diff the results against master output,
1568# and hide skipped tests from the reference output.
1569
1570class ReproducibleTestResult(unittest.TextTestResult):
1571    def addSkip(self, test, reason):
1572        # Same as TextTestResult, but print dot instead of "s"
1573        unittest.TestResult.addSkip(self, test, reason)
1574        if self.showAll:
1575            self.stream.writeln("skipped {0!r}".format(reason))
1576        elif self.dots:
1577            self.stream.write(".")
1578            self.stream.flush()
1579
1580class ReproducibleStreamWrapper:
1581    def __init__(self, stream: TextIO):
1582        self.stream = stream
1583
1584    def __getattr__(self, attr):
1585        if attr in ('stream', '__getstate__'):
1586            raise AttributeError(attr)
1587        return getattr(self.stream, attr)
1588
1589    def write(self, arg=None):
1590        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1591        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1592        self.stream.write(arg)
1593
1594class ReproducibleTestRunner(unittest.TextTestRunner):
1595    def __init__(self, stream: Optional[TextIO] = None,
1596                 resultclass: Type[unittest.TestResult] =
1597                 ReproducibleTestResult,
1598                 **kwargs: Any) -> None:
1599        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1600        super().__init__(stream=rstream,           # type: ignore
1601                         descriptions=True,
1602                         resultclass=resultclass,
1603                         **kwargs)
1604
1605def execute_unittest(argv: List[str], debug: bool = False) -> None:
1606    """Executes unittests within the calling module."""
1607
1608    # Some tests have warnings, especially ResourceWarnings for unclosed
1609    # files and sockets.  Ignore them for now to ensure reproducibility of
1610    # the test output.
1611    unittest.main(argv=argv,
1612                  testRunner=ReproducibleTestRunner,
1613                  verbosity=2 if debug else 1,
1614                  warnings=None if sys.warnoptions else 'ignore')
1615
1616def execute_setup_common(supported_fmts: Sequence[str] = (),
1617                         supported_platforms: Sequence[str] = (),
1618                         supported_cache_modes: Sequence[str] = (),
1619                         supported_aio_modes: Sequence[str] = (),
1620                         unsupported_fmts: Sequence[str] = (),
1621                         supported_protocols: Sequence[str] = (),
1622                         unsupported_protocols: Sequence[str] = (),
1623                         required_fmts: Sequence[str] = (),
1624                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1625    """
1626    Perform necessary setup for either script-style or unittest-style tests.
1627
1628    :return: Bool; Whether or not debug mode has been requested via the CLI.
1629    """
1630    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1631
1632    debug = '-d' in sys.argv
1633    if debug:
1634        sys.argv.remove('-d')
1635    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1636
1637    _verify_image_format(supported_fmts, unsupported_fmts)
1638    _verify_protocol(supported_protocols, unsupported_protocols)
1639    _verify_platform(supported=supported_platforms)
1640    _verify_cache_mode(supported_cache_modes)
1641    _verify_aio_mode(supported_aio_modes)
1642    _verify_formats(required_fmts)
1643    _verify_virtio_blk()
1644    _verify_imgopts(unsupported_imgopts)
1645
1646    return debug
1647
1648def execute_test(*args, test_function=None, **kwargs):
1649    """Run either unittest or script-style tests."""
1650
1651    debug = execute_setup_common(*args, **kwargs)
1652    if not test_function:
1653        execute_unittest(sys.argv, debug)
1654    else:
1655        test_function()
1656
1657def activate_logging():
1658    """Activate iotests.log() output to stdout for script-style tests."""
1659    handler = logging.StreamHandler(stream=sys.stdout)
1660    formatter = logging.Formatter('%(message)s')
1661    handler.setFormatter(formatter)
1662    test_logger.addHandler(handler)
1663    test_logger.setLevel(logging.INFO)
1664    test_logger.propagate = False
1665
1666# This is called from script-style iotests without a single point of entry
1667def script_initialize(*args, **kwargs):
1668    """Initialize script-style tests without running any tests."""
1669    activate_logging()
1670    execute_setup_common(*args, **kwargs)
1671
1672# This is called from script-style iotests with a single point of entry
1673def script_main(test_function, *args, **kwargs):
1674    """Run script-style tests outside of the unittest framework"""
1675    activate_logging()
1676    execute_test(*args, test_function=test_function, **kwargs)
1677
1678# This is called from unittest style iotests
1679def main(*args, **kwargs):
1680    """Run tests using the unittest framework"""
1681    execute_test(*args, **kwargs)
1682