xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision a46d410c)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
210             ) -> 'subprocess.CompletedProcess[str]':
211    """
212    Run qemu_img and return the status code and console output.
213
214    This function always prepends QEMU_IMG_OPTIONS and may further alter
215    the args for 'create' commands.
216
217    :param args: command-line arguments to qemu-img.
218    :param check: Enforce a return code of zero.
219    :param combine_stdio: set to False to keep stdout/stderr separated.
220
221    :raise VerboseProcessError:
222        When the return code is negative, or on any non-zero exit code
223        when 'check=True' was provided (the default). This exception has
224        'stdout', 'stderr', and 'returncode' properties that may be
225        inspected to show greater detail. If this exception is not
226        handled, the command-line, return code, and all console output
227        will be included at the bottom of the stack trace.
228
229    :return:
230        a CompletedProcess. This object has args, returncode, and stdout
231        properties. If streams are not combined, it will also have a
232        stderr property.
233    """
234    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
235
236    subp = subprocess.run(
237        full_args,
238        stdout=subprocess.PIPE,
239        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
240        universal_newlines=True,
241        check=False
242    )
243
244    if check and subp.returncode or (subp.returncode < 0):
245        raise VerboseProcessError(
246            subp.returncode, full_args,
247            output=subp.stdout,
248            stderr=subp.stderr,
249        )
250
251    return subp
252
253
254def ordered_qmp(qmsg, conv_keys=True):
255    # Dictionaries are not ordered prior to 3.6, therefore:
256    if isinstance(qmsg, list):
257        return [ordered_qmp(atom) for atom in qmsg]
258    if isinstance(qmsg, dict):
259        od = OrderedDict()
260        for k, v in sorted(qmsg.items()):
261            if conv_keys:
262                k = k.replace('_', '-')
263            od[k] = ordered_qmp(v, conv_keys=False)
264        return od
265    return qmsg
266
267def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
268    return qemu_img('create', *args)
269
270def qemu_img_json(*args: str) -> Any:
271    """
272    Run qemu-img and return its output as deserialized JSON.
273
274    :raise CalledProcessError:
275        When qemu-img crashes, or returns a non-zero exit code without
276        producing a valid JSON document to stdout.
277    :raise JSONDecoderError:
278        When qemu-img returns 0, but failed to produce a valid JSON document.
279
280    :return: A deserialized JSON object; probably a dict[str, Any].
281    """
282    try:
283        res = qemu_img(*args, combine_stdio=False)
284    except subprocess.CalledProcessError as exc:
285        # Terminated due to signal. Don't bother.
286        if exc.returncode < 0:
287            raise
288
289        # Commands like 'check' can return failure (exit codes 2 and 3)
290        # to indicate command completion, but with errors found. For
291        # multi-command flexibility, ignore the exact error codes and
292        # *try* to load JSON.
293        try:
294            return json.loads(exc.stdout)
295        except json.JSONDecodeError:
296            # Nope. This thing is toast. Raise the /process/ error.
297            pass
298        raise
299
300    return json.loads(res.stdout)
301
302def qemu_img_measure(*args: str) -> Any:
303    return qemu_img_json("measure", "--output", "json", *args)
304
305def qemu_img_check(*args: str) -> Any:
306    return qemu_img_json("check", "--output", "json", *args)
307
308def qemu_img_info(*args: str) -> Any:
309    return qemu_img_json('info', "--output", "json", *args)
310
311def qemu_img_map(*args: str) -> Any:
312    return qemu_img_json('map', "--output", "json", *args)
313
314def qemu_img_log(*args: str, check: bool = True
315                 ) -> 'subprocess.CompletedProcess[str]':
316    result = qemu_img(*args, check=check)
317    log(result.stdout, filters=[filter_testfiles])
318    return result
319
320def img_info_log(filename: str, filter_path: Optional[str] = None,
321                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
322                 check: bool = True,
323                 ) -> None:
324    args = ['info']
325    if use_image_opts:
326        args.append('--image-opts')
327    else:
328        args += ['-f', imgfmt]
329    args += extra_args
330    args.append(filename)
331
332    output = qemu_img(*args, check=check).stdout
333    if not filter_path:
334        filter_path = filename
335    log(filter_img_info(output, filter_path))
336
337def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
338    if '-f' in args or '--image-opts' in args:
339        return qemu_io_args_no_fmt + list(args)
340    else:
341        return qemu_io_args + list(args)
342
343def qemu_io_popen(*args):
344    return qemu_tool_popen(qemu_io_wrap_args(args))
345
346def qemu_io(*args):
347    '''Run qemu-io and return the stdout data'''
348    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
349
350def qemu_io_pipe_and_status(*args):
351    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))
352
353def qemu_io_log(*args):
354    result = qemu_io(*args)
355    log(result, filters=[filter_testfiles, filter_qemu_io])
356    return result
357
358def qemu_io_silent(*args):
359    '''Run qemu-io and return the exit code, suppressing stdout'''
360    args = qemu_io_wrap_args(args)
361    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
362    if result.returncode < 0:
363        sys.stderr.write('qemu-io received signal %i: %s\n' %
364                         (-result.returncode, ' '.join(args)))
365    return result.returncode
366
367def qemu_io_silent_check(*args):
368    '''Run qemu-io and return the true if subprocess returned 0'''
369    args = qemu_io_wrap_args(args)
370    result = subprocess.run(args, stdout=subprocess.DEVNULL,
371                            stderr=subprocess.STDOUT, check=False)
372    return result.returncode == 0
373
374class QemuIoInteractive:
375    def __init__(self, *args):
376        self.args = qemu_io_wrap_args(args)
377        # We need to keep the Popen objext around, and not
378        # close it immediately. Therefore, disable the pylint check:
379        # pylint: disable=consider-using-with
380        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
381                                   stdout=subprocess.PIPE,
382                                   stderr=subprocess.STDOUT,
383                                   universal_newlines=True)
384        out = self._p.stdout.read(9)
385        if out != 'qemu-io> ':
386            # Most probably qemu-io just failed to start.
387            # Let's collect the whole output and exit.
388            out += self._p.stdout.read()
389            self._p.wait(timeout=1)
390            raise ValueError(out)
391
392    def close(self):
393        self._p.communicate('q\n')
394
395    def _read_output(self):
396        pattern = 'qemu-io> '
397        n = len(pattern)
398        pos = 0
399        s = []
400        while pos != n:
401            c = self._p.stdout.read(1)
402            # check unexpected EOF
403            assert c != ''
404            s.append(c)
405            if c == pattern[pos]:
406                pos += 1
407            else:
408                pos = 0
409
410        return ''.join(s[:-n])
411
412    def cmd(self, cmd):
413        # quit command is in close(), '\n' is added automatically
414        assert '\n' not in cmd
415        cmd = cmd.strip()
416        assert cmd not in ('q', 'quit')
417        self._p.stdin.write(cmd + '\n')
418        self._p.stdin.flush()
419        return self._read_output()
420
421
422class QemuStorageDaemon:
423    _qmp: Optional[QEMUMonitorProtocol] = None
424    _qmpsock: Optional[str] = None
425    # Python < 3.8 would complain if this type were not a string literal
426    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
427    _p: 'Optional[subprocess.Popen[bytes]]' = None
428
429    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
430        assert '--pidfile' not in args
431        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
432        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
433
434        if qmp:
435            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
436            all_args += ['--chardev',
437                         f'socket,id=qmp-sock,path={self._qmpsock}',
438                         '--monitor', 'qmp-sock']
439
440            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
441
442        # Cannot use with here, we want the subprocess to stay around
443        # pylint: disable=consider-using-with
444        self._p = subprocess.Popen(all_args)
445        if self._qmp is not None:
446            self._qmp.accept()
447        while not os.path.exists(self.pidfile):
448            if self._p.poll() is not None:
449                cmd = ' '.join(all_args)
450                raise RuntimeError(
451                    'qemu-storage-daemon terminated with exit code ' +
452                    f'{self._p.returncode}: {cmd}')
453
454            time.sleep(0.01)
455
456        with open(self.pidfile, encoding='utf-8') as f:
457            self._pid = int(f.read().strip())
458
459        assert self._pid == self._p.pid
460
461    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
462            -> QMPMessage:
463        assert self._qmp is not None
464        return self._qmp.cmd(cmd, args)
465
466    def stop(self, kill_signal=15):
467        self._p.send_signal(kill_signal)
468        self._p.wait()
469        self._p = None
470
471        if self._qmp:
472            self._qmp.close()
473
474        if self._qmpsock is not None:
475            try:
476                os.remove(self._qmpsock)
477            except OSError:
478                pass
479        try:
480            os.remove(self.pidfile)
481        except OSError:
482            pass
483
484    def __del__(self):
485        if self._p is not None:
486            self.stop(kill_signal=9)
487
488
489def qemu_nbd(*args):
490    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
491    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
492
493def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
494    '''Run qemu-nbd in daemon mode and return both the parent's exit code
495       and its output in case of an error'''
496    full_args = qemu_nbd_args + ['--fork'] + list(args)
497    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
498                                                   connect_stderr=False)
499    return returncode, output if returncode else ''
500
501def qemu_nbd_list_log(*args: str) -> str:
502    '''Run qemu-nbd to list remote exports'''
503    full_args = [qemu_nbd_prog, '-L'] + list(args)
504    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
505    log(output, filters=[filter_testfiles, filter_nbd_exports])
506    return output
507
508@contextmanager
509def qemu_nbd_popen(*args):
510    '''Context manager running qemu-nbd within the context'''
511    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
512
513    assert not os.path.exists(pid_file)
514
515    cmd = list(qemu_nbd_args)
516    cmd.extend(('--persistent', '--pid-file', pid_file))
517    cmd.extend(args)
518
519    log('Start NBD server')
520    with subprocess.Popen(cmd) as p:
521        try:
522            while not os.path.exists(pid_file):
523                if p.poll() is not None:
524                    raise RuntimeError(
525                        "qemu-nbd terminated with exit code {}: {}"
526                        .format(p.returncode, ' '.join(cmd)))
527
528                time.sleep(0.01)
529            yield
530        finally:
531            if os.path.exists(pid_file):
532                os.remove(pid_file)
533            log('Kill NBD server')
534            p.kill()
535            p.wait()
536
537def compare_images(img1: str, img2: str,
538                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
539    """
540    Compare two images with QEMU_IMG; return True if they are identical.
541
542    :raise CalledProcessError:
543        when qemu-img crashes or returns a status code of anything other
544        than 0 (identical) or 1 (different).
545    """
546    try:
547        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
548        return True
549    except subprocess.CalledProcessError as exc:
550        if exc.returncode == 1:
551            return False
552        raise
553
554def create_image(name, size):
555    '''Create a fully-allocated raw image with sector markers'''
556    with open(name, 'wb') as file:
557        i = 0
558        while i < size:
559            sector = struct.pack('>l504xl', i // 512, i // 512)
560            file.write(sector)
561            i = i + 512
562
563def image_size(img: str) -> int:
564    """Return image's virtual size"""
565    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
566    if not isinstance(value, int):
567        type_name = type(value).__name__
568        raise TypeError("Expected 'int' for 'virtual-size', "
569                        f"got '{value}' of type '{type_name}'")
570    return value
571
572def is_str(val):
573    return isinstance(val, str)
574
575test_dir_re = re.compile(r"%s" % test_dir)
576def filter_test_dir(msg):
577    return test_dir_re.sub("TEST_DIR", msg)
578
579win32_re = re.compile(r"\r")
580def filter_win32(msg):
581    return win32_re.sub("", msg)
582
583qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
584                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
585                        r"and [0-9\/.inf]* ops\/sec\)")
586def filter_qemu_io(msg):
587    msg = filter_win32(msg)
588    return qemu_io_re.sub("X ops; XX:XX:XX.X "
589                          "(XXX YYY/sec and XXX ops/sec)", msg)
590
591chown_re = re.compile(r"chown [0-9]+:[0-9]+")
592def filter_chown(msg):
593    return chown_re.sub("chown UID:GID", msg)
594
595def filter_qmp_event(event):
596    '''Filter a QMP event dict'''
597    event = dict(event)
598    if 'timestamp' in event:
599        event['timestamp']['seconds'] = 'SECS'
600        event['timestamp']['microseconds'] = 'USECS'
601    return event
602
603def filter_qmp(qmsg, filter_fn):
604    '''Given a string filter, filter a QMP object's values.
605    filter_fn takes a (key, value) pair.'''
606    # Iterate through either lists or dicts;
607    if isinstance(qmsg, list):
608        items = enumerate(qmsg)
609    elif isinstance(qmsg, dict):
610        items = qmsg.items()
611    else:
612        return filter_fn(None, qmsg)
613
614    for k, v in items:
615        if isinstance(v, (dict, list)):
616            qmsg[k] = filter_qmp(v, filter_fn)
617        else:
618            qmsg[k] = filter_fn(k, v)
619    return qmsg
620
621def filter_testfiles(msg):
622    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
623    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
624    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
625
626def filter_qmp_testfiles(qmsg):
627    def _filter(_key, value):
628        if is_str(value):
629            return filter_testfiles(value)
630        return value
631    return filter_qmp(qmsg, _filter)
632
633def filter_virtio_scsi(output: str) -> str:
634    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
635
636def filter_qmp_virtio_scsi(qmsg):
637    def _filter(_key, value):
638        if is_str(value):
639            return filter_virtio_scsi(value)
640        return value
641    return filter_qmp(qmsg, _filter)
642
643def filter_generated_node_ids(msg):
644    return re.sub("#block[0-9]+", "NODE_NAME", msg)
645
646def filter_img_info(output, filename):
647    lines = []
648    for line in output.split('\n'):
649        if 'disk size' in line or 'actual-size' in line:
650            continue
651        line = line.replace(filename, 'TEST_IMG')
652        line = filter_testfiles(line)
653        line = line.replace(imgfmt, 'IMGFMT')
654        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
655        line = re.sub('uuid: [-a-f0-9]+',
656                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
657                      line)
658        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
659        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
660                      line)
661        lines.append(line)
662    return '\n'.join(lines)
663
664def filter_imgfmt(msg):
665    return msg.replace(imgfmt, 'IMGFMT')
666
667def filter_qmp_imgfmt(qmsg):
668    def _filter(_key, value):
669        if is_str(value):
670            return filter_imgfmt(value)
671        return value
672    return filter_qmp(qmsg, _filter)
673
674def filter_nbd_exports(output: str) -> str:
675    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
676
677
678Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
679
680def log(msg: Msg,
681        filters: Iterable[Callable[[Msg], Msg]] = (),
682        indent: Optional[int] = None) -> None:
683    """
684    Logs either a string message or a JSON serializable message (like QMP).
685    If indent is provided, JSON serializable messages are pretty-printed.
686    """
687    for flt in filters:
688        msg = flt(msg)
689    if isinstance(msg, (dict, list)):
690        # Don't sort if it's already sorted
691        do_sort = not isinstance(msg, OrderedDict)
692        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
693    else:
694        test_logger.info(msg)
695
696class Timeout:
697    def __init__(self, seconds, errmsg="Timeout"):
698        self.seconds = seconds
699        self.errmsg = errmsg
700    def __enter__(self):
701        if qemu_gdb or qemu_valgrind:
702            return self
703        signal.signal(signal.SIGALRM, self.timeout)
704        signal.setitimer(signal.ITIMER_REAL, self.seconds)
705        return self
706    def __exit__(self, exc_type, value, traceback):
707        if qemu_gdb or qemu_valgrind:
708            return False
709        signal.setitimer(signal.ITIMER_REAL, 0)
710        return False
711    def timeout(self, signum, frame):
712        raise Exception(self.errmsg)
713
714def file_pattern(name):
715    return "{0}-{1}".format(os.getpid(), name)
716
717class FilePath:
718    """
719    Context manager generating multiple file names. The generated files are
720    removed when exiting the context.
721
722    Example usage:
723
724        with FilePath('a.img', 'b.img') as (img_a, img_b):
725            # Use img_a and img_b here...
726
727        # a.img and b.img are automatically removed here.
728
729    By default images are created in iotests.test_dir. To create sockets use
730    iotests.sock_dir:
731
732       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
733
734    For convenience, calling with one argument yields a single file instead of
735    a tuple with one item.
736
737    """
738    def __init__(self, *names, base_dir=test_dir):
739        self.paths = [os.path.join(base_dir, file_pattern(name))
740                      for name in names]
741
742    def __enter__(self):
743        if len(self.paths) == 1:
744            return self.paths[0]
745        else:
746            return self.paths
747
748    def __exit__(self, exc_type, exc_val, exc_tb):
749        for path in self.paths:
750            try:
751                os.remove(path)
752            except OSError:
753                pass
754        return False
755
756
757def try_remove(img):
758    try:
759        os.remove(img)
760    except OSError:
761        pass
762
763def file_path_remover():
764    for path in reversed(file_path_remover.paths):
765        try_remove(path)
766
767
768def file_path(*names, base_dir=test_dir):
769    ''' Another way to get auto-generated filename that cleans itself up.
770
771    Use is as simple as:
772
773    img_a, img_b = file_path('a.img', 'b.img')
774    sock = file_path('socket')
775    '''
776
777    if not hasattr(file_path_remover, 'paths'):
778        file_path_remover.paths = []
779        atexit.register(file_path_remover)
780
781    paths = []
782    for name in names:
783        filename = file_pattern(name)
784        path = os.path.join(base_dir, filename)
785        file_path_remover.paths.append(path)
786        paths.append(path)
787
788    return paths[0] if len(paths) == 1 else paths
789
790def remote_filename(path):
791    if imgproto == 'file':
792        return path
793    elif imgproto == 'ssh':
794        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
795    else:
796        raise Exception("Protocol %s not supported" % (imgproto))
797
798class VM(qtest.QEMUQtestMachine):
799    '''A QEMU VM'''
800
801    def __init__(self, path_suffix=''):
802        name = "qemu%s-%d" % (path_suffix, os.getpid())
803        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
804        if qemu_gdb and qemu_valgrind:
805            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
806            sys.exit(1)
807        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
808        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
809                         name=name,
810                         base_temp_dir=test_dir,
811                         sock_dir=sock_dir, qmp_timer=timer)
812        self._num_drives = 0
813
814    def _post_shutdown(self) -> None:
815        super()._post_shutdown()
816        if not qemu_valgrind or not self._popen:
817            return
818        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
819        if self.exitcode() == 99:
820            with open(valgrind_filename, encoding='utf-8') as f:
821                print(f.read())
822        else:
823            os.remove(valgrind_filename)
824
825    def _pre_launch(self) -> None:
826        super()._pre_launch()
827        if qemu_print:
828            # set QEMU binary output to stdout
829            self._close_qemu_log_file()
830
831    def add_object(self, opts):
832        self._args.append('-object')
833        self._args.append(opts)
834        return self
835
836    def add_device(self, opts):
837        self._args.append('-device')
838        self._args.append(opts)
839        return self
840
841    def add_drive_raw(self, opts):
842        self._args.append('-drive')
843        self._args.append(opts)
844        return self
845
846    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
847        '''Add a virtio-blk drive to the VM'''
848        options = ['if=%s' % interface,
849                   'id=drive%d' % self._num_drives]
850
851        if path is not None:
852            options.append('file=%s' % path)
853            options.append('format=%s' % img_format)
854            options.append('cache=%s' % cachemode)
855            options.append('aio=%s' % aiomode)
856
857        if opts:
858            options.append(opts)
859
860        if img_format == 'luks' and 'key-secret' not in opts:
861            # default luks support
862            if luks_default_secret_object not in self._args:
863                self.add_object(luks_default_secret_object)
864
865            options.append(luks_default_key_secret_opt)
866
867        self._args.append('-drive')
868        self._args.append(','.join(options))
869        self._num_drives += 1
870        return self
871
872    def add_blockdev(self, opts):
873        self._args.append('-blockdev')
874        if isinstance(opts, str):
875            self._args.append(opts)
876        else:
877            self._args.append(','.join(opts))
878        return self
879
880    def add_incoming(self, addr):
881        self._args.append('-incoming')
882        self._args.append(addr)
883        return self
884
885    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
886        cmd = 'human-monitor-command'
887        kwargs: Dict[str, Any] = {'command-line': command_line}
888        if use_log:
889            return self.qmp_log(cmd, **kwargs)
890        else:
891            return self.qmp(cmd, **kwargs)
892
893    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
894        """Pause drive r/w operations"""
895        if not event:
896            self.pause_drive(drive, "read_aio")
897            self.pause_drive(drive, "write_aio")
898            return
899        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
900
901    def resume_drive(self, drive: str) -> None:
902        """Resume drive r/w operations"""
903        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
904
905    def hmp_qemu_io(self, drive: str, cmd: str,
906                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
907        """Write to a given drive using an HMP command"""
908        d = '-d ' if qdev else ''
909        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
910
911    def flatten_qmp_object(self, obj, output=None, basestr=''):
912        if output is None:
913            output = {}
914        if isinstance(obj, list):
915            for i, item in enumerate(obj):
916                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
917        elif isinstance(obj, dict):
918            for key in obj:
919                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
920        else:
921            output[basestr[:-1]] = obj # Strip trailing '.'
922        return output
923
924    def qmp_to_opts(self, obj):
925        obj = self.flatten_qmp_object(obj)
926        output_list = []
927        for key in obj:
928            output_list += [key + '=' + obj[key]]
929        return ','.join(output_list)
930
931    def get_qmp_events_filtered(self, wait=60.0):
932        result = []
933        for ev in self.get_qmp_events(wait=wait):
934            result.append(filter_qmp_event(ev))
935        return result
936
937    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
938        full_cmd = OrderedDict((
939            ("execute", cmd),
940            ("arguments", ordered_qmp(kwargs))
941        ))
942        log(full_cmd, filters, indent=indent)
943        result = self.qmp(cmd, **kwargs)
944        log(result, filters, indent=indent)
945        return result
946
947    # Returns None on success, and an error string on failure
948    def run_job(self, job: str, auto_finalize: bool = True,
949                auto_dismiss: bool = False,
950                pre_finalize: Optional[Callable[[], None]] = None,
951                cancel: bool = False, wait: float = 60.0,
952                filters: Iterable[Callable[[Any], Any]] = (),
953                ) -> Optional[str]:
954        """
955        run_job moves a job from creation through to dismissal.
956
957        :param job: String. ID of recently-launched job
958        :param auto_finalize: Bool. True if the job was launched with
959                              auto_finalize. Defaults to True.
960        :param auto_dismiss: Bool. True if the job was launched with
961                             auto_dismiss=True. Defaults to False.
962        :param pre_finalize: Callback. A callable that takes no arguments to be
963                             invoked prior to issuing job-finalize, if any.
964        :param cancel: Bool. When true, cancels the job after the pre_finalize
965                       callback.
966        :param wait: Float. Timeout value specifying how long to wait for any
967                     event, in seconds. Defaults to 60.0.
968        """
969        match_device = {'data': {'device': job}}
970        match_id = {'data': {'id': job}}
971        events = [
972            ('BLOCK_JOB_COMPLETED', match_device),
973            ('BLOCK_JOB_CANCELLED', match_device),
974            ('BLOCK_JOB_ERROR', match_device),
975            ('BLOCK_JOB_READY', match_device),
976            ('BLOCK_JOB_PENDING', match_id),
977            ('JOB_STATUS_CHANGE', match_id)
978        ]
979        error = None
980        while True:
981            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
982            if ev['event'] != 'JOB_STATUS_CHANGE':
983                log(ev, filters=filters)
984                continue
985            status = ev['data']['status']
986            if status == 'aborting':
987                result = self.qmp('query-jobs')
988                for j in result['return']:
989                    if j['id'] == job:
990                        error = j['error']
991                        log('Job failed: %s' % (j['error']), filters=filters)
992            elif status == 'ready':
993                self.qmp_log('job-complete', id=job, filters=filters)
994            elif status == 'pending' and not auto_finalize:
995                if pre_finalize:
996                    pre_finalize()
997                if cancel:
998                    self.qmp_log('job-cancel', id=job, filters=filters)
999                else:
1000                    self.qmp_log('job-finalize', id=job, filters=filters)
1001            elif status == 'concluded' and not auto_dismiss:
1002                self.qmp_log('job-dismiss', id=job, filters=filters)
1003            elif status == 'null':
1004                return error
1005
1006    # Returns None on success, and an error string on failure
1007    def blockdev_create(self, options, job_id='job0', filters=None):
1008        if filters is None:
1009            filters = [filter_qmp_testfiles]
1010        result = self.qmp_log('blockdev-create', filters=filters,
1011                              job_id=job_id, options=options)
1012
1013        if 'return' in result:
1014            assert result['return'] == {}
1015            job_result = self.run_job(job_id, filters=filters)
1016        else:
1017            job_result = result['error']
1018
1019        log("")
1020        return job_result
1021
1022    def enable_migration_events(self, name):
1023        log('Enabling migration QMP events on %s...' % name)
1024        log(self.qmp('migrate-set-capabilities', capabilities=[
1025            {
1026                'capability': 'events',
1027                'state': True
1028            }
1029        ]))
1030
1031    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1032        while True:
1033            event = self.event_wait('MIGRATION')
1034            # We use the default timeout, and with a timeout, event_wait()
1035            # never returns None
1036            assert event
1037
1038            log(event, filters=[filter_qmp_event])
1039            if event['data']['status'] in ('completed', 'failed'):
1040                break
1041
1042        if event['data']['status'] == 'completed':
1043            # The event may occur in finish-migrate, so wait for the expected
1044            # post-migration runstate
1045            runstate = None
1046            while runstate != expect_runstate:
1047                runstate = self.qmp('query-status')['return']['status']
1048            return True
1049        else:
1050            return False
1051
1052    def node_info(self, node_name):
1053        nodes = self.qmp('query-named-block-nodes')
1054        for x in nodes['return']:
1055            if x['node-name'] == node_name:
1056                return x
1057        return None
1058
1059    def query_bitmaps(self):
1060        res = self.qmp("query-named-block-nodes")
1061        return {device['node-name']: device['dirty-bitmaps']
1062                for device in res['return'] if 'dirty-bitmaps' in device}
1063
1064    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1065        """
1066        get a specific bitmap from the object returned by query_bitmaps.
1067        :param recording: If specified, filter results by the specified value.
1068        :param bitmaps: If specified, use it instead of call query_bitmaps()
1069        """
1070        if bitmaps is None:
1071            bitmaps = self.query_bitmaps()
1072
1073        for bitmap in bitmaps[node_name]:
1074            if bitmap.get('name', '') == bitmap_name:
1075                if recording is None or bitmap.get('recording') == recording:
1076                    return bitmap
1077        return None
1078
1079    def check_bitmap_status(self, node_name, bitmap_name, fields):
1080        ret = self.get_bitmap(node_name, bitmap_name)
1081
1082        return fields.items() <= ret.items()
1083
1084    def assert_block_path(self, root, path, expected_node, graph=None):
1085        """
1086        Check whether the node under the given path in the block graph
1087        is @expected_node.
1088
1089        @root is the node name of the node where the @path is rooted.
1090
1091        @path is a string that consists of child names separated by
1092        slashes.  It must begin with a slash.
1093
1094        Examples for @root + @path:
1095          - root="qcow2-node", path="/backing/file"
1096          - root="quorum-node", path="/children.2/file"
1097
1098        Hypothetically, @path could be empty, in which case it would
1099        point to @root.  However, in practice this case is not useful
1100        and hence not allowed.
1101
1102        @expected_node may be None.  (All elements of the path but the
1103        leaf must still exist.)
1104
1105        @graph may be None or the result of an x-debug-query-block-graph
1106        call that has already been performed.
1107        """
1108        if graph is None:
1109            graph = self.qmp('x-debug-query-block-graph')['return']
1110
1111        iter_path = iter(path.split('/'))
1112
1113        # Must start with a /
1114        assert next(iter_path) == ''
1115
1116        node = next((node for node in graph['nodes'] if node['name'] == root),
1117                    None)
1118
1119        # An empty @path is not allowed, so the root node must be present
1120        assert node is not None, 'Root node %s not found' % root
1121
1122        for child_name in iter_path:
1123            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1124
1125            try:
1126                node_id = next(edge['child'] for edge in graph['edges']
1127                               if (edge['parent'] == node['id'] and
1128                                   edge['name'] == child_name))
1129
1130                node = next(node for node in graph['nodes']
1131                            if node['id'] == node_id)
1132
1133            except StopIteration:
1134                node = None
1135
1136        if node is None:
1137            assert expected_node is None, \
1138                   'No node found under %s (but expected %s)' % \
1139                   (path, expected_node)
1140        else:
1141            assert node['name'] == expected_node, \
1142                   'Found node %s under %s (but expected %s)' % \
1143                   (node['name'], path, expected_node)
1144
1145index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1146
1147class QMPTestCase(unittest.TestCase):
1148    '''Abstract base class for QMP test cases'''
1149
1150    def __init__(self, *args, **kwargs):
1151        super().__init__(*args, **kwargs)
1152        # Many users of this class set a VM property we rely on heavily
1153        # in the methods below.
1154        self.vm = None
1155
1156    def dictpath(self, d, path):
1157        '''Traverse a path in a nested dict'''
1158        for component in path.split('/'):
1159            m = index_re.match(component)
1160            if m:
1161                component, idx = m.groups()
1162                idx = int(idx)
1163
1164            if not isinstance(d, dict) or component not in d:
1165                self.fail(f'failed path traversal for "{path}" in "{d}"')
1166            d = d[component]
1167
1168            if m:
1169                if not isinstance(d, list):
1170                    self.fail(f'path component "{component}" in "{path}" '
1171                              f'is not a list in "{d}"')
1172                try:
1173                    d = d[idx]
1174                except IndexError:
1175                    self.fail(f'invalid index "{idx}" in path "{path}" '
1176                              f'in "{d}"')
1177        return d
1178
1179    def assert_qmp_absent(self, d, path):
1180        try:
1181            result = self.dictpath(d, path)
1182        except AssertionError:
1183            return
1184        self.fail('path "%s" has value "%s"' % (path, str(result)))
1185
1186    def assert_qmp(self, d, path, value):
1187        '''Assert that the value for a specific path in a QMP dict
1188           matches.  When given a list of values, assert that any of
1189           them matches.'''
1190
1191        result = self.dictpath(d, path)
1192
1193        # [] makes no sense as a list of valid values, so treat it as
1194        # an actual single value.
1195        if isinstance(value, list) and value != []:
1196            for v in value:
1197                if result == v:
1198                    return
1199            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1200        else:
1201            self.assertEqual(result, value,
1202                             '"%s" is "%s", expected "%s"'
1203                             % (path, str(result), str(value)))
1204
1205    def assert_no_active_block_jobs(self):
1206        result = self.vm.qmp('query-block-jobs')
1207        self.assert_qmp(result, 'return', [])
1208
1209    def assert_has_block_node(self, node_name=None, file_name=None):
1210        """Issue a query-named-block-nodes and assert node_name and/or
1211        file_name is present in the result"""
1212        def check_equal_or_none(a, b):
1213            return a is None or b is None or a == b
1214        assert node_name or file_name
1215        result = self.vm.qmp('query-named-block-nodes')
1216        for x in result["return"]:
1217            if check_equal_or_none(x.get("node-name"), node_name) and \
1218                    check_equal_or_none(x.get("file"), file_name):
1219                return
1220        self.fail("Cannot find %s %s in result:\n%s" %
1221                  (node_name, file_name, result))
1222
1223    def assert_json_filename_equal(self, json_filename, reference):
1224        '''Asserts that the given filename is a json: filename and that its
1225           content is equal to the given reference object'''
1226        self.assertEqual(json_filename[:5], 'json:')
1227        self.assertEqual(
1228            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1229            self.vm.flatten_qmp_object(reference)
1230        )
1231
1232    def cancel_and_wait(self, drive='drive0', force=False,
1233                        resume=False, wait=60.0):
1234        '''Cancel a block job and wait for it to finish, returning the event'''
1235        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1236        self.assert_qmp(result, 'return', {})
1237
1238        if resume:
1239            self.vm.resume_drive(drive)
1240
1241        cancelled = False
1242        result = None
1243        while not cancelled:
1244            for event in self.vm.get_qmp_events(wait=wait):
1245                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1246                   event['event'] == 'BLOCK_JOB_CANCELLED':
1247                    self.assert_qmp(event, 'data/device', drive)
1248                    result = event
1249                    cancelled = True
1250                elif event['event'] == 'JOB_STATUS_CHANGE':
1251                    self.assert_qmp(event, 'data/id', drive)
1252
1253
1254        self.assert_no_active_block_jobs()
1255        return result
1256
1257    def wait_until_completed(self, drive='drive0', check_offset=True,
1258                             wait=60.0, error=None):
1259        '''Wait for a block job to finish, returning the event'''
1260        while True:
1261            for event in self.vm.get_qmp_events(wait=wait):
1262                if event['event'] == 'BLOCK_JOB_COMPLETED':
1263                    self.assert_qmp(event, 'data/device', drive)
1264                    if error is None:
1265                        self.assert_qmp_absent(event, 'data/error')
1266                        if check_offset:
1267                            self.assert_qmp(event, 'data/offset',
1268                                            event['data']['len'])
1269                    else:
1270                        self.assert_qmp(event, 'data/error', error)
1271                    self.assert_no_active_block_jobs()
1272                    return event
1273                if event['event'] == 'JOB_STATUS_CHANGE':
1274                    self.assert_qmp(event, 'data/id', drive)
1275
1276    def wait_ready(self, drive='drive0'):
1277        """Wait until a BLOCK_JOB_READY event, and return the event."""
1278        return self.vm.events_wait([
1279            ('BLOCK_JOB_READY',
1280             {'data': {'type': 'mirror', 'device': drive}}),
1281            ('BLOCK_JOB_READY',
1282             {'data': {'type': 'commit', 'device': drive}})
1283        ])
1284
1285    def wait_ready_and_cancel(self, drive='drive0'):
1286        self.wait_ready(drive=drive)
1287        event = self.cancel_and_wait(drive=drive)
1288        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1289        self.assert_qmp(event, 'data/type', 'mirror')
1290        self.assert_qmp(event, 'data/offset', event['data']['len'])
1291
1292    def complete_and_wait(self, drive='drive0', wait_ready=True,
1293                          completion_error=None):
1294        '''Complete a block job and wait for it to finish'''
1295        if wait_ready:
1296            self.wait_ready(drive=drive)
1297
1298        result = self.vm.qmp('block-job-complete', device=drive)
1299        self.assert_qmp(result, 'return', {})
1300
1301        event = self.wait_until_completed(drive=drive, error=completion_error)
1302        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1303
1304    def pause_wait(self, job_id='job0'):
1305        with Timeout(3, "Timeout waiting for job to pause"):
1306            while True:
1307                result = self.vm.qmp('query-block-jobs')
1308                found = False
1309                for job in result['return']:
1310                    if job['device'] == job_id:
1311                        found = True
1312                        if job['paused'] and not job['busy']:
1313                            return job
1314                        break
1315                assert found
1316
1317    def pause_job(self, job_id='job0', wait=True):
1318        result = self.vm.qmp('block-job-pause', device=job_id)
1319        self.assert_qmp(result, 'return', {})
1320        if wait:
1321            return self.pause_wait(job_id)
1322        return result
1323
1324    def case_skip(self, reason):
1325        '''Skip this test case'''
1326        case_notrun(reason)
1327        self.skipTest(reason)
1328
1329
1330def notrun(reason):
1331    '''Skip this test suite'''
1332    # Each test in qemu-iotests has a number ("seq")
1333    seq = os.path.basename(sys.argv[0])
1334
1335    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1336            as outfile:
1337        outfile.write(reason + '\n')
1338    logger.warning("%s not run: %s", seq, reason)
1339    sys.exit(0)
1340
1341def case_notrun(reason):
1342    '''Mark this test case as not having been run (without actually
1343    skipping it, that is left to the caller).  See
1344    QMPTestCase.case_skip() for a variant that actually skips the
1345    current test case.'''
1346
1347    # Each test in qemu-iotests has a number ("seq")
1348    seq = os.path.basename(sys.argv[0])
1349
1350    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1351            as outfile:
1352        outfile.write('    [case not run] ' + reason + '\n')
1353
1354def _verify_image_format(supported_fmts: Sequence[str] = (),
1355                         unsupported_fmts: Sequence[str] = ()) -> None:
1356    if 'generic' in supported_fmts and \
1357            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1358        # similar to
1359        #   _supported_fmt generic
1360        # for bash tests
1361        supported_fmts = ()
1362
1363    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1364    if not_sup or (imgfmt in unsupported_fmts):
1365        notrun('not suitable for this image format: %s' % imgfmt)
1366
1367    if imgfmt == 'luks':
1368        verify_working_luks()
1369
1370def _verify_protocol(supported: Sequence[str] = (),
1371                     unsupported: Sequence[str] = ()) -> None:
1372    assert not (supported and unsupported)
1373
1374    if 'generic' in supported:
1375        return
1376
1377    not_sup = supported and (imgproto not in supported)
1378    if not_sup or (imgproto in unsupported):
1379        notrun('not suitable for this protocol: %s' % imgproto)
1380
1381def _verify_platform(supported: Sequence[str] = (),
1382                     unsupported: Sequence[str] = ()) -> None:
1383    if any((sys.platform.startswith(x) for x in unsupported)):
1384        notrun('not suitable for this OS: %s' % sys.platform)
1385
1386    if supported:
1387        if not any((sys.platform.startswith(x) for x in supported)):
1388            notrun('not suitable for this OS: %s' % sys.platform)
1389
1390def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1391    if supported_cache_modes and (cachemode not in supported_cache_modes):
1392        notrun('not suitable for this cache mode: %s' % cachemode)
1393
1394def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1395    if supported_aio_modes and (aiomode not in supported_aio_modes):
1396        notrun('not suitable for this aio mode: %s' % aiomode)
1397
1398def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1399    usf_list = list(set(required_formats) - set(supported_formats()))
1400    if usf_list:
1401        notrun(f'formats {usf_list} are not whitelisted')
1402
1403
1404def _verify_virtio_blk() -> None:
1405    out = qemu_pipe('-M', 'none', '-device', 'help')
1406    if 'virtio-blk' not in out:
1407        notrun('Missing virtio-blk in QEMU binary')
1408
1409def _verify_virtio_scsi_pci_or_ccw() -> None:
1410    out = qemu_pipe('-M', 'none', '-device', 'help')
1411    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1412        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1413
1414
1415def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1416    imgopts = os.environ.get('IMGOPTS')
1417    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1418    # but it supported only for bash tests. We don't have a concept of global
1419    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1420    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1421    unsup = list(unsupported) + ['$']
1422    if imgopts and any(x in imgopts for x in unsup):
1423        notrun(f'not suitable for this imgopts: {imgopts}')
1424
1425
1426def supports_quorum() -> bool:
1427    return 'quorum' in qemu_img('--help').stdout
1428
1429def verify_quorum():
1430    '''Skip test suite if quorum support is not available'''
1431    if not supports_quorum():
1432        notrun('quorum support missing')
1433
1434def has_working_luks() -> Tuple[bool, str]:
1435    """
1436    Check whether our LUKS driver can actually create images
1437    (this extends to LUKS encryption for qcow2).
1438
1439    If not, return the reason why.
1440    """
1441
1442    img_file = f'{test_dir}/luks-test.luks'
1443    res = qemu_img('create', '-f', 'luks',
1444                   '--object', luks_default_secret_object,
1445                   '-o', luks_default_key_secret_opt,
1446                   '-o', 'iter-time=10',
1447                   img_file, '1G',
1448                   check=False)
1449    try:
1450        os.remove(img_file)
1451    except OSError:
1452        pass
1453
1454    if res.returncode:
1455        reason = res.stdout
1456        for line in res.stdout.splitlines():
1457            if img_file + ':' in line:
1458                reason = line.split(img_file + ':', 1)[1].strip()
1459                break
1460
1461        return (False, reason)
1462    else:
1463        return (True, '')
1464
1465def verify_working_luks():
1466    """
1467    Skip test suite if LUKS does not work
1468    """
1469    (working, reason) = has_working_luks()
1470    if not working:
1471        notrun(reason)
1472
1473def supports_qcow2_zstd_compression() -> bool:
1474    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1475    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1476                   img_file, '0',
1477                   check=False)
1478    try:
1479        os.remove(img_file)
1480    except OSError:
1481        pass
1482
1483    if res.returncode == 1 and \
1484            "'compression-type' does not accept value 'zstd'" in res.stdout:
1485        return False
1486    else:
1487        return True
1488
1489def verify_qcow2_zstd_compression():
1490    if not supports_qcow2_zstd_compression():
1491        notrun('zstd compression not supported')
1492
1493def qemu_pipe(*args: str) -> str:
1494    """
1495    Run qemu with an option to print something and exit (e.g. a help option).
1496
1497    :return: QEMU's stdout output.
1498    """
1499    full_args = [qemu_prog] + qemu_opts + list(args)
1500    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1501    return output
1502
1503def supported_formats(read_only=False):
1504    '''Set 'read_only' to True to check ro-whitelist
1505       Otherwise, rw-whitelist is checked'''
1506
1507    if not hasattr(supported_formats, "formats"):
1508        supported_formats.formats = {}
1509
1510    if read_only not in supported_formats.formats:
1511        format_message = qemu_pipe("-drive", "format=help")
1512        line = 1 if read_only else 0
1513        supported_formats.formats[read_only] = \
1514            format_message.splitlines()[line].split(":")[1].split()
1515
1516    return supported_formats.formats[read_only]
1517
1518def skip_if_unsupported(required_formats=(), read_only=False):
1519    '''Skip Test Decorator
1520       Runs the test if all the required formats are whitelisted'''
1521    def skip_test_decorator(func):
1522        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1523                         **kwargs: Dict[str, Any]) -> None:
1524            if callable(required_formats):
1525                fmts = required_formats(test_case)
1526            else:
1527                fmts = required_formats
1528
1529            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1530            if usf_list:
1531                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1532                test_case.case_skip(msg)
1533            else:
1534                func(test_case, *args, **kwargs)
1535        return func_wrapper
1536    return skip_test_decorator
1537
1538def skip_for_formats(formats: Sequence[str] = ()) \
1539    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1540                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1541    '''Skip Test Decorator
1542       Skips the test for the given formats'''
1543    def skip_test_decorator(func):
1544        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1545                         **kwargs: Dict[str, Any]) -> None:
1546            if imgfmt in formats:
1547                msg = f'{test_case}: Skipped for format {imgfmt}'
1548                test_case.case_skip(msg)
1549            else:
1550                func(test_case, *args, **kwargs)
1551        return func_wrapper
1552    return skip_test_decorator
1553
1554def skip_if_user_is_root(func):
1555    '''Skip Test Decorator
1556       Runs the test only without root permissions'''
1557    def func_wrapper(*args, **kwargs):
1558        if os.getuid() == 0:
1559            case_notrun('{}: cannot be run as root'.format(args[0]))
1560            return None
1561        else:
1562            return func(*args, **kwargs)
1563    return func_wrapper
1564
1565# We need to filter out the time taken from the output so that
1566# qemu-iotest can reliably diff the results against master output,
1567# and hide skipped tests from the reference output.
1568
1569class ReproducibleTestResult(unittest.TextTestResult):
1570    def addSkip(self, test, reason):
1571        # Same as TextTestResult, but print dot instead of "s"
1572        unittest.TestResult.addSkip(self, test, reason)
1573        if self.showAll:
1574            self.stream.writeln("skipped {0!r}".format(reason))
1575        elif self.dots:
1576            self.stream.write(".")
1577            self.stream.flush()
1578
1579class ReproducibleStreamWrapper:
1580    def __init__(self, stream: TextIO):
1581        self.stream = stream
1582
1583    def __getattr__(self, attr):
1584        if attr in ('stream', '__getstate__'):
1585            raise AttributeError(attr)
1586        return getattr(self.stream, attr)
1587
1588    def write(self, arg=None):
1589        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1590        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1591        self.stream.write(arg)
1592
1593class ReproducibleTestRunner(unittest.TextTestRunner):
1594    def __init__(self, stream: Optional[TextIO] = None,
1595                 resultclass: Type[unittest.TestResult] =
1596                 ReproducibleTestResult,
1597                 **kwargs: Any) -> None:
1598        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1599        super().__init__(stream=rstream,           # type: ignore
1600                         descriptions=True,
1601                         resultclass=resultclass,
1602                         **kwargs)
1603
1604def execute_unittest(argv: List[str], debug: bool = False) -> None:
1605    """Executes unittests within the calling module."""
1606
1607    # Some tests have warnings, especially ResourceWarnings for unclosed
1608    # files and sockets.  Ignore them for now to ensure reproducibility of
1609    # the test output.
1610    unittest.main(argv=argv,
1611                  testRunner=ReproducibleTestRunner,
1612                  verbosity=2 if debug else 1,
1613                  warnings=None if sys.warnoptions else 'ignore')
1614
1615def execute_setup_common(supported_fmts: Sequence[str] = (),
1616                         supported_platforms: Sequence[str] = (),
1617                         supported_cache_modes: Sequence[str] = (),
1618                         supported_aio_modes: Sequence[str] = (),
1619                         unsupported_fmts: Sequence[str] = (),
1620                         supported_protocols: Sequence[str] = (),
1621                         unsupported_protocols: Sequence[str] = (),
1622                         required_fmts: Sequence[str] = (),
1623                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1624    """
1625    Perform necessary setup for either script-style or unittest-style tests.
1626
1627    :return: Bool; Whether or not debug mode has been requested via the CLI.
1628    """
1629    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1630
1631    debug = '-d' in sys.argv
1632    if debug:
1633        sys.argv.remove('-d')
1634    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1635
1636    _verify_image_format(supported_fmts, unsupported_fmts)
1637    _verify_protocol(supported_protocols, unsupported_protocols)
1638    _verify_platform(supported=supported_platforms)
1639    _verify_cache_mode(supported_cache_modes)
1640    _verify_aio_mode(supported_aio_modes)
1641    _verify_formats(required_fmts)
1642    _verify_virtio_blk()
1643    _verify_imgopts(unsupported_imgopts)
1644
1645    return debug
1646
1647def execute_test(*args, test_function=None, **kwargs):
1648    """Run either unittest or script-style tests."""
1649
1650    debug = execute_setup_common(*args, **kwargs)
1651    if not test_function:
1652        execute_unittest(sys.argv, debug)
1653    else:
1654        test_function()
1655
1656def activate_logging():
1657    """Activate iotests.log() output to stdout for script-style tests."""
1658    handler = logging.StreamHandler(stream=sys.stdout)
1659    formatter = logging.Formatter('%(message)s')
1660    handler.setFormatter(formatter)
1661    test_logger.addHandler(handler)
1662    test_logger.setLevel(logging.INFO)
1663    test_logger.propagate = False
1664
1665# This is called from script-style iotests without a single point of entry
1666def script_initialize(*args, **kwargs):
1667    """Initialize script-style tests without running any tests."""
1668    activate_logging()
1669    execute_setup_common(*args, **kwargs)
1670
1671# This is called from script-style iotests with a single point of entry
1672def script_main(test_function, *args, **kwargs):
1673    """Run script-style tests outside of the unittest framework"""
1674    activate_logging()
1675    execute_test(*args, test_function=test_function, **kwargs)
1676
1677# This is called from unittest style iotests
1678def main(*args, **kwargs):
1679    """Run tests using the unittest framework"""
1680    execute_test(*args, **kwargs)
1681