xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 740b1759)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78test_dir = os.environ.get('TEST_DIR')
79sock_dir = os.environ.get('SOCK_DIR')
80output_dir = os.environ.get('OUTPUT_DIR', '.')
81cachemode = os.environ.get('CACHEMODE')
82aiomode = os.environ.get('AIOMODE')
83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87luks_default_secret_object = 'secret,id=keysec0,data=' + \
88                             os.environ.get('IMGKEYSECRET', '')
89luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93                              connect_stderr: bool = True) -> Tuple[str, int]:
94    """
95    Run a tool and return both its output and its exit code
96    """
97    stderr = subprocess.STDOUT if connect_stderr else None
98    subp = subprocess.Popen(args,
99                            stdout=subprocess.PIPE,
100                            stderr=stderr,
101                            universal_newlines=True)
102    output = subp.communicate()[0]
103    if subp.returncode < 0:
104        sys.stderr.write('%s received signal %i: %s\n'
105                         % (tool, -subp.returncode,
106                            ' '.join(qemu_img_args + list(args))))
107    return (output, subp.returncode)
108
109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110    """
111    Run qemu-img and return both its output and its exit code
112    """
113    full_args = qemu_img_args + list(args)
114    return qemu_tool_pipe_and_status('qemu-img', full_args)
115
116def qemu_img(*args: str) -> int:
117    '''Run qemu-img and return the exit code'''
118    return qemu_img_pipe_and_status(*args)[1]
119
120def ordered_qmp(qmsg, conv_keys=True):
121    # Dictionaries are not ordered prior to 3.6, therefore:
122    if isinstance(qmsg, list):
123        return [ordered_qmp(atom) for atom in qmsg]
124    if isinstance(qmsg, dict):
125        od = OrderedDict()
126        for k, v in sorted(qmsg.items()):
127            if conv_keys:
128                k = k.replace('_', '-')
129            od[k] = ordered_qmp(v, conv_keys=False)
130        return od
131    return qmsg
132
133def qemu_img_create(*args):
134    args = list(args)
135
136    # default luks support
137    if '-f' in args and args[args.index('-f') + 1] == 'luks':
138        if '-o' in args:
139            i = args.index('-o')
140            if 'key-secret' not in args[i + 1]:
141                args[i + 1].append(luks_default_key_secret_opt)
142                args.insert(i + 2, '--object')
143                args.insert(i + 3, luks_default_secret_object)
144        else:
145            args = ['-o', luks_default_key_secret_opt,
146                    '--object', luks_default_secret_object] + args
147
148    args.insert(0, 'create')
149
150    return qemu_img(*args)
151
152def qemu_img_measure(*args):
153    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154
155def qemu_img_check(*args):
156    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157
158def qemu_img_verbose(*args):
159    '''Run qemu-img without suppressing its output and return the exit code'''
160    exitcode = subprocess.call(qemu_img_args + list(args))
161    if exitcode < 0:
162        sys.stderr.write('qemu-img received signal %i: %s\n'
163                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
164    return exitcode
165
166def qemu_img_pipe(*args: str) -> str:
167    '''Run qemu-img and return its output'''
168    return qemu_img_pipe_and_status(*args)[0]
169
170def qemu_img_log(*args):
171    result = qemu_img_pipe(*args)
172    log(result, filters=[filter_testfiles])
173    return result
174
175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176    args = ['info']
177    if imgopts:
178        args.append('--image-opts')
179    else:
180        args += ['-f', imgfmt]
181    args += extra_args
182    args.append(filename)
183
184    output = qemu_img_pipe(*args)
185    if not filter_path:
186        filter_path = filename
187    log(filter_img_info(output, filter_path))
188
189def qemu_io(*args):
190    '''Run qemu-io and return the stdout data'''
191    args = qemu_io_args + list(args)
192    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193                            stderr=subprocess.STDOUT,
194                            universal_newlines=True)
195    output = subp.communicate()[0]
196    if subp.returncode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n'
198                         % (-subp.returncode, ' '.join(args)))
199    return output
200
201def qemu_io_log(*args):
202    result = qemu_io(*args)
203    log(result, filters=[filter_testfiles, filter_qemu_io])
204    return result
205
206def qemu_io_silent(*args):
207    '''Run qemu-io and return the exit code, suppressing stdout'''
208    args = qemu_io_args + list(args)
209    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
210    if exitcode < 0:
211        sys.stderr.write('qemu-io received signal %i: %s\n' %
212                         (-exitcode, ' '.join(args)))
213    return exitcode
214
215def qemu_io_silent_check(*args):
216    '''Run qemu-io and return the true if subprocess returned 0'''
217    args = qemu_io_args + list(args)
218    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
219                               stderr=subprocess.STDOUT)
220    return exitcode == 0
221
222def get_virtio_scsi_device():
223    if qemu_default_machine == 's390-ccw-virtio':
224        return 'virtio-scsi-ccw'
225    return 'virtio-scsi-pci'
226
227class QemuIoInteractive:
228    def __init__(self, *args):
229        self.args = qemu_io_args_no_fmt + list(args)
230        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
231                                   stdout=subprocess.PIPE,
232                                   stderr=subprocess.STDOUT,
233                                   universal_newlines=True)
234        out = self._p.stdout.read(9)
235        if out != 'qemu-io> ':
236            # Most probably qemu-io just failed to start.
237            # Let's collect the whole output and exit.
238            out += self._p.stdout.read()
239            self._p.wait(timeout=1)
240            raise ValueError(out)
241
242    def close(self):
243        self._p.communicate('q\n')
244
245    def _read_output(self):
246        pattern = 'qemu-io> '
247        n = len(pattern)
248        pos = 0
249        s = []
250        while pos != n:
251            c = self._p.stdout.read(1)
252            # check unexpected EOF
253            assert c != ''
254            s.append(c)
255            if c == pattern[pos]:
256                pos += 1
257            else:
258                pos = 0
259
260        return ''.join(s[:-n])
261
262    def cmd(self, cmd):
263        # quit command is in close(), '\n' is added automatically
264        assert '\n' not in cmd
265        cmd = cmd.strip()
266        assert cmd not in ('q', 'quit')
267        self._p.stdin.write(cmd + '\n')
268        self._p.stdin.flush()
269        return self._read_output()
270
271
272def qemu_nbd(*args):
273    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
274    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
275
276def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
277    '''Run qemu-nbd in daemon mode and return both the parent's exit code
278       and its output in case of an error'''
279    full_args = qemu_nbd_args + ['--fork'] + list(args)
280    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
281                                                   connect_stderr=False)
282    return returncode, output if returncode else ''
283
284def qemu_nbd_list_log(*args: str) -> str:
285    '''Run qemu-nbd to list remote exports'''
286    full_args = [qemu_nbd_prog, '-L'] + list(args)
287    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
288    log(output, filters=[filter_testfiles, filter_nbd_exports])
289    return output
290
291@contextmanager
292def qemu_nbd_popen(*args):
293    '''Context manager running qemu-nbd within the context'''
294    pid_file = file_path("pid")
295
296    cmd = list(qemu_nbd_args)
297    cmd.extend(('--persistent', '--pid-file', pid_file))
298    cmd.extend(args)
299
300    log('Start NBD server')
301    p = subprocess.Popen(cmd)
302    try:
303        while not os.path.exists(pid_file):
304            if p.poll() is not None:
305                raise RuntimeError(
306                    "qemu-nbd terminated with exit code {}: {}"
307                    .format(p.returncode, ' '.join(cmd)))
308
309            time.sleep(0.01)
310        yield
311    finally:
312        log('Kill NBD server')
313        p.kill()
314        p.wait()
315
316def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
317    '''Return True if two image files are identical'''
318    return qemu_img('compare', '-f', fmt1,
319                    '-F', fmt2, img1, img2) == 0
320
321def create_image(name, size):
322    '''Create a fully-allocated raw image with sector markers'''
323    file = open(name, 'wb')
324    i = 0
325    while i < size:
326        sector = struct.pack('>l504xl', i // 512, i // 512)
327        file.write(sector)
328        i = i + 512
329    file.close()
330
331def image_size(img):
332    '''Return image's virtual size'''
333    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
334    return json.loads(r)['virtual-size']
335
336def is_str(val):
337    return isinstance(val, str)
338
339test_dir_re = re.compile(r"%s" % test_dir)
340def filter_test_dir(msg):
341    return test_dir_re.sub("TEST_DIR", msg)
342
343win32_re = re.compile(r"\r")
344def filter_win32(msg):
345    return win32_re.sub("", msg)
346
347qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
348                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
349                        r"and [0-9\/.inf]* ops\/sec\)")
350def filter_qemu_io(msg):
351    msg = filter_win32(msg)
352    return qemu_io_re.sub("X ops; XX:XX:XX.X "
353                          "(XXX YYY/sec and XXX ops/sec)", msg)
354
355chown_re = re.compile(r"chown [0-9]+:[0-9]+")
356def filter_chown(msg):
357    return chown_re.sub("chown UID:GID", msg)
358
359def filter_qmp_event(event):
360    '''Filter a QMP event dict'''
361    event = dict(event)
362    if 'timestamp' in event:
363        event['timestamp']['seconds'] = 'SECS'
364        event['timestamp']['microseconds'] = 'USECS'
365    return event
366
367def filter_qmp(qmsg, filter_fn):
368    '''Given a string filter, filter a QMP object's values.
369    filter_fn takes a (key, value) pair.'''
370    # Iterate through either lists or dicts;
371    if isinstance(qmsg, list):
372        items = enumerate(qmsg)
373    else:
374        items = qmsg.items()
375
376    for k, v in items:
377        if isinstance(v, (dict, list)):
378            qmsg[k] = filter_qmp(v, filter_fn)
379        else:
380            qmsg[k] = filter_fn(k, v)
381    return qmsg
382
383def filter_testfiles(msg):
384    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
385    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
386    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
387
388def filter_qmp_testfiles(qmsg):
389    def _filter(_key, value):
390        if is_str(value):
391            return filter_testfiles(value)
392        return value
393    return filter_qmp(qmsg, _filter)
394
395def filter_generated_node_ids(msg):
396    return re.sub("#block[0-9]+", "NODE_NAME", msg)
397
398def filter_img_info(output, filename):
399    lines = []
400    for line in output.split('\n'):
401        if 'disk size' in line or 'actual-size' in line:
402            continue
403        line = line.replace(filename, 'TEST_IMG')
404        line = filter_testfiles(line)
405        line = line.replace(imgfmt, 'IMGFMT')
406        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
407        line = re.sub('uuid: [-a-f0-9]+',
408                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
409                      line)
410        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
411        lines.append(line)
412    return '\n'.join(lines)
413
414def filter_imgfmt(msg):
415    return msg.replace(imgfmt, 'IMGFMT')
416
417def filter_qmp_imgfmt(qmsg):
418    def _filter(_key, value):
419        if is_str(value):
420            return filter_imgfmt(value)
421        return value
422    return filter_qmp(qmsg, _filter)
423
424def filter_nbd_exports(output: str) -> str:
425    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
426
427
428Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
429
430def log(msg: Msg,
431        filters: Iterable[Callable[[Msg], Msg]] = (),
432        indent: Optional[int] = None) -> None:
433    """
434    Logs either a string message or a JSON serializable message (like QMP).
435    If indent is provided, JSON serializable messages are pretty-printed.
436    """
437    for flt in filters:
438        msg = flt(msg)
439    if isinstance(msg, (dict, list)):
440        # Don't sort if it's already sorted
441        do_sort = not isinstance(msg, OrderedDict)
442        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
443    else:
444        test_logger.info(msg)
445
446class Timeout:
447    def __init__(self, seconds, errmsg="Timeout"):
448        self.seconds = seconds
449        self.errmsg = errmsg
450    def __enter__(self):
451        signal.signal(signal.SIGALRM, self.timeout)
452        signal.setitimer(signal.ITIMER_REAL, self.seconds)
453        return self
454    def __exit__(self, exc_type, value, traceback):
455        signal.setitimer(signal.ITIMER_REAL, 0)
456        return False
457    def timeout(self, signum, frame):
458        raise Exception(self.errmsg)
459
460def file_pattern(name):
461    return "{0}-{1}".format(os.getpid(), name)
462
463class FilePath:
464    """
465    Context manager generating multiple file names. The generated files are
466    removed when exiting the context.
467
468    Example usage:
469
470        with FilePath('a.img', 'b.img') as (img_a, img_b):
471            # Use img_a and img_b here...
472
473        # a.img and b.img are automatically removed here.
474
475    By default images are created in iotests.test_dir. To create sockets use
476    iotests.sock_dir:
477
478       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
479
480    For convenience, calling with one argument yields a single file instead of
481    a tuple with one item.
482
483    """
484    def __init__(self, *names, base_dir=test_dir):
485        self.paths = [os.path.join(base_dir, file_pattern(name))
486                      for name in names]
487
488    def __enter__(self):
489        if len(self.paths) == 1:
490            return self.paths[0]
491        else:
492            return self.paths
493
494    def __exit__(self, exc_type, exc_val, exc_tb):
495        for path in self.paths:
496            try:
497                os.remove(path)
498            except OSError:
499                pass
500        return False
501
502
503def file_path_remover():
504    for path in reversed(file_path_remover.paths):
505        try:
506            os.remove(path)
507        except OSError:
508            pass
509
510
511def file_path(*names, base_dir=test_dir):
512    ''' Another way to get auto-generated filename that cleans itself up.
513
514    Use is as simple as:
515
516    img_a, img_b = file_path('a.img', 'b.img')
517    sock = file_path('socket')
518    '''
519
520    if not hasattr(file_path_remover, 'paths'):
521        file_path_remover.paths = []
522        atexit.register(file_path_remover)
523
524    paths = []
525    for name in names:
526        filename = file_pattern(name)
527        path = os.path.join(base_dir, filename)
528        file_path_remover.paths.append(path)
529        paths.append(path)
530
531    return paths[0] if len(paths) == 1 else paths
532
533def remote_filename(path):
534    if imgproto == 'file':
535        return path
536    elif imgproto == 'ssh':
537        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
538    else:
539        raise Exception("Protocol %s not supported" % (imgproto))
540
541class VM(qtest.QEMUQtestMachine):
542    '''A QEMU VM'''
543
544    def __init__(self, path_suffix=''):
545        name = "qemu%s-%d" % (path_suffix, os.getpid())
546        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
547                                 test_dir=test_dir,
548                                 socket_scm_helper=socket_scm_helper,
549                                 sock_dir=sock_dir)
550        self._num_drives = 0
551
552    def add_object(self, opts):
553        self._args.append('-object')
554        self._args.append(opts)
555        return self
556
557    def add_device(self, opts):
558        self._args.append('-device')
559        self._args.append(opts)
560        return self
561
562    def add_drive_raw(self, opts):
563        self._args.append('-drive')
564        self._args.append(opts)
565        return self
566
567    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
568        '''Add a virtio-blk drive to the VM'''
569        options = ['if=%s' % interface,
570                   'id=drive%d' % self._num_drives]
571
572        if path is not None:
573            options.append('file=%s' % path)
574            options.append('format=%s' % img_format)
575            options.append('cache=%s' % cachemode)
576            options.append('aio=%s' % aiomode)
577
578        if opts:
579            options.append(opts)
580
581        if img_format == 'luks' and 'key-secret' not in opts:
582            # default luks support
583            if luks_default_secret_object not in self._args:
584                self.add_object(luks_default_secret_object)
585
586            options.append(luks_default_key_secret_opt)
587
588        self._args.append('-drive')
589        self._args.append(','.join(options))
590        self._num_drives += 1
591        return self
592
593    def add_blockdev(self, opts):
594        self._args.append('-blockdev')
595        if isinstance(opts, str):
596            self._args.append(opts)
597        else:
598            self._args.append(','.join(opts))
599        return self
600
601    def add_incoming(self, addr):
602        self._args.append('-incoming')
603        self._args.append(addr)
604        return self
605
606    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
607        cmd = 'human-monitor-command'
608        kwargs = {'command-line': command_line}
609        if use_log:
610            return self.qmp_log(cmd, **kwargs)
611        else:
612            return self.qmp(cmd, **kwargs)
613
614    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
615        """Pause drive r/w operations"""
616        if not event:
617            self.pause_drive(drive, "read_aio")
618            self.pause_drive(drive, "write_aio")
619            return
620        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
621
622    def resume_drive(self, drive: str) -> None:
623        """Resume drive r/w operations"""
624        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
625
626    def hmp_qemu_io(self, drive: str, cmd: str,
627                    use_log: bool = False) -> QMPMessage:
628        """Write to a given drive using an HMP command"""
629        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
630
631    def flatten_qmp_object(self, obj, output=None, basestr=''):
632        if output is None:
633            output = dict()
634        if isinstance(obj, list):
635            for i, item in enumerate(obj):
636                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
637        elif isinstance(obj, dict):
638            for key in obj:
639                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
640        else:
641            output[basestr[:-1]] = obj # Strip trailing '.'
642        return output
643
644    def qmp_to_opts(self, obj):
645        obj = self.flatten_qmp_object(obj)
646        output_list = list()
647        for key in obj:
648            output_list += [key + '=' + obj[key]]
649        return ','.join(output_list)
650
651    def get_qmp_events_filtered(self, wait=60.0):
652        result = []
653        for ev in self.get_qmp_events(wait=wait):
654            result.append(filter_qmp_event(ev))
655        return result
656
657    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
658        full_cmd = OrderedDict((
659            ("execute", cmd),
660            ("arguments", ordered_qmp(kwargs))
661        ))
662        log(full_cmd, filters, indent=indent)
663        result = self.qmp(cmd, **kwargs)
664        log(result, filters, indent=indent)
665        return result
666
667    # Returns None on success, and an error string on failure
668    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
669                pre_finalize=None, cancel=False, wait=60.0):
670        """
671        run_job moves a job from creation through to dismissal.
672
673        :param job: String. ID of recently-launched job
674        :param auto_finalize: Bool. True if the job was launched with
675                              auto_finalize. Defaults to True.
676        :param auto_dismiss: Bool. True if the job was launched with
677                             auto_dismiss=True. Defaults to False.
678        :param pre_finalize: Callback. A callable that takes no arguments to be
679                             invoked prior to issuing job-finalize, if any.
680        :param cancel: Bool. When true, cancels the job after the pre_finalize
681                       callback.
682        :param wait: Float. Timeout value specifying how long to wait for any
683                     event, in seconds. Defaults to 60.0.
684        """
685        match_device = {'data': {'device': job}}
686        match_id = {'data': {'id': job}}
687        events = [
688            ('BLOCK_JOB_COMPLETED', match_device),
689            ('BLOCK_JOB_CANCELLED', match_device),
690            ('BLOCK_JOB_ERROR', match_device),
691            ('BLOCK_JOB_READY', match_device),
692            ('BLOCK_JOB_PENDING', match_id),
693            ('JOB_STATUS_CHANGE', match_id)
694        ]
695        error = None
696        while True:
697            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
698            if ev['event'] != 'JOB_STATUS_CHANGE':
699                log(ev)
700                continue
701            status = ev['data']['status']
702            if status == 'aborting':
703                result = self.qmp('query-jobs')
704                for j in result['return']:
705                    if j['id'] == job:
706                        error = j['error']
707                        log('Job failed: %s' % (j['error']))
708            elif status == 'ready':
709                self.qmp_log('job-complete', id=job)
710            elif status == 'pending' and not auto_finalize:
711                if pre_finalize:
712                    pre_finalize()
713                if cancel:
714                    self.qmp_log('job-cancel', id=job)
715                else:
716                    self.qmp_log('job-finalize', id=job)
717            elif status == 'concluded' and not auto_dismiss:
718                self.qmp_log('job-dismiss', id=job)
719            elif status == 'null':
720                return error
721
722    # Returns None on success, and an error string on failure
723    def blockdev_create(self, options, job_id='job0', filters=None):
724        if filters is None:
725            filters = [filter_qmp_testfiles]
726        result = self.qmp_log('blockdev-create', filters=filters,
727                              job_id=job_id, options=options)
728
729        if 'return' in result:
730            assert result['return'] == {}
731            job_result = self.run_job(job_id)
732        else:
733            job_result = result['error']
734
735        log("")
736        return job_result
737
738    def enable_migration_events(self, name):
739        log('Enabling migration QMP events on %s...' % name)
740        log(self.qmp('migrate-set-capabilities', capabilities=[
741            {
742                'capability': 'events',
743                'state': True
744            }
745        ]))
746
747    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
748        while True:
749            event = self.event_wait('MIGRATION')
750            log(event, filters=[filter_qmp_event])
751            if event['data']['status'] in ('completed', 'failed'):
752                break
753
754        if event['data']['status'] == 'completed':
755            # The event may occur in finish-migrate, so wait for the expected
756            # post-migration runstate
757            runstate = None
758            while runstate != expect_runstate:
759                runstate = self.qmp('query-status')['return']['status']
760            return True
761        else:
762            return False
763
764    def node_info(self, node_name):
765        nodes = self.qmp('query-named-block-nodes')
766        for x in nodes['return']:
767            if x['node-name'] == node_name:
768                return x
769        return None
770
771    def query_bitmaps(self):
772        res = self.qmp("query-named-block-nodes")
773        return {device['node-name']: device['dirty-bitmaps']
774                for device in res['return'] if 'dirty-bitmaps' in device}
775
776    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
777        """
778        get a specific bitmap from the object returned by query_bitmaps.
779        :param recording: If specified, filter results by the specified value.
780        :param bitmaps: If specified, use it instead of call query_bitmaps()
781        """
782        if bitmaps is None:
783            bitmaps = self.query_bitmaps()
784
785        for bitmap in bitmaps[node_name]:
786            if bitmap.get('name', '') == bitmap_name:
787                if recording is None or bitmap.get('recording') == recording:
788                    return bitmap
789        return None
790
791    def check_bitmap_status(self, node_name, bitmap_name, fields):
792        ret = self.get_bitmap(node_name, bitmap_name)
793
794        return fields.items() <= ret.items()
795
796    def assert_block_path(self, root, path, expected_node, graph=None):
797        """
798        Check whether the node under the given path in the block graph
799        is @expected_node.
800
801        @root is the node name of the node where the @path is rooted.
802
803        @path is a string that consists of child names separated by
804        slashes.  It must begin with a slash.
805
806        Examples for @root + @path:
807          - root="qcow2-node", path="/backing/file"
808          - root="quorum-node", path="/children.2/file"
809
810        Hypothetically, @path could be empty, in which case it would
811        point to @root.  However, in practice this case is not useful
812        and hence not allowed.
813
814        @expected_node may be None.  (All elements of the path but the
815        leaf must still exist.)
816
817        @graph may be None or the result of an x-debug-query-block-graph
818        call that has already been performed.
819        """
820        if graph is None:
821            graph = self.qmp('x-debug-query-block-graph')['return']
822
823        iter_path = iter(path.split('/'))
824
825        # Must start with a /
826        assert next(iter_path) == ''
827
828        node = next((node for node in graph['nodes'] if node['name'] == root),
829                    None)
830
831        # An empty @path is not allowed, so the root node must be present
832        assert node is not None, 'Root node %s not found' % root
833
834        for child_name in iter_path:
835            assert node is not None, 'Cannot follow path %s%s' % (root, path)
836
837            try:
838                node_id = next(edge['child'] for edge in graph['edges']
839                               if (edge['parent'] == node['id'] and
840                                   edge['name'] == child_name))
841
842                node = next(node for node in graph['nodes']
843                            if node['id'] == node_id)
844
845            except StopIteration:
846                node = None
847
848        if node is None:
849            assert expected_node is None, \
850                   'No node found under %s (but expected %s)' % \
851                   (path, expected_node)
852        else:
853            assert node['name'] == expected_node, \
854                   'Found node %s under %s (but expected %s)' % \
855                   (node['name'], path, expected_node)
856
857index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
858
859class QMPTestCase(unittest.TestCase):
860    '''Abstract base class for QMP test cases'''
861
862    def __init__(self, *args, **kwargs):
863        super().__init__(*args, **kwargs)
864        # Many users of this class set a VM property we rely on heavily
865        # in the methods below.
866        self.vm = None
867
868    def dictpath(self, d, path):
869        '''Traverse a path in a nested dict'''
870        for component in path.split('/'):
871            m = index_re.match(component)
872            if m:
873                component, idx = m.groups()
874                idx = int(idx)
875
876            if not isinstance(d, dict) or component not in d:
877                self.fail(f'failed path traversal for "{path}" in "{d}"')
878            d = d[component]
879
880            if m:
881                if not isinstance(d, list):
882                    self.fail(f'path component "{component}" in "{path}" '
883                              f'is not a list in "{d}"')
884                try:
885                    d = d[idx]
886                except IndexError:
887                    self.fail(f'invalid index "{idx}" in path "{path}" '
888                              f'in "{d}"')
889        return d
890
891    def assert_qmp_absent(self, d, path):
892        try:
893            result = self.dictpath(d, path)
894        except AssertionError:
895            return
896        self.fail('path "%s" has value "%s"' % (path, str(result)))
897
898    def assert_qmp(self, d, path, value):
899        '''Assert that the value for a specific path in a QMP dict
900           matches.  When given a list of values, assert that any of
901           them matches.'''
902
903        result = self.dictpath(d, path)
904
905        # [] makes no sense as a list of valid values, so treat it as
906        # an actual single value.
907        if isinstance(value, list) and value != []:
908            for v in value:
909                if result == v:
910                    return
911            self.fail('no match for "%s" in %s' % (str(result), str(value)))
912        else:
913            self.assertEqual(result, value,
914                             '"%s" is "%s", expected "%s"'
915                             % (path, str(result), str(value)))
916
917    def assert_no_active_block_jobs(self):
918        result = self.vm.qmp('query-block-jobs')
919        self.assert_qmp(result, 'return', [])
920
921    def assert_has_block_node(self, node_name=None, file_name=None):
922        """Issue a query-named-block-nodes and assert node_name and/or
923        file_name is present in the result"""
924        def check_equal_or_none(a, b):
925            return a is None or b is None or a == b
926        assert node_name or file_name
927        result = self.vm.qmp('query-named-block-nodes')
928        for x in result["return"]:
929            if check_equal_or_none(x.get("node-name"), node_name) and \
930                    check_equal_or_none(x.get("file"), file_name):
931                return
932        self.fail("Cannot find %s %s in result:\n%s" %
933                  (node_name, file_name, result))
934
935    def assert_json_filename_equal(self, json_filename, reference):
936        '''Asserts that the given filename is a json: filename and that its
937           content is equal to the given reference object'''
938        self.assertEqual(json_filename[:5], 'json:')
939        self.assertEqual(
940            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
941            self.vm.flatten_qmp_object(reference)
942        )
943
944    def cancel_and_wait(self, drive='drive0', force=False,
945                        resume=False, wait=60.0):
946        '''Cancel a block job and wait for it to finish, returning the event'''
947        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
948        self.assert_qmp(result, 'return', {})
949
950        if resume:
951            self.vm.resume_drive(drive)
952
953        cancelled = False
954        result = None
955        while not cancelled:
956            for event in self.vm.get_qmp_events(wait=wait):
957                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
958                   event['event'] == 'BLOCK_JOB_CANCELLED':
959                    self.assert_qmp(event, 'data/device', drive)
960                    result = event
961                    cancelled = True
962                elif event['event'] == 'JOB_STATUS_CHANGE':
963                    self.assert_qmp(event, 'data/id', drive)
964
965
966        self.assert_no_active_block_jobs()
967        return result
968
969    def wait_until_completed(self, drive='drive0', check_offset=True,
970                             wait=60.0, error=None):
971        '''Wait for a block job to finish, returning the event'''
972        while True:
973            for event in self.vm.get_qmp_events(wait=wait):
974                if event['event'] == 'BLOCK_JOB_COMPLETED':
975                    self.assert_qmp(event, 'data/device', drive)
976                    if error is None:
977                        self.assert_qmp_absent(event, 'data/error')
978                        if check_offset:
979                            self.assert_qmp(event, 'data/offset',
980                                            event['data']['len'])
981                    else:
982                        self.assert_qmp(event, 'data/error', error)
983                    self.assert_no_active_block_jobs()
984                    return event
985                if event['event'] == 'JOB_STATUS_CHANGE':
986                    self.assert_qmp(event, 'data/id', drive)
987
988    def wait_ready(self, drive='drive0'):
989        """Wait until a BLOCK_JOB_READY event, and return the event."""
990        return self.vm.events_wait([
991            ('BLOCK_JOB_READY',
992             {'data': {'type': 'mirror', 'device': drive}}),
993            ('BLOCK_JOB_READY',
994             {'data': {'type': 'commit', 'device': drive}})
995        ])
996
997    def wait_ready_and_cancel(self, drive='drive0'):
998        self.wait_ready(drive=drive)
999        event = self.cancel_and_wait(drive=drive)
1000        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1001        self.assert_qmp(event, 'data/type', 'mirror')
1002        self.assert_qmp(event, 'data/offset', event['data']['len'])
1003
1004    def complete_and_wait(self, drive='drive0', wait_ready=True,
1005                          completion_error=None):
1006        '''Complete a block job and wait for it to finish'''
1007        if wait_ready:
1008            self.wait_ready(drive=drive)
1009
1010        result = self.vm.qmp('block-job-complete', device=drive)
1011        self.assert_qmp(result, 'return', {})
1012
1013        event = self.wait_until_completed(drive=drive, error=completion_error)
1014        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1015
1016    def pause_wait(self, job_id='job0'):
1017        with Timeout(3, "Timeout waiting for job to pause"):
1018            while True:
1019                result = self.vm.qmp('query-block-jobs')
1020                found = False
1021                for job in result['return']:
1022                    if job['device'] == job_id:
1023                        found = True
1024                        if job['paused'] and not job['busy']:
1025                            return job
1026                        break
1027                assert found
1028
1029    def pause_job(self, job_id='job0', wait=True):
1030        result = self.vm.qmp('block-job-pause', device=job_id)
1031        self.assert_qmp(result, 'return', {})
1032        if wait:
1033            return self.pause_wait(job_id)
1034        return result
1035
1036    def case_skip(self, reason):
1037        '''Skip this test case'''
1038        case_notrun(reason)
1039        self.skipTest(reason)
1040
1041
1042def notrun(reason):
1043    '''Skip this test suite'''
1044    # Each test in qemu-iotests has a number ("seq")
1045    seq = os.path.basename(sys.argv[0])
1046
1047    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1048    logger.warning("%s not run: %s", seq, reason)
1049    sys.exit(0)
1050
1051def case_notrun(reason):
1052    '''Mark this test case as not having been run (without actually
1053    skipping it, that is left to the caller).  See
1054    QMPTestCase.case_skip() for a variant that actually skips the
1055    current test case.'''
1056
1057    # Each test in qemu-iotests has a number ("seq")
1058    seq = os.path.basename(sys.argv[0])
1059
1060    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1061        '    [case not run] ' + reason + '\n')
1062
1063def _verify_image_format(supported_fmts: Sequence[str] = (),
1064                         unsupported_fmts: Sequence[str] = ()) -> None:
1065    if 'generic' in supported_fmts and \
1066            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1067        # similar to
1068        #   _supported_fmt generic
1069        # for bash tests
1070        supported_fmts = ()
1071
1072    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1073    if not_sup or (imgfmt in unsupported_fmts):
1074        notrun('not suitable for this image format: %s' % imgfmt)
1075
1076    if imgfmt == 'luks':
1077        verify_working_luks()
1078
1079def _verify_protocol(supported: Sequence[str] = (),
1080                     unsupported: Sequence[str] = ()) -> None:
1081    assert not (supported and unsupported)
1082
1083    if 'generic' in supported:
1084        return
1085
1086    not_sup = supported and (imgproto not in supported)
1087    if not_sup or (imgproto in unsupported):
1088        notrun('not suitable for this protocol: %s' % imgproto)
1089
1090def _verify_platform(supported: Sequence[str] = (),
1091                     unsupported: Sequence[str] = ()) -> None:
1092    if any((sys.platform.startswith(x) for x in unsupported)):
1093        notrun('not suitable for this OS: %s' % sys.platform)
1094
1095    if supported:
1096        if not any((sys.platform.startswith(x) for x in supported)):
1097            notrun('not suitable for this OS: %s' % sys.platform)
1098
1099def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1100    if supported_cache_modes and (cachemode not in supported_cache_modes):
1101        notrun('not suitable for this cache mode: %s' % cachemode)
1102
1103def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1104    if supported_aio_modes and (aiomode not in supported_aio_modes):
1105        notrun('not suitable for this aio mode: %s' % aiomode)
1106
1107def supports_quorum():
1108    return 'quorum' in qemu_img_pipe('--help')
1109
1110def verify_quorum():
1111    '''Skip test suite if quorum support is not available'''
1112    if not supports_quorum():
1113        notrun('quorum support missing')
1114
1115def has_working_luks() -> Tuple[bool, str]:
1116    """
1117    Check whether our LUKS driver can actually create images
1118    (this extends to LUKS encryption for qcow2).
1119
1120    If not, return the reason why.
1121    """
1122
1123    img_file = f'{test_dir}/luks-test.luks'
1124    (output, status) = \
1125        qemu_img_pipe_and_status('create', '-f', 'luks',
1126                                 '--object', luks_default_secret_object,
1127                                 '-o', luks_default_key_secret_opt,
1128                                 '-o', 'iter-time=10',
1129                                 img_file, '1G')
1130    try:
1131        os.remove(img_file)
1132    except OSError:
1133        pass
1134
1135    if status != 0:
1136        reason = output
1137        for line in output.splitlines():
1138            if img_file + ':' in line:
1139                reason = line.split(img_file + ':', 1)[1].strip()
1140                break
1141
1142        return (False, reason)
1143    else:
1144        return (True, '')
1145
1146def verify_working_luks():
1147    """
1148    Skip test suite if LUKS does not work
1149    """
1150    (working, reason) = has_working_luks()
1151    if not working:
1152        notrun(reason)
1153
1154def qemu_pipe(*args: str) -> str:
1155    """
1156    Run qemu with an option to print something and exit (e.g. a help option).
1157
1158    :return: QEMU's stdout output.
1159    """
1160    full_args = [qemu_prog] + qemu_opts + list(args)
1161    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1162    return output
1163
1164def supported_formats(read_only=False):
1165    '''Set 'read_only' to True to check ro-whitelist
1166       Otherwise, rw-whitelist is checked'''
1167
1168    if not hasattr(supported_formats, "formats"):
1169        supported_formats.formats = {}
1170
1171    if read_only not in supported_formats.formats:
1172        format_message = qemu_pipe("-drive", "format=help")
1173        line = 1 if read_only else 0
1174        supported_formats.formats[read_only] = \
1175            format_message.splitlines()[line].split(":")[1].split()
1176
1177    return supported_formats.formats[read_only]
1178
1179def skip_if_unsupported(required_formats=(), read_only=False):
1180    '''Skip Test Decorator
1181       Runs the test if all the required formats are whitelisted'''
1182    def skip_test_decorator(func):
1183        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1184                         **kwargs: Dict[str, Any]) -> None:
1185            if callable(required_formats):
1186                fmts = required_formats(test_case)
1187            else:
1188                fmts = required_formats
1189
1190            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1191            if usf_list:
1192                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1193                test_case.case_skip(msg)
1194            else:
1195                func(test_case, *args, **kwargs)
1196        return func_wrapper
1197    return skip_test_decorator
1198
1199def skip_for_formats(formats: Sequence[str] = ()) \
1200    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1201                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1202    '''Skip Test Decorator
1203       Skips the test for the given formats'''
1204    def skip_test_decorator(func):
1205        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1206                         **kwargs: Dict[str, Any]) -> None:
1207            if imgfmt in formats:
1208                msg = f'{test_case}: Skipped for format {imgfmt}'
1209                test_case.case_skip(msg)
1210            else:
1211                func(test_case, *args, **kwargs)
1212        return func_wrapper
1213    return skip_test_decorator
1214
1215def skip_if_user_is_root(func):
1216    '''Skip Test Decorator
1217       Runs the test only without root permissions'''
1218    def func_wrapper(*args, **kwargs):
1219        if os.getuid() == 0:
1220            case_notrun('{}: cannot be run as root'.format(args[0]))
1221            return None
1222        else:
1223            return func(*args, **kwargs)
1224    return func_wrapper
1225
1226def execute_unittest(debug=False):
1227    """Executes unittests within the calling module."""
1228
1229    verbosity = 2 if debug else 1
1230
1231    if debug:
1232        output = sys.stdout
1233    else:
1234        # We need to filter out the time taken from the output so that
1235        # qemu-iotest can reliably diff the results against master output.
1236        output = io.StringIO()
1237
1238    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1239                                     verbosity=verbosity)
1240    try:
1241        # unittest.main() will use sys.exit(); so expect a SystemExit
1242        # exception
1243        unittest.main(testRunner=runner)
1244    finally:
1245        # We need to filter out the time taken from the output so that
1246        # qemu-iotest can reliably diff the results against master output.
1247        if not debug:
1248            out = output.getvalue()
1249            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1250
1251            # Hide skipped tests from the reference output
1252            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1253            out_first_line, out_rest = out.split('\n', 1)
1254            out = out_first_line.replace('s', '.') + '\n' + out_rest
1255
1256            sys.stderr.write(out)
1257
1258def execute_setup_common(supported_fmts: Sequence[str] = (),
1259                         supported_platforms: Sequence[str] = (),
1260                         supported_cache_modes: Sequence[str] = (),
1261                         supported_aio_modes: Sequence[str] = (),
1262                         unsupported_fmts: Sequence[str] = (),
1263                         supported_protocols: Sequence[str] = (),
1264                         unsupported_protocols: Sequence[str] = ()) -> bool:
1265    """
1266    Perform necessary setup for either script-style or unittest-style tests.
1267
1268    :return: Bool; Whether or not debug mode has been requested via the CLI.
1269    """
1270    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1271
1272    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1273    # indicate that we're not being run via "check". There may be
1274    # other things set up by "check" that individual test cases rely
1275    # on.
1276    if test_dir is None or qemu_default_machine is None:
1277        sys.stderr.write('Please run this test via the "check" script\n')
1278        sys.exit(os.EX_USAGE)
1279
1280    debug = '-d' in sys.argv
1281    if debug:
1282        sys.argv.remove('-d')
1283    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1284
1285    _verify_image_format(supported_fmts, unsupported_fmts)
1286    _verify_protocol(supported_protocols, unsupported_protocols)
1287    _verify_platform(supported=supported_platforms)
1288    _verify_cache_mode(supported_cache_modes)
1289    _verify_aio_mode(supported_aio_modes)
1290
1291    return debug
1292
1293def execute_test(*args, test_function=None, **kwargs):
1294    """Run either unittest or script-style tests."""
1295
1296    debug = execute_setup_common(*args, **kwargs)
1297    if not test_function:
1298        execute_unittest(debug)
1299    else:
1300        test_function()
1301
1302def activate_logging():
1303    """Activate iotests.log() output to stdout for script-style tests."""
1304    handler = logging.StreamHandler(stream=sys.stdout)
1305    formatter = logging.Formatter('%(message)s')
1306    handler.setFormatter(formatter)
1307    test_logger.addHandler(handler)
1308    test_logger.setLevel(logging.INFO)
1309    test_logger.propagate = False
1310
1311# This is called from script-style iotests without a single point of entry
1312def script_initialize(*args, **kwargs):
1313    """Initialize script-style tests without running any tests."""
1314    activate_logging()
1315    execute_setup_common(*args, **kwargs)
1316
1317# This is called from script-style iotests with a single point of entry
1318def script_main(test_function, *args, **kwargs):
1319    """Run script-style tests outside of the unittest framework"""
1320    activate_logging()
1321    execute_test(*args, test_function=test_function, **kwargs)
1322
1323# This is called from unittest style iotests
1324def main(*args, **kwargs):
1325    """Run tests using the unittest framework"""
1326    execute_test(*args, **kwargs)
1327