xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 9951ba94)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp.legacy import QMPMessage, QEMUMonitorProtocol
42from qemu.utils import VerboseProcessError
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209
210def qemu_tool(*args: str, check: bool = True, combine_stdio: bool = True
211              ) -> 'subprocess.CompletedProcess[str]':
212    """
213    Run a qemu tool and return its status code and console output.
214
215    :param args: full command line to run.
216    :param check: Enforce a return code of zero.
217    :param combine_stdio: set to False to keep stdout/stderr separated.
218
219    :raise VerboseProcessError:
220        When the return code is negative, or on any non-zero exit code
221        when 'check=True' was provided (the default). This exception has
222        'stdout', 'stderr', and 'returncode' properties that may be
223        inspected to show greater detail. If this exception is not
224        handled, the command-line, return code, and all console output
225        will be included at the bottom of the stack trace.
226
227    :return:
228        a CompletedProcess. This object has args, returncode, and stdout
229        properties. If streams are not combined, it will also have a
230        stderr property.
231    """
232    subp = subprocess.run(
233        args,
234        stdout=subprocess.PIPE,
235        stderr=subprocess.STDOUT if combine_stdio else subprocess.PIPE,
236        universal_newlines=True,
237        check=False
238    )
239
240    if check and subp.returncode or (subp.returncode < 0):
241        raise VerboseProcessError(
242            subp.returncode, args,
243            output=subp.stdout,
244            stderr=subp.stderr,
245        )
246
247    return subp
248
249
250def qemu_img(*args: str, check: bool = True, combine_stdio: bool = True
251             ) -> 'subprocess.CompletedProcess[str]':
252    """
253    Run QEMU_IMG_PROG and return its status code and console output.
254
255    This function always prepends QEMU_IMG_OPTIONS and may further alter
256    the args for 'create' commands.
257
258    See `qemu_tool()` for greater detail.
259    """
260    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
261    return qemu_tool(*full_args, check=check, combine_stdio=combine_stdio)
262
263
264def ordered_qmp(qmsg, conv_keys=True):
265    # Dictionaries are not ordered prior to 3.6, therefore:
266    if isinstance(qmsg, list):
267        return [ordered_qmp(atom) for atom in qmsg]
268    if isinstance(qmsg, dict):
269        od = OrderedDict()
270        for k, v in sorted(qmsg.items()):
271            if conv_keys:
272                k = k.replace('_', '-')
273            od[k] = ordered_qmp(v, conv_keys=False)
274        return od
275    return qmsg
276
277def qemu_img_create(*args: str) -> 'subprocess.CompletedProcess[str]':
278    return qemu_img('create', *args)
279
280def qemu_img_json(*args: str) -> Any:
281    """
282    Run qemu-img and return its output as deserialized JSON.
283
284    :raise CalledProcessError:
285        When qemu-img crashes, or returns a non-zero exit code without
286        producing a valid JSON document to stdout.
287    :raise JSONDecoderError:
288        When qemu-img returns 0, but failed to produce a valid JSON document.
289
290    :return: A deserialized JSON object; probably a dict[str, Any].
291    """
292    try:
293        res = qemu_img(*args, combine_stdio=False)
294    except subprocess.CalledProcessError as exc:
295        # Terminated due to signal. Don't bother.
296        if exc.returncode < 0:
297            raise
298
299        # Commands like 'check' can return failure (exit codes 2 and 3)
300        # to indicate command completion, but with errors found. For
301        # multi-command flexibility, ignore the exact error codes and
302        # *try* to load JSON.
303        try:
304            return json.loads(exc.stdout)
305        except json.JSONDecodeError:
306            # Nope. This thing is toast. Raise the /process/ error.
307            pass
308        raise
309
310    return json.loads(res.stdout)
311
312def qemu_img_measure(*args: str) -> Any:
313    return qemu_img_json("measure", "--output", "json", *args)
314
315def qemu_img_check(*args: str) -> Any:
316    return qemu_img_json("check", "--output", "json", *args)
317
318def qemu_img_info(*args: str) -> Any:
319    return qemu_img_json('info', "--output", "json", *args)
320
321def qemu_img_map(*args: str) -> Any:
322    return qemu_img_json('map', "--output", "json", *args)
323
324def qemu_img_log(*args: str, check: bool = True
325                 ) -> 'subprocess.CompletedProcess[str]':
326    result = qemu_img(*args, check=check)
327    log(result.stdout, filters=[filter_testfiles])
328    return result
329
330def img_info_log(filename: str, filter_path: Optional[str] = None,
331                 use_image_opts: bool = False, extra_args: Sequence[str] = (),
332                 check: bool = True,
333                 ) -> None:
334    args = ['info']
335    if use_image_opts:
336        args.append('--image-opts')
337    else:
338        args += ['-f', imgfmt]
339    args += extra_args
340    args.append(filename)
341
342    output = qemu_img(*args, check=check).stdout
343    if not filter_path:
344        filter_path = filename
345    log(filter_img_info(output, filter_path))
346
347def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
348    if '-f' in args or '--image-opts' in args:
349        return qemu_io_args_no_fmt + list(args)
350    else:
351        return qemu_io_args + list(args)
352
353def qemu_io_popen(*args):
354    return qemu_tool_popen(qemu_io_wrap_args(args))
355
356def qemu_io(*args: str, check: bool = True, combine_stdio: bool = True
357            ) -> 'subprocess.CompletedProcess[str]':
358    """
359    Run QEMU_IO_PROG and return the status code and console output.
360
361    This function always prepends either QEMU_IO_OPTIONS or
362    QEMU_IO_OPTIONS_NO_FMT.
363    """
364    return qemu_tool(*qemu_io_wrap_args(args),
365                     check=check, combine_stdio=combine_stdio)
366
367def qemu_io_log(*args: str, check: bool = True
368                ) -> 'subprocess.CompletedProcess[str]':
369    result = qemu_io(*args, check=check)
370    log(result.stdout, filters=[filter_testfiles, filter_qemu_io])
371    return result
372
373class QemuIoInteractive:
374    def __init__(self, *args):
375        self.args = qemu_io_wrap_args(args)
376        # We need to keep the Popen objext around, and not
377        # close it immediately. Therefore, disable the pylint check:
378        # pylint: disable=consider-using-with
379        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
380                                   stdout=subprocess.PIPE,
381                                   stderr=subprocess.STDOUT,
382                                   universal_newlines=True)
383        out = self._p.stdout.read(9)
384        if out != 'qemu-io> ':
385            # Most probably qemu-io just failed to start.
386            # Let's collect the whole output and exit.
387            out += self._p.stdout.read()
388            self._p.wait(timeout=1)
389            raise ValueError(out)
390
391    def close(self):
392        self._p.communicate('q\n')
393
394    def _read_output(self):
395        pattern = 'qemu-io> '
396        n = len(pattern)
397        pos = 0
398        s = []
399        while pos != n:
400            c = self._p.stdout.read(1)
401            # check unexpected EOF
402            assert c != ''
403            s.append(c)
404            if c == pattern[pos]:
405                pos += 1
406            else:
407                pos = 0
408
409        return ''.join(s[:-n])
410
411    def cmd(self, cmd):
412        # quit command is in close(), '\n' is added automatically
413        assert '\n' not in cmd
414        cmd = cmd.strip()
415        assert cmd not in ('q', 'quit')
416        self._p.stdin.write(cmd + '\n')
417        self._p.stdin.flush()
418        return self._read_output()
419
420
421class QemuStorageDaemon:
422    _qmp: Optional[QEMUMonitorProtocol] = None
423    _qmpsock: Optional[str] = None
424    # Python < 3.8 would complain if this type were not a string literal
425    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426    _p: 'Optional[subprocess.Popen[bytes]]' = None
427
428    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
429        assert '--pidfile' not in args
430        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
431        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
432
433        if qmp:
434            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
435            all_args += ['--chardev',
436                         f'socket,id=qmp-sock,path={self._qmpsock}',
437                         '--monitor', 'qmp-sock']
438
439            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
440
441        # Cannot use with here, we want the subprocess to stay around
442        # pylint: disable=consider-using-with
443        self._p = subprocess.Popen(all_args)
444        if self._qmp is not None:
445            self._qmp.accept()
446        while not os.path.exists(self.pidfile):
447            if self._p.poll() is not None:
448                cmd = ' '.join(all_args)
449                raise RuntimeError(
450                    'qemu-storage-daemon terminated with exit code ' +
451                    f'{self._p.returncode}: {cmd}')
452
453            time.sleep(0.01)
454
455        with open(self.pidfile, encoding='utf-8') as f:
456            self._pid = int(f.read().strip())
457
458        assert self._pid == self._p.pid
459
460    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
461            -> QMPMessage:
462        assert self._qmp is not None
463        return self._qmp.cmd(cmd, args)
464
465    def stop(self, kill_signal=15):
466        self._p.send_signal(kill_signal)
467        self._p.wait()
468        self._p = None
469
470        if self._qmp:
471            self._qmp.close()
472
473        if self._qmpsock is not None:
474            try:
475                os.remove(self._qmpsock)
476            except OSError:
477                pass
478        try:
479            os.remove(self.pidfile)
480        except OSError:
481            pass
482
483    def __del__(self):
484        if self._p is not None:
485            self.stop(kill_signal=9)
486
487
488def qemu_nbd(*args):
489    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
490    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
491
492def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
493    '''Run qemu-nbd in daemon mode and return both the parent's exit code
494       and its output in case of an error'''
495    full_args = qemu_nbd_args + ['--fork'] + list(args)
496    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
497                                                   connect_stderr=False)
498    return returncode, output if returncode else ''
499
500def qemu_nbd_list_log(*args: str) -> str:
501    '''Run qemu-nbd to list remote exports'''
502    full_args = [qemu_nbd_prog, '-L'] + list(args)
503    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
504    log(output, filters=[filter_testfiles, filter_nbd_exports])
505    return output
506
507@contextmanager
508def qemu_nbd_popen(*args):
509    '''Context manager running qemu-nbd within the context'''
510    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
511
512    assert not os.path.exists(pid_file)
513
514    cmd = list(qemu_nbd_args)
515    cmd.extend(('--persistent', '--pid-file', pid_file))
516    cmd.extend(args)
517
518    log('Start NBD server')
519    with subprocess.Popen(cmd) as p:
520        try:
521            while not os.path.exists(pid_file):
522                if p.poll() is not None:
523                    raise RuntimeError(
524                        "qemu-nbd terminated with exit code {}: {}"
525                        .format(p.returncode, ' '.join(cmd)))
526
527                time.sleep(0.01)
528            yield
529        finally:
530            if os.path.exists(pid_file):
531                os.remove(pid_file)
532            log('Kill NBD server')
533            p.kill()
534            p.wait()
535
536def compare_images(img1: str, img2: str,
537                   fmt1: str = imgfmt, fmt2: str = imgfmt) -> bool:
538    """
539    Compare two images with QEMU_IMG; return True if they are identical.
540
541    :raise CalledProcessError:
542        when qemu-img crashes or returns a status code of anything other
543        than 0 (identical) or 1 (different).
544    """
545    try:
546        qemu_img('compare', '-f', fmt1, '-F', fmt2, img1, img2)
547        return True
548    except subprocess.CalledProcessError as exc:
549        if exc.returncode == 1:
550            return False
551        raise
552
553def create_image(name, size):
554    '''Create a fully-allocated raw image with sector markers'''
555    with open(name, 'wb') as file:
556        i = 0
557        while i < size:
558            sector = struct.pack('>l504xl', i // 512, i // 512)
559            file.write(sector)
560            i = i + 512
561
562def image_size(img: str) -> int:
563    """Return image's virtual size"""
564    value = qemu_img_info('-f', imgfmt, img)['virtual-size']
565    if not isinstance(value, int):
566        type_name = type(value).__name__
567        raise TypeError("Expected 'int' for 'virtual-size', "
568                        f"got '{value}' of type '{type_name}'")
569    return value
570
571def is_str(val):
572    return isinstance(val, str)
573
574test_dir_re = re.compile(r"%s" % test_dir)
575def filter_test_dir(msg):
576    return test_dir_re.sub("TEST_DIR", msg)
577
578win32_re = re.compile(r"\r")
579def filter_win32(msg):
580    return win32_re.sub("", msg)
581
582qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
583                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
584                        r"and [0-9\/.inf]* ops\/sec\)")
585def filter_qemu_io(msg):
586    msg = filter_win32(msg)
587    return qemu_io_re.sub("X ops; XX:XX:XX.X "
588                          "(XXX YYY/sec and XXX ops/sec)", msg)
589
590chown_re = re.compile(r"chown [0-9]+:[0-9]+")
591def filter_chown(msg):
592    return chown_re.sub("chown UID:GID", msg)
593
594def filter_qmp_event(event):
595    '''Filter a QMP event dict'''
596    event = dict(event)
597    if 'timestamp' in event:
598        event['timestamp']['seconds'] = 'SECS'
599        event['timestamp']['microseconds'] = 'USECS'
600    return event
601
602def filter_qmp(qmsg, filter_fn):
603    '''Given a string filter, filter a QMP object's values.
604    filter_fn takes a (key, value) pair.'''
605    # Iterate through either lists or dicts;
606    if isinstance(qmsg, list):
607        items = enumerate(qmsg)
608    elif isinstance(qmsg, dict):
609        items = qmsg.items()
610    else:
611        return filter_fn(None, qmsg)
612
613    for k, v in items:
614        if isinstance(v, (dict, list)):
615            qmsg[k] = filter_qmp(v, filter_fn)
616        else:
617            qmsg[k] = filter_fn(k, v)
618    return qmsg
619
620def filter_testfiles(msg):
621    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
622    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
623    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
624
625def filter_qmp_testfiles(qmsg):
626    def _filter(_key, value):
627        if is_str(value):
628            return filter_testfiles(value)
629        return value
630    return filter_qmp(qmsg, _filter)
631
632def filter_virtio_scsi(output: str) -> str:
633    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
634
635def filter_qmp_virtio_scsi(qmsg):
636    def _filter(_key, value):
637        if is_str(value):
638            return filter_virtio_scsi(value)
639        return value
640    return filter_qmp(qmsg, _filter)
641
642def filter_generated_node_ids(msg):
643    return re.sub("#block[0-9]+", "NODE_NAME", msg)
644
645def filter_img_info(output, filename):
646    lines = []
647    for line in output.split('\n'):
648        if 'disk size' in line or 'actual-size' in line:
649            continue
650        line = line.replace(filename, 'TEST_IMG')
651        line = filter_testfiles(line)
652        line = line.replace(imgfmt, 'IMGFMT')
653        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
654        line = re.sub('uuid: [-a-f0-9]+',
655                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
656                      line)
657        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
658        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
659                      line)
660        lines.append(line)
661    return '\n'.join(lines)
662
663def filter_imgfmt(msg):
664    return msg.replace(imgfmt, 'IMGFMT')
665
666def filter_qmp_imgfmt(qmsg):
667    def _filter(_key, value):
668        if is_str(value):
669            return filter_imgfmt(value)
670        return value
671    return filter_qmp(qmsg, _filter)
672
673def filter_nbd_exports(output: str) -> str:
674    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
675
676
677Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
678
679def log(msg: Msg,
680        filters: Iterable[Callable[[Msg], Msg]] = (),
681        indent: Optional[int] = None) -> None:
682    """
683    Logs either a string message or a JSON serializable message (like QMP).
684    If indent is provided, JSON serializable messages are pretty-printed.
685    """
686    for flt in filters:
687        msg = flt(msg)
688    if isinstance(msg, (dict, list)):
689        # Don't sort if it's already sorted
690        do_sort = not isinstance(msg, OrderedDict)
691        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
692    else:
693        test_logger.info(msg)
694
695class Timeout:
696    def __init__(self, seconds, errmsg="Timeout"):
697        self.seconds = seconds
698        self.errmsg = errmsg
699    def __enter__(self):
700        if qemu_gdb or qemu_valgrind:
701            return self
702        signal.signal(signal.SIGALRM, self.timeout)
703        signal.setitimer(signal.ITIMER_REAL, self.seconds)
704        return self
705    def __exit__(self, exc_type, value, traceback):
706        if qemu_gdb or qemu_valgrind:
707            return False
708        signal.setitimer(signal.ITIMER_REAL, 0)
709        return False
710    def timeout(self, signum, frame):
711        raise Exception(self.errmsg)
712
713def file_pattern(name):
714    return "{0}-{1}".format(os.getpid(), name)
715
716class FilePath:
717    """
718    Context manager generating multiple file names. The generated files are
719    removed when exiting the context.
720
721    Example usage:
722
723        with FilePath('a.img', 'b.img') as (img_a, img_b):
724            # Use img_a and img_b here...
725
726        # a.img and b.img are automatically removed here.
727
728    By default images are created in iotests.test_dir. To create sockets use
729    iotests.sock_dir:
730
731       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
732
733    For convenience, calling with one argument yields a single file instead of
734    a tuple with one item.
735
736    """
737    def __init__(self, *names, base_dir=test_dir):
738        self.paths = [os.path.join(base_dir, file_pattern(name))
739                      for name in names]
740
741    def __enter__(self):
742        if len(self.paths) == 1:
743            return self.paths[0]
744        else:
745            return self.paths
746
747    def __exit__(self, exc_type, exc_val, exc_tb):
748        for path in self.paths:
749            try:
750                os.remove(path)
751            except OSError:
752                pass
753        return False
754
755
756def try_remove(img):
757    try:
758        os.remove(img)
759    except OSError:
760        pass
761
762def file_path_remover():
763    for path in reversed(file_path_remover.paths):
764        try_remove(path)
765
766
767def file_path(*names, base_dir=test_dir):
768    ''' Another way to get auto-generated filename that cleans itself up.
769
770    Use is as simple as:
771
772    img_a, img_b = file_path('a.img', 'b.img')
773    sock = file_path('socket')
774    '''
775
776    if not hasattr(file_path_remover, 'paths'):
777        file_path_remover.paths = []
778        atexit.register(file_path_remover)
779
780    paths = []
781    for name in names:
782        filename = file_pattern(name)
783        path = os.path.join(base_dir, filename)
784        file_path_remover.paths.append(path)
785        paths.append(path)
786
787    return paths[0] if len(paths) == 1 else paths
788
789def remote_filename(path):
790    if imgproto == 'file':
791        return path
792    elif imgproto == 'ssh':
793        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
794    else:
795        raise Exception("Protocol %s not supported" % (imgproto))
796
797class VM(qtest.QEMUQtestMachine):
798    '''A QEMU VM'''
799
800    def __init__(self, path_suffix=''):
801        name = "qemu%s-%d" % (path_suffix, os.getpid())
802        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
803        if qemu_gdb and qemu_valgrind:
804            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
805            sys.exit(1)
806        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
807        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
808                         name=name,
809                         base_temp_dir=test_dir,
810                         sock_dir=sock_dir, qmp_timer=timer)
811        self._num_drives = 0
812
813    def _post_shutdown(self) -> None:
814        super()._post_shutdown()
815        if not qemu_valgrind or not self._popen:
816            return
817        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
818        if self.exitcode() == 99:
819            with open(valgrind_filename, encoding='utf-8') as f:
820                print(f.read())
821        else:
822            os.remove(valgrind_filename)
823
824    def _pre_launch(self) -> None:
825        super()._pre_launch()
826        if qemu_print:
827            # set QEMU binary output to stdout
828            self._close_qemu_log_file()
829
830    def add_object(self, opts):
831        self._args.append('-object')
832        self._args.append(opts)
833        return self
834
835    def add_device(self, opts):
836        self._args.append('-device')
837        self._args.append(opts)
838        return self
839
840    def add_drive_raw(self, opts):
841        self._args.append('-drive')
842        self._args.append(opts)
843        return self
844
845    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
846        '''Add a virtio-blk drive to the VM'''
847        options = ['if=%s' % interface,
848                   'id=drive%d' % self._num_drives]
849
850        if path is not None:
851            options.append('file=%s' % path)
852            options.append('format=%s' % img_format)
853            options.append('cache=%s' % cachemode)
854            options.append('aio=%s' % aiomode)
855
856        if opts:
857            options.append(opts)
858
859        if img_format == 'luks' and 'key-secret' not in opts:
860            # default luks support
861            if luks_default_secret_object not in self._args:
862                self.add_object(luks_default_secret_object)
863
864            options.append(luks_default_key_secret_opt)
865
866        self._args.append('-drive')
867        self._args.append(','.join(options))
868        self._num_drives += 1
869        return self
870
871    def add_blockdev(self, opts):
872        self._args.append('-blockdev')
873        if isinstance(opts, str):
874            self._args.append(opts)
875        else:
876            self._args.append(','.join(opts))
877        return self
878
879    def add_incoming(self, addr):
880        self._args.append('-incoming')
881        self._args.append(addr)
882        return self
883
884    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
885        cmd = 'human-monitor-command'
886        kwargs: Dict[str, Any] = {'command-line': command_line}
887        if use_log:
888            return self.qmp_log(cmd, **kwargs)
889        else:
890            return self.qmp(cmd, **kwargs)
891
892    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
893        """Pause drive r/w operations"""
894        if not event:
895            self.pause_drive(drive, "read_aio")
896            self.pause_drive(drive, "write_aio")
897            return
898        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
899
900    def resume_drive(self, drive: str) -> None:
901        """Resume drive r/w operations"""
902        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
903
904    def hmp_qemu_io(self, drive: str, cmd: str,
905                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
906        """Write to a given drive using an HMP command"""
907        d = '-d ' if qdev else ''
908        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
909
910    def flatten_qmp_object(self, obj, output=None, basestr=''):
911        if output is None:
912            output = {}
913        if isinstance(obj, list):
914            for i, item in enumerate(obj):
915                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
916        elif isinstance(obj, dict):
917            for key in obj:
918                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
919        else:
920            output[basestr[:-1]] = obj # Strip trailing '.'
921        return output
922
923    def qmp_to_opts(self, obj):
924        obj = self.flatten_qmp_object(obj)
925        output_list = []
926        for key in obj:
927            output_list += [key + '=' + obj[key]]
928        return ','.join(output_list)
929
930    def get_qmp_events_filtered(self, wait=60.0):
931        result = []
932        for ev in self.get_qmp_events(wait=wait):
933            result.append(filter_qmp_event(ev))
934        return result
935
936    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
937        full_cmd = OrderedDict((
938            ("execute", cmd),
939            ("arguments", ordered_qmp(kwargs))
940        ))
941        log(full_cmd, filters, indent=indent)
942        result = self.qmp(cmd, **kwargs)
943        log(result, filters, indent=indent)
944        return result
945
946    # Returns None on success, and an error string on failure
947    def run_job(self, job: str, auto_finalize: bool = True,
948                auto_dismiss: bool = False,
949                pre_finalize: Optional[Callable[[], None]] = None,
950                cancel: bool = False, wait: float = 60.0,
951                filters: Iterable[Callable[[Any], Any]] = (),
952                ) -> Optional[str]:
953        """
954        run_job moves a job from creation through to dismissal.
955
956        :param job: String. ID of recently-launched job
957        :param auto_finalize: Bool. True if the job was launched with
958                              auto_finalize. Defaults to True.
959        :param auto_dismiss: Bool. True if the job was launched with
960                             auto_dismiss=True. Defaults to False.
961        :param pre_finalize: Callback. A callable that takes no arguments to be
962                             invoked prior to issuing job-finalize, if any.
963        :param cancel: Bool. When true, cancels the job after the pre_finalize
964                       callback.
965        :param wait: Float. Timeout value specifying how long to wait for any
966                     event, in seconds. Defaults to 60.0.
967        """
968        match_device = {'data': {'device': job}}
969        match_id = {'data': {'id': job}}
970        events = [
971            ('BLOCK_JOB_COMPLETED', match_device),
972            ('BLOCK_JOB_CANCELLED', match_device),
973            ('BLOCK_JOB_ERROR', match_device),
974            ('BLOCK_JOB_READY', match_device),
975            ('BLOCK_JOB_PENDING', match_id),
976            ('JOB_STATUS_CHANGE', match_id)
977        ]
978        error = None
979        while True:
980            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
981            if ev['event'] != 'JOB_STATUS_CHANGE':
982                log(ev, filters=filters)
983                continue
984            status = ev['data']['status']
985            if status == 'aborting':
986                result = self.qmp('query-jobs')
987                for j in result['return']:
988                    if j['id'] == job:
989                        error = j['error']
990                        log('Job failed: %s' % (j['error']), filters=filters)
991            elif status == 'ready':
992                self.qmp_log('job-complete', id=job, filters=filters)
993            elif status == 'pending' and not auto_finalize:
994                if pre_finalize:
995                    pre_finalize()
996                if cancel:
997                    self.qmp_log('job-cancel', id=job, filters=filters)
998                else:
999                    self.qmp_log('job-finalize', id=job, filters=filters)
1000            elif status == 'concluded' and not auto_dismiss:
1001                self.qmp_log('job-dismiss', id=job, filters=filters)
1002            elif status == 'null':
1003                return error
1004
1005    # Returns None on success, and an error string on failure
1006    def blockdev_create(self, options, job_id='job0', filters=None):
1007        if filters is None:
1008            filters = [filter_qmp_testfiles]
1009        result = self.qmp_log('blockdev-create', filters=filters,
1010                              job_id=job_id, options=options)
1011
1012        if 'return' in result:
1013            assert result['return'] == {}
1014            job_result = self.run_job(job_id, filters=filters)
1015        else:
1016            job_result = result['error']
1017
1018        log("")
1019        return job_result
1020
1021    def enable_migration_events(self, name):
1022        log('Enabling migration QMP events on %s...' % name)
1023        log(self.qmp('migrate-set-capabilities', capabilities=[
1024            {
1025                'capability': 'events',
1026                'state': True
1027            }
1028        ]))
1029
1030    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
1031        while True:
1032            event = self.event_wait('MIGRATION')
1033            # We use the default timeout, and with a timeout, event_wait()
1034            # never returns None
1035            assert event
1036
1037            log(event, filters=[filter_qmp_event])
1038            if event['data']['status'] in ('completed', 'failed'):
1039                break
1040
1041        if event['data']['status'] == 'completed':
1042            # The event may occur in finish-migrate, so wait for the expected
1043            # post-migration runstate
1044            runstate = None
1045            while runstate != expect_runstate:
1046                runstate = self.qmp('query-status')['return']['status']
1047            return True
1048        else:
1049            return False
1050
1051    def node_info(self, node_name):
1052        nodes = self.qmp('query-named-block-nodes')
1053        for x in nodes['return']:
1054            if x['node-name'] == node_name:
1055                return x
1056        return None
1057
1058    def query_bitmaps(self):
1059        res = self.qmp("query-named-block-nodes")
1060        return {device['node-name']: device['dirty-bitmaps']
1061                for device in res['return'] if 'dirty-bitmaps' in device}
1062
1063    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
1064        """
1065        get a specific bitmap from the object returned by query_bitmaps.
1066        :param recording: If specified, filter results by the specified value.
1067        :param bitmaps: If specified, use it instead of call query_bitmaps()
1068        """
1069        if bitmaps is None:
1070            bitmaps = self.query_bitmaps()
1071
1072        for bitmap in bitmaps[node_name]:
1073            if bitmap.get('name', '') == bitmap_name:
1074                if recording is None or bitmap.get('recording') == recording:
1075                    return bitmap
1076        return None
1077
1078    def check_bitmap_status(self, node_name, bitmap_name, fields):
1079        ret = self.get_bitmap(node_name, bitmap_name)
1080
1081        return fields.items() <= ret.items()
1082
1083    def assert_block_path(self, root, path, expected_node, graph=None):
1084        """
1085        Check whether the node under the given path in the block graph
1086        is @expected_node.
1087
1088        @root is the node name of the node where the @path is rooted.
1089
1090        @path is a string that consists of child names separated by
1091        slashes.  It must begin with a slash.
1092
1093        Examples for @root + @path:
1094          - root="qcow2-node", path="/backing/file"
1095          - root="quorum-node", path="/children.2/file"
1096
1097        Hypothetically, @path could be empty, in which case it would
1098        point to @root.  However, in practice this case is not useful
1099        and hence not allowed.
1100
1101        @expected_node may be None.  (All elements of the path but the
1102        leaf must still exist.)
1103
1104        @graph may be None or the result of an x-debug-query-block-graph
1105        call that has already been performed.
1106        """
1107        if graph is None:
1108            graph = self.qmp('x-debug-query-block-graph')['return']
1109
1110        iter_path = iter(path.split('/'))
1111
1112        # Must start with a /
1113        assert next(iter_path) == ''
1114
1115        node = next((node for node in graph['nodes'] if node['name'] == root),
1116                    None)
1117
1118        # An empty @path is not allowed, so the root node must be present
1119        assert node is not None, 'Root node %s not found' % root
1120
1121        for child_name in iter_path:
1122            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1123
1124            try:
1125                node_id = next(edge['child'] for edge in graph['edges']
1126                               if (edge['parent'] == node['id'] and
1127                                   edge['name'] == child_name))
1128
1129                node = next(node for node in graph['nodes']
1130                            if node['id'] == node_id)
1131
1132            except StopIteration:
1133                node = None
1134
1135        if node is None:
1136            assert expected_node is None, \
1137                   'No node found under %s (but expected %s)' % \
1138                   (path, expected_node)
1139        else:
1140            assert node['name'] == expected_node, \
1141                   'Found node %s under %s (but expected %s)' % \
1142                   (node['name'], path, expected_node)
1143
1144index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1145
1146class QMPTestCase(unittest.TestCase):
1147    '''Abstract base class for QMP test cases'''
1148
1149    def __init__(self, *args, **kwargs):
1150        super().__init__(*args, **kwargs)
1151        # Many users of this class set a VM property we rely on heavily
1152        # in the methods below.
1153        self.vm = None
1154
1155    def dictpath(self, d, path):
1156        '''Traverse a path in a nested dict'''
1157        for component in path.split('/'):
1158            m = index_re.match(component)
1159            if m:
1160                component, idx = m.groups()
1161                idx = int(idx)
1162
1163            if not isinstance(d, dict) or component not in d:
1164                self.fail(f'failed path traversal for "{path}" in "{d}"')
1165            d = d[component]
1166
1167            if m:
1168                if not isinstance(d, list):
1169                    self.fail(f'path component "{component}" in "{path}" '
1170                              f'is not a list in "{d}"')
1171                try:
1172                    d = d[idx]
1173                except IndexError:
1174                    self.fail(f'invalid index "{idx}" in path "{path}" '
1175                              f'in "{d}"')
1176        return d
1177
1178    def assert_qmp_absent(self, d, path):
1179        try:
1180            result = self.dictpath(d, path)
1181        except AssertionError:
1182            return
1183        self.fail('path "%s" has value "%s"' % (path, str(result)))
1184
1185    def assert_qmp(self, d, path, value):
1186        '''Assert that the value for a specific path in a QMP dict
1187           matches.  When given a list of values, assert that any of
1188           them matches.'''
1189
1190        result = self.dictpath(d, path)
1191
1192        # [] makes no sense as a list of valid values, so treat it as
1193        # an actual single value.
1194        if isinstance(value, list) and value != []:
1195            for v in value:
1196                if result == v:
1197                    return
1198            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1199        else:
1200            self.assertEqual(result, value,
1201                             '"%s" is "%s", expected "%s"'
1202                             % (path, str(result), str(value)))
1203
1204    def assert_no_active_block_jobs(self):
1205        result = self.vm.qmp('query-block-jobs')
1206        self.assert_qmp(result, 'return', [])
1207
1208    def assert_has_block_node(self, node_name=None, file_name=None):
1209        """Issue a query-named-block-nodes and assert node_name and/or
1210        file_name is present in the result"""
1211        def check_equal_or_none(a, b):
1212            return a is None or b is None or a == b
1213        assert node_name or file_name
1214        result = self.vm.qmp('query-named-block-nodes')
1215        for x in result["return"]:
1216            if check_equal_or_none(x.get("node-name"), node_name) and \
1217                    check_equal_or_none(x.get("file"), file_name):
1218                return
1219        self.fail("Cannot find %s %s in result:\n%s" %
1220                  (node_name, file_name, result))
1221
1222    def assert_json_filename_equal(self, json_filename, reference):
1223        '''Asserts that the given filename is a json: filename and that its
1224           content is equal to the given reference object'''
1225        self.assertEqual(json_filename[:5], 'json:')
1226        self.assertEqual(
1227            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1228            self.vm.flatten_qmp_object(reference)
1229        )
1230
1231    def cancel_and_wait(self, drive='drive0', force=False,
1232                        resume=False, wait=60.0):
1233        '''Cancel a block job and wait for it to finish, returning the event'''
1234        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1235        self.assert_qmp(result, 'return', {})
1236
1237        if resume:
1238            self.vm.resume_drive(drive)
1239
1240        cancelled = False
1241        result = None
1242        while not cancelled:
1243            for event in self.vm.get_qmp_events(wait=wait):
1244                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1245                   event['event'] == 'BLOCK_JOB_CANCELLED':
1246                    self.assert_qmp(event, 'data/device', drive)
1247                    result = event
1248                    cancelled = True
1249                elif event['event'] == 'JOB_STATUS_CHANGE':
1250                    self.assert_qmp(event, 'data/id', drive)
1251
1252
1253        self.assert_no_active_block_jobs()
1254        return result
1255
1256    def wait_until_completed(self, drive='drive0', check_offset=True,
1257                             wait=60.0, error=None):
1258        '''Wait for a block job to finish, returning the event'''
1259        while True:
1260            for event in self.vm.get_qmp_events(wait=wait):
1261                if event['event'] == 'BLOCK_JOB_COMPLETED':
1262                    self.assert_qmp(event, 'data/device', drive)
1263                    if error is None:
1264                        self.assert_qmp_absent(event, 'data/error')
1265                        if check_offset:
1266                            self.assert_qmp(event, 'data/offset',
1267                                            event['data']['len'])
1268                    else:
1269                        self.assert_qmp(event, 'data/error', error)
1270                    self.assert_no_active_block_jobs()
1271                    return event
1272                if event['event'] == 'JOB_STATUS_CHANGE':
1273                    self.assert_qmp(event, 'data/id', drive)
1274
1275    def wait_ready(self, drive='drive0'):
1276        """Wait until a BLOCK_JOB_READY event, and return the event."""
1277        return self.vm.events_wait([
1278            ('BLOCK_JOB_READY',
1279             {'data': {'type': 'mirror', 'device': drive}}),
1280            ('BLOCK_JOB_READY',
1281             {'data': {'type': 'commit', 'device': drive}})
1282        ])
1283
1284    def wait_ready_and_cancel(self, drive='drive0'):
1285        self.wait_ready(drive=drive)
1286        event = self.cancel_and_wait(drive=drive)
1287        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1288        self.assert_qmp(event, 'data/type', 'mirror')
1289        self.assert_qmp(event, 'data/offset', event['data']['len'])
1290
1291    def complete_and_wait(self, drive='drive0', wait_ready=True,
1292                          completion_error=None):
1293        '''Complete a block job and wait for it to finish'''
1294        if wait_ready:
1295            self.wait_ready(drive=drive)
1296
1297        result = self.vm.qmp('block-job-complete', device=drive)
1298        self.assert_qmp(result, 'return', {})
1299
1300        event = self.wait_until_completed(drive=drive, error=completion_error)
1301        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1302
1303    def pause_wait(self, job_id='job0'):
1304        with Timeout(3, "Timeout waiting for job to pause"):
1305            while True:
1306                result = self.vm.qmp('query-block-jobs')
1307                found = False
1308                for job in result['return']:
1309                    if job['device'] == job_id:
1310                        found = True
1311                        if job['paused'] and not job['busy']:
1312                            return job
1313                        break
1314                assert found
1315
1316    def pause_job(self, job_id='job0', wait=True):
1317        result = self.vm.qmp('block-job-pause', device=job_id)
1318        self.assert_qmp(result, 'return', {})
1319        if wait:
1320            return self.pause_wait(job_id)
1321        return result
1322
1323    def case_skip(self, reason):
1324        '''Skip this test case'''
1325        case_notrun(reason)
1326        self.skipTest(reason)
1327
1328
1329def notrun(reason):
1330    '''Skip this test suite'''
1331    # Each test in qemu-iotests has a number ("seq")
1332    seq = os.path.basename(sys.argv[0])
1333
1334    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1335            as outfile:
1336        outfile.write(reason + '\n')
1337    logger.warning("%s not run: %s", seq, reason)
1338    sys.exit(0)
1339
1340def case_notrun(reason):
1341    '''Mark this test case as not having been run (without actually
1342    skipping it, that is left to the caller).  See
1343    QMPTestCase.case_skip() for a variant that actually skips the
1344    current test case.'''
1345
1346    # Each test in qemu-iotests has a number ("seq")
1347    seq = os.path.basename(sys.argv[0])
1348
1349    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1350            as outfile:
1351        outfile.write('    [case not run] ' + reason + '\n')
1352
1353def _verify_image_format(supported_fmts: Sequence[str] = (),
1354                         unsupported_fmts: Sequence[str] = ()) -> None:
1355    if 'generic' in supported_fmts and \
1356            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1357        # similar to
1358        #   _supported_fmt generic
1359        # for bash tests
1360        supported_fmts = ()
1361
1362    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1363    if not_sup or (imgfmt in unsupported_fmts):
1364        notrun('not suitable for this image format: %s' % imgfmt)
1365
1366    if imgfmt == 'luks':
1367        verify_working_luks()
1368
1369def _verify_protocol(supported: Sequence[str] = (),
1370                     unsupported: Sequence[str] = ()) -> None:
1371    assert not (supported and unsupported)
1372
1373    if 'generic' in supported:
1374        return
1375
1376    not_sup = supported and (imgproto not in supported)
1377    if not_sup or (imgproto in unsupported):
1378        notrun('not suitable for this protocol: %s' % imgproto)
1379
1380def _verify_platform(supported: Sequence[str] = (),
1381                     unsupported: Sequence[str] = ()) -> None:
1382    if any((sys.platform.startswith(x) for x in unsupported)):
1383        notrun('not suitable for this OS: %s' % sys.platform)
1384
1385    if supported:
1386        if not any((sys.platform.startswith(x) for x in supported)):
1387            notrun('not suitable for this OS: %s' % sys.platform)
1388
1389def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1390    if supported_cache_modes and (cachemode not in supported_cache_modes):
1391        notrun('not suitable for this cache mode: %s' % cachemode)
1392
1393def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1394    if supported_aio_modes and (aiomode not in supported_aio_modes):
1395        notrun('not suitable for this aio mode: %s' % aiomode)
1396
1397def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1398    usf_list = list(set(required_formats) - set(supported_formats()))
1399    if usf_list:
1400        notrun(f'formats {usf_list} are not whitelisted')
1401
1402
1403def _verify_virtio_blk() -> None:
1404    out = qemu_pipe('-M', 'none', '-device', 'help')
1405    if 'virtio-blk' not in out:
1406        notrun('Missing virtio-blk in QEMU binary')
1407
1408def _verify_virtio_scsi_pci_or_ccw() -> None:
1409    out = qemu_pipe('-M', 'none', '-device', 'help')
1410    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1411        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1412
1413
1414def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1415    imgopts = os.environ.get('IMGOPTS')
1416    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1417    # but it supported only for bash tests. We don't have a concept of global
1418    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1419    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1420    unsup = list(unsupported) + ['$']
1421    if imgopts and any(x in imgopts for x in unsup):
1422        notrun(f'not suitable for this imgopts: {imgopts}')
1423
1424
1425def supports_quorum() -> bool:
1426    return 'quorum' in qemu_img('--help').stdout
1427
1428def verify_quorum():
1429    '''Skip test suite if quorum support is not available'''
1430    if not supports_quorum():
1431        notrun('quorum support missing')
1432
1433def has_working_luks() -> Tuple[bool, str]:
1434    """
1435    Check whether our LUKS driver can actually create images
1436    (this extends to LUKS encryption for qcow2).
1437
1438    If not, return the reason why.
1439    """
1440
1441    img_file = f'{test_dir}/luks-test.luks'
1442    res = qemu_img('create', '-f', 'luks',
1443                   '--object', luks_default_secret_object,
1444                   '-o', luks_default_key_secret_opt,
1445                   '-o', 'iter-time=10',
1446                   img_file, '1G',
1447                   check=False)
1448    try:
1449        os.remove(img_file)
1450    except OSError:
1451        pass
1452
1453    if res.returncode:
1454        reason = res.stdout
1455        for line in res.stdout.splitlines():
1456            if img_file + ':' in line:
1457                reason = line.split(img_file + ':', 1)[1].strip()
1458                break
1459
1460        return (False, reason)
1461    else:
1462        return (True, '')
1463
1464def verify_working_luks():
1465    """
1466    Skip test suite if LUKS does not work
1467    """
1468    (working, reason) = has_working_luks()
1469    if not working:
1470        notrun(reason)
1471
1472def supports_qcow2_zstd_compression() -> bool:
1473    img_file = f'{test_dir}/qcow2-zstd-test.qcow2'
1474    res = qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1475                   img_file, '0',
1476                   check=False)
1477    try:
1478        os.remove(img_file)
1479    except OSError:
1480        pass
1481
1482    if res.returncode == 1 and \
1483            "'compression-type' does not accept value 'zstd'" in res.stdout:
1484        return False
1485    else:
1486        return True
1487
1488def verify_qcow2_zstd_compression():
1489    if not supports_qcow2_zstd_compression():
1490        notrun('zstd compression not supported')
1491
1492def qemu_pipe(*args: str) -> str:
1493    """
1494    Run qemu with an option to print something and exit (e.g. a help option).
1495
1496    :return: QEMU's stdout output.
1497    """
1498    full_args = [qemu_prog] + qemu_opts + list(args)
1499    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1500    return output
1501
1502def supported_formats(read_only=False):
1503    '''Set 'read_only' to True to check ro-whitelist
1504       Otherwise, rw-whitelist is checked'''
1505
1506    if not hasattr(supported_formats, "formats"):
1507        supported_formats.formats = {}
1508
1509    if read_only not in supported_formats.formats:
1510        format_message = qemu_pipe("-drive", "format=help")
1511        line = 1 if read_only else 0
1512        supported_formats.formats[read_only] = \
1513            format_message.splitlines()[line].split(":")[1].split()
1514
1515    return supported_formats.formats[read_only]
1516
1517def skip_if_unsupported(required_formats=(), read_only=False):
1518    '''Skip Test Decorator
1519       Runs the test if all the required formats are whitelisted'''
1520    def skip_test_decorator(func):
1521        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1522                         **kwargs: Dict[str, Any]) -> None:
1523            if callable(required_formats):
1524                fmts = required_formats(test_case)
1525            else:
1526                fmts = required_formats
1527
1528            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1529            if usf_list:
1530                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1531                test_case.case_skip(msg)
1532            else:
1533                func(test_case, *args, **kwargs)
1534        return func_wrapper
1535    return skip_test_decorator
1536
1537def skip_for_formats(formats: Sequence[str] = ()) \
1538    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1539                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1540    '''Skip Test Decorator
1541       Skips the test for the given formats'''
1542    def skip_test_decorator(func):
1543        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1544                         **kwargs: Dict[str, Any]) -> None:
1545            if imgfmt in formats:
1546                msg = f'{test_case}: Skipped for format {imgfmt}'
1547                test_case.case_skip(msg)
1548            else:
1549                func(test_case, *args, **kwargs)
1550        return func_wrapper
1551    return skip_test_decorator
1552
1553def skip_if_user_is_root(func):
1554    '''Skip Test Decorator
1555       Runs the test only without root permissions'''
1556    def func_wrapper(*args, **kwargs):
1557        if os.getuid() == 0:
1558            case_notrun('{}: cannot be run as root'.format(args[0]))
1559            return None
1560        else:
1561            return func(*args, **kwargs)
1562    return func_wrapper
1563
1564# We need to filter out the time taken from the output so that
1565# qemu-iotest can reliably diff the results against master output,
1566# and hide skipped tests from the reference output.
1567
1568class ReproducibleTestResult(unittest.TextTestResult):
1569    def addSkip(self, test, reason):
1570        # Same as TextTestResult, but print dot instead of "s"
1571        unittest.TestResult.addSkip(self, test, reason)
1572        if self.showAll:
1573            self.stream.writeln("skipped {0!r}".format(reason))
1574        elif self.dots:
1575            self.stream.write(".")
1576            self.stream.flush()
1577
1578class ReproducibleStreamWrapper:
1579    def __init__(self, stream: TextIO):
1580        self.stream = stream
1581
1582    def __getattr__(self, attr):
1583        if attr in ('stream', '__getstate__'):
1584            raise AttributeError(attr)
1585        return getattr(self.stream, attr)
1586
1587    def write(self, arg=None):
1588        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1589        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1590        self.stream.write(arg)
1591
1592class ReproducibleTestRunner(unittest.TextTestRunner):
1593    def __init__(self, stream: Optional[TextIO] = None,
1594                 resultclass: Type[unittest.TestResult] =
1595                 ReproducibleTestResult,
1596                 **kwargs: Any) -> None:
1597        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1598        super().__init__(stream=rstream,           # type: ignore
1599                         descriptions=True,
1600                         resultclass=resultclass,
1601                         **kwargs)
1602
1603def execute_unittest(argv: List[str], debug: bool = False) -> None:
1604    """Executes unittests within the calling module."""
1605
1606    # Some tests have warnings, especially ResourceWarnings for unclosed
1607    # files and sockets.  Ignore them for now to ensure reproducibility of
1608    # the test output.
1609    unittest.main(argv=argv,
1610                  testRunner=ReproducibleTestRunner,
1611                  verbosity=2 if debug else 1,
1612                  warnings=None if sys.warnoptions else 'ignore')
1613
1614def execute_setup_common(supported_fmts: Sequence[str] = (),
1615                         supported_platforms: Sequence[str] = (),
1616                         supported_cache_modes: Sequence[str] = (),
1617                         supported_aio_modes: Sequence[str] = (),
1618                         unsupported_fmts: Sequence[str] = (),
1619                         supported_protocols: Sequence[str] = (),
1620                         unsupported_protocols: Sequence[str] = (),
1621                         required_fmts: Sequence[str] = (),
1622                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1623    """
1624    Perform necessary setup for either script-style or unittest-style tests.
1625
1626    :return: Bool; Whether or not debug mode has been requested via the CLI.
1627    """
1628    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1629
1630    debug = '-d' in sys.argv
1631    if debug:
1632        sys.argv.remove('-d')
1633    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1634
1635    _verify_image_format(supported_fmts, unsupported_fmts)
1636    _verify_protocol(supported_protocols, unsupported_protocols)
1637    _verify_platform(supported=supported_platforms)
1638    _verify_cache_mode(supported_cache_modes)
1639    _verify_aio_mode(supported_aio_modes)
1640    _verify_formats(required_fmts)
1641    _verify_virtio_blk()
1642    _verify_imgopts(unsupported_imgopts)
1643
1644    return debug
1645
1646def execute_test(*args, test_function=None, **kwargs):
1647    """Run either unittest or script-style tests."""
1648
1649    debug = execute_setup_common(*args, **kwargs)
1650    if not test_function:
1651        execute_unittest(sys.argv, debug)
1652    else:
1653        test_function()
1654
1655def activate_logging():
1656    """Activate iotests.log() output to stdout for script-style tests."""
1657    handler = logging.StreamHandler(stream=sys.stdout)
1658    formatter = logging.Formatter('%(message)s')
1659    handler.setFormatter(formatter)
1660    test_logger.addHandler(handler)
1661    test_logger.setLevel(logging.INFO)
1662    test_logger.propagate = False
1663
1664# This is called from script-style iotests without a single point of entry
1665def script_initialize(*args, **kwargs):
1666    """Initialize script-style tests without running any tests."""
1667    activate_logging()
1668    execute_setup_common(*args, **kwargs)
1669
1670# This is called from script-style iotests with a single point of entry
1671def script_main(test_function, *args, **kwargs):
1672    """Run script-style tests outside of the unittest framework"""
1673    activate_logging()
1674    execute_test(*args, test_function=test_function, **kwargs)
1675
1676# This is called from unittest style iotests
1677def main(*args, **kwargs):
1678    """Run tests using the unittest framework"""
1679    execute_test(*args, **kwargs)
1680