xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 03ff4f8d)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import argparse
20import atexit
21import bz2
22from collections import OrderedDict
23import faulthandler
24import json
25import logging
26import os
27import re
28import shutil
29import signal
30import struct
31import subprocess
32import sys
33import time
34from typing import (Any, Callable, Dict, Iterable, Iterator,
35                    List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36import unittest
37
38from contextlib import contextmanager
39
40from qemu.machine import qtest
41from qemu.qmp import QMPMessage
42from qemu.aqmp.legacy import QEMUMonitorProtocol
43
44# Use this logger for logging messages directly from the iotests module
45logger = logging.getLogger('qemu.iotests')
46logger.addHandler(logging.NullHandler())
47
48# Use this logger for messages that ought to be used for diff output.
49test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52faulthandler.enable()
53
54# This will not work if arguments contain spaces but is necessary if we
55# want to support the override options that ./check supports.
56qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57if os.environ.get('QEMU_IMG_OPTIONS'):
58    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61if os.environ.get('QEMU_IO_OPTIONS'):
62    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66    qemu_io_args_no_fmt += \
67        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70qemu_nbd_args = [qemu_nbd_prog]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
78
79gdb_qemu_env = os.environ.get('GDB_OPTIONS')
80qemu_gdb = []
81if gdb_qemu_env:
82    qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
83
84qemu_print = os.environ.get('PRINT_QEMU', False)
85
86imgfmt = os.environ.get('IMGFMT', 'raw')
87imgproto = os.environ.get('IMGPROTO', 'file')
88output_dir = os.environ.get('OUTPUT_DIR', '.')
89
90try:
91    test_dir = os.environ['TEST_DIR']
92    sock_dir = os.environ['SOCK_DIR']
93    cachemode = os.environ['CACHEMODE']
94    aiomode = os.environ['AIOMODE']
95    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
96except KeyError:
97    # We are using these variables as proxies to indicate that we're
98    # not being run via "check". There may be other things set up by
99    # "check" that individual test cases rely on.
100    sys.stderr.write('Please run this test via the "check" script\n')
101    sys.exit(os.EX_USAGE)
102
103qemu_valgrind = []
104if os.environ.get('VALGRIND_QEMU') == "y" and \
105    os.environ.get('NO_VALGRIND') != "y":
106    valgrind_logfile = "--log-file=" + test_dir
107    # %p allows to put the valgrind process PID, since
108    # we don't know it a priori (subprocess.Popen is
109    # not yet invoked)
110    valgrind_logfile += "/%p.valgrind"
111
112    qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
113
114luks_default_secret_object = 'secret,id=keysec0,data=' + \
115                             os.environ.get('IMGKEYSECRET', '')
116luks_default_key_secret_opt = 'key-secret=keysec0'
117
118sample_img_dir = os.environ['SAMPLE_IMG_DIR']
119
120
121@contextmanager
122def change_log_level(
123        logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
124    """
125    Utility function for temporarily changing the log level of a logger.
126
127    This can be used to silence errors that are expected or uninteresting.
128    """
129    _logger = logging.getLogger(logger_name)
130    current_level = _logger.level
131    _logger.setLevel(level)
132
133    try:
134        yield
135    finally:
136        _logger.setLevel(current_level)
137
138
139def unarchive_sample_image(sample, fname):
140    sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
141    with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
142        shutil.copyfileobj(f_in, f_out)
143
144
145def qemu_tool_popen(args: Sequence[str],
146                    connect_stderr: bool = True) -> 'subprocess.Popen[str]':
147    stderr = subprocess.STDOUT if connect_stderr else None
148    # pylint: disable=consider-using-with
149    return subprocess.Popen(args,
150                            stdout=subprocess.PIPE,
151                            stderr=stderr,
152                            universal_newlines=True)
153
154
155def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
156                              connect_stderr: bool = True,
157                              drop_successful_output: bool = False) \
158        -> Tuple[str, int]:
159    """
160    Run a tool and return both its output and its exit code
161    """
162    with qemu_tool_popen(args, connect_stderr) as subp:
163        output = subp.communicate()[0]
164        if subp.returncode < 0:
165            cmd = ' '.join(args)
166            sys.stderr.write(f'{tool} received signal \
167                               {-subp.returncode}: {cmd}\n')
168        if drop_successful_output and subp.returncode == 0:
169            output = ''
170        return (output, subp.returncode)
171
172def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
173    if not args or args[0] != 'create':
174        return list(args)
175    args = args[1:]
176
177    p = argparse.ArgumentParser(allow_abbrev=False)
178    # -o option may be specified several times
179    p.add_argument('-o', action='append', default=[])
180    p.add_argument('-f')
181    parsed, remaining = p.parse_known_args(args)
182
183    opts_list = parsed.o
184
185    result = ['create']
186    if parsed.f is not None:
187        result += ['-f', parsed.f]
188
189    # IMGOPTS most probably contain options specific for the selected format,
190    # like extended_l2 or compression_type for qcow2. Test may want to create
191    # additional images in other formats that doesn't support these options.
192    # So, use IMGOPTS only for images created in imgfmt format.
193    imgopts = os.environ.get('IMGOPTS')
194    if imgopts and parsed.f == imgfmt:
195        opts_list.insert(0, imgopts)
196
197    # default luks support
198    if parsed.f == 'luks' and \
199            all('key-secret' not in opts for opts in opts_list):
200        result += ['--object', luks_default_secret_object]
201        opts_list.append(luks_default_key_secret_opt)
202
203    for opts in opts_list:
204        result += ['-o', opts]
205
206    result += remaining
207
208    return result
209
210def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
211    """
212    Run qemu-img and return both its output and its exit code
213    """
214    is_create = bool(args and args[0] == 'create')
215    full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
216    return qemu_tool_pipe_and_status('qemu-img', full_args,
217                                     drop_successful_output=is_create)
218
219def qemu_img(*args: str) -> int:
220    '''Run qemu-img and return the exit code'''
221    return qemu_img_pipe_and_status(*args)[1]
222
223def ordered_qmp(qmsg, conv_keys=True):
224    # Dictionaries are not ordered prior to 3.6, therefore:
225    if isinstance(qmsg, list):
226        return [ordered_qmp(atom) for atom in qmsg]
227    if isinstance(qmsg, dict):
228        od = OrderedDict()
229        for k, v in sorted(qmsg.items()):
230            if conv_keys:
231                k = k.replace('_', '-')
232            od[k] = ordered_qmp(v, conv_keys=False)
233        return od
234    return qmsg
235
236def qemu_img_create(*args):
237    return qemu_img('create', *args)
238
239def qemu_img_measure(*args):
240    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
241
242def qemu_img_check(*args):
243    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
244
245def qemu_img_pipe(*args: str) -> str:
246    '''Run qemu-img and return its output'''
247    return qemu_img_pipe_and_status(*args)[0]
248
249def qemu_img_log(*args):
250    result = qemu_img_pipe(*args)
251    log(result, filters=[filter_testfiles])
252    return result
253
254def img_info_log(filename, filter_path=None, use_image_opts=False,
255                 extra_args=()):
256    args = ['info']
257    if use_image_opts:
258        args.append('--image-opts')
259    else:
260        args += ['-f', imgfmt]
261    args += extra_args
262    args.append(filename)
263
264    output = qemu_img_pipe(*args)
265    if not filter_path:
266        filter_path = filename
267    log(filter_img_info(output, filter_path))
268
269def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
270    if '-f' in args or '--image-opts' in args:
271        return qemu_io_args_no_fmt + list(args)
272    else:
273        return qemu_io_args + list(args)
274
275def qemu_io_popen(*args):
276    return qemu_tool_popen(qemu_io_wrap_args(args))
277
278def qemu_io(*args):
279    '''Run qemu-io and return the stdout data'''
280    return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
281
282def qemu_io_log(*args):
283    result = qemu_io(*args)
284    log(result, filters=[filter_testfiles, filter_qemu_io])
285    return result
286
287def qemu_io_silent(*args):
288    '''Run qemu-io and return the exit code, suppressing stdout'''
289    args = qemu_io_wrap_args(args)
290    result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
291    if result.returncode < 0:
292        sys.stderr.write('qemu-io received signal %i: %s\n' %
293                         (-result.returncode, ' '.join(args)))
294    return result.returncode
295
296def qemu_io_silent_check(*args):
297    '''Run qemu-io and return the true if subprocess returned 0'''
298    args = qemu_io_wrap_args(args)
299    result = subprocess.run(args, stdout=subprocess.DEVNULL,
300                            stderr=subprocess.STDOUT, check=False)
301    return result.returncode == 0
302
303class QemuIoInteractive:
304    def __init__(self, *args):
305        self.args = qemu_io_wrap_args(args)
306        # We need to keep the Popen objext around, and not
307        # close it immediately. Therefore, disable the pylint check:
308        # pylint: disable=consider-using-with
309        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
310                                   stdout=subprocess.PIPE,
311                                   stderr=subprocess.STDOUT,
312                                   universal_newlines=True)
313        out = self._p.stdout.read(9)
314        if out != 'qemu-io> ':
315            # Most probably qemu-io just failed to start.
316            # Let's collect the whole output and exit.
317            out += self._p.stdout.read()
318            self._p.wait(timeout=1)
319            raise ValueError(out)
320
321    def close(self):
322        self._p.communicate('q\n')
323
324    def _read_output(self):
325        pattern = 'qemu-io> '
326        n = len(pattern)
327        pos = 0
328        s = []
329        while pos != n:
330            c = self._p.stdout.read(1)
331            # check unexpected EOF
332            assert c != ''
333            s.append(c)
334            if c == pattern[pos]:
335                pos += 1
336            else:
337                pos = 0
338
339        return ''.join(s[:-n])
340
341    def cmd(self, cmd):
342        # quit command is in close(), '\n' is added automatically
343        assert '\n' not in cmd
344        cmd = cmd.strip()
345        assert cmd not in ('q', 'quit')
346        self._p.stdin.write(cmd + '\n')
347        self._p.stdin.flush()
348        return self._read_output()
349
350
351class QemuStorageDaemon:
352    _qmp: Optional[QEMUMonitorProtocol] = None
353    _qmpsock: Optional[str] = None
354    # Python < 3.8 would complain if this type were not a string literal
355    # (importing `annotations` from `__future__` would work; but not on <= 3.6)
356    _p: 'Optional[subprocess.Popen[bytes]]' = None
357
358    def __init__(self, *args: str, instance_id: str = 'a', qmp: bool = False):
359        assert '--pidfile' not in args
360        self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
361        all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
362
363        if qmp:
364            self._qmpsock = os.path.join(sock_dir, f'qsd-{instance_id}.sock')
365            all_args += ['--chardev',
366                         f'socket,id=qmp-sock,path={self._qmpsock}',
367                         '--monitor', 'qmp-sock']
368
369            self._qmp = QEMUMonitorProtocol(self._qmpsock, server=True)
370
371        # Cannot use with here, we want the subprocess to stay around
372        # pylint: disable=consider-using-with
373        self._p = subprocess.Popen(all_args)
374        if self._qmp is not None:
375            self._qmp.accept()
376        while not os.path.exists(self.pidfile):
377            if self._p.poll() is not None:
378                cmd = ' '.join(all_args)
379                raise RuntimeError(
380                    'qemu-storage-daemon terminated with exit code ' +
381                    f'{self._p.returncode}: {cmd}')
382
383            time.sleep(0.01)
384
385        with open(self.pidfile, encoding='utf-8') as f:
386            self._pid = int(f.read().strip())
387
388        assert self._pid == self._p.pid
389
390    def qmp(self, cmd: str, args: Optional[Dict[str, object]] = None) \
391            -> QMPMessage:
392        assert self._qmp is not None
393        return self._qmp.cmd(cmd, args)
394
395    def stop(self, kill_signal=15):
396        self._p.send_signal(kill_signal)
397        self._p.wait()
398        self._p = None
399
400        if self._qmp:
401            self._qmp.close()
402
403        if self._qmpsock is not None:
404            try:
405                os.remove(self._qmpsock)
406            except OSError:
407                pass
408        try:
409            os.remove(self.pidfile)
410        except OSError:
411            pass
412
413    def __del__(self):
414        if self._p is not None:
415            self.stop(kill_signal=9)
416
417
418def qemu_nbd(*args):
419    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
420    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
421
422def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
423    '''Run qemu-nbd in daemon mode and return both the parent's exit code
424       and its output in case of an error'''
425    full_args = qemu_nbd_args + ['--fork'] + list(args)
426    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
427                                                   connect_stderr=False)
428    return returncode, output if returncode else ''
429
430def qemu_nbd_list_log(*args: str) -> str:
431    '''Run qemu-nbd to list remote exports'''
432    full_args = [qemu_nbd_prog, '-L'] + list(args)
433    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
434    log(output, filters=[filter_testfiles, filter_nbd_exports])
435    return output
436
437@contextmanager
438def qemu_nbd_popen(*args):
439    '''Context manager running qemu-nbd within the context'''
440    pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
441
442    assert not os.path.exists(pid_file)
443
444    cmd = list(qemu_nbd_args)
445    cmd.extend(('--persistent', '--pid-file', pid_file))
446    cmd.extend(args)
447
448    log('Start NBD server')
449    with subprocess.Popen(cmd) as p:
450        try:
451            while not os.path.exists(pid_file):
452                if p.poll() is not None:
453                    raise RuntimeError(
454                        "qemu-nbd terminated with exit code {}: {}"
455                        .format(p.returncode, ' '.join(cmd)))
456
457                time.sleep(0.01)
458            yield
459        finally:
460            if os.path.exists(pid_file):
461                os.remove(pid_file)
462            log('Kill NBD server')
463            p.kill()
464            p.wait()
465
466def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
467    '''Return True if two image files are identical'''
468    return qemu_img('compare', '-f', fmt1,
469                    '-F', fmt2, img1, img2) == 0
470
471def create_image(name, size):
472    '''Create a fully-allocated raw image with sector markers'''
473    with open(name, 'wb') as file:
474        i = 0
475        while i < size:
476            sector = struct.pack('>l504xl', i // 512, i // 512)
477            file.write(sector)
478            i = i + 512
479
480def image_size(img):
481    '''Return image's virtual size'''
482    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
483    return json.loads(r)['virtual-size']
484
485def is_str(val):
486    return isinstance(val, str)
487
488test_dir_re = re.compile(r"%s" % test_dir)
489def filter_test_dir(msg):
490    return test_dir_re.sub("TEST_DIR", msg)
491
492win32_re = re.compile(r"\r")
493def filter_win32(msg):
494    return win32_re.sub("", msg)
495
496qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
497                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
498                        r"and [0-9\/.inf]* ops\/sec\)")
499def filter_qemu_io(msg):
500    msg = filter_win32(msg)
501    return qemu_io_re.sub("X ops; XX:XX:XX.X "
502                          "(XXX YYY/sec and XXX ops/sec)", msg)
503
504chown_re = re.compile(r"chown [0-9]+:[0-9]+")
505def filter_chown(msg):
506    return chown_re.sub("chown UID:GID", msg)
507
508def filter_qmp_event(event):
509    '''Filter a QMP event dict'''
510    event = dict(event)
511    if 'timestamp' in event:
512        event['timestamp']['seconds'] = 'SECS'
513        event['timestamp']['microseconds'] = 'USECS'
514    return event
515
516def filter_qmp(qmsg, filter_fn):
517    '''Given a string filter, filter a QMP object's values.
518    filter_fn takes a (key, value) pair.'''
519    # Iterate through either lists or dicts;
520    if isinstance(qmsg, list):
521        items = enumerate(qmsg)
522    else:
523        items = qmsg.items()
524
525    for k, v in items:
526        if isinstance(v, (dict, list)):
527            qmsg[k] = filter_qmp(v, filter_fn)
528        else:
529            qmsg[k] = filter_fn(k, v)
530    return qmsg
531
532def filter_testfiles(msg):
533    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
534    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
535    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
536
537def filter_qmp_testfiles(qmsg):
538    def _filter(_key, value):
539        if is_str(value):
540            return filter_testfiles(value)
541        return value
542    return filter_qmp(qmsg, _filter)
543
544def filter_virtio_scsi(output: str) -> str:
545    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
546
547def filter_qmp_virtio_scsi(qmsg):
548    def _filter(_key, value):
549        if is_str(value):
550            return filter_virtio_scsi(value)
551        return value
552    return filter_qmp(qmsg, _filter)
553
554def filter_generated_node_ids(msg):
555    return re.sub("#block[0-9]+", "NODE_NAME", msg)
556
557def filter_img_info(output, filename):
558    lines = []
559    for line in output.split('\n'):
560        if 'disk size' in line or 'actual-size' in line:
561            continue
562        line = line.replace(filename, 'TEST_IMG')
563        line = filter_testfiles(line)
564        line = line.replace(imgfmt, 'IMGFMT')
565        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
566        line = re.sub('uuid: [-a-f0-9]+',
567                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
568                      line)
569        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
570        line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
571                      line)
572        lines.append(line)
573    return '\n'.join(lines)
574
575def filter_imgfmt(msg):
576    return msg.replace(imgfmt, 'IMGFMT')
577
578def filter_qmp_imgfmt(qmsg):
579    def _filter(_key, value):
580        if is_str(value):
581            return filter_imgfmt(value)
582        return value
583    return filter_qmp(qmsg, _filter)
584
585def filter_nbd_exports(output: str) -> str:
586    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
587
588
589Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
590
591def log(msg: Msg,
592        filters: Iterable[Callable[[Msg], Msg]] = (),
593        indent: Optional[int] = None) -> None:
594    """
595    Logs either a string message or a JSON serializable message (like QMP).
596    If indent is provided, JSON serializable messages are pretty-printed.
597    """
598    for flt in filters:
599        msg = flt(msg)
600    if isinstance(msg, (dict, list)):
601        # Don't sort if it's already sorted
602        do_sort = not isinstance(msg, OrderedDict)
603        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
604    else:
605        test_logger.info(msg)
606
607class Timeout:
608    def __init__(self, seconds, errmsg="Timeout"):
609        self.seconds = seconds
610        self.errmsg = errmsg
611    def __enter__(self):
612        if qemu_gdb or qemu_valgrind:
613            return self
614        signal.signal(signal.SIGALRM, self.timeout)
615        signal.setitimer(signal.ITIMER_REAL, self.seconds)
616        return self
617    def __exit__(self, exc_type, value, traceback):
618        if qemu_gdb or qemu_valgrind:
619            return False
620        signal.setitimer(signal.ITIMER_REAL, 0)
621        return False
622    def timeout(self, signum, frame):
623        raise Exception(self.errmsg)
624
625def file_pattern(name):
626    return "{0}-{1}".format(os.getpid(), name)
627
628class FilePath:
629    """
630    Context manager generating multiple file names. The generated files are
631    removed when exiting the context.
632
633    Example usage:
634
635        with FilePath('a.img', 'b.img') as (img_a, img_b):
636            # Use img_a and img_b here...
637
638        # a.img and b.img are automatically removed here.
639
640    By default images are created in iotests.test_dir. To create sockets use
641    iotests.sock_dir:
642
643       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
644
645    For convenience, calling with one argument yields a single file instead of
646    a tuple with one item.
647
648    """
649    def __init__(self, *names, base_dir=test_dir):
650        self.paths = [os.path.join(base_dir, file_pattern(name))
651                      for name in names]
652
653    def __enter__(self):
654        if len(self.paths) == 1:
655            return self.paths[0]
656        else:
657            return self.paths
658
659    def __exit__(self, exc_type, exc_val, exc_tb):
660        for path in self.paths:
661            try:
662                os.remove(path)
663            except OSError:
664                pass
665        return False
666
667
668def try_remove(img):
669    try:
670        os.remove(img)
671    except OSError:
672        pass
673
674def file_path_remover():
675    for path in reversed(file_path_remover.paths):
676        try_remove(path)
677
678
679def file_path(*names, base_dir=test_dir):
680    ''' Another way to get auto-generated filename that cleans itself up.
681
682    Use is as simple as:
683
684    img_a, img_b = file_path('a.img', 'b.img')
685    sock = file_path('socket')
686    '''
687
688    if not hasattr(file_path_remover, 'paths'):
689        file_path_remover.paths = []
690        atexit.register(file_path_remover)
691
692    paths = []
693    for name in names:
694        filename = file_pattern(name)
695        path = os.path.join(base_dir, filename)
696        file_path_remover.paths.append(path)
697        paths.append(path)
698
699    return paths[0] if len(paths) == 1 else paths
700
701def remote_filename(path):
702    if imgproto == 'file':
703        return path
704    elif imgproto == 'ssh':
705        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
706    else:
707        raise Exception("Protocol %s not supported" % (imgproto))
708
709class VM(qtest.QEMUQtestMachine):
710    '''A QEMU VM'''
711
712    def __init__(self, path_suffix=''):
713        name = "qemu%s-%d" % (path_suffix, os.getpid())
714        timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
715        if qemu_gdb and qemu_valgrind:
716            sys.stderr.write('gdb and valgrind are mutually exclusive\n')
717            sys.exit(1)
718        wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
719        super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
720                         name=name,
721                         base_temp_dir=test_dir,
722                         sock_dir=sock_dir, qmp_timer=timer)
723        self._num_drives = 0
724
725    def _post_shutdown(self) -> None:
726        super()._post_shutdown()
727        if not qemu_valgrind or not self._popen:
728            return
729        valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
730        if self.exitcode() == 99:
731            with open(valgrind_filename, encoding='utf-8') as f:
732                print(f.read())
733        else:
734            os.remove(valgrind_filename)
735
736    def _pre_launch(self) -> None:
737        super()._pre_launch()
738        if qemu_print:
739            # set QEMU binary output to stdout
740            self._close_qemu_log_file()
741
742    def add_object(self, opts):
743        self._args.append('-object')
744        self._args.append(opts)
745        return self
746
747    def add_device(self, opts):
748        self._args.append('-device')
749        self._args.append(opts)
750        return self
751
752    def add_drive_raw(self, opts):
753        self._args.append('-drive')
754        self._args.append(opts)
755        return self
756
757    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
758        '''Add a virtio-blk drive to the VM'''
759        options = ['if=%s' % interface,
760                   'id=drive%d' % self._num_drives]
761
762        if path is not None:
763            options.append('file=%s' % path)
764            options.append('format=%s' % img_format)
765            options.append('cache=%s' % cachemode)
766            options.append('aio=%s' % aiomode)
767
768        if opts:
769            options.append(opts)
770
771        if img_format == 'luks' and 'key-secret' not in opts:
772            # default luks support
773            if luks_default_secret_object not in self._args:
774                self.add_object(luks_default_secret_object)
775
776            options.append(luks_default_key_secret_opt)
777
778        self._args.append('-drive')
779        self._args.append(','.join(options))
780        self._num_drives += 1
781        return self
782
783    def add_blockdev(self, opts):
784        self._args.append('-blockdev')
785        if isinstance(opts, str):
786            self._args.append(opts)
787        else:
788            self._args.append(','.join(opts))
789        return self
790
791    def add_incoming(self, addr):
792        self._args.append('-incoming')
793        self._args.append(addr)
794        return self
795
796    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
797        cmd = 'human-monitor-command'
798        kwargs: Dict[str, Any] = {'command-line': command_line}
799        if use_log:
800            return self.qmp_log(cmd, **kwargs)
801        else:
802            return self.qmp(cmd, **kwargs)
803
804    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
805        """Pause drive r/w operations"""
806        if not event:
807            self.pause_drive(drive, "read_aio")
808            self.pause_drive(drive, "write_aio")
809            return
810        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
811
812    def resume_drive(self, drive: str) -> None:
813        """Resume drive r/w operations"""
814        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
815
816    def hmp_qemu_io(self, drive: str, cmd: str,
817                    use_log: bool = False, qdev: bool = False) -> QMPMessage:
818        """Write to a given drive using an HMP command"""
819        d = '-d ' if qdev else ''
820        return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
821
822    def flatten_qmp_object(self, obj, output=None, basestr=''):
823        if output is None:
824            output = {}
825        if isinstance(obj, list):
826            for i, item in enumerate(obj):
827                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
828        elif isinstance(obj, dict):
829            for key in obj:
830                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
831        else:
832            output[basestr[:-1]] = obj # Strip trailing '.'
833        return output
834
835    def qmp_to_opts(self, obj):
836        obj = self.flatten_qmp_object(obj)
837        output_list = []
838        for key in obj:
839            output_list += [key + '=' + obj[key]]
840        return ','.join(output_list)
841
842    def get_qmp_events_filtered(self, wait=60.0):
843        result = []
844        for ev in self.get_qmp_events(wait=wait):
845            result.append(filter_qmp_event(ev))
846        return result
847
848    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
849        full_cmd = OrderedDict((
850            ("execute", cmd),
851            ("arguments", ordered_qmp(kwargs))
852        ))
853        log(full_cmd, filters, indent=indent)
854        result = self.qmp(cmd, **kwargs)
855        log(result, filters, indent=indent)
856        return result
857
858    # Returns None on success, and an error string on failure
859    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
860                pre_finalize=None, cancel=False, wait=60.0):
861        """
862        run_job moves a job from creation through to dismissal.
863
864        :param job: String. ID of recently-launched job
865        :param auto_finalize: Bool. True if the job was launched with
866                              auto_finalize. Defaults to True.
867        :param auto_dismiss: Bool. True if the job was launched with
868                             auto_dismiss=True. Defaults to False.
869        :param pre_finalize: Callback. A callable that takes no arguments to be
870                             invoked prior to issuing job-finalize, if any.
871        :param cancel: Bool. When true, cancels the job after the pre_finalize
872                       callback.
873        :param wait: Float. Timeout value specifying how long to wait for any
874                     event, in seconds. Defaults to 60.0.
875        """
876        match_device = {'data': {'device': job}}
877        match_id = {'data': {'id': job}}
878        events = [
879            ('BLOCK_JOB_COMPLETED', match_device),
880            ('BLOCK_JOB_CANCELLED', match_device),
881            ('BLOCK_JOB_ERROR', match_device),
882            ('BLOCK_JOB_READY', match_device),
883            ('BLOCK_JOB_PENDING', match_id),
884            ('JOB_STATUS_CHANGE', match_id)
885        ]
886        error = None
887        while True:
888            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
889            if ev['event'] != 'JOB_STATUS_CHANGE':
890                log(ev)
891                continue
892            status = ev['data']['status']
893            if status == 'aborting':
894                result = self.qmp('query-jobs')
895                for j in result['return']:
896                    if j['id'] == job:
897                        error = j['error']
898                        log('Job failed: %s' % (j['error']))
899            elif status == 'ready':
900                self.qmp_log('job-complete', id=job)
901            elif status == 'pending' and not auto_finalize:
902                if pre_finalize:
903                    pre_finalize()
904                if cancel:
905                    self.qmp_log('job-cancel', id=job)
906                else:
907                    self.qmp_log('job-finalize', id=job)
908            elif status == 'concluded' and not auto_dismiss:
909                self.qmp_log('job-dismiss', id=job)
910            elif status == 'null':
911                return error
912
913    # Returns None on success, and an error string on failure
914    def blockdev_create(self, options, job_id='job0', filters=None):
915        if filters is None:
916            filters = [filter_qmp_testfiles]
917        result = self.qmp_log('blockdev-create', filters=filters,
918                              job_id=job_id, options=options)
919
920        if 'return' in result:
921            assert result['return'] == {}
922            job_result = self.run_job(job_id)
923        else:
924            job_result = result['error']
925
926        log("")
927        return job_result
928
929    def enable_migration_events(self, name):
930        log('Enabling migration QMP events on %s...' % name)
931        log(self.qmp('migrate-set-capabilities', capabilities=[
932            {
933                'capability': 'events',
934                'state': True
935            }
936        ]))
937
938    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
939        while True:
940            event = self.event_wait('MIGRATION')
941            # We use the default timeout, and with a timeout, event_wait()
942            # never returns None
943            assert event
944
945            log(event, filters=[filter_qmp_event])
946            if event['data']['status'] in ('completed', 'failed'):
947                break
948
949        if event['data']['status'] == 'completed':
950            # The event may occur in finish-migrate, so wait for the expected
951            # post-migration runstate
952            runstate = None
953            while runstate != expect_runstate:
954                runstate = self.qmp('query-status')['return']['status']
955            return True
956        else:
957            return False
958
959    def node_info(self, node_name):
960        nodes = self.qmp('query-named-block-nodes')
961        for x in nodes['return']:
962            if x['node-name'] == node_name:
963                return x
964        return None
965
966    def query_bitmaps(self):
967        res = self.qmp("query-named-block-nodes")
968        return {device['node-name']: device['dirty-bitmaps']
969                for device in res['return'] if 'dirty-bitmaps' in device}
970
971    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
972        """
973        get a specific bitmap from the object returned by query_bitmaps.
974        :param recording: If specified, filter results by the specified value.
975        :param bitmaps: If specified, use it instead of call query_bitmaps()
976        """
977        if bitmaps is None:
978            bitmaps = self.query_bitmaps()
979
980        for bitmap in bitmaps[node_name]:
981            if bitmap.get('name', '') == bitmap_name:
982                if recording is None or bitmap.get('recording') == recording:
983                    return bitmap
984        return None
985
986    def check_bitmap_status(self, node_name, bitmap_name, fields):
987        ret = self.get_bitmap(node_name, bitmap_name)
988
989        return fields.items() <= ret.items()
990
991    def assert_block_path(self, root, path, expected_node, graph=None):
992        """
993        Check whether the node under the given path in the block graph
994        is @expected_node.
995
996        @root is the node name of the node where the @path is rooted.
997
998        @path is a string that consists of child names separated by
999        slashes.  It must begin with a slash.
1000
1001        Examples for @root + @path:
1002          - root="qcow2-node", path="/backing/file"
1003          - root="quorum-node", path="/children.2/file"
1004
1005        Hypothetically, @path could be empty, in which case it would
1006        point to @root.  However, in practice this case is not useful
1007        and hence not allowed.
1008
1009        @expected_node may be None.  (All elements of the path but the
1010        leaf must still exist.)
1011
1012        @graph may be None or the result of an x-debug-query-block-graph
1013        call that has already been performed.
1014        """
1015        if graph is None:
1016            graph = self.qmp('x-debug-query-block-graph')['return']
1017
1018        iter_path = iter(path.split('/'))
1019
1020        # Must start with a /
1021        assert next(iter_path) == ''
1022
1023        node = next((node for node in graph['nodes'] if node['name'] == root),
1024                    None)
1025
1026        # An empty @path is not allowed, so the root node must be present
1027        assert node is not None, 'Root node %s not found' % root
1028
1029        for child_name in iter_path:
1030            assert node is not None, 'Cannot follow path %s%s' % (root, path)
1031
1032            try:
1033                node_id = next(edge['child'] for edge in graph['edges']
1034                               if (edge['parent'] == node['id'] and
1035                                   edge['name'] == child_name))
1036
1037                node = next(node for node in graph['nodes']
1038                            if node['id'] == node_id)
1039
1040            except StopIteration:
1041                node = None
1042
1043        if node is None:
1044            assert expected_node is None, \
1045                   'No node found under %s (but expected %s)' % \
1046                   (path, expected_node)
1047        else:
1048            assert node['name'] == expected_node, \
1049                   'Found node %s under %s (but expected %s)' % \
1050                   (node['name'], path, expected_node)
1051
1052index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1053
1054class QMPTestCase(unittest.TestCase):
1055    '''Abstract base class for QMP test cases'''
1056
1057    def __init__(self, *args, **kwargs):
1058        super().__init__(*args, **kwargs)
1059        # Many users of this class set a VM property we rely on heavily
1060        # in the methods below.
1061        self.vm = None
1062
1063    def dictpath(self, d, path):
1064        '''Traverse a path in a nested dict'''
1065        for component in path.split('/'):
1066            m = index_re.match(component)
1067            if m:
1068                component, idx = m.groups()
1069                idx = int(idx)
1070
1071            if not isinstance(d, dict) or component not in d:
1072                self.fail(f'failed path traversal for "{path}" in "{d}"')
1073            d = d[component]
1074
1075            if m:
1076                if not isinstance(d, list):
1077                    self.fail(f'path component "{component}" in "{path}" '
1078                              f'is not a list in "{d}"')
1079                try:
1080                    d = d[idx]
1081                except IndexError:
1082                    self.fail(f'invalid index "{idx}" in path "{path}" '
1083                              f'in "{d}"')
1084        return d
1085
1086    def assert_qmp_absent(self, d, path):
1087        try:
1088            result = self.dictpath(d, path)
1089        except AssertionError:
1090            return
1091        self.fail('path "%s" has value "%s"' % (path, str(result)))
1092
1093    def assert_qmp(self, d, path, value):
1094        '''Assert that the value for a specific path in a QMP dict
1095           matches.  When given a list of values, assert that any of
1096           them matches.'''
1097
1098        result = self.dictpath(d, path)
1099
1100        # [] makes no sense as a list of valid values, so treat it as
1101        # an actual single value.
1102        if isinstance(value, list) and value != []:
1103            for v in value:
1104                if result == v:
1105                    return
1106            self.fail('no match for "%s" in %s' % (str(result), str(value)))
1107        else:
1108            self.assertEqual(result, value,
1109                             '"%s" is "%s", expected "%s"'
1110                             % (path, str(result), str(value)))
1111
1112    def assert_no_active_block_jobs(self):
1113        result = self.vm.qmp('query-block-jobs')
1114        self.assert_qmp(result, 'return', [])
1115
1116    def assert_has_block_node(self, node_name=None, file_name=None):
1117        """Issue a query-named-block-nodes and assert node_name and/or
1118        file_name is present in the result"""
1119        def check_equal_or_none(a, b):
1120            return a is None or b is None or a == b
1121        assert node_name or file_name
1122        result = self.vm.qmp('query-named-block-nodes')
1123        for x in result["return"]:
1124            if check_equal_or_none(x.get("node-name"), node_name) and \
1125                    check_equal_or_none(x.get("file"), file_name):
1126                return
1127        self.fail("Cannot find %s %s in result:\n%s" %
1128                  (node_name, file_name, result))
1129
1130    def assert_json_filename_equal(self, json_filename, reference):
1131        '''Asserts that the given filename is a json: filename and that its
1132           content is equal to the given reference object'''
1133        self.assertEqual(json_filename[:5], 'json:')
1134        self.assertEqual(
1135            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1136            self.vm.flatten_qmp_object(reference)
1137        )
1138
1139    def cancel_and_wait(self, drive='drive0', force=False,
1140                        resume=False, wait=60.0):
1141        '''Cancel a block job and wait for it to finish, returning the event'''
1142        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1143        self.assert_qmp(result, 'return', {})
1144
1145        if resume:
1146            self.vm.resume_drive(drive)
1147
1148        cancelled = False
1149        result = None
1150        while not cancelled:
1151            for event in self.vm.get_qmp_events(wait=wait):
1152                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1153                   event['event'] == 'BLOCK_JOB_CANCELLED':
1154                    self.assert_qmp(event, 'data/device', drive)
1155                    result = event
1156                    cancelled = True
1157                elif event['event'] == 'JOB_STATUS_CHANGE':
1158                    self.assert_qmp(event, 'data/id', drive)
1159
1160
1161        self.assert_no_active_block_jobs()
1162        return result
1163
1164    def wait_until_completed(self, drive='drive0', check_offset=True,
1165                             wait=60.0, error=None):
1166        '''Wait for a block job to finish, returning the event'''
1167        while True:
1168            for event in self.vm.get_qmp_events(wait=wait):
1169                if event['event'] == 'BLOCK_JOB_COMPLETED':
1170                    self.assert_qmp(event, 'data/device', drive)
1171                    if error is None:
1172                        self.assert_qmp_absent(event, 'data/error')
1173                        if check_offset:
1174                            self.assert_qmp(event, 'data/offset',
1175                                            event['data']['len'])
1176                    else:
1177                        self.assert_qmp(event, 'data/error', error)
1178                    self.assert_no_active_block_jobs()
1179                    return event
1180                if event['event'] == 'JOB_STATUS_CHANGE':
1181                    self.assert_qmp(event, 'data/id', drive)
1182
1183    def wait_ready(self, drive='drive0'):
1184        """Wait until a BLOCK_JOB_READY event, and return the event."""
1185        return self.vm.events_wait([
1186            ('BLOCK_JOB_READY',
1187             {'data': {'type': 'mirror', 'device': drive}}),
1188            ('BLOCK_JOB_READY',
1189             {'data': {'type': 'commit', 'device': drive}})
1190        ])
1191
1192    def wait_ready_and_cancel(self, drive='drive0'):
1193        self.wait_ready(drive=drive)
1194        event = self.cancel_and_wait(drive=drive)
1195        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1196        self.assert_qmp(event, 'data/type', 'mirror')
1197        self.assert_qmp(event, 'data/offset', event['data']['len'])
1198
1199    def complete_and_wait(self, drive='drive0', wait_ready=True,
1200                          completion_error=None):
1201        '''Complete a block job and wait for it to finish'''
1202        if wait_ready:
1203            self.wait_ready(drive=drive)
1204
1205        result = self.vm.qmp('block-job-complete', device=drive)
1206        self.assert_qmp(result, 'return', {})
1207
1208        event = self.wait_until_completed(drive=drive, error=completion_error)
1209        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1210
1211    def pause_wait(self, job_id='job0'):
1212        with Timeout(3, "Timeout waiting for job to pause"):
1213            while True:
1214                result = self.vm.qmp('query-block-jobs')
1215                found = False
1216                for job in result['return']:
1217                    if job['device'] == job_id:
1218                        found = True
1219                        if job['paused'] and not job['busy']:
1220                            return job
1221                        break
1222                assert found
1223
1224    def pause_job(self, job_id='job0', wait=True):
1225        result = self.vm.qmp('block-job-pause', device=job_id)
1226        self.assert_qmp(result, 'return', {})
1227        if wait:
1228            return self.pause_wait(job_id)
1229        return result
1230
1231    def case_skip(self, reason):
1232        '''Skip this test case'''
1233        case_notrun(reason)
1234        self.skipTest(reason)
1235
1236
1237def notrun(reason):
1238    '''Skip this test suite'''
1239    # Each test in qemu-iotests has a number ("seq")
1240    seq = os.path.basename(sys.argv[0])
1241
1242    with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1243            as outfile:
1244        outfile.write(reason + '\n')
1245    logger.warning("%s not run: %s", seq, reason)
1246    sys.exit(0)
1247
1248def case_notrun(reason):
1249    '''Mark this test case as not having been run (without actually
1250    skipping it, that is left to the caller).  See
1251    QMPTestCase.case_skip() for a variant that actually skips the
1252    current test case.'''
1253
1254    # Each test in qemu-iotests has a number ("seq")
1255    seq = os.path.basename(sys.argv[0])
1256
1257    with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1258            as outfile:
1259        outfile.write('    [case not run] ' + reason + '\n')
1260
1261def _verify_image_format(supported_fmts: Sequence[str] = (),
1262                         unsupported_fmts: Sequence[str] = ()) -> None:
1263    if 'generic' in supported_fmts and \
1264            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1265        # similar to
1266        #   _supported_fmt generic
1267        # for bash tests
1268        supported_fmts = ()
1269
1270    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1271    if not_sup or (imgfmt in unsupported_fmts):
1272        notrun('not suitable for this image format: %s' % imgfmt)
1273
1274    if imgfmt == 'luks':
1275        verify_working_luks()
1276
1277def _verify_protocol(supported: Sequence[str] = (),
1278                     unsupported: Sequence[str] = ()) -> None:
1279    assert not (supported and unsupported)
1280
1281    if 'generic' in supported:
1282        return
1283
1284    not_sup = supported and (imgproto not in supported)
1285    if not_sup or (imgproto in unsupported):
1286        notrun('not suitable for this protocol: %s' % imgproto)
1287
1288def _verify_platform(supported: Sequence[str] = (),
1289                     unsupported: Sequence[str] = ()) -> None:
1290    if any((sys.platform.startswith(x) for x in unsupported)):
1291        notrun('not suitable for this OS: %s' % sys.platform)
1292
1293    if supported:
1294        if not any((sys.platform.startswith(x) for x in supported)):
1295            notrun('not suitable for this OS: %s' % sys.platform)
1296
1297def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1298    if supported_cache_modes and (cachemode not in supported_cache_modes):
1299        notrun('not suitable for this cache mode: %s' % cachemode)
1300
1301def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1302    if supported_aio_modes and (aiomode not in supported_aio_modes):
1303        notrun('not suitable for this aio mode: %s' % aiomode)
1304
1305def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1306    usf_list = list(set(required_formats) - set(supported_formats()))
1307    if usf_list:
1308        notrun(f'formats {usf_list} are not whitelisted')
1309
1310
1311def _verify_virtio_blk() -> None:
1312    out = qemu_pipe('-M', 'none', '-device', 'help')
1313    if 'virtio-blk' not in out:
1314        notrun('Missing virtio-blk in QEMU binary')
1315
1316def _verify_virtio_scsi_pci_or_ccw() -> None:
1317    out = qemu_pipe('-M', 'none', '-device', 'help')
1318    if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1319        notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1320
1321
1322def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1323    imgopts = os.environ.get('IMGOPTS')
1324    # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1325    # but it supported only for bash tests. We don't have a concept of global
1326    # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1327    # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1328    unsup = list(unsupported) + ['$']
1329    if imgopts and any(x in imgopts for x in unsup):
1330        notrun(f'not suitable for this imgopts: {imgopts}')
1331
1332
1333def supports_quorum():
1334    return 'quorum' in qemu_img_pipe('--help')
1335
1336def verify_quorum():
1337    '''Skip test suite if quorum support is not available'''
1338    if not supports_quorum():
1339        notrun('quorum support missing')
1340
1341def has_working_luks() -> Tuple[bool, str]:
1342    """
1343    Check whether our LUKS driver can actually create images
1344    (this extends to LUKS encryption for qcow2).
1345
1346    If not, return the reason why.
1347    """
1348
1349    img_file = f'{test_dir}/luks-test.luks'
1350    (output, status) = \
1351        qemu_img_pipe_and_status('create', '-f', 'luks',
1352                                 '--object', luks_default_secret_object,
1353                                 '-o', luks_default_key_secret_opt,
1354                                 '-o', 'iter-time=10',
1355                                 img_file, '1G')
1356    try:
1357        os.remove(img_file)
1358    except OSError:
1359        pass
1360
1361    if status != 0:
1362        reason = output
1363        for line in output.splitlines():
1364            if img_file + ':' in line:
1365                reason = line.split(img_file + ':', 1)[1].strip()
1366                break
1367
1368        return (False, reason)
1369    else:
1370        return (True, '')
1371
1372def verify_working_luks():
1373    """
1374    Skip test suite if LUKS does not work
1375    """
1376    (working, reason) = has_working_luks()
1377    if not working:
1378        notrun(reason)
1379
1380def qemu_pipe(*args: str) -> str:
1381    """
1382    Run qemu with an option to print something and exit (e.g. a help option).
1383
1384    :return: QEMU's stdout output.
1385    """
1386    full_args = [qemu_prog] + qemu_opts + list(args)
1387    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1388    return output
1389
1390def supported_formats(read_only=False):
1391    '''Set 'read_only' to True to check ro-whitelist
1392       Otherwise, rw-whitelist is checked'''
1393
1394    if not hasattr(supported_formats, "formats"):
1395        supported_formats.formats = {}
1396
1397    if read_only not in supported_formats.formats:
1398        format_message = qemu_pipe("-drive", "format=help")
1399        line = 1 if read_only else 0
1400        supported_formats.formats[read_only] = \
1401            format_message.splitlines()[line].split(":")[1].split()
1402
1403    return supported_formats.formats[read_only]
1404
1405def skip_if_unsupported(required_formats=(), read_only=False):
1406    '''Skip Test Decorator
1407       Runs the test if all the required formats are whitelisted'''
1408    def skip_test_decorator(func):
1409        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1410                         **kwargs: Dict[str, Any]) -> None:
1411            if callable(required_formats):
1412                fmts = required_formats(test_case)
1413            else:
1414                fmts = required_formats
1415
1416            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1417            if usf_list:
1418                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1419                test_case.case_skip(msg)
1420            else:
1421                func(test_case, *args, **kwargs)
1422        return func_wrapper
1423    return skip_test_decorator
1424
1425def skip_for_formats(formats: Sequence[str] = ()) \
1426    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1427                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1428    '''Skip Test Decorator
1429       Skips the test for the given formats'''
1430    def skip_test_decorator(func):
1431        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1432                         **kwargs: Dict[str, Any]) -> None:
1433            if imgfmt in formats:
1434                msg = f'{test_case}: Skipped for format {imgfmt}'
1435                test_case.case_skip(msg)
1436            else:
1437                func(test_case, *args, **kwargs)
1438        return func_wrapper
1439    return skip_test_decorator
1440
1441def skip_if_user_is_root(func):
1442    '''Skip Test Decorator
1443       Runs the test only without root permissions'''
1444    def func_wrapper(*args, **kwargs):
1445        if os.getuid() == 0:
1446            case_notrun('{}: cannot be run as root'.format(args[0]))
1447            return None
1448        else:
1449            return func(*args, **kwargs)
1450    return func_wrapper
1451
1452# We need to filter out the time taken from the output so that
1453# qemu-iotest can reliably diff the results against master output,
1454# and hide skipped tests from the reference output.
1455
1456class ReproducibleTestResult(unittest.TextTestResult):
1457    def addSkip(self, test, reason):
1458        # Same as TextTestResult, but print dot instead of "s"
1459        unittest.TestResult.addSkip(self, test, reason)
1460        if self.showAll:
1461            self.stream.writeln("skipped {0!r}".format(reason))
1462        elif self.dots:
1463            self.stream.write(".")
1464            self.stream.flush()
1465
1466class ReproducibleStreamWrapper:
1467    def __init__(self, stream: TextIO):
1468        self.stream = stream
1469
1470    def __getattr__(self, attr):
1471        if attr in ('stream', '__getstate__'):
1472            raise AttributeError(attr)
1473        return getattr(self.stream, attr)
1474
1475    def write(self, arg=None):
1476        arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1477        arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1478        self.stream.write(arg)
1479
1480class ReproducibleTestRunner(unittest.TextTestRunner):
1481    def __init__(self, stream: Optional[TextIO] = None,
1482                 resultclass: Type[unittest.TestResult] =
1483                 ReproducibleTestResult,
1484                 **kwargs: Any) -> None:
1485        rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1486        super().__init__(stream=rstream,           # type: ignore
1487                         descriptions=True,
1488                         resultclass=resultclass,
1489                         **kwargs)
1490
1491def execute_unittest(argv: List[str], debug: bool = False) -> None:
1492    """Executes unittests within the calling module."""
1493
1494    # Some tests have warnings, especially ResourceWarnings for unclosed
1495    # files and sockets.  Ignore them for now to ensure reproducibility of
1496    # the test output.
1497    unittest.main(argv=argv,
1498                  testRunner=ReproducibleTestRunner,
1499                  verbosity=2 if debug else 1,
1500                  warnings=None if sys.warnoptions else 'ignore')
1501
1502def execute_setup_common(supported_fmts: Sequence[str] = (),
1503                         supported_platforms: Sequence[str] = (),
1504                         supported_cache_modes: Sequence[str] = (),
1505                         supported_aio_modes: Sequence[str] = (),
1506                         unsupported_fmts: Sequence[str] = (),
1507                         supported_protocols: Sequence[str] = (),
1508                         unsupported_protocols: Sequence[str] = (),
1509                         required_fmts: Sequence[str] = (),
1510                         unsupported_imgopts: Sequence[str] = ()) -> bool:
1511    """
1512    Perform necessary setup for either script-style or unittest-style tests.
1513
1514    :return: Bool; Whether or not debug mode has been requested via the CLI.
1515    """
1516    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1517
1518    debug = '-d' in sys.argv
1519    if debug:
1520        sys.argv.remove('-d')
1521    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1522
1523    _verify_image_format(supported_fmts, unsupported_fmts)
1524    _verify_protocol(supported_protocols, unsupported_protocols)
1525    _verify_platform(supported=supported_platforms)
1526    _verify_cache_mode(supported_cache_modes)
1527    _verify_aio_mode(supported_aio_modes)
1528    _verify_formats(required_fmts)
1529    _verify_virtio_blk()
1530    _verify_imgopts(unsupported_imgopts)
1531
1532    return debug
1533
1534def execute_test(*args, test_function=None, **kwargs):
1535    """Run either unittest or script-style tests."""
1536
1537    debug = execute_setup_common(*args, **kwargs)
1538    if not test_function:
1539        execute_unittest(sys.argv, debug)
1540    else:
1541        test_function()
1542
1543def activate_logging():
1544    """Activate iotests.log() output to stdout for script-style tests."""
1545    handler = logging.StreamHandler(stream=sys.stdout)
1546    formatter = logging.Formatter('%(message)s')
1547    handler.setFormatter(formatter)
1548    test_logger.addHandler(handler)
1549    test_logger.setLevel(logging.INFO)
1550    test_logger.propagate = False
1551
1552# This is called from script-style iotests without a single point of entry
1553def script_initialize(*args, **kwargs):
1554    """Initialize script-style tests without running any tests."""
1555    activate_logging()
1556    execute_setup_common(*args, **kwargs)
1557
1558# This is called from script-style iotests with a single point of entry
1559def script_main(test_function, *args, **kwargs):
1560    """Run script-style tests outside of the unittest framework"""
1561    activate_logging()
1562    execute_test(*args, test_function=test_function, **kwargs)
1563
1564# This is called from unittest style iotests
1565def main(*args, **kwargs):
1566    """Run tests using the unittest framework"""
1567    execute_test(*args, **kwargs)
1568