xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 21063bce)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd(cmd, args)
464
465    def stop(self, kill_signal=15):
466        self._p.send_signal(kill_signal)
467        self._p.wait()
468        self._p = None
469
470        if self._qmp:
471            self._qmp.close()
472
473        if self._qmpsock is not None:
474            try:
475                os.remove(self._qmpsock)
476            except OSError:
477                pass
478        try:
479            os.remove(self.pidfile)
480        except OSError:
481            pass
482
483    def __del__(self):
484        if self._p is not None:
485            self.stop(kill_signal=9)
486
487
488def qemu_nbd(*args):
489    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
490    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
491
492def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
493    '''Run qemu-nbd in daemon mode and return both the parent's exit code
494       and its output in case of an error'''
495    full_args = qemu_nbd_args + ['--fork'] + list(args)
496    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
497                                                   connect_stderr=False)
498    return returncode, output if returncode else ''
499
500def qemu_nbd_list_log(*args: str) -> str:
501    '''Run qemu-nbd to list remote exports'''
502    full_args = [qemu_nbd_prog, '-L'] + list(args)
503    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
504    log(output, filters=[filter_testfiles, filter_nbd_exports])
505    return output
506
507@contextmanager
508def qemu_nbd_popen(*args):
509    '''Context manager running qemu-nbd within the context'''
510    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
511
512    assert not os.path.exists(pid_file)
513
514    cmd = list(qemu_nbd_args)
515    cmd.extend(('--persistent', '--pid-file', pid_file))
516    cmd.extend(args)
517
518    log('Start NBD server')
519    with subprocess.Popen(cmd) as p:
520        try:
521            while not os.path.exists(pid_file):
522                if p.poll() is not None:
523                    raise RuntimeError(
524                        "qemu-nbd terminated with exit code {}: {}"
525                        .format(p.returncode, ' '.join(cmd)))
526
527                time.sleep(0.01)
528            yield
529        finally:
530            if os.path.exists(pid_file):
531                os.remove(pid_file)
532            log('Kill NBD server')
533            p.kill()
534            p.wait()
535
536def compare_images(img1: str, img2: str,
537                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
538    """
539    Compare two images with QEMU_IMG; return True if they are identical.
540
541    :raise CalledProcessError:
542        when qemu-img crashes or returns a status code of anything other
543        than 0 (identical) or 1 (different).
544    """
545    try:
546        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
547        return True
548    except subprocess.CalledProcessError as exc:
549        if exc.returncode == 1:
550            return False
551        raise
552
553def create_image(name, size):
554    '''Create a fully-allocated raw image with sector markers'''
555    with open(name, 'wb') as file:
556        i = 0
557        while i < size:
558            sector = struct.pack('>l504xl', i // 512, i // 512)
559            file.write(sector)
560            i = i + 512
561
562def image_size(img: str) -> int:
563    """Return image's virtual size"""
564    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
565    if not isinstance(value, int):
566        type_name = type(value).__name__
567        raise TypeError("Expected 'int' for 'virtual-size', "
568                        f"got '{value}' of type '{type_name}'")
569    return value
570
571def is_str(val):
572    return isinstance(val, str)
573
574test_dir_re = re.compile(r"%s" % test_dir)
575def filter_test_dir(msg):
576    return test_dir_re.sub("TEST_DIR", msg)
577
578win32_re = re.compile(r"\r")
579def filter_win32(msg):
580    return win32_re.sub("", msg)
581
582qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
583                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
584                        r"and [0-9\/.inf]* ops\/sec\)")
585def filter_qemu_io(msg):
586    msg = filter_win32(msg)
587    return qemu_io_re.sub("X ops; XX:XX:XX.X "
588                          "(XXX YYY/sec and XXX ops/sec)", msg)
589
590chown_re = re.compile(r"chown [0-9]+:[0-9]+")
591def filter_chown(msg):
592    return chown_re.sub("chown UID:GID", msg)
593
594def filter_qmp_event(event):
595    '''Filter a QMP event dict'''
596    event = dict(event)
597    if 'timestamp' in event:
598        event['timestamp']['seconds'] = 'SECS'
599        event['timestamp']['microseconds'] = 'USECS'
600    return event
601
602def filter_qmp(qmsg, filter_fn):
603    '''Given a string filter, filter a QMP object's values.
604    filter_fn takes a (key, value) pair.'''
605    # Iterate through either lists or dicts;
606    if isinstance(qmsg, list):
607        items = enumerate(qmsg)
608    elif isinstance(qmsg, dict):
609        items = qmsg.items()
610    else:
611        return filter_fn(None, qmsg)
612
613    for k, v in items:
614        if isinstance(v, (dict, list)):
615            qmsg[k] = filter_qmp(v, filter_fn)
616        else:
617            qmsg[k] = filter_fn(k, v)
618    return qmsg
619
620def filter_testfiles(msg):
621    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
622    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
623    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
624
625def filter_qmp_testfiles(qmsg):
626    def _filter(_key, value):
627        if is_str(value):
628            return filter_testfiles(value)
629        return value
630    return filter_qmp(qmsg, _filter)
631
632def filter_virtio_scsi(output: str) -> str:
633    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
634
635def filter_qmp_virtio_scsi(qmsg):
636    def _filter(_key, value):
637        if is_str(value):
638            return filter_virtio_scsi(value)
639        return value
640    return filter_qmp(qmsg, _filter)
641
642def filter_generated_node_ids(msg):
643    return re.sub("#block[0-9]+", "NODE_NAME", msg)
644
645def filter_img_info(output: str, filename: str,
646                    drop_child_info: bool = True) -> str:
647    lines = []
648    drop_indented = False
649    for line in output.split('\n'):
650        if 'disk size' in line or 'actual-size' in line:
651            continue
652
653        # Drop child node info
654        if drop_indented:
655            if line.startswith(' '):
656                continue
657            drop_indented = False
658        if drop_child_info and "Child node '/" in line:
659            drop_indented = True
660            continue
661
662        line = line.replace(filename, 'TEST_IMG')
663        line = filter_testfiles(line)
664        line = line.replace(imgfmt, 'IMGFMT')
665        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
666        line = re.sub('uuid: [-a-f0-9]+',
667                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
668                      line)
669        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
670        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
671                      line)
672        lines.append(line)
673    return '\n'.join(lines)
674
675def filter_imgfmt(msg):
676    return msg.replace(imgfmt, 'IMGFMT')
677
678def filter_qmp_imgfmt(qmsg):
679    def _filter(_key, value):
680        if is_str(value):
681            return filter_imgfmt(value)
682        return value
683    return filter_qmp(qmsg, _filter)
684
685def filter_nbd_exports(output: str) -> str:
686    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
687
688
689Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
690
691def log(msg: Msg,
692        filters: Iterable[Callable[[Msg], Msg]] = (),
693        indent: Optional[int] = None) -> None:
694    """
695    Logs either a string message or a JSON serializable message (like QMP).
696    If indent is provided, JSON serializable messages are pretty-printed.
697    """
698    for flt in filters:
699        msg = flt(msg)
700    if isinstance(msg, (dict, list)):
701        # Don't sort if it's already sorted
702        do_sort = not isinstance(msg, OrderedDict)
703        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
704    else:
705        test_logger.info(msg)
706
707class Timeout:
708    def __init__(self, seconds, errmsg="Timeout"):
709        self.seconds = seconds
710        self.errmsg = errmsg
711    def __enter__(self):
712        if qemu_gdb or qemu_valgrind:
713            return self
714        signal.signal(signal.SIGALRM, self.timeout)
715        signal.setitimer(signal.ITIMER_REAL, self.seconds)
716        return self
717    def __exit__(self, exc_type, value, traceback):
718        if qemu_gdb or qemu_valgrind:
719            return False
720        signal.setitimer(signal.ITIMER_REAL, 0)
721        return False
722    def timeout(self, signum, frame):
723        raise TimeoutError(self.errmsg)
724
725def file_pattern(name):
726    return "{0}-{1}".format(os.getpid(), name)
727
728class FilePath:
729    """
730    Context manager generating multiple file names. The generated files are
731    removed when exiting the context.
732
733    Example usage:
734
735        with FilePath('a.img', 'b.img') as (img_a, img_b):
736            # Use img_a and img_b here...
737
738        # a.img and b.img are automatically removed here.
739
740    By default images are created in iotests.test_dir. To create sockets use
741    iotests.sock_dir:
742
743       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
744
745    For convenience, calling with one argument yields a single file instead of
746    a tuple with one item.
747
748    """
749    def __init__(self, *names, base_dir=test_dir):
750        self.paths = [os.path.join(base_dir, file_pattern(name))
751                      for name in names]
752
753    def __enter__(self):
754        if len(self.paths) == 1:
755            return self.paths[0]
756        else:
757            return self.paths
758
759    def __exit__(self, exc_type, exc_val, exc_tb):
760        for path in self.paths:
761            try:
762                os.remove(path)
763            except OSError:
764                pass
765        return False
766
767
768def try_remove(img):
769    try:
770        os.remove(img)
771    except OSError:
772        pass
773
774def file_path_remover():
775    for path in reversed(file_path_remover.paths):
776        try_remove(path)
777
778
779def file_path(*names, base_dir=test_dir):
780    ''' Another way to get auto-generated filename that cleans itself up.
781
782    Use is as simple as:
783
784    img_a, img_b = file_path('a.img', 'b.img')
785    sock = file_path('socket')
786    '''
787
788    if not hasattr(file_path_remover, 'paths'):
789        file_path_remover.paths = []
790        atexit.register(file_path_remover)
791
792    paths = []
793    for name in names:
794        filename = file_pattern(name)
795        path = os.path.join(base_dir, filename)
796        file_path_remover.paths.append(path)
797        paths.append(path)
798
799    return paths[0] if len(paths) == 1 else paths
800
801def remote_filename(path):
802    if imgproto == 'file':
803        return path
804    elif imgproto == 'ssh':
805        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
806    else:
807        raise ValueError("Protocol %s not supported" % (imgproto))
808
809class VM(qtest.QEMUQtestMachine):
810    '''A QEMU VM'''
811
812    def __init__(self, path_suffix=''):
813        name = "qemu%s-%d" % (path_suffix, os.getpid())
814        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
815        if qemu_gdb and qemu_valgrind:
816            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
817            sys.exit(1)
818        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
819        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
820                         name=name,
821                         base_temp_dir=test_dir,
822                         sock_dir=sock_dir, qmp_timer=timer)
823        self._num_drives = 0
824
825    def _post_shutdown(self) -> None:
826        super()._post_shutdown()
827        if not qemu_valgrind or not self._popen:
828            return
829        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
830        if self.exitcode() == 99:
831            with open(valgrind_filename, encoding='utf-8') as f:
832                print(f.read())
833        else:
834            os.remove(valgrind_filename)
835
836    def _pre_launch(self) -> None:
837        super()._pre_launch()
838        if qemu_print:
839            # set QEMU binary output to stdout
840            self._close_qemu_log_file()
841
842    def add_object(self, opts):
843        self._args.append('-object')
844        self._args.append(opts)
845        return self
846
847    def add_device(self, opts):
848        self._args.append('-device')
849        self._args.append(opts)
850        return self
851
852    def add_drive_raw(self, opts):
853        self._args.append('-drive')
854        self._args.append(opts)
855        return self
856
857    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
858        '''Add a virtio-blk drive to the VM'''
859        options = ['if=%s' % interface,
860                   'id=drive%d' % self._num_drives]
861
862        if path is not None:
863            options.append('file=%s' % path)
864            options.append('format=%s' % img_format)
865            options.append('cache=%s' % cachemode)
866            options.append('aio=%s' % aiomode)
867
868        if opts:
869            options.append(opts)
870
871        if img_format == 'luks' and 'key-secret' not in opts:
872            # default luks support
873            if luks_default_secret_object not in self._args:
874                self.add_object(luks_default_secret_object)
875
876            options.append(luks_default_key_secret_opt)
877
878        self._args.append('-drive')
879        self._args.append(','.join(options))
880        self._num_drives += 1
881        return self
882
883    def add_blockdev(self, opts):
884        self._args.append('-blockdev')
885        if isinstance(opts, str):
886            self._args.append(opts)
887        else:
888            self._args.append(','.join(opts))
889        return self
890
891    def add_incoming(self, addr):
892        self._args.append('-incoming')
893        self._args.append(addr)
894        return self
895
896    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
897        cmd = 'human-monitor-command'
898        kwargs: Dict[str, Any] = {'command-line': command_line}
899        if use_log:
900            return self.qmp_log(cmd, **kwargs)
901        else:
902            return self.qmp(cmd, **kwargs)
903
904    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
905        """Pause drive r/w operations"""
906        if not event:
907            self.pause_drive(drive, "read_aio")
908            self.pause_drive(drive, "write_aio")
909            return
910        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
911
912    def resume_drive(self, drive: str) -> None:
913        """Resume drive r/w operations"""
914        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
915
916    def hmp_qemu_io(self, drive: str, cmd: str,
917                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
918        """Write to a given drive using an HMP command"""
919        d = '-d ' if qdev else ''
920        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
921
922    def flatten_qmp_object(self, obj, output=None, basestr=''):
923        if output is None:
924            output = {}
925        if isinstance(obj, list):
926            for i, item in enumerate(obj):
927                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
928        elif isinstance(obj, dict):
929            for key in obj:
930                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
931        else:
932            output[basestr[:-1]] = obj # Strip trailing '.'
933        return output
934
935    def qmp_to_opts(self, obj):
936        obj = self.flatten_qmp_object(obj)
937        output_list = []
938        for key in obj:
939            output_list += [key + '=' + obj[key]]
940        return ','.join(output_list)
941
942    def get_qmp_events_filtered(self, wait=60.0):
943        result = []
944        for ev in self.get_qmp_events(wait=wait):
945            result.append(filter_qmp_event(ev))
946        return result
947
948    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
949        full_cmd = OrderedDict((
950            ("execute", cmd),
951            ("arguments", ordered_qmp(kwargs))
952        ))
953        log(full_cmd, filters, indent=indent)
954        result = self.qmp(cmd, **kwargs)
955        log(result, filters, indent=indent)
956        return result
957
958    # Returns None on success, and an error string on failure
959    def run_job(self, job: str, auto_finalize: bool = True,
960                auto_dismiss: bool = False,
961                pre_finalize: Optional[Callable[[], None]] = None,
962                cancel: bool = False, wait: float = 60.0,
963                filters: Iterable[Callable[[Any], Any]] = (),
964                ) -> Optional[str]:
965        """
966        run_job moves a job from creation through to dismissal.
967
968        :param job: String. ID of recently-launched job
969        :param auto_finalize: Bool. True if the job was launched with
970                              auto_finalize. Defaults to True.
971        :param auto_dismiss: Bool. True if the job was launched with
972                             auto_dismiss=True. Defaults to False.
973        :param pre_finalize: Callback. A callable that takes no arguments to be
974                             invoked prior to issuing job-finalize, if any.
975        :param cancel: Bool. When true, cancels the job after the pre_finalize
976                       callback.
977        :param wait: Float. Timeout value specifying how long to wait for any
978                     event, in seconds. Defaults to 60.0.
979        """
980        match_device = {'data': {'device': job}}
981        match_id = {'data': {'id': job}}
982        events = [
983            ('BLOCK_JOB_COMPLETED', match_device),
984            ('BLOCK_JOB_CANCELLED', match_device),
985            ('BLOCK_JOB_ERROR', match_device),
986            ('BLOCK_JOB_READY', match_device),
987            ('BLOCK_JOB_PENDING', match_id),
988            ('JOB_STATUS_CHANGE', match_id)
989        ]
990        error = None
991        while True:
992            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
993            if ev['event'] != 'JOB_STATUS_CHANGE':
994                log(ev, filters=filters)
995                continue
996            status = ev['data']['status']
997            if status == 'aborting':
998                result = self.qmp('query-jobs')
999                for j in result['return']:
1000                    if j['id'] == job:
1001                        error = j['error']
1002                        log('Job failed: %s' % (j['error']), filters=filters)
1003            elif status == 'ready':
1004                self.qmp_log('job-complete', id=job, filters=filters)
1005            elif status == 'pending' and not auto_finalize:
1006                if pre_finalize:
1007                    pre_finalize()
1008                if cancel:
1009                    self.qmp_log('job-cancel', id=job, filters=filters)
1010                else:
1011                    self.qmp_log('job-finalize', id=job, filters=filters)
1012            elif status == 'concluded' and not auto_dismiss:
1013                self.qmp_log('job-dismiss', id=job, filters=filters)
1014            elif status == 'null':
1015                return error
1016
1017    # Returns None on success, and an error string on failure
1018    def blockdev_create(self, options, job_id='job0', filters=None):
1019        if filters is None:
1020            filters = [filter_qmp_testfiles]
1021        result = self.qmp_log('blockdev-create', filters=filters,
1022                              job_id=job_id, options=options)
1023
1024        if 'return' in result:
1025            assert result['return'] == {}
1026            job_result = self.run_job(job_id, filters=filters)
1027        else:
1028            job_result = result['error']
1029
1030        log("")
1031        return job_result
1032
1033    def enable_migration_events(self, name):
1034        log('Enabling migration QMP events on %s...' % name)
1035        log(self.qmp('migrate-set-capabilities', capabilities=[
1036            {
1037                'capability': 'events',
1038                'state': True
1039            }
1040        ]))
1041
1042    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1043        while True:
1044            event = self.event_wait('MIGRATION')
1045            # We use the default timeout, and with a timeout, event_wait()
1046            # never returns None
1047            assert event
1048
1049            log(event, filters=[filter_qmp_event])
1050            if event['data']['status'] in ('completed', 'failed'):
1051                break
1052
1053        if event['data']['status'] == 'completed':
1054            # The event may occur in finish-migrate, so wait for the expected
1055            # post-migration runstate
1056            runstate = None
1057            while runstate != expect_runstate:
1058                runstate = self.qmp('query-status')['return']['status']
1059            return True
1060        else:
1061            return False
1062
1063    def node_info(self, node_name):
1064        nodes = self.qmp('query-named-block-nodes')
1065        for x in nodes['return']:
1066            if x['node-name'] == node_name:
1067                return x
1068        return None
1069
1070    def query_bitmaps(self):
1071        res = self.qmp("query-named-block-nodes")
1072        return {device['node-name']: device['dirty-bitmaps']
1073                for device in res['return'] if 'dirty-bitmaps' in device}
1074
1075    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1076        """
1077        get a specific bitmap from the object returned by query_bitmaps.
1078        :param recording: If specified, filter results by the specified value.
1079        :param bitmaps: If specified, use it instead of call query_bitmaps()
1080        """
1081        if bitmaps is None:
1082            bitmaps = self.query_bitmaps()
1083
1084        for bitmap in bitmaps[node_name]:
1085            if bitmap.get('name', '') == bitmap_name:
1086                if recording is None or bitmap.get('recording') == recording:
1087                    return bitmap
1088        return None
1089
1090    def check_bitmap_status(self, node_name, bitmap_name, fields):
1091        ret = self.get_bitmap(node_name, bitmap_name)
1092
1093        return fields.items() <= ret.items()
1094
1095    def assert_block_path(self, root, path, expected_node, graph=None):
1096        """
1097        Check whether the node under the given path in the block graph
1098        is @expected_node.
1099
1100        @root is the node name of the node where the @path is rooted.
1101
1102        @path is a string that consists of child names separated by
1103        slashes.  It must begin with a slash.
1104
1105        Examples for @root + @path:
1106          - root="qcow2-node", path="/backing/file"
1107          - root="quorum-node", path="/children.2/file"
1108
1109        Hypothetically, @path could be empty, in which case it would
1110        point to @root.  However, in practice this case is not useful
1111        and hence not allowed.
1112
1113        @expected_node may be None.  (All elements of the path but the
1114        leaf must still exist.)
1115
1116        @graph may be None or the result of an x-debug-query-block-graph
1117        call that has already been performed.
1118        """
1119        if graph is None:
1120            graph = self.qmp('x-debug-query-block-graph')['return']
1121
1122        iter_path = iter(path.split('/'))
1123
1124        # Must start with a /
1125        assert next(iter_path) == ''
1126
1127        node = next((node for node in graph['nodes'] if node['name'] == root),
1128                    None)
1129
1130        # An empty @path is not allowed, so the root node must be present
1131        assert node is not None, 'Root node %s not found' % root
1132
1133        for child_name in iter_path:
1134            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1135
1136            try:
1137                node_id = next(edge['child'] for edge in graph['edges']
1138                               if (edge['parent'] == node['id'] and
1139                                   edge['name'] == child_name))
1140
1141                node = next(node for node in graph['nodes']
1142                            if node['id'] == node_id)
1143
1144            except StopIteration:
1145                node = None
1146
1147        if node is None:
1148            assert expected_node is None, \
1149                   'No node found under %s (but expected %s)' % \
1150                   (path, expected_node)
1151        else:
1152            assert node['name'] == expected_node, \
1153                   'Found node %s under %s (but expected %s)' % \
1154                   (node['name'], path, expected_node)
1155
1156index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1157
1158class QMPTestCase(unittest.TestCase):
1159    '''Abstract base class for QMP test cases'''
1160
1161    def __init__(self, *args, **kwargs):
1162        super().__init__(*args, **kwargs)
1163        # Many users of this class set a VM property we rely on heavily
1164        # in the methods below.
1165        self.vm = None
1166
1167    def dictpath(self, d, path):
1168        '''Traverse a path in a nested dict'''
1169        for component in path.split('/'):
1170            m = index_re.match(component)
1171            if m:
1172                component, idx = m.groups()
1173                idx = int(idx)
1174
1175            if not isinstance(d, dict) or component not in d:
1176                self.fail(f'failed path traversal for "{path}" in "{d}"')
1177            d = d[component]
1178
1179            if m:
1180                if not isinstance(d, list):
1181                    self.fail(f'path component "{component}" in "{path}" '
1182                              f'is not a list in "{d}"')
1183                try:
1184                    d = d[idx]
1185                except IndexError:
1186                    self.fail(f'invalid index "{idx}" in path "{path}" '
1187                              f'in "{d}"')
1188        return d
1189
1190    def assert_qmp_absent(self, d, path):
1191        try:
1192            result = self.dictpath(d, path)
1193        except AssertionError:
1194            return
1195        self.fail('path "%s" has value "%s"' % (path, str(result)))
1196
1197    def assert_qmp(self, d, path, value):
1198        '''Assert that the value for a specific path in a QMP dict
1199           matches.  When given a list of values, assert that any of
1200           them matches.'''
1201
1202        result = self.dictpath(d, path)
1203
1204        # [] makes no sense as a list of valid values, so treat it as
1205        # an actual single value.
1206        if isinstance(value, list) and value != []:
1207            for v in value:
1208                if result == v:
1209                    return
1210            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1211        else:
1212            self.assertEqual(result, value,
1213                             '"%s" is "%s", expected "%s"'
1214                             % (path, str(result), str(value)))
1215
1216    def assert_no_active_block_jobs(self):
1217        result = self.vm.qmp('query-block-jobs')
1218        self.assert_qmp(result, 'return', [])
1219
1220    def assert_has_block_node(self, node_name=None, file_name=None):
1221        """Issue a query-named-block-nodes and assert node_name and/or
1222        file_name is present in the result"""
1223        def check_equal_or_none(a, b):
1224            return a is None or b is None or a == b
1225        assert node_name or file_name
1226        result = self.vm.qmp('query-named-block-nodes')
1227        for x in result["return"]:
1228            if check_equal_or_none(x.get("node-name"), node_name) and \
1229                    check_equal_or_none(x.get("file"), file_name):
1230                return
1231        self.fail("Cannot find %s %s in result:\n%s" %
1232                  (node_name, file_name, result))
1233
1234    def assert_json_filename_equal(self, json_filename, reference):
1235        '''Asserts that the given filename is a json: filename and that its
1236           content is equal to the given reference object'''
1237        self.assertEqual(json_filename[:5], 'json:')
1238        self.assertEqual(
1239            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1240            self.vm.flatten_qmp_object(reference)
1241        )
1242
1243    def cancel_and_wait(self, drive='drive0', force=False,
1244                        resume=False, wait=60.0):
1245        '''Cancel a block job and wait for it to finish, returning the event'''
1246        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1247        self.assert_qmp(result, 'return', {})
1248
1249        if resume:
1250            self.vm.resume_drive(drive)
1251
1252        cancelled = False
1253        result = None
1254        while not cancelled:
1255            for event in self.vm.get_qmp_events(wait=wait):
1256                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1257                   event['event'] == 'BLOCK_JOB_CANCELLED':
1258                    self.assert_qmp(event, 'data/device', drive)
1259                    result = event
1260                    cancelled = True
1261                elif event['event'] == 'JOB_STATUS_CHANGE':
1262                    self.assert_qmp(event, 'data/id', drive)
1263
1264
1265        self.assert_no_active_block_jobs()
1266        return result
1267
1268    def wait_until_completed(self, drive='drive0', check_offset=True,
1269                             wait=60.0, error=None):
1270        '''Wait for a block job to finish, returning the event'''
1271        while True:
1272            for event in self.vm.get_qmp_events(wait=wait):
1273                if event['event'] == 'BLOCK_JOB_COMPLETED':
1274                    self.assert_qmp(event, 'data/device', drive)
1275                    if error is None:
1276                        self.assert_qmp_absent(event, 'data/error')
1277                        if check_offset:
1278                            self.assert_qmp(event, 'data/offset',
1279                                            event['data']['len'])
1280                    else:
1281                        self.assert_qmp(event, 'data/error', error)
1282                    self.assert_no_active_block_jobs()
1283                    return event
1284                if event['event'] == 'JOB_STATUS_CHANGE':
1285                    self.assert_qmp(event, 'data/id', drive)
1286
1287    def wait_ready(self, drive='drive0'):
1288        """Wait until a BLOCK_JOB_READY event, and return the event."""
1289        return self.vm.events_wait([
1290            ('BLOCK_JOB_READY',
1291             {'data': {'type': 'mirror', 'device': drive}}),
1292            ('BLOCK_JOB_READY',
1293             {'data': {'type': 'commit', 'device': drive}})
1294        ])
1295
1296    def wait_ready_and_cancel(self, drive='drive0'):
1297        self.wait_ready(drive=drive)
1298        event = self.cancel_and_wait(drive=drive)
1299        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1300        self.assert_qmp(event, 'data/type', 'mirror')
1301        self.assert_qmp(event, 'data/offset', event['data']['len'])
1302
1303    def complete_and_wait(self, drive='drive0', wait_ready=True,
1304                          completion_error=None):
1305        '''Complete a block job and wait for it to finish'''
1306        if wait_ready:
1307            self.wait_ready(drive=drive)
1308
1309        result = self.vm.qmp('block-job-complete', device=drive)
1310        self.assert_qmp(result, 'return', {})
1311
1312        event = self.wait_until_completed(drive=drive, error=completion_error)
1313        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1314
1315    def pause_wait(self, job_id='job0'):
1316        with Timeout(3, "Timeout waiting for job to pause"):
1317            while True:
1318                result = self.vm.qmp('query-block-jobs')
1319                found = False
1320                for job in result['return']:
1321                    if job['device'] == job_id:
1322                        found = True
1323                        if job['paused'] and not job['busy']:
1324                            return job
1325                        break
1326                assert found
1327
1328    def pause_job(self, job_id='job0', wait=True):
1329        result = self.vm.qmp('block-job-pause', device=job_id)
1330        self.assert_qmp(result, 'return', {})
1331        if wait:
1332            return self.pause_wait(job_id)
1333        return result
1334
1335    def case_skip(self, reason):
1336        '''Skip this test case'''
1337        case_notrun(reason)
1338        self.skipTest(reason)
1339
1340
1341def notrun(reason):
1342    '''Skip this test suite'''
1343    # Each test in qemu-iotests has a number ("seq")
1344    seq = os.path.basename(sys.argv[0])
1345
1346    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1347            as outfile:
1348        outfile.write(reason + '\n')
1349    logger.warning("%s not run: %s", seq, reason)
1350    sys.exit(0)
1351
1352def case_notrun(reason):
1353    '''Mark this test case as not having been run (without actually
1354    skipping it, that is left to the caller).  See
1355    QMPTestCase.case_skip() for a variant that actually skips the
1356    current test case.'''
1357
1358    # Each test in qemu-iotests has a number ("seq")
1359    seq = os.path.basename(sys.argv[0])
1360
1361    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1362            as outfile:
1363        outfile.write('    [case not run] ' + reason + '\n')
1364
1365def _verify_image_format(supported_fmts: Sequence[str] = (),
1366                         unsupported_fmts: Sequence[str] = ()) -> None:
1367    if 'generic' in supported_fmts and \
1368            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1369        # similar to
1370        #   _supported_fmt generic
1371        # for bash tests
1372        supported_fmts = ()
1373
1374    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1375    if not_sup or (imgfmt in unsupported_fmts):
1376        notrun('not suitable for this image format: %s' % imgfmt)
1377
1378    if imgfmt == 'luks':
1379        verify_working_luks()
1380
1381def _verify_protocol(supported: Sequence[str] = (),
1382                     unsupported: Sequence[str] = ()) -> None:
1383    assert not (supported and unsupported)
1384
1385    if 'generic' in supported:
1386        return
1387
1388    not_sup = supported and (imgproto not in supported)
1389    if not_sup or (imgproto in unsupported):
1390        notrun('not suitable for this protocol: %s' % imgproto)
1391
1392def _verify_platform(supported: Sequence[str] = (),
1393                     unsupported: Sequence[str] = ()) -> None:
1394    if any((sys.platform.startswith(x) for x in unsupported)):
1395        notrun('not suitable for this OS: %s' % sys.platform)
1396
1397    if supported:
1398        if not any((sys.platform.startswith(x) for x in supported)):
1399            notrun('not suitable for this OS: %s' % sys.platform)
1400
1401def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1402    if supported_cache_modes and (cachemode not in supported_cache_modes):
1403        notrun('not suitable for this cache mode: %s' % cachemode)
1404
1405def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1406    if supported_aio_modes and (aiomode not in supported_aio_modes):
1407        notrun('not suitable for this aio mode: %s' % aiomode)
1408
1409def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1410    usf_list = list(set(required_formats) - set(supported_formats()))
1411    if usf_list:
1412        notrun(f'formats {usf_list} are not whitelisted')
1413
1414
1415def _verify_virtio_blk() -> None:
1416    out = qemu_pipe('-M', 'none', '-device', 'help')
1417    if 'virtio-blk' not in out:
1418        notrun('Missing virtio-blk in QEMU binary')
1419
1420def _verify_virtio_scsi_pci_or_ccw() -> None:
1421    out = qemu_pipe('-M', 'none', '-device', 'help')
1422    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1423        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1424
1425
1426def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1427    imgopts = os.environ.get('IMGOPTS')
1428    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1429    # but it supported only for bash tests. We don't have a concept of global
1430    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1431    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1432    unsup = list(unsupported) + ['$']
1433    if imgopts and any(x in imgopts for x in unsup):
1434        notrun(f'not suitable for this imgopts: {imgopts}')
1435
1436
1437def supports_quorum() -> bool:
1438    return 'quorum' in qemu_img('--help').stdout
1439
1440def verify_quorum():
1441    '''Skip test suite if quorum support is not available'''
1442    if not supports_quorum():
1443        notrun('quorum support missing')
1444
1445def has_working_luks() -> Tuple[bool, str]:
1446    """
1447    Check whether our LUKS driver can actually create images
1448    (this extends to LUKS encryption for qcow2).
1449
1450    If not, return the reason why.
1451    """
1452
1453    img_file = f'{test_dir}/luks-test.luks'
1454    res = qemu_img('create', '-f', 'luks',
1455                   '--object', luks_default_secret_object,
1456                   '-o', luks_default_key_secret_opt,
1457                   '-o', 'iter-time=10',
1458                   img_file, '1G',
1459                   check=False)
1460    try:
1461        os.remove(img_file)
1462    except OSError:
1463        pass
1464
1465    if res.returncode:
1466        reason = res.stdout
1467        for line in res.stdout.splitlines():
1468            if img_file + ':' in line:
1469                reason = line.split(img_file + ':', 1)[1].strip()
1470                break
1471
1472        return (False, reason)
1473    else:
1474        return (True, '')
1475
1476def verify_working_luks():
1477    """
1478    Skip test suite if LUKS does not work
1479    """
1480    (working, reason) = has_working_luks()
1481    if not working:
1482        notrun(reason)
1483
1484def supports_qcow2_zstd_compression() -> bool:
1485    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1486    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1487                   img_file, '0',
1488                   check=False)
1489    try:
1490        os.remove(img_file)
1491    except OSError:
1492        pass
1493
1494    if res.returncode == 1 and \
1495            "'compression-type' does not accept value 'zstd'" in res.stdout:
1496        return False
1497    else:
1498        return True
1499
1500def verify_qcow2_zstd_compression():
1501    if not supports_qcow2_zstd_compression():
1502        notrun('zstd compression not supported')
1503
1504def qemu_pipe(*args: str) -> str:
1505    """
1506    Run qemu with an option to print something and exit (e.g. a help option).
1507
1508    :return: QEMU's stdout output.
1509    """
1510    full_args = [qemu_prog] + qemu_opts + list(args)
1511    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1512    return output
1513
1514def supported_formats(read_only=False):
1515    '''Set 'read_only' to True to check ro-whitelist
1516       Otherwise, rw-whitelist is checked'''
1517
1518    if not hasattr(supported_formats, "formats"):
1519        supported_formats.formats = {}
1520
1521    if read_only not in supported_formats.formats:
1522        format_message = qemu_pipe("-drive", "format=help")
1523        line = 1 if read_only else 0
1524        supported_formats.formats[read_only] = \
1525            format_message.splitlines()[line].split(":")[1].split()
1526
1527    return supported_formats.formats[read_only]
1528
1529def skip_if_unsupported(required_formats=(), read_only=False):
1530    '''Skip Test Decorator
1531       Runs the test if all the required formats are whitelisted'''
1532    def skip_test_decorator(func):
1533        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1534                         **kwargs: Dict[str, Any]) -> None:
1535            if callable(required_formats):
1536                fmts = required_formats(test_case)
1537            else:
1538                fmts = required_formats
1539
1540            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1541            if usf_list:
1542                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1543                test_case.case_skip(msg)
1544            else:
1545                func(test_case, *args, **kwargs)
1546        return func_wrapper
1547    return skip_test_decorator
1548
1549def skip_for_formats(formats: Sequence[str] = ()) \
1550    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1551                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1552    '''Skip Test Decorator
1553       Skips the test for the given formats'''
1554    def skip_test_decorator(func):
1555        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1556                         **kwargs: Dict[str, Any]) -> None:
1557            if imgfmt in formats:
1558                msg = f'{test_case}: Skipped for format {imgfmt}'
1559                test_case.case_skip(msg)
1560            else:
1561                func(test_case, *args, **kwargs)
1562        return func_wrapper
1563    return skip_test_decorator
1564
1565def skip_if_user_is_root(func):
1566    '''Skip Test Decorator
1567       Runs the test only without root permissions'''
1568    def func_wrapper(*args, **kwargs):
1569        if os.getuid() == 0:
1570            case_notrun('{}: cannot be run as root'.format(args[0]))
1571            return None
1572        else:
1573            return func(*args, **kwargs)
1574    return func_wrapper
1575
1576# We need to filter out the time taken from the output so that
1577# qemu-iotest can reliably diff the results against master output,
1578# and hide skipped tests from the reference output.
1579
1580class ReproducibleTestResult(unittest.TextTestResult):
1581    def addSkip(self, test, reason):
1582        # Same as TextTestResult, but print dot instead of "s"
1583        unittest.TestResult.addSkip(self, test, reason)
1584        if self.showAll:
1585            self.stream.writeln("skipped {0!r}".format(reason))
1586        elif self.dots:
1587            self.stream.write(".")
1588            self.stream.flush()
1589
1590class ReproducibleStreamWrapper:
1591    def __init__(self, stream: TextIO):
1592        self.stream = stream
1593
1594    def __getattr__(self, attr):
1595        if attr in ('stream', '__getstate__'):
1596            raise AttributeError(attr)
1597        return getattr(self.stream, attr)
1598
1599    def write(self, arg=None):
1600        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1601        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1602        self.stream.write(arg)
1603
1604class ReproducibleTestRunner(unittest.TextTestRunner):
1605    def __init__(self, stream: Optional[TextIO] = None,
1606                 resultclass: Type[unittest.TestResult] =
1607                 ReproducibleTestResult,
1608                 **kwargs: Any) -> None:
1609        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1610        super().__init__(stream=rstream,           # type: ignore
1611                         descriptions=True,
1612                         resultclass=resultclass,
1613                         **kwargs)
1614
1615def execute_unittest(argv: List[str], debug: bool = False) -> None:
1616    """Executes unittests within the calling module."""
1617
1618    # Some tests have warnings, especially ResourceWarnings for unclosed
1619    # files and sockets.  Ignore them for now to ensure reproducibility of
1620    # the test output.
1621    unittest.main(argv=argv,
1622                  testRunner=ReproducibleTestRunner,
1623                  verbosity=2 if debug else 1,
1624                  warnings=None if sys.warnoptions else 'ignore')
1625
1626def execute_setup_common(supported_fmts: Sequence[str] = (),
1627                         supported_platforms: Sequence[str] = (),
1628                         supported_cache_modes: Sequence[str] = (),
1629                         supported_aio_modes: Sequence[str] = (),
1630                         unsupported_fmts: Sequence[str] = (),
1631                         supported_protocols: Sequence[str] = (),
1632                         unsupported_protocols: Sequence[str] = (),
1633                         required_fmts: Sequence[str] = (),
1634                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1635    """
1636    Perform necessary setup for either script-style or unittest-style tests.
1637
1638    :return: Bool; Whether or not debug mode has been requested via the CLI.
1639    """
1640    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1641
1642    debug = '-d' in sys.argv
1643    if debug:
1644        sys.argv.remove('-d')
1645    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1646
1647    _verify_image_format(supported_fmts, unsupported_fmts)
1648    _verify_protocol(supported_protocols, unsupported_protocols)
1649    _verify_platform(supported=supported_platforms)
1650    _verify_cache_mode(supported_cache_modes)
1651    _verify_aio_mode(supported_aio_modes)
1652    _verify_formats(required_fmts)
1653    _verify_virtio_blk()
1654    _verify_imgopts(unsupported_imgopts)
1655
1656    return debug
1657
1658def execute_test(*args, test_function=None, **kwargs):
1659    """Run either unittest or script-style tests."""
1660
1661    debug = execute_setup_common(*args, **kwargs)
1662    if not test_function:
1663        execute_unittest(sys.argv, debug)
1664    else:
1665        test_function()
1666
1667def activate_logging():
1668    """Activate iotests.log() output to stdout for script-style tests."""
1669    handler = logging.StreamHandler(stream=sys.stdout)
1670    formatter = logging.Formatter('%(message)s')
1671    handler.setFormatter(formatter)
1672    test_logger.addHandler(handler)
1673    test_logger.setLevel(logging.INFO)
1674    test_logger.propagate = False
1675
1676# This is called from script-style iotests without a single point of entry
1677def script_initialize(*args, **kwargs):
1678    """Initialize script-style tests without running any tests."""
1679    activate_logging()
1680    execute_setup_common(*args, **kwargs)
1681
1682# This is called from script-style iotests with a single point of entry
1683def script_main(test_function, *args, **kwargs):
1684    """Run script-style tests outside of the unittest framework"""
1685    activate_logging()
1686    execute_test(*args, test_function=test_function, **kwargs)
1687
1688# This is called from unittest style iotests
1689def main(*args, **kwargs):
1690    """Run tests using the unittest framework"""
1691    execute_test(*args, **kwargs)
1692