xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 215caca6)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp import QMPMessage
42from qemu.aqmp.legacy import QEMUMonitorProtocol
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88
89try:
90    test_dir = os.environ['TEST_DIR']
91    sock_dir = os.environ['SOCK_DIR']
92    cachemode = os.environ['CACHEMODE']
93    aiomode = os.environ['AIOMODE']
94    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95except KeyError:
96    # We are using these variables as proxies to indicate that we're
97    # not being run via "check". There may be other things set up by
98    # "check" that individual test cases rely on.
99    sys.stderr.write('Please run this test via the "check" script\n')
100    sys.exit(os.EX_USAGE)
101
102qemu_valgrind = []
103if os.environ.get('VALGRIND_QEMU') == "y" and \
104    os.environ.get('NO_VALGRIND') != "y":
105    valgrind_logfile = "--log-file=" + test_dir
106    # %p allows to put the valgrind process PID, since
107    # we don't know it a priori (subprocess.Popen is
108    # not yet invoked)
109    valgrind_logfile += "/%p.valgrind"
110
111    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113luks_default_secret_object = 'secret,id=keysec0,data=' + \
114                             os.environ.get('IMGKEYSECRET', '')
115luks_default_key_secret_opt = 'key-secret=keysec0'
116
117sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120@contextmanager
121def change_log_level(
122        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123    """
124    Utility function for temporarily changing the log level of a logger.
125
126    This can be used to silence errors that are expected or uninteresting.
127    """
128    _logger = logging.getLogger(logger_name)
129    current_level = _logger.level
130    _logger.setLevel(level)
131
132    try:
133        yield
134    finally:
135        _logger.setLevel(current_level)
136
137
138def unarchive_sample_image(sample, fname):
139    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141        shutil.copyfileobj(f_in, f_out)
142
143
144def qemu_tool_popen(args: Sequence[str],
145                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146    stderr = subprocess.STDOUT if connect_stderr else None
147    # pylint: disable=consider-using-with
148    return subprocess.Popen(args,
149                            stdout=subprocess.PIPE,
150                            stderr=stderr,
151                            universal_newlines=True)
152
153
154def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155                              connect_stderr: bool = True,
156                              drop_successful_output: bool = False) \
157        -> Tuple[str, int]:
158    """
159    Run a tool and return both its output and its exit code
160    """
161    with qemu_tool_popen(args, connect_stderr) as subp:
162        output = subp.communicate()[0]
163        if subp.returncode < 0:
164            cmd = ' '.join(args)
165            sys.stderr.write(f'{tool} received signal \
166                               {-subp.returncode}: {cmd}\n')
167        if drop_successful_output and subp.returncode == 0:
168            output = ''
169        return (output, subp.returncode)
170
171def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172    if not args or args[0] != 'create':
173        return list(args)
174    args = args[1:]
175
176    p = argparse.ArgumentParser(allow_abbrev=False)
177    # -o option may be specified several times
178    p.add_argument('-o', action='append', default=[])
179    p.add_argument('-f')
180    parsed, remaining = p.parse_known_args(args)
181
182    opts_list = parsed.o
183
184    result = ['create']
185    if parsed.f is not None:
186        result += ['-f', parsed.f]
187
188    # IMGOPTS most probably contain options specific for the selected format,
189    # like extended_l2 or compression_type for qcow2. Test may want to create
190    # additional images in other formats that doesn't support these options.
191    # So, use IMGOPTS only for images created in imgfmt format.
192    imgopts = os.environ.get('IMGOPTS')
193    if imgopts and parsed.f == imgfmt:
194        opts_list.insert(0, imgopts)
195
196    # default luks support
197    if parsed.f == 'luks' and \
198            all('key-secret' not in opts for opts in opts_list):
199        result += ['--object', luks_default_secret_object]
200        opts_list.append(luks_default_key_secret_opt)
201
202    for opts in opts_list:
203        result += ['-o', opts]
204
205    result += remaining
206
207    return result
208
209def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
210    """
211    Run qemu-img and return both its output and its exit code
212    """
213    is_create = bool(args and args[0] == 'create')
214    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
215    return qemu_tool_pipe_and_status('qemu-img', full_args,
216                                     drop_successful_output=is_create)
217
218def qemu_img(*args: str) -> int:
219    '''Run qemu-img and return the exit code'''
220    return qemu_img_pipe_and_status(*args)[1]
221
222def ordered_qmp(qmsg, conv_keys=True):
223    # Dictionaries are not ordered prior to 3.6, therefore:
224    if isinstance(qmsg, list):
225        return [ordered_qmp(atom) for atom in qmsg]
226    if isinstance(qmsg, dict):
227        od = OrderedDict()
228        for k, v in sorted(qmsg.items()):
229            if conv_keys:
230                k = k.replace('_', '-')
231            od[k] = ordered_qmp(v, conv_keys=False)
232        return od
233    return qmsg
234
235def qemu_img_create(*args):
236    return qemu_img('create', *args)
237
238def qemu_img_measure(*args):
239    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
240
241def qemu_img_check(*args):
242    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
243
244def qemu_img_pipe(*args: str) -> str:
245    '''Run qemu-img and return its output'''
246    return qemu_img_pipe_and_status(*args)[0]
247
248def qemu_img_log(*args):
249    result = qemu_img_pipe(*args)
250    log(result, filters=[filter_testfiles])
251    return result
252
253def img_info_log(filename, filter_path=None, use_image_opts=False,
254                 extra_args=()):
255    args = ['info']
256    if use_image_opts:
257        args.append('--image-opts')
258    else:
259        args += ['-f', imgfmt]
260    args += extra_args
261    args.append(filename)
262
263    output = qemu_img_pipe(*args)
264    if not filter_path:
265        filter_path = filename
266    log(filter_img_info(output, filter_path))
267
268def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
269    if '-f' in args or '--image-opts' in args:
270        return qemu_io_args_no_fmt + list(args)
271    else:
272        return qemu_io_args + list(args)
273
274def qemu_io_popen(*args):
275    return qemu_tool_popen(qemu_io_wrap_args(args))
276
277def qemu_io(*args):
278    '''Run qemu-io and return the stdout data'''
279    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
280
281def qemu_io_pipe_and_status(*args):
282    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))
283
284def qemu_io_log(*args):
285    result = qemu_io(*args)
286    log(result, filters=[filter_testfiles, filter_qemu_io])
287    return result
288
289def qemu_io_silent(*args):
290    '''Run qemu-io and return the exit code, suppressing stdout'''
291    args = qemu_io_wrap_args(args)
292    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
293    if result.returncode < 0:
294        sys.stderr.write('qemu-io received signal %i: %s\n' %
295                         (-result.returncode, ' '.join(args)))
296    return result.returncode
297
298def qemu_io_silent_check(*args):
299    '''Run qemu-io and return the true if subprocess returned 0'''
300    args = qemu_io_wrap_args(args)
301    result = subprocess.run(args, stdout=subprocess.DEVNULL,
302                            stderr=subprocess.STDOUT, check=False)
303    return result.returncode == 0
304
305class QemuIoInteractive:
306    def __init__(self, *args):
307        self.args = qemu_io_wrap_args(args)
308        # We need to keep the Popen objext around, and not
309        # close it immediately. Therefore, disable the pylint check:
310        # pylint: disable=consider-using-with
311        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
312                                   stdout=subprocess.PIPE,
313                                   stderr=subprocess.STDOUT,
314                                   universal_newlines=True)
315        out = self._p.stdout.read(9)
316        if out != 'qemu-io> ':
317            # Most probably qemu-io just failed to start.
318            # Let's collect the whole output and exit.
319            out += self._p.stdout.read()
320            self._p.wait(timeout=1)
321            raise ValueError(out)
322
323    def close(self):
324        self._p.communicate('q\n')
325
326    def _read_output(self):
327        pattern = 'qemu-io> '
328        n = len(pattern)
329        pos = 0
330        s = []
331        while pos != n:
332            c = self._p.stdout.read(1)
333            # check unexpected EOF
334            assert c != ''
335            s.append(c)
336            if c == pattern[pos]:
337                pos += 1
338            else:
339                pos = 0
340
341        return ''.join(s[:-n])
342
343    def cmd(self, cmd):
344        # quit command is in close(), '\n' is added automatically
345        assert '\n' not in cmd
346        cmd = cmd.strip()
347        assert cmd not in ('q', 'quit')
348        self._p.stdin.write(cmd + '\n')
349        self._p.stdin.flush()
350        return self._read_output()
351
352
353class QemuStorageDaemon:
354    _qmp: Optional[QEMUMonitorProtocol] = None
355    _qmpsock: Optional[str] = None
356    # Python < 3.8 would complain if this type were not a string literal
357    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
358    _p: 'Optional[subprocess.Popen[bytes]]' = None
359
360    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
361        assert '--pidfile' not in args
362        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
363        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
364
365        if qmp:
366            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
367            all_args += ['--chardev',
368                         f'socket,id=qmp-sock,path={self._qmpsock}',
369                         '--monitor', 'qmp-sock']
370
371            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
372
373        # Cannot use with here, we want the subprocess to stay around
374        # pylint: disable=consider-using-with
375        self._p = subprocess.Popen(all_args)
376        if self._qmp is not None:
377            self._qmp.accept()
378        while not os.path.exists(self.pidfile):
379            if self._p.poll() is not None:
380                cmd = ' '.join(all_args)
381                raise RuntimeError(
382                    'qemu-storage-daemon terminated with exit code ' +
383                    f'{self._p.returncode}: {cmd}')
384
385            time.sleep(0.01)
386
387        with open(self.pidfile, encoding='utf-8') as f:
388            self._pid = int(f.read().strip())
389
390        assert self._pid == self._p.pid
391
392    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
393            -> QMPMessage:
394        assert self._qmp is not None
395        return self._qmp.cmd(cmd, args)
396
397    def stop(self, kill_signal=15):
398        self._p.send_signal(kill_signal)
399        self._p.wait()
400        self._p = None
401
402        if self._qmp:
403            self._qmp.close()
404
405        if self._qmpsock is not None:
406            try:
407                os.remove(self._qmpsock)
408            except OSError:
409                pass
410        try:
411            os.remove(self.pidfile)
412        except OSError:
413            pass
414
415    def __del__(self):
416        if self._p is not None:
417            self.stop(kill_signal=9)
418
419
420def qemu_nbd(*args):
421    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
422    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
423
424def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
425    '''Run qemu-nbd in daemon mode and return both the parent's exit code
426       and its output in case of an error'''
427    full_args = qemu_nbd_args + ['--fork'] + list(args)
428    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
429                                                   connect_stderr=False)
430    return returncode, output if returncode else ''
431
432def qemu_nbd_list_log(*args: str) -> str:
433    '''Run qemu-nbd to list remote exports'''
434    full_args = [qemu_nbd_prog, '-L'] + list(args)
435    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
436    log(output, filters=[filter_testfiles, filter_nbd_exports])
437    return output
438
439@contextmanager
440def qemu_nbd_popen(*args):
441    '''Context manager running qemu-nbd within the context'''
442    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
443
444    assert not os.path.exists(pid_file)
445
446    cmd = list(qemu_nbd_args)
447    cmd.extend(('--persistent', '--pid-file', pid_file))
448    cmd.extend(args)
449
450    log('Start NBD server')
451    with subprocess.Popen(cmd) as p:
452        try:
453            while not os.path.exists(pid_file):
454                if p.poll() is not None:
455                    raise RuntimeError(
456                        "qemu-nbd terminated with exit code {}: {}"
457                        .format(p.returncode, ' '.join(cmd)))
458
459                time.sleep(0.01)
460            yield
461        finally:
462            if os.path.exists(pid_file):
463                os.remove(pid_file)
464            log('Kill NBD server')
465            p.kill()
466            p.wait()
467
468def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
469    '''Return True if two image files are identical'''
470    return qemu_img('compare', '-f', fmt1,
471                    '-F', fmt2, img1, img2) == 0
472
473def create_image(name, size):
474    '''Create a fully-allocated raw image with sector markers'''
475    with open(name, 'wb') as file:
476        i = 0
477        while i < size:
478            sector = struct.pack('>l504xl', i // 512, i // 512)
479            file.write(sector)
480            i = i + 512
481
482def image_size(img):
483    '''Return image's virtual size'''
484    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
485    return json.loads(r)['virtual-size']
486
487def is_str(val):
488    return isinstance(val, str)
489
490test_dir_re = re.compile(r"%s" % test_dir)
491def filter_test_dir(msg):
492    return test_dir_re.sub("TEST_DIR", msg)
493
494win32_re = re.compile(r"\r")
495def filter_win32(msg):
496    return win32_re.sub("", msg)
497
498qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
499                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
500                        r"and [0-9\/.inf]* ops\/sec\)")
501def filter_qemu_io(msg):
502    msg = filter_win32(msg)
503    return qemu_io_re.sub("X ops; XX:XX:XX.X "
504                          "(XXX YYY/sec and XXX ops/sec)", msg)
505
506chown_re = re.compile(r"chown [0-9]+:[0-9]+")
507def filter_chown(msg):
508    return chown_re.sub("chown UID:GID", msg)
509
510def filter_qmp_event(event):
511    '''Filter a QMP event dict'''
512    event = dict(event)
513    if 'timestamp' in event:
514        event['timestamp']['seconds'] = 'SECS'
515        event['timestamp']['microseconds'] = 'USECS'
516    return event
517
518def filter_qmp(qmsg, filter_fn):
519    '''Given a string filter, filter a QMP object's values.
520    filter_fn takes a (key, value) pair.'''
521    # Iterate through either lists or dicts;
522    if isinstance(qmsg, list):
523        items = enumerate(qmsg)
524    else:
525        items = qmsg.items()
526
527    for k, v in items:
528        if isinstance(v, (dict, list)):
529            qmsg[k] = filter_qmp(v, filter_fn)
530        else:
531            qmsg[k] = filter_fn(k, v)
532    return qmsg
533
534def filter_testfiles(msg):
535    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
536    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
537    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
538
539def filter_qmp_testfiles(qmsg):
540    def _filter(_key, value):
541        if is_str(value):
542            return filter_testfiles(value)
543        return value
544    return filter_qmp(qmsg, _filter)
545
546def filter_virtio_scsi(output: str) -> str:
547    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
548
549def filter_qmp_virtio_scsi(qmsg):
550    def _filter(_key, value):
551        if is_str(value):
552            return filter_virtio_scsi(value)
553        return value
554    return filter_qmp(qmsg, _filter)
555
556def filter_generated_node_ids(msg):
557    return re.sub("#block[0-9]+", "NODE_NAME", msg)
558
559def filter_img_info(output, filename):
560    lines = []
561    for line in output.split('\n'):
562        if 'disk size' in line or 'actual-size' in line:
563            continue
564        line = line.replace(filename, 'TEST_IMG')
565        line = filter_testfiles(line)
566        line = line.replace(imgfmt, 'IMGFMT')
567        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
568        line = re.sub('uuid: [-a-f0-9]+',
569                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
570                      line)
571        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
572        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
573                      line)
574        lines.append(line)
575    return '\n'.join(lines)
576
577def filter_imgfmt(msg):
578    return msg.replace(imgfmt, 'IMGFMT')
579
580def filter_qmp_imgfmt(qmsg):
581    def _filter(_key, value):
582        if is_str(value):
583            return filter_imgfmt(value)
584        return value
585    return filter_qmp(qmsg, _filter)
586
587def filter_nbd_exports(output: str) -> str:
588    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
589
590
591Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
592
593def log(msg: Msg,
594        filters: Iterable[Callable[[Msg], Msg]] = (),
595        indent: Optional[int] = None) -> None:
596    """
597    Logs either a string message or a JSON serializable message (like QMP).
598    If indent is provided, JSON serializable messages are pretty-printed.
599    """
600    for flt in filters:
601        msg = flt(msg)
602    if isinstance(msg, (dict, list)):
603        # Don't sort if it's already sorted
604        do_sort = not isinstance(msg, OrderedDict)
605        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
606    else:
607        test_logger.info(msg)
608
609class Timeout:
610    def __init__(self, seconds, errmsg="Timeout"):
611        self.seconds = seconds
612        self.errmsg = errmsg
613    def __enter__(self):
614        if qemu_gdb or qemu_valgrind:
615            return self
616        signal.signal(signal.SIGALRM, self.timeout)
617        signal.setitimer(signal.ITIMER_REAL, self.seconds)
618        return self
619    def __exit__(self, exc_type, value, traceback):
620        if qemu_gdb or qemu_valgrind:
621            return False
622        signal.setitimer(signal.ITIMER_REAL, 0)
623        return False
624    def timeout(self, signum, frame):
625        raise Exception(self.errmsg)
626
627def file_pattern(name):
628    return "{0}-{1}".format(os.getpid(), name)
629
630class FilePath:
631    """
632    Context manager generating multiple file names. The generated files are
633    removed when exiting the context.
634
635    Example usage:
636
637        with FilePath('a.img', 'b.img') as (img_a, img_b):
638            # Use img_a and img_b here...
639
640        # a.img and b.img are automatically removed here.
641
642    By default images are created in iotests.test_dir. To create sockets use
643    iotests.sock_dir:
644
645       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
646
647    For convenience, calling with one argument yields a single file instead of
648    a tuple with one item.
649
650    """
651    def __init__(self, *names, base_dir=test_dir):
652        self.paths = [os.path.join(base_dir, file_pattern(name))
653                      for name in names]
654
655    def __enter__(self):
656        if len(self.paths) == 1:
657            return self.paths[0]
658        else:
659            return self.paths
660
661    def __exit__(self, exc_type, exc_val, exc_tb):
662        for path in self.paths:
663            try:
664                os.remove(path)
665            except OSError:
666                pass
667        return False
668
669
670def try_remove(img):
671    try:
672        os.remove(img)
673    except OSError:
674        pass
675
676def file_path_remover():
677    for path in reversed(file_path_remover.paths):
678        try_remove(path)
679
680
681def file_path(*names, base_dir=test_dir):
682    ''' Another way to get auto-generated filename that cleans itself up.
683
684    Use is as simple as:
685
686    img_a, img_b = file_path('a.img', 'b.img')
687    sock = file_path('socket')
688    '''
689
690    if not hasattr(file_path_remover, 'paths'):
691        file_path_remover.paths = []
692        atexit.register(file_path_remover)
693
694    paths = []
695    for name in names:
696        filename = file_pattern(name)
697        path = os.path.join(base_dir, filename)
698        file_path_remover.paths.append(path)
699        paths.append(path)
700
701    return paths[0] if len(paths) == 1 else paths
702
703def remote_filename(path):
704    if imgproto == 'file':
705        return path
706    elif imgproto == 'ssh':
707        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
708    else:
709        raise Exception("Protocol %s not supported" % (imgproto))
710
711class VM(qtest.QEMUQtestMachine):
712    '''A QEMU VM'''
713
714    def __init__(self, path_suffix=''):
715        name = "qemu%s-%d" % (path_suffix, os.getpid())
716        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
717        if qemu_gdb and qemu_valgrind:
718            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
719            sys.exit(1)
720        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
721        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
722                         name=name,
723                         base_temp_dir=test_dir,
724                         sock_dir=sock_dir, qmp_timer=timer)
725        self._num_drives = 0
726
727    def _post_shutdown(self) -> None:
728        super()._post_shutdown()
729        if not qemu_valgrind or not self._popen:
730            return
731        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
732        if self.exitcode() == 99:
733            with open(valgrind_filename, encoding='utf-8') as f:
734                print(f.read())
735        else:
736            os.remove(valgrind_filename)
737
738    def _pre_launch(self) -> None:
739        super()._pre_launch()
740        if qemu_print:
741            # set QEMU binary output to stdout
742            self._close_qemu_log_file()
743
744    def add_object(self, opts):
745        self._args.append('-object')
746        self._args.append(opts)
747        return self
748
749    def add_device(self, opts):
750        self._args.append('-device')
751        self._args.append(opts)
752        return self
753
754    def add_drive_raw(self, opts):
755        self._args.append('-drive')
756        self._args.append(opts)
757        return self
758
759    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
760        '''Add a virtio-blk drive to the VM'''
761        options = ['if=%s' % interface,
762                   'id=drive%d' % self._num_drives]
763
764        if path is not None:
765            options.append('file=%s' % path)
766            options.append('format=%s' % img_format)
767            options.append('cache=%s' % cachemode)
768            options.append('aio=%s' % aiomode)
769
770        if opts:
771            options.append(opts)
772
773        if img_format == 'luks' and 'key-secret' not in opts:
774            # default luks support
775            if luks_default_secret_object not in self._args:
776                self.add_object(luks_default_secret_object)
777
778            options.append(luks_default_key_secret_opt)
779
780        self._args.append('-drive')
781        self._args.append(','.join(options))
782        self._num_drives += 1
783        return self
784
785    def add_blockdev(self, opts):
786        self._args.append('-blockdev')
787        if isinstance(opts, str):
788            self._args.append(opts)
789        else:
790            self._args.append(','.join(opts))
791        return self
792
793    def add_incoming(self, addr):
794        self._args.append('-incoming')
795        self._args.append(addr)
796        return self
797
798    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
799        cmd = 'human-monitor-command'
800        kwargs: Dict[str, Any] = {'command-line': command_line}
801        if use_log:
802            return self.qmp_log(cmd, **kwargs)
803        else:
804            return self.qmp(cmd, **kwargs)
805
806    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
807        """Pause drive r/w operations"""
808        if not event:
809            self.pause_drive(drive, "read_aio")
810            self.pause_drive(drive, "write_aio")
811            return
812        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
813
814    def resume_drive(self, drive: str) -> None:
815        """Resume drive r/w operations"""
816        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
817
818    def hmp_qemu_io(self, drive: str, cmd: str,
819                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
820        """Write to a given drive using an HMP command"""
821        d = '-d ' if qdev else ''
822        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
823
824    def flatten_qmp_object(self, obj, output=None, basestr=''):
825        if output is None:
826            output = {}
827        if isinstance(obj, list):
828            for i, item in enumerate(obj):
829                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
830        elif isinstance(obj, dict):
831            for key in obj:
832                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
833        else:
834            output[basestr[:-1]] = obj # Strip trailing '.'
835        return output
836
837    def qmp_to_opts(self, obj):
838        obj = self.flatten_qmp_object(obj)
839        output_list = []
840        for key in obj:
841            output_list += [key + '=' + obj[key]]
842        return ','.join(output_list)
843
844    def get_qmp_events_filtered(self, wait=60.0):
845        result = []
846        for ev in self.get_qmp_events(wait=wait):
847            result.append(filter_qmp_event(ev))
848        return result
849
850    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
851        full_cmd = OrderedDict((
852            ("execute", cmd),
853            ("arguments", ordered_qmp(kwargs))
854        ))
855        log(full_cmd, filters, indent=indent)
856        result = self.qmp(cmd, **kwargs)
857        log(result, filters, indent=indent)
858        return result
859
860    # Returns None on success, and an error string on failure
861    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
862                pre_finalize=None, cancel=False, wait=60.0):
863        """
864        run_job moves a job from creation through to dismissal.
865
866        :param job: String. ID of recently-launched job
867        :param auto_finalize: Bool. True if the job was launched with
868                              auto_finalize. Defaults to True.
869        :param auto_dismiss: Bool. True if the job was launched with
870                             auto_dismiss=True. Defaults to False.
871        :param pre_finalize: Callback. A callable that takes no arguments to be
872                             invoked prior to issuing job-finalize, if any.
873        :param cancel: Bool. When true, cancels the job after the pre_finalize
874                       callback.
875        :param wait: Float. Timeout value specifying how long to wait for any
876                     event, in seconds. Defaults to 60.0.
877        """
878        match_device = {'data': {'device': job}}
879        match_id = {'data': {'id': job}}
880        events = [
881            ('BLOCK_JOB_COMPLETED', match_device),
882            ('BLOCK_JOB_CANCELLED', match_device),
883            ('BLOCK_JOB_ERROR', match_device),
884            ('BLOCK_JOB_READY', match_device),
885            ('BLOCK_JOB_PENDING', match_id),
886            ('JOB_STATUS_CHANGE', match_id)
887        ]
888        error = None
889        while True:
890            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
891            if ev['event'] != 'JOB_STATUS_CHANGE':
892                log(ev)
893                continue
894            status = ev['data']['status']
895            if status == 'aborting':
896                result = self.qmp('query-jobs')
897                for j in result['return']:
898                    if j['id'] == job:
899                        error = j['error']
900                        log('Job failed: %s' % (j['error']))
901            elif status == 'ready':
902                self.qmp_log('job-complete', id=job)
903            elif status == 'pending' and not auto_finalize:
904                if pre_finalize:
905                    pre_finalize()
906                if cancel:
907                    self.qmp_log('job-cancel', id=job)
908                else:
909                    self.qmp_log('job-finalize', id=job)
910            elif status == 'concluded' and not auto_dismiss:
911                self.qmp_log('job-dismiss', id=job)
912            elif status == 'null':
913                return error
914
915    # Returns None on success, and an error string on failure
916    def blockdev_create(self, options, job_id='job0', filters=None):
917        if filters is None:
918            filters = [filter_qmp_testfiles]
919        result = self.qmp_log('blockdev-create', filters=filters,
920                              job_id=job_id, options=options)
921
922        if 'return' in result:
923            assert result['return'] == {}
924            job_result = self.run_job(job_id)
925        else:
926            job_result = result['error']
927
928        log("")
929        return job_result
930
931    def enable_migration_events(self, name):
932        log('Enabling migration QMP events on %s...' % name)
933        log(self.qmp('migrate-set-capabilities', capabilities=[
934            {
935                'capability': 'events',
936                'state': True
937            }
938        ]))
939
940    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
941        while True:
942            event = self.event_wait('MIGRATION')
943            # We use the default timeout, and with a timeout, event_wait()
944            # never returns None
945            assert event
946
947            log(event, filters=[filter_qmp_event])
948            if event['data']['status'] in ('completed', 'failed'):
949                break
950
951        if event['data']['status'] == 'completed':
952            # The event may occur in finish-migrate, so wait for the expected
953            # post-migration runstate
954            runstate = None
955            while runstate != expect_runstate:
956                runstate = self.qmp('query-status')['return']['status']
957            return True
958        else:
959            return False
960
961    def node_info(self, node_name):
962        nodes = self.qmp('query-named-block-nodes')
963        for x in nodes['return']:
964            if x['node-name'] == node_name:
965                return x
966        return None
967
968    def query_bitmaps(self):
969        res = self.qmp("query-named-block-nodes")
970        return {device['node-name']: device['dirty-bitmaps']
971                for device in res['return'] if 'dirty-bitmaps' in device}
972
973    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
974        """
975        get a specific bitmap from the object returned by query_bitmaps.
976        :param recording: If specified, filter results by the specified value.
977        :param bitmaps: If specified, use it instead of call query_bitmaps()
978        """
979        if bitmaps is None:
980            bitmaps = self.query_bitmaps()
981
982        for bitmap in bitmaps[node_name]:
983            if bitmap.get('name', '') == bitmap_name:
984                if recording is None or bitmap.get('recording') == recording:
985                    return bitmap
986        return None
987
988    def check_bitmap_status(self, node_name, bitmap_name, fields):
989        ret = self.get_bitmap(node_name, bitmap_name)
990
991        return fields.items() <= ret.items()
992
993    def assert_block_path(self, root, path, expected_node, graph=None):
994        """
995        Check whether the node under the given path in the block graph
996        is @expected_node.
997
998        @root is the node name of the node where the @path is rooted.
999
1000        @path is a string that consists of child names separated by
1001        slashes.  It must begin with a slash.
1002
1003        Examples for @root + @path:
1004          - root="qcow2-node", path="/backing/file"
1005          - root="quorum-node", path="/children.2/file"
1006
1007        Hypothetically, @path could be empty, in which case it would
1008        point to @root.  However, in practice this case is not useful
1009        and hence not allowed.
1010
1011        @expected_node may be None.  (All elements of the path but the
1012        leaf must still exist.)
1013
1014        @graph may be None or the result of an x-debug-query-block-graph
1015        call that has already been performed.
1016        """
1017        if graph is None:
1018            graph = self.qmp('x-debug-query-block-graph')['return']
1019
1020        iter_path = iter(path.split('/'))
1021
1022        # Must start with a /
1023        assert next(iter_path) == ''
1024
1025        node = next((node for node in graph['nodes'] if node['name'] == root),
1026                    None)
1027
1028        # An empty @path is not allowed, so the root node must be present
1029        assert node is not None, 'Root node %s not found' % root
1030
1031        for child_name in iter_path:
1032            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1033
1034            try:
1035                node_id = next(edge['child'] for edge in graph['edges']
1036                               if (edge['parent'] == node['id'] and
1037                                   edge['name'] == child_name))
1038
1039                node = next(node for node in graph['nodes']
1040                            if node['id'] == node_id)
1041
1042            except StopIteration:
1043                node = None
1044
1045        if node is None:
1046            assert expected_node is None, \
1047                   'No node found under %s (but expected %s)' % \
1048                   (path, expected_node)
1049        else:
1050            assert node['name'] == expected_node, \
1051                   'Found node %s under %s (but expected %s)' % \
1052                   (node['name'], path, expected_node)
1053
1054index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1055
1056class QMPTestCase(unittest.TestCase):
1057    '''Abstract base class for QMP test cases'''
1058
1059    def __init__(self, *args, **kwargs):
1060        super().__init__(*args, **kwargs)
1061        # Many users of this class set a VM property we rely on heavily
1062        # in the methods below.
1063        self.vm = None
1064
1065    def dictpath(self, d, path):
1066        '''Traverse a path in a nested dict'''
1067        for component in path.split('/'):
1068            m = index_re.match(component)
1069            if m:
1070                component, idx = m.groups()
1071                idx = int(idx)
1072
1073            if not isinstance(d, dict) or component not in d:
1074                self.fail(f'failed path traversal for "{path}" in "{d}"')
1075            d = d[component]
1076
1077            if m:
1078                if not isinstance(d, list):
1079                    self.fail(f'path component "{component}" in "{path}" '
1080                              f'is not a list in "{d}"')
1081                try:
1082                    d = d[idx]
1083                except IndexError:
1084                    self.fail(f'invalid index "{idx}" in path "{path}" '
1085                              f'in "{d}"')
1086        return d
1087
1088    def assert_qmp_absent(self, d, path):
1089        try:
1090            result = self.dictpath(d, path)
1091        except AssertionError:
1092            return
1093        self.fail('path "%s" has value "%s"' % (path, str(result)))
1094
1095    def assert_qmp(self, d, path, value):
1096        '''Assert that the value for a specific path in a QMP dict
1097           matches.  When given a list of values, assert that any of
1098           them matches.'''
1099
1100        result = self.dictpath(d, path)
1101
1102        # [] makes no sense as a list of valid values, so treat it as
1103        # an actual single value.
1104        if isinstance(value, list) and value != []:
1105            for v in value:
1106                if result == v:
1107                    return
1108            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1109        else:
1110            self.assertEqual(result, value,
1111                             '"%s" is "%s", expected "%s"'
1112                             % (path, str(result), str(value)))
1113
1114    def assert_no_active_block_jobs(self):
1115        result = self.vm.qmp('query-block-jobs')
1116        self.assert_qmp(result, 'return', [])
1117
1118    def assert_has_block_node(self, node_name=None, file_name=None):
1119        """Issue a query-named-block-nodes and assert node_name and/or
1120        file_name is present in the result"""
1121        def check_equal_or_none(a, b):
1122            return a is None or b is None or a == b
1123        assert node_name or file_name
1124        result = self.vm.qmp('query-named-block-nodes')
1125        for x in result["return"]:
1126            if check_equal_or_none(x.get("node-name"), node_name) and \
1127                    check_equal_or_none(x.get("file"), file_name):
1128                return
1129        self.fail("Cannot find %s %s in result:\n%s" %
1130                  (node_name, file_name, result))
1131
1132    def assert_json_filename_equal(self, json_filename, reference):
1133        '''Asserts that the given filename is a json: filename and that its
1134           content is equal to the given reference object'''
1135        self.assertEqual(json_filename[:5], 'json:')
1136        self.assertEqual(
1137            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1138            self.vm.flatten_qmp_object(reference)
1139        )
1140
1141    def cancel_and_wait(self, drive='drive0', force=False,
1142                        resume=False, wait=60.0):
1143        '''Cancel a block job and wait for it to finish, returning the event'''
1144        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1145        self.assert_qmp(result, 'return', {})
1146
1147        if resume:
1148            self.vm.resume_drive(drive)
1149
1150        cancelled = False
1151        result = None
1152        while not cancelled:
1153            for event in self.vm.get_qmp_events(wait=wait):
1154                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1155                   event['event'] == 'BLOCK_JOB_CANCELLED':
1156                    self.assert_qmp(event, 'data/device', drive)
1157                    result = event
1158                    cancelled = True
1159                elif event['event'] == 'JOB_STATUS_CHANGE':
1160                    self.assert_qmp(event, 'data/id', drive)
1161
1162
1163        self.assert_no_active_block_jobs()
1164        return result
1165
1166    def wait_until_completed(self, drive='drive0', check_offset=True,
1167                             wait=60.0, error=None):
1168        '''Wait for a block job to finish, returning the event'''
1169        while True:
1170            for event in self.vm.get_qmp_events(wait=wait):
1171                if event['event'] == 'BLOCK_JOB_COMPLETED':
1172                    self.assert_qmp(event, 'data/device', drive)
1173                    if error is None:
1174                        self.assert_qmp_absent(event, 'data/error')
1175                        if check_offset:
1176                            self.assert_qmp(event, 'data/offset',
1177                                            event['data']['len'])
1178                    else:
1179                        self.assert_qmp(event, 'data/error', error)
1180                    self.assert_no_active_block_jobs()
1181                    return event
1182                if event['event'] == 'JOB_STATUS_CHANGE':
1183                    self.assert_qmp(event, 'data/id', drive)
1184
1185    def wait_ready(self, drive='drive0'):
1186        """Wait until a BLOCK_JOB_READY event, and return the event."""
1187        return self.vm.events_wait([
1188            ('BLOCK_JOB_READY',
1189             {'data': {'type': 'mirror', 'device': drive}}),
1190            ('BLOCK_JOB_READY',
1191             {'data': {'type': 'commit', 'device': drive}})
1192        ])
1193
1194    def wait_ready_and_cancel(self, drive='drive0'):
1195        self.wait_ready(drive=drive)
1196        event = self.cancel_and_wait(drive=drive)
1197        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1198        self.assert_qmp(event, 'data/type', 'mirror')
1199        self.assert_qmp(event, 'data/offset', event['data']['len'])
1200
1201    def complete_and_wait(self, drive='drive0', wait_ready=True,
1202                          completion_error=None):
1203        '''Complete a block job and wait for it to finish'''
1204        if wait_ready:
1205            self.wait_ready(drive=drive)
1206
1207        result = self.vm.qmp('block-job-complete', device=drive)
1208        self.assert_qmp(result, 'return', {})
1209
1210        event = self.wait_until_completed(drive=drive, error=completion_error)
1211        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1212
1213    def pause_wait(self, job_id='job0'):
1214        with Timeout(3, "Timeout waiting for job to pause"):
1215            while True:
1216                result = self.vm.qmp('query-block-jobs')
1217                found = False
1218                for job in result['return']:
1219                    if job['device'] == job_id:
1220                        found = True
1221                        if job['paused'] and not job['busy']:
1222                            return job
1223                        break
1224                assert found
1225
1226    def pause_job(self, job_id='job0', wait=True):
1227        result = self.vm.qmp('block-job-pause', device=job_id)
1228        self.assert_qmp(result, 'return', {})
1229        if wait:
1230            return self.pause_wait(job_id)
1231        return result
1232
1233    def case_skip(self, reason):
1234        '''Skip this test case'''
1235        case_notrun(reason)
1236        self.skipTest(reason)
1237
1238
1239def notrun(reason):
1240    '''Skip this test suite'''
1241    # Each test in qemu-iotests has a number ("seq")
1242    seq = os.path.basename(sys.argv[0])
1243
1244    with open('%s/%s.notrun' % (test_dir, seq), 'w', encoding='utf-8') \
1245            as outfile:
1246        outfile.write(reason + '\n')
1247    logger.warning("%s not run: %s", seq, reason)
1248    sys.exit(0)
1249
1250def case_notrun(reason):
1251    '''Mark this test case as not having been run (without actually
1252    skipping it, that is left to the caller).  See
1253    QMPTestCase.case_skip() for a variant that actually skips the
1254    current test case.'''
1255
1256    # Each test in qemu-iotests has a number ("seq")
1257    seq = os.path.basename(sys.argv[0])
1258
1259    with open('%s/%s.casenotrun' % (test_dir, seq), 'a', encoding='utf-8') \
1260            as outfile:
1261        outfile.write('    [case not run] ' + reason + '\n')
1262
1263def _verify_image_format(supported_fmts: Sequence[str] = (),
1264                         unsupported_fmts: Sequence[str] = ()) -> None:
1265    if 'generic' in supported_fmts and \
1266            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1267        # similar to
1268        #   _supported_fmt generic
1269        # for bash tests
1270        supported_fmts = ()
1271
1272    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1273    if not_sup or (imgfmt in unsupported_fmts):
1274        notrun('not suitable for this image format: %s' % imgfmt)
1275
1276    if imgfmt == 'luks':
1277        verify_working_luks()
1278
1279def _verify_protocol(supported: Sequence[str] = (),
1280                     unsupported: Sequence[str] = ()) -> None:
1281    assert not (supported and unsupported)
1282
1283    if 'generic' in supported:
1284        return
1285
1286    not_sup = supported and (imgproto not in supported)
1287    if not_sup or (imgproto in unsupported):
1288        notrun('not suitable for this protocol: %s' % imgproto)
1289
1290def _verify_platform(supported: Sequence[str] = (),
1291                     unsupported: Sequence[str] = ()) -> None:
1292    if any((sys.platform.startswith(x) for x in unsupported)):
1293        notrun('not suitable for this OS: %s' % sys.platform)
1294
1295    if supported:
1296        if not any((sys.platform.startswith(x) for x in supported)):
1297            notrun('not suitable for this OS: %s' % sys.platform)
1298
1299def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1300    if supported_cache_modes and (cachemode not in supported_cache_modes):
1301        notrun('not suitable for this cache mode: %s' % cachemode)
1302
1303def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1304    if supported_aio_modes and (aiomode not in supported_aio_modes):
1305        notrun('not suitable for this aio mode: %s' % aiomode)
1306
1307def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1308    usf_list = list(set(required_formats) - set(supported_formats()))
1309    if usf_list:
1310        notrun(f'formats {usf_list} are not whitelisted')
1311
1312
1313def _verify_virtio_blk() -> None:
1314    out = qemu_pipe('-M', 'none', '-device', 'help')
1315    if 'virtio-blk' not in out:
1316        notrun('Missing virtio-blk in QEMU binary')
1317
1318def _verify_virtio_scsi_pci_or_ccw() -> None:
1319    out = qemu_pipe('-M', 'none', '-device', 'help')
1320    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1321        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1322
1323
1324def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1325    imgopts = os.environ.get('IMGOPTS')
1326    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1327    # but it supported only for bash tests. We don't have a concept of global
1328    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1329    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1330    unsup = list(unsupported) + ['$']
1331    if imgopts and any(x in imgopts for x in unsup):
1332        notrun(f'not suitable for this imgopts: {imgopts}')
1333
1334
1335def supports_quorum():
1336    return 'quorum' in qemu_img_pipe('--help')
1337
1338def verify_quorum():
1339    '''Skip test suite if quorum support is not available'''
1340    if not supports_quorum():
1341        notrun('quorum support missing')
1342
1343def has_working_luks() -> Tuple[bool, str]:
1344    """
1345    Check whether our LUKS driver can actually create images
1346    (this extends to LUKS encryption for qcow2).
1347
1348    If not, return the reason why.
1349    """
1350
1351    img_file = f'{test_dir}/luks-test.luks'
1352    (output, status) = \
1353        qemu_img_pipe_and_status('create', '-f', 'luks',
1354                                 '--object', luks_default_secret_object,
1355                                 '-o', luks_default_key_secret_opt,
1356                                 '-o', 'iter-time=10',
1357                                 img_file, '1G')
1358    try:
1359        os.remove(img_file)
1360    except OSError:
1361        pass
1362
1363    if status != 0:
1364        reason = output
1365        for line in output.splitlines():
1366            if img_file + ':' in line:
1367                reason = line.split(img_file + ':', 1)[1].strip()
1368                break
1369
1370        return (False, reason)
1371    else:
1372        return (True, '')
1373
1374def verify_working_luks():
1375    """
1376    Skip test suite if LUKS does not work
1377    """
1378    (working, reason) = has_working_luks()
1379    if not working:
1380        notrun(reason)
1381
1382def qemu_pipe(*args: str) -> str:
1383    """
1384    Run qemu with an option to print something and exit (e.g. a help option).
1385
1386    :return: QEMU's stdout output.
1387    """
1388    full_args = [qemu_prog] + qemu_opts + list(args)
1389    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1390    return output
1391
1392def supported_formats(read_only=False):
1393    '''Set 'read_only' to True to check ro-whitelist
1394       Otherwise, rw-whitelist is checked'''
1395
1396    if not hasattr(supported_formats, "formats"):
1397        supported_formats.formats = {}
1398
1399    if read_only not in supported_formats.formats:
1400        format_message = qemu_pipe("-drive", "format=help")
1401        line = 1 if read_only else 0
1402        supported_formats.formats[read_only] = \
1403            format_message.splitlines()[line].split(":")[1].split()
1404
1405    return supported_formats.formats[read_only]
1406
1407def skip_if_unsupported(required_formats=(), read_only=False):
1408    '''Skip Test Decorator
1409       Runs the test if all the required formats are whitelisted'''
1410    def skip_test_decorator(func):
1411        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1412                         **kwargs: Dict[str, Any]) -> None:
1413            if callable(required_formats):
1414                fmts = required_formats(test_case)
1415            else:
1416                fmts = required_formats
1417
1418            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1419            if usf_list:
1420                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1421                test_case.case_skip(msg)
1422            else:
1423                func(test_case, *args, **kwargs)
1424        return func_wrapper
1425    return skip_test_decorator
1426
1427def skip_for_formats(formats: Sequence[str] = ()) \
1428    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1429                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1430    '''Skip Test Decorator
1431       Skips the test for the given formats'''
1432    def skip_test_decorator(func):
1433        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1434                         **kwargs: Dict[str, Any]) -> None:
1435            if imgfmt in formats:
1436                msg = f'{test_case}: Skipped for format {imgfmt}'
1437                test_case.case_skip(msg)
1438            else:
1439                func(test_case, *args, **kwargs)
1440        return func_wrapper
1441    return skip_test_decorator
1442
1443def skip_if_user_is_root(func):
1444    '''Skip Test Decorator
1445       Runs the test only without root permissions'''
1446    def func_wrapper(*args, **kwargs):
1447        if os.getuid() == 0:
1448            case_notrun('{}: cannot be run as root'.format(args[0]))
1449            return None
1450        else:
1451            return func(*args, **kwargs)
1452    return func_wrapper
1453
1454# We need to filter out the time taken from the output so that
1455# qemu-iotest can reliably diff the results against master output,
1456# and hide skipped tests from the reference output.
1457
1458class ReproducibleTestResult(unittest.TextTestResult):
1459    def addSkip(self, test, reason):
1460        # Same as TextTestResult, but print dot instead of "s"
1461        unittest.TestResult.addSkip(self, test, reason)
1462        if self.showAll:
1463            self.stream.writeln("skipped {0!r}".format(reason))
1464        elif self.dots:
1465            self.stream.write(".")
1466            self.stream.flush()
1467
1468class ReproducibleStreamWrapper:
1469    def __init__(self, stream: TextIO):
1470        self.stream = stream
1471
1472    def __getattr__(self, attr):
1473        if attr in ('stream', '__getstate__'):
1474            raise AttributeError(attr)
1475        return getattr(self.stream, attr)
1476
1477    def write(self, arg=None):
1478        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1479        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1480        self.stream.write(arg)
1481
1482class ReproducibleTestRunner(unittest.TextTestRunner):
1483    def __init__(self, stream: Optional[TextIO] = None,
1484                 resultclass: Type[unittest.TestResult] =
1485                 ReproducibleTestResult,
1486                 **kwargs: Any) -> None:
1487        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1488        super().__init__(stream=rstream,           # type: ignore
1489                         descriptions=True,
1490                         resultclass=resultclass,
1491                         **kwargs)
1492
1493def execute_unittest(argv: List[str], debug: bool = False) -> None:
1494    """Executes unittests within the calling module."""
1495
1496    # Some tests have warnings, especially ResourceWarnings for unclosed
1497    # files and sockets.  Ignore them for now to ensure reproducibility of
1498    # the test output.
1499    unittest.main(argv=argv,
1500                  testRunner=ReproducibleTestRunner,
1501                  verbosity=2 if debug else 1,
1502                  warnings=None if sys.warnoptions else 'ignore')
1503
1504def execute_setup_common(supported_fmts: Sequence[str] = (),
1505                         supported_platforms: Sequence[str] = (),
1506                         supported_cache_modes: Sequence[str] = (),
1507                         supported_aio_modes: Sequence[str] = (),
1508                         unsupported_fmts: Sequence[str] = (),
1509                         supported_protocols: Sequence[str] = (),
1510                         unsupported_protocols: Sequence[str] = (),
1511                         required_fmts: Sequence[str] = (),
1512                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1513    """
1514    Perform necessary setup for either script-style or unittest-style tests.
1515
1516    :return: Bool; Whether or not debug mode has been requested via the CLI.
1517    """
1518    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1519
1520    debug = '-d' in sys.argv
1521    if debug:
1522        sys.argv.remove('-d')
1523    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1524
1525    _verify_image_format(supported_fmts, unsupported_fmts)
1526    _verify_protocol(supported_protocols, unsupported_protocols)
1527    _verify_platform(supported=supported_platforms)
1528    _verify_cache_mode(supported_cache_modes)
1529    _verify_aio_mode(supported_aio_modes)
1530    _verify_formats(required_fmts)
1531    _verify_virtio_blk()
1532    _verify_imgopts(unsupported_imgopts)
1533
1534    return debug
1535
1536def execute_test(*args, test_function=None, **kwargs):
1537    """Run either unittest or script-style tests."""
1538
1539    debug = execute_setup_common(*args, **kwargs)
1540    if not test_function:
1541        execute_unittest(sys.argv, debug)
1542    else:
1543        test_function()
1544
1545def activate_logging():
1546    """Activate iotests.log() output to stdout for script-style tests."""
1547    handler = logging.StreamHandler(stream=sys.stdout)
1548    formatter = logging.Formatter('%(message)s')
1549    handler.setFormatter(formatter)
1550    test_logger.addHandler(handler)
1551    test_logger.setLevel(logging.INFO)
1552    test_logger.propagate = False
1553
1554# This is called from script-style iotests without a single point of entry
1555def script_initialize(*args, **kwargs):
1556    """Initialize script-style tests without running any tests."""
1557    activate_logging()
1558    execute_setup_common(*args, **kwargs)
1559
1560# This is called from script-style iotests with a single point of entry
1561def script_main(test_function, *args, **kwargs):
1562    """Run script-style tests outside of the unittest framework"""
1563    activate_logging()
1564    execute_test(*args, test_function=test_function, **kwargs)
1565
1566# This is called from unittest style iotests
1567def main(*args, **kwargs):
1568    """Run tests using the unittest framework"""
1569    execute_test(*args, **kwargs)
1570