xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 623d7e3551a6fc5693c06ea938c60fe281b52e27)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True, drop_child_info: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path, drop_child_info))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd(cmd, args)
464
465    def get_qmp(self) -> QEMUMonitorProtocol:
466        assert self._qmp is not None
467        return self._qmp
468
469    def stop(self, kill_signal=15):
470        self._p.send_signal(kill_signal)
471        self._p.wait()
472        self._p = None
473
474        if self._qmp:
475            self._qmp.close()
476
477        if self._qmpsock is not None:
478            try:
479                os.remove(self._qmpsock)
480            except OSError:
481                pass
482        try:
483            os.remove(self.pidfile)
484        except OSError:
485            pass
486
487    def __del__(self):
488        if self._p is not None:
489            self.stop(kill_signal=9)
490
491
492def qemu_nbd(*args):
493    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
494    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
495
496def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
497    '''Run qemu-nbd in daemon mode and return both the parent's exit code
498       and its output in case of an error'''
499    full_args = qemu_nbd_args + ['--fork'] + list(args)
500    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
501                                                   connect_stderr=False)
502    return returncode, output if returncode else ''
503
504def qemu_nbd_list_log(*args: str) -> str:
505    '''Run qemu-nbd to list remote exports'''
506    full_args = [qemu_nbd_prog, '-L'] + list(args)
507    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
508    log(output, filters=[filter_testfiles, filter_nbd_exports])
509    return output
510
511@contextmanager
512def qemu_nbd_popen(*args):
513    '''Context manager running qemu-nbd within the context'''
514    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
515
516    assert not os.path.exists(pid_file)
517
518    cmd = list(qemu_nbd_args)
519    cmd.extend(('--persistent', '--pid-file', pid_file))
520    cmd.extend(args)
521
522    log('Start NBD server')
523    with subprocess.Popen(cmd) as p:
524        try:
525            while not os.path.exists(pid_file):
526                if p.poll() is not None:
527                    raise RuntimeError(
528                        "qemu-nbd terminated with exit code {}: {}"
529                        .format(p.returncode, ' '.join(cmd)))
530
531                time.sleep(0.01)
532            yield
533        finally:
534            if os.path.exists(pid_file):
535                os.remove(pid_file)
536            log('Kill NBD server')
537            p.kill()
538            p.wait()
539
540def compare_images(img1: str, img2: str,
541                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
542    """
543    Compare two images with QEMU_IMG; return True if they are identical.
544
545    :raise CalledProcessError:
546        when qemu-img crashes or returns a status code of anything other
547        than 0 (identical) or 1 (different).
548    """
549    try:
550        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
551        return True
552    except subprocess.CalledProcessError as exc:
553        if exc.returncode == 1:
554            return False
555        raise
556
557def create_image(name, size):
558    '''Create a fully-allocated raw image with sector markers'''
559    with open(name, 'wb') as file:
560        i = 0
561        while i < size:
562            sector = struct.pack('>l504xl', i // 512, i // 512)
563            file.write(sector)
564            i = i + 512
565
566def image_size(img: str) -> int:
567    """Return image's virtual size"""
568    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
569    if not isinstance(value, int):
570        type_name = type(value).__name__
571        raise TypeError("Expected 'int' for 'virtual-size', "
572                        f"got '{value}' of type '{type_name}'")
573    return value
574
575def is_str(val):
576    return isinstance(val, str)
577
578test_dir_re = re.compile(r"%s" % test_dir)
579def filter_test_dir(msg):
580    return test_dir_re.sub("TEST_DIR", msg)
581
582win32_re = re.compile(r"\r")
583def filter_win32(msg):
584    return win32_re.sub("", msg)
585
586qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
587                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
588                        r"and [0-9\/.inf]* ops\/sec\)")
589def filter_qemu_io(msg):
590    msg = filter_win32(msg)
591    return qemu_io_re.sub("X ops; XX:XX:XX.X "
592                          "(XXX YYY/sec and XXX ops/sec)", msg)
593
594chown_re = re.compile(r"chown [0-9]+:[0-9]+")
595def filter_chown(msg):
596    return chown_re.sub("chown UID:GID", msg)
597
598def filter_qmp_event(event):
599    '''Filter a QMP event dict'''
600    event = dict(event)
601    if 'timestamp' in event:
602        event['timestamp']['seconds'] = 'SECS'
603        event['timestamp']['microseconds'] = 'USECS'
604    return event
605
606def filter_qmp(qmsg, filter_fn):
607    '''Given a string filter, filter a QMP object's values.
608    filter_fn takes a (key, value) pair.'''
609    # Iterate through either lists or dicts;
610    if isinstance(qmsg, list):
611        items = enumerate(qmsg)
612    elif isinstance(qmsg, dict):
613        items = qmsg.items()
614    else:
615        return filter_fn(None, qmsg)
616
617    for k, v in items:
618        if isinstance(v, (dict, list)):
619            qmsg[k] = filter_qmp(v, filter_fn)
620        else:
621            qmsg[k] = filter_fn(k, v)
622    return qmsg
623
624def filter_testfiles(msg):
625    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
626    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
627    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
628
629def filter_qmp_testfiles(qmsg):
630    def _filter(_key, value):
631        if is_str(value):
632            return filter_testfiles(value)
633        return value
634    return filter_qmp(qmsg, _filter)
635
636def filter_virtio_scsi(output: str) -> str:
637    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
638
639def filter_qmp_virtio_scsi(qmsg):
640    def _filter(_key, value):
641        if is_str(value):
642            return filter_virtio_scsi(value)
643        return value
644    return filter_qmp(qmsg, _filter)
645
646def filter_generated_node_ids(msg):
647    return re.sub("#block[0-9]+", "NODE_NAME", msg)
648
649def filter_img_info(output: str, filename: str,
650                    drop_child_info: bool = True) -> str:
651    lines = []
652    drop_indented = False
653    for line in output.split('\n'):
654        if 'disk size' in line or 'actual-size' in line:
655            continue
656
657        # Drop child node info
658        if drop_indented:
659            if line.startswith(' '):
660                continue
661            drop_indented = False
662        if drop_child_info and "Child node '/" in line:
663            drop_indented = True
664            continue
665
666        line = line.replace(filename, 'TEST_IMG')
667        line = filter_testfiles(line)
668        line = line.replace(imgfmt, 'IMGFMT')
669        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
670        line = re.sub('uuid: [-a-f0-9]+',
671                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
672                      line)
673        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
674        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
675                      line)
676        lines.append(line)
677    return '\n'.join(lines)
678
679def filter_imgfmt(msg):
680    return msg.replace(imgfmt, 'IMGFMT')
681
682def filter_qmp_imgfmt(qmsg):
683    def _filter(_key, value):
684        if is_str(value):
685            return filter_imgfmt(value)
686        return value
687    return filter_qmp(qmsg, _filter)
688
689def filter_nbd_exports(output: str) -> str:
690    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
691
692
693Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
694
695def log(msg: Msg,
696        filters: Iterable[Callable[[Msg], Msg]] = (),
697        indent: Optional[int] = None) -> None:
698    """
699    Logs either a string message or a JSON serializable message (like QMP).
700    If indent is provided, JSON serializable messages are pretty-printed.
701    """
702    for flt in filters:
703        msg = flt(msg)
704    if isinstance(msg, (dict, list)):
705        # Don't sort if it's already sorted
706        do_sort = not isinstance(msg, OrderedDict)
707        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
708    else:
709        test_logger.info(msg)
710
711class Timeout:
712    def __init__(self, seconds, errmsg="Timeout"):
713        self.seconds = seconds
714        self.errmsg = errmsg
715    def __enter__(self):
716        if qemu_gdb or qemu_valgrind:
717            return self
718        signal.signal(signal.SIGALRM, self.timeout)
719        signal.setitimer(signal.ITIMER_REAL, self.seconds)
720        return self
721    def __exit__(self, exc_type, value, traceback):
722        if qemu_gdb or qemu_valgrind:
723            return False
724        signal.setitimer(signal.ITIMER_REAL, 0)
725        return False
726    def timeout(self, signum, frame):
727        raise TimeoutError(self.errmsg)
728
729def file_pattern(name):
730    return "{0}-{1}".format(os.getpid(), name)
731
732class FilePath:
733    """
734    Context manager generating multiple file names. The generated files are
735    removed when exiting the context.
736
737    Example usage:
738
739        with FilePath('a.img', 'b.img') as (img_a, img_b):
740            # Use img_a and img_b here...
741
742        # a.img and b.img are automatically removed here.
743
744    By default images are created in iotests.test_dir. To create sockets use
745    iotests.sock_dir:
746
747       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
748
749    For convenience, calling with one argument yields a single file instead of
750    a tuple with one item.
751
752    """
753    def __init__(self, *names, base_dir=test_dir):
754        self.paths = [os.path.join(base_dir, file_pattern(name))
755                      for name in names]
756
757    def __enter__(self):
758        if len(self.paths) == 1:
759            return self.paths[0]
760        else:
761            return self.paths
762
763    def __exit__(self, exc_type, exc_val, exc_tb):
764        for path in self.paths:
765            try:
766                os.remove(path)
767            except OSError:
768                pass
769        return False
770
771
772def try_remove(img):
773    try:
774        os.remove(img)
775    except OSError:
776        pass
777
778def file_path_remover():
779    for path in reversed(file_path_remover.paths):
780        try_remove(path)
781
782
783def file_path(*names, base_dir=test_dir):
784    ''' Another way to get auto-generated filename that cleans itself up.
785
786    Use is as simple as:
787
788    img_a, img_b = file_path('a.img', 'b.img')
789    sock = file_path('socket')
790    '''
791
792    if not hasattr(file_path_remover, 'paths'):
793        file_path_remover.paths = []
794        atexit.register(file_path_remover)
795
796    paths = []
797    for name in names:
798        filename = file_pattern(name)
799        path = os.path.join(base_dir, filename)
800        file_path_remover.paths.append(path)
801        paths.append(path)
802
803    return paths[0] if len(paths) == 1 else paths
804
805def remote_filename(path):
806    if imgproto == 'file':
807        return path
808    elif imgproto == 'ssh':
809        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
810    else:
811        raise ValueError("Protocol %s not supported" % (imgproto))
812
813class VM(qtest.QEMUQtestMachine):
814    '''A QEMU VM'''
815
816    def __init__(self, path_suffix=''):
817        name = "qemu%s-%d" % (path_suffix, os.getpid())
818        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
819        if qemu_gdb and qemu_valgrind:
820            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
821            sys.exit(1)
822        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
823        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
824                         name=name,
825                         base_temp_dir=test_dir,
826                         sock_dir=sock_dir, qmp_timer=timer)
827        self._num_drives = 0
828
829    def _post_shutdown(self) -> None:
830        super()._post_shutdown()
831        if not qemu_valgrind or not self._popen:
832            return
833        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
834        if self.exitcode() == 99:
835            with open(valgrind_filename, encoding='utf-8') as f:
836                print(f.read())
837        else:
838            os.remove(valgrind_filename)
839
840    def _pre_launch(self) -> None:
841        super()._pre_launch()
842        if qemu_print:
843            # set QEMU binary output to stdout
844            self._close_qemu_log_file()
845
846    def add_object(self, opts):
847        self._args.append('-object')
848        self._args.append(opts)
849        return self
850
851    def add_device(self, opts):
852        self._args.append('-device')
853        self._args.append(opts)
854        return self
855
856    def add_drive_raw(self, opts):
857        self._args.append('-drive')
858        self._args.append(opts)
859        return self
860
861    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
862        '''Add a virtio-blk drive to the VM'''
863        options = ['if=%s' % interface,
864                   'id=drive%d' % self._num_drives]
865
866        if path is not None:
867            options.append('file=%s' % path)
868            options.append('format=%s' % img_format)
869            options.append('cache=%s' % cachemode)
870            options.append('aio=%s' % aiomode)
871
872        if opts:
873            options.append(opts)
874
875        if img_format == 'luks' and 'key-secret' not in opts:
876            # default luks support
877            if luks_default_secret_object not in self._args:
878                self.add_object(luks_default_secret_object)
879
880            options.append(luks_default_key_secret_opt)
881
882        self._args.append('-drive')
883        self._args.append(','.join(options))
884        self._num_drives += 1
885        return self
886
887    def add_blockdev(self, opts):
888        self._args.append('-blockdev')
889        if isinstance(opts, str):
890            self._args.append(opts)
891        else:
892            self._args.append(','.join(opts))
893        return self
894
895    def add_incoming(self, addr):
896        self._args.append('-incoming')
897        self._args.append(addr)
898        return self
899
900    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
901        cmd = 'human-monitor-command'
902        kwargs: Dict[str, Any] = {'command-line': command_line}
903        if use_log:
904            return self.qmp_log(cmd, **kwargs)
905        else:
906            return self.qmp(cmd, **kwargs)
907
908    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
909        """Pause drive r/w operations"""
910        if not event:
911            self.pause_drive(drive, "read_aio")
912            self.pause_drive(drive, "write_aio")
913            return
914        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
915
916    def resume_drive(self, drive: str) -> None:
917        """Resume drive r/w operations"""
918        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
919
920    def hmp_qemu_io(self, drive: str, cmd: str,
921                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
922        """Write to a given drive using an HMP command"""
923        d = '-d ' if qdev else ''
924        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
925
926    def flatten_qmp_object(self, obj, output=None, basestr=''):
927        if output is None:
928            output = {}
929        if isinstance(obj, list):
930            for i, item in enumerate(obj):
931                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
932        elif isinstance(obj, dict):
933            for key in obj:
934                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
935        else:
936            output[basestr[:-1]] = obj # Strip trailing '.'
937        return output
938
939    def qmp_to_opts(self, obj):
940        obj = self.flatten_qmp_object(obj)
941        output_list = []
942        for key in obj:
943            output_list += [key + '=' + obj[key]]
944        return ','.join(output_list)
945
946    def get_qmp_events_filtered(self, wait=60.0):
947        result = []
948        for ev in self.get_qmp_events(wait=wait):
949            result.append(filter_qmp_event(ev))
950        return result
951
952    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
953        full_cmd = OrderedDict((
954            ("execute", cmd),
955            ("arguments", ordered_qmp(kwargs))
956        ))
957        log(full_cmd, filters, indent=indent)
958        result = self.qmp(cmd, **kwargs)
959        log(result, filters, indent=indent)
960        return result
961
962    # Returns None on success, and an error string on failure
963    def run_job(self, job: str, auto_finalize: bool = True,
964                auto_dismiss: bool = False,
965                pre_finalize: Optional[Callable[[], None]] = None,
966                cancel: bool = False, wait: float = 60.0,
967                filters: Iterable[Callable[[Any], Any]] = (),
968                ) -> Optional[str]:
969        """
970        run_job moves a job from creation through to dismissal.
971
972        :param job: String. ID of recently-launched job
973        :param auto_finalize: Bool. True if the job was launched with
974                              auto_finalize. Defaults to True.
975        :param auto_dismiss: Bool. True if the job was launched with
976                             auto_dismiss=True. Defaults to False.
977        :param pre_finalize: Callback. A callable that takes no arguments to be
978                             invoked prior to issuing job-finalize, if any.
979        :param cancel: Bool. When true, cancels the job after the pre_finalize
980                       callback.
981        :param wait: Float. Timeout value specifying how long to wait for any
982                     event, in seconds. Defaults to 60.0.
983        """
984        match_device = {'data': {'device': job}}
985        match_id = {'data': {'id': job}}
986        events = [
987            ('BLOCK_JOB_COMPLETED', match_device),
988            ('BLOCK_JOB_CANCELLED', match_device),
989            ('BLOCK_JOB_ERROR', match_device),
990            ('BLOCK_JOB_READY', match_device),
991            ('BLOCK_JOB_PENDING', match_id),
992            ('JOB_STATUS_CHANGE', match_id)
993        ]
994        error = None
995        while True:
996            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
997            if ev['event'] != 'JOB_STATUS_CHANGE':
998                log(ev, filters=filters)
999                continue
1000            status = ev['data']['status']
1001            if status == 'aborting':
1002                result = self.qmp('query-jobs')
1003                for j in result['return']:
1004                    if j['id'] == job:
1005                        error = j['error']
1006                        log('Job failed: %s' % (j['error']), filters=filters)
1007            elif status == 'ready':
1008                self.qmp_log('job-complete', id=job, filters=filters)
1009            elif status == 'pending' and not auto_finalize:
1010                if pre_finalize:
1011                    pre_finalize()
1012                if cancel:
1013                    self.qmp_log('job-cancel', id=job, filters=filters)
1014                else:
1015                    self.qmp_log('job-finalize', id=job, filters=filters)
1016            elif status == 'concluded' and not auto_dismiss:
1017                self.qmp_log('job-dismiss', id=job, filters=filters)
1018            elif status == 'null':
1019                return error
1020
1021    # Returns None on success, and an error string on failure
1022    def blockdev_create(self, options, job_id='job0', filters=None):
1023        if filters is None:
1024            filters = [filter_qmp_testfiles]
1025        result = self.qmp_log('blockdev-create', filters=filters,
1026                              job_id=job_id, options=options)
1027
1028        if 'return' in result:
1029            assert result['return'] == {}
1030            job_result = self.run_job(job_id, filters=filters)
1031        else:
1032            job_result = result['error']
1033
1034        log("")
1035        return job_result
1036
1037    def enable_migration_events(self, name):
1038        log('Enabling migration QMP events on %s...' % name)
1039        log(self.qmp('migrate-set-capabilities', capabilities=[
1040            {
1041                'capability': 'events',
1042                'state': True
1043            }
1044        ]))
1045
1046    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1047        while True:
1048            event = self.event_wait('MIGRATION')
1049            # We use the default timeout, and with a timeout, event_wait()
1050            # never returns None
1051            assert event
1052
1053            log(event, filters=[filter_qmp_event])
1054            if event['data']['status'] in ('completed', 'failed'):
1055                break
1056
1057        if event['data']['status'] == 'completed':
1058            # The event may occur in finish-migrate, so wait for the expected
1059            # post-migration runstate
1060            runstate = None
1061            while runstate != expect_runstate:
1062                runstate = self.qmp('query-status')['return']['status']
1063            return True
1064        else:
1065            return False
1066
1067    def node_info(self, node_name):
1068        nodes = self.qmp('query-named-block-nodes')
1069        for x in nodes['return']:
1070            if x['node-name'] == node_name:
1071                return x
1072        return None
1073
1074    def query_bitmaps(self):
1075        res = self.qmp("query-named-block-nodes")
1076        return {device['node-name']: device['dirty-bitmaps']
1077                for device in res['return'] if 'dirty-bitmaps' in device}
1078
1079    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1080        """
1081        get a specific bitmap from the object returned by query_bitmaps.
1082        :param recording: If specified, filter results by the specified value.
1083        :param bitmaps: If specified, use it instead of call query_bitmaps()
1084        """
1085        if bitmaps is None:
1086            bitmaps = self.query_bitmaps()
1087
1088        for bitmap in bitmaps[node_name]:
1089            if bitmap.get('name', '') == bitmap_name:
1090                if recording is None or bitmap.get('recording') == recording:
1091                    return bitmap
1092        return None
1093
1094    def check_bitmap_status(self, node_name, bitmap_name, fields):
1095        ret = self.get_bitmap(node_name, bitmap_name)
1096
1097        return fields.items() <= ret.items()
1098
1099    def assert_block_path(self, root, path, expected_node, graph=None):
1100        """
1101        Check whether the node under the given path in the block graph
1102        is @expected_node.
1103
1104        @root is the node name of the node where the @path is rooted.
1105
1106        @path is a string that consists of child names separated by
1107        slashes.  It must begin with a slash.
1108
1109        Examples for @root + @path:
1110          - root="qcow2-node", path="/backing/file"
1111          - root="quorum-node", path="/children.2/file"
1112
1113        Hypothetically, @path could be empty, in which case it would
1114        point to @root.  However, in practice this case is not useful
1115        and hence not allowed.
1116
1117        @expected_node may be None.  (All elements of the path but the
1118        leaf must still exist.)
1119
1120        @graph may be None or the result of an x-debug-query-block-graph
1121        call that has already been performed.
1122        """
1123        if graph is None:
1124            graph = self.qmp('x-debug-query-block-graph')['return']
1125
1126        iter_path = iter(path.split('/'))
1127
1128        # Must start with a /
1129        assert next(iter_path) == ''
1130
1131        node = next((node for node in graph['nodes'] if node['name'] == root),
1132                    None)
1133
1134        # An empty @path is not allowed, so the root node must be present
1135        assert node is not None, 'Root node %s not found' % root
1136
1137        for child_name in iter_path:
1138            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1139
1140            try:
1141                node_id = next(edge['child'] for edge in graph['edges']
1142                               if (edge['parent'] == node['id'] and
1143                                   edge['name'] == child_name))
1144
1145                node = next(node for node in graph['nodes']
1146                            if node['id'] == node_id)
1147
1148            except StopIteration:
1149                node = None
1150
1151        if node is None:
1152            assert expected_node is None, \
1153                   'No node found under %s (but expected %s)' % \
1154                   (path, expected_node)
1155        else:
1156            assert node['name'] == expected_node, \
1157                   'Found node %s under %s (but expected %s)' % \
1158                   (node['name'], path, expected_node)
1159
1160index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1161
1162class QMPTestCase(unittest.TestCase):
1163    '''Abstract base class for QMP test cases'''
1164
1165    def __init__(self, *args, **kwargs):
1166        super().__init__(*args, **kwargs)
1167        # Many users of this class set a VM property we rely on heavily
1168        # in the methods below.
1169        self.vm = None
1170
1171    def dictpath(self, d, path):
1172        '''Traverse a path in a nested dict'''
1173        for component in path.split('/'):
1174            m = index_re.match(component)
1175            if m:
1176                component, idx = m.groups()
1177                idx = int(idx)
1178
1179            if not isinstance(d, dict) or component not in d:
1180                self.fail(f'failed path traversal for "{path}" in "{d}"')
1181            d = d[component]
1182
1183            if m:
1184                if not isinstance(d, list):
1185                    self.fail(f'path component "{component}" in "{path}" '
1186                              f'is not a list in "{d}"')
1187                try:
1188                    d = d[idx]
1189                except IndexError:
1190                    self.fail(f'invalid index "{idx}" in path "{path}" '
1191                              f'in "{d}"')
1192        return d
1193
1194    def assert_qmp_absent(self, d, path):
1195        try:
1196            result = self.dictpath(d, path)
1197        except AssertionError:
1198            return
1199        self.fail('path "%s" has value "%s"' % (path, str(result)))
1200
1201    def assert_qmp(self, d, path, value):
1202        '''Assert that the value for a specific path in a QMP dict
1203           matches.  When given a list of values, assert that any of
1204           them matches.'''
1205
1206        result = self.dictpath(d, path)
1207
1208        # [] makes no sense as a list of valid values, so treat it as
1209        # an actual single value.
1210        if isinstance(value, list) and value != []:
1211            for v in value:
1212                if result == v:
1213                    return
1214            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1215        else:
1216            self.assertEqual(result, value,
1217                             '"%s" is "%s", expected "%s"'
1218                             % (path, str(result), str(value)))
1219
1220    def assert_no_active_block_jobs(self):
1221        result = self.vm.qmp('query-block-jobs')
1222        self.assert_qmp(result, 'return', [])
1223
1224    def assert_has_block_node(self, node_name=None, file_name=None):
1225        """Issue a query-named-block-nodes and assert node_name and/or
1226        file_name is present in the result"""
1227        def check_equal_or_none(a, b):
1228            return a is None or b is None or a == b
1229        assert node_name or file_name
1230        result = self.vm.qmp('query-named-block-nodes')
1231        for x in result["return"]:
1232            if check_equal_or_none(x.get("node-name"), node_name) and \
1233                    check_equal_or_none(x.get("file"), file_name):
1234                return
1235        self.fail("Cannot find %s %s in result:\n%s" %
1236                  (node_name, file_name, result))
1237
1238    def assert_json_filename_equal(self, json_filename, reference):
1239        '''Asserts that the given filename is a json: filename and that its
1240           content is equal to the given reference object'''
1241        self.assertEqual(json_filename[:5], 'json:')
1242        self.assertEqual(
1243            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1244            self.vm.flatten_qmp_object(reference)
1245        )
1246
1247    def cancel_and_wait(self, drive='drive0', force=False,
1248                        resume=False, wait=60.0):
1249        '''Cancel a block job and wait for it to finish, returning the event'''
1250        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1251        self.assert_qmp(result, 'return', {})
1252
1253        if resume:
1254            self.vm.resume_drive(drive)
1255
1256        cancelled = False
1257        result = None
1258        while not cancelled:
1259            for event in self.vm.get_qmp_events(wait=wait):
1260                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1261                   event['event'] == 'BLOCK_JOB_CANCELLED':
1262                    self.assert_qmp(event, 'data/device', drive)
1263                    result = event
1264                    cancelled = True
1265                elif event['event'] == 'JOB_STATUS_CHANGE':
1266                    self.assert_qmp(event, 'data/id', drive)
1267
1268
1269        self.assert_no_active_block_jobs()
1270        return result
1271
1272    def wait_until_completed(self, drive='drive0', check_offset=True,
1273                             wait=60.0, error=None):
1274        '''Wait for a block job to finish, returning the event'''
1275        while True:
1276            for event in self.vm.get_qmp_events(wait=wait):
1277                if event['event'] == 'BLOCK_JOB_COMPLETED':
1278                    self.assert_qmp(event, 'data/device', drive)
1279                    if error is None:
1280                        self.assert_qmp_absent(event, 'data/error')
1281                        if check_offset:
1282                            self.assert_qmp(event, 'data/offset',
1283                                            event['data']['len'])
1284                    else:
1285                        self.assert_qmp(event, 'data/error', error)
1286                    self.assert_no_active_block_jobs()
1287                    return event
1288                if event['event'] == 'JOB_STATUS_CHANGE':
1289                    self.assert_qmp(event, 'data/id', drive)
1290
1291    def wait_ready(self, drive='drive0'):
1292        """Wait until a BLOCK_JOB_READY event, and return the event."""
1293        return self.vm.events_wait([
1294            ('BLOCK_JOB_READY',
1295             {'data': {'type': 'mirror', 'device': drive}}),
1296            ('BLOCK_JOB_READY',
1297             {'data': {'type': 'commit', 'device': drive}})
1298        ])
1299
1300    def wait_ready_and_cancel(self, drive='drive0'):
1301        self.wait_ready(drive=drive)
1302        event = self.cancel_and_wait(drive=drive)
1303        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1304        self.assert_qmp(event, 'data/type', 'mirror')
1305        self.assert_qmp(event, 'data/offset', event['data']['len'])
1306
1307    def complete_and_wait(self, drive='drive0', wait_ready=True,
1308                          completion_error=None):
1309        '''Complete a block job and wait for it to finish'''
1310        if wait_ready:
1311            self.wait_ready(drive=drive)
1312
1313        result = self.vm.qmp('block-job-complete', device=drive)
1314        self.assert_qmp(result, 'return', {})
1315
1316        event = self.wait_until_completed(drive=drive, error=completion_error)
1317        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1318
1319    def pause_wait(self, job_id='job0'):
1320        with Timeout(3, "Timeout waiting for job to pause"):
1321            while True:
1322                result = self.vm.qmp('query-block-jobs')
1323                found = False
1324                for job in result['return']:
1325                    if job['device'] == job_id:
1326                        found = True
1327                        if job['paused'] and not job['busy']:
1328                            return job
1329                        break
1330                assert found
1331
1332    def pause_job(self, job_id='job0', wait=True):
1333        result = self.vm.qmp('block-job-pause', device=job_id)
1334        self.assert_qmp(result, 'return', {})
1335        if wait:
1336            return self.pause_wait(job_id)
1337        return result
1338
1339    def case_skip(self, reason):
1340        '''Skip this test case'''
1341        case_notrun(reason)
1342        self.skipTest(reason)
1343
1344
1345def notrun(reason):
1346    '''Skip this test suite'''
1347    # Each test in qemu-iotests has a number ("seq")
1348    seq = os.path.basename(sys.argv[0])
1349
1350    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1351            as outfile:
1352        outfile.write(reason + '\n')
1353    logger.warning("%s not run: %s", seq, reason)
1354    sys.exit(0)
1355
1356def case_notrun(reason):
1357    '''Mark this test case as not having been run (without actually
1358    skipping it, that is left to the caller).  See
1359    QMPTestCase.case_skip() for a variant that actually skips the
1360    current test case.'''
1361
1362    # Each test in qemu-iotests has a number ("seq")
1363    seq = os.path.basename(sys.argv[0])
1364
1365    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1366            as outfile:
1367        outfile.write('    [case not run] ' + reason + '\n')
1368
1369def _verify_image_format(supported_fmts: Sequence[str] = (),
1370                         unsupported_fmts: Sequence[str] = ()) -> None:
1371    if 'generic' in supported_fmts and \
1372            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1373        # similar to
1374        #   _supported_fmt generic
1375        # for bash tests
1376        supported_fmts = ()
1377
1378    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1379    if not_sup or (imgfmt in unsupported_fmts):
1380        notrun('not suitable for this image format: %s' % imgfmt)
1381
1382    if imgfmt == 'luks':
1383        verify_working_luks()
1384
1385def _verify_protocol(supported: Sequence[str] = (),
1386                     unsupported: Sequence[str] = ()) -> None:
1387    assert not (supported and unsupported)
1388
1389    if 'generic' in supported:
1390        return
1391
1392    not_sup = supported and (imgproto not in supported)
1393    if not_sup or (imgproto in unsupported):
1394        notrun('not suitable for this protocol: %s' % imgproto)
1395
1396def _verify_platform(supported: Sequence[str] = (),
1397                     unsupported: Sequence[str] = ()) -> None:
1398    if any((sys.platform.startswith(x) for x in unsupported)):
1399        notrun('not suitable for this OS: %s' % sys.platform)
1400
1401    if supported:
1402        if not any((sys.platform.startswith(x) for x in supported)):
1403            notrun('not suitable for this OS: %s' % sys.platform)
1404
1405def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1406    if supported_cache_modes and (cachemode not in supported_cache_modes):
1407        notrun('not suitable for this cache mode: %s' % cachemode)
1408
1409def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1410    if supported_aio_modes and (aiomode not in supported_aio_modes):
1411        notrun('not suitable for this aio mode: %s' % aiomode)
1412
1413def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1414    usf_list = list(set(required_formats) - set(supported_formats()))
1415    if usf_list:
1416        notrun(f'formats {usf_list} are not whitelisted')
1417
1418
1419def _verify_virtio_blk() -> None:
1420    out = qemu_pipe('-M', 'none', '-device', 'help')
1421    if 'virtio-blk' not in out:
1422        notrun('Missing virtio-blk in QEMU binary')
1423
1424def verify_virtio_scsi_pci_or_ccw() -> None:
1425    out = qemu_pipe('-M', 'none', '-device', 'help')
1426    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1427        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1428
1429
1430def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1431    imgopts = os.environ.get('IMGOPTS')
1432    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1433    # but it supported only for bash tests. We don't have a concept of global
1434    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1435    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1436    unsup = list(unsupported) + ['$']
1437    if imgopts and any(x in imgopts for x in unsup):
1438        notrun(f'not suitable for this imgopts: {imgopts}')
1439
1440
1441def supports_quorum() -> bool:
1442    return 'quorum' in qemu_img('--help').stdout
1443
1444def verify_quorum():
1445    '''Skip test suite if quorum support is not available'''
1446    if not supports_quorum():
1447        notrun('quorum support missing')
1448
1449def has_working_luks() -> Tuple[bool, str]:
1450    """
1451    Check whether our LUKS driver can actually create images
1452    (this extends to LUKS encryption for qcow2).
1453
1454    If not, return the reason why.
1455    """
1456
1457    img_file = f'{test_dir}/luks-test.luks'
1458    res = qemu_img('create', '-f', 'luks',
1459                   '--object', luks_default_secret_object,
1460                   '-o', luks_default_key_secret_opt,
1461                   '-o', 'iter-time=10',
1462                   img_file, '1G',
1463                   check=False)
1464    try:
1465        os.remove(img_file)
1466    except OSError:
1467        pass
1468
1469    if res.returncode:
1470        reason = res.stdout
1471        for line in res.stdout.splitlines():
1472            if img_file + ':' in line:
1473                reason = line.split(img_file + ':', 1)[1].strip()
1474                break
1475
1476        return (False, reason)
1477    else:
1478        return (True, '')
1479
1480def verify_working_luks():
1481    """
1482    Skip test suite if LUKS does not work
1483    """
1484    (working, reason) = has_working_luks()
1485    if not working:
1486        notrun(reason)
1487
1488def supports_qcow2_zstd_compression() -> bool:
1489    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1490    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1491                   img_file, '0',
1492                   check=False)
1493    try:
1494        os.remove(img_file)
1495    except OSError:
1496        pass
1497
1498    if res.returncode == 1 and \
1499            "'compression-type' does not accept value 'zstd'" in res.stdout:
1500        return False
1501    else:
1502        return True
1503
1504def verify_qcow2_zstd_compression():
1505    if not supports_qcow2_zstd_compression():
1506        notrun('zstd compression not supported')
1507
1508def qemu_pipe(*args: str) -> str:
1509    """
1510    Run qemu with an option to print something and exit (e.g. a help option).
1511
1512    :return: QEMU's stdout output.
1513    """
1514    full_args = [qemu_prog] + qemu_opts + list(args)
1515    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1516    return output
1517
1518def supported_formats(read_only=False):
1519    '''Set 'read_only' to True to check ro-whitelist
1520       Otherwise, rw-whitelist is checked'''
1521
1522    if not hasattr(supported_formats, "formats"):
1523        supported_formats.formats = {}
1524
1525    if read_only not in supported_formats.formats:
1526        format_message = qemu_pipe("-drive", "format=help")
1527        line = 1 if read_only else 0
1528        supported_formats.formats[read_only] = \
1529            format_message.splitlines()[line].split(":")[1].split()
1530
1531    return supported_formats.formats[read_only]
1532
1533def skip_if_unsupported(required_formats=(), read_only=False):
1534    '''Skip Test Decorator
1535       Runs the test if all the required formats are whitelisted'''
1536    def skip_test_decorator(func):
1537        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1538                         **kwargs: Dict[str, Any]) -> None:
1539            if callable(required_formats):
1540                fmts = required_formats(test_case)
1541            else:
1542                fmts = required_formats
1543
1544            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1545            if usf_list:
1546                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1547                test_case.case_skip(msg)
1548            else:
1549                func(test_case, *args, **kwargs)
1550        return func_wrapper
1551    return skip_test_decorator
1552
1553def skip_for_formats(formats: Sequence[str] = ()) \
1554    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1555                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1556    '''Skip Test Decorator
1557       Skips the test for the given formats'''
1558    def skip_test_decorator(func):
1559        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1560                         **kwargs: Dict[str, Any]) -> None:
1561            if imgfmt in formats:
1562                msg = f'{test_case}: Skipped for format {imgfmt}'
1563                test_case.case_skip(msg)
1564            else:
1565                func(test_case, *args, **kwargs)
1566        return func_wrapper
1567    return skip_test_decorator
1568
1569def skip_if_user_is_root(func):
1570    '''Skip Test Decorator
1571       Runs the test only without root permissions'''
1572    def func_wrapper(*args, **kwargs):
1573        if os.getuid() == 0:
1574            case_notrun('{}: cannot be run as root'.format(args[0]))
1575            return None
1576        else:
1577            return func(*args, **kwargs)
1578    return func_wrapper
1579
1580# We need to filter out the time taken from the output so that
1581# qemu-iotest can reliably diff the results against master output,
1582# and hide skipped tests from the reference output.
1583
1584class ReproducibleTestResult(unittest.TextTestResult):
1585    def addSkip(self, test, reason):
1586        # Same as TextTestResult, but print dot instead of "s"
1587        unittest.TestResult.addSkip(self, test, reason)
1588        if self.showAll:
1589            self.stream.writeln("skipped {0!r}".format(reason))
1590        elif self.dots:
1591            self.stream.write(".")
1592            self.stream.flush()
1593
1594class ReproducibleStreamWrapper:
1595    def __init__(self, stream: TextIO):
1596        self.stream = stream
1597
1598    def __getattr__(self, attr):
1599        if attr in ('stream', '__getstate__'):
1600            raise AttributeError(attr)
1601        return getattr(self.stream, attr)
1602
1603    def write(self, arg=None):
1604        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1605        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1606        self.stream.write(arg)
1607
1608class ReproducibleTestRunner(unittest.TextTestRunner):
1609    def __init__(self, stream: Optional[TextIO] = None,
1610                 resultclass: Type[unittest.TestResult] =
1611                 ReproducibleTestResult,
1612                 **kwargs: Any) -> None:
1613        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1614        super().__init__(stream=rstream,           # type: ignore
1615                         descriptions=True,
1616                         resultclass=resultclass,
1617                         **kwargs)
1618
1619def execute_unittest(argv: List[str], debug: bool = False) -> None:
1620    """Executes unittests within the calling module."""
1621
1622    # Some tests have warnings, especially ResourceWarnings for unclosed
1623    # files and sockets.  Ignore them for now to ensure reproducibility of
1624    # the test output.
1625    unittest.main(argv=argv,
1626                  testRunner=ReproducibleTestRunner,
1627                  verbosity=2 if debug else 1,
1628                  warnings=None if sys.warnoptions else 'ignore')
1629
1630def execute_setup_common(supported_fmts: Sequence[str] = (),
1631                         supported_platforms: Sequence[str] = (),
1632                         supported_cache_modes: Sequence[str] = (),
1633                         supported_aio_modes: Sequence[str] = (),
1634                         unsupported_fmts: Sequence[str] = (),
1635                         supported_protocols: Sequence[str] = (),
1636                         unsupported_protocols: Sequence[str] = (),
1637                         required_fmts: Sequence[str] = (),
1638                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1639    """
1640    Perform necessary setup for either script-style or unittest-style tests.
1641
1642    :return: Bool; Whether or not debug mode has been requested via the CLI.
1643    """
1644    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1645
1646    debug = '-d' in sys.argv
1647    if debug:
1648        sys.argv.remove('-d')
1649    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1650
1651    _verify_image_format(supported_fmts, unsupported_fmts)
1652    _verify_protocol(supported_protocols, unsupported_protocols)
1653    _verify_platform(supported=supported_platforms)
1654    _verify_cache_mode(supported_cache_modes)
1655    _verify_aio_mode(supported_aio_modes)
1656    _verify_formats(required_fmts)
1657    _verify_virtio_blk()
1658    _verify_imgopts(unsupported_imgopts)
1659
1660    return debug
1661
1662def execute_test(*args, test_function=None, **kwargs):
1663    """Run either unittest or script-style tests."""
1664
1665    debug = execute_setup_common(*args, **kwargs)
1666    if not test_function:
1667        execute_unittest(sys.argv, debug)
1668    else:
1669        test_function()
1670
1671def activate_logging():
1672    """Activate iotests.log() output to stdout for script-style tests."""
1673    handler = logging.StreamHandler(stream=sys.stdout)
1674    formatter = logging.Formatter('%(message)s')
1675    handler.setFormatter(formatter)
1676    test_logger.addHandler(handler)
1677    test_logger.setLevel(logging.INFO)
1678    test_logger.propagate = False
1679
1680# This is called from script-style iotests without a single point of entry
1681def script_initialize(*args, **kwargs):
1682    """Initialize script-style tests without running any tests."""
1683    activate_logging()
1684    execute_setup_common(*args, **kwargs)
1685
1686# This is called from script-style iotests with a single point of entry
1687def script_main(test_function, *args, **kwargs):
1688    """Run script-style tests outside of the unittest framework"""
1689    activate_logging()
1690    execute_test(*args, test_function=test_function, **kwargs)
1691
1692# This is called from unittest style iotests
1693def main(*args, **kwargs):
1694    """Run tests using the unittest framework"""
1695    execute_test(*args, **kwargs)
1696