xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 3a37f239)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31from typing import (Any, Callable, Dict, Iterable,
32                    List, Optional, Sequence, TypeVar)
33import unittest
34
35# pylint: disable=import-error, wrong-import-position
36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37from qemu import qtest
38
39assert sys.version_info >= (3, 6)
40
41# Type Aliases
42QMPResponse = Dict[str, Any]
43
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img(*args):
94    '''Run qemu-img and return the exit code'''
95    devnull = open('/dev/null', 'r+')
96    exitcode = subprocess.call(qemu_img_args + list(args),
97                               stdin=devnull, stdout=devnull)
98    if exitcode < 0:
99        sys.stderr.write('qemu-img received signal %i: %s\n'
100                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
101    return exitcode
102
103def ordered_qmp(qmsg, conv_keys=True):
104    # Dictionaries are not ordered prior to 3.6, therefore:
105    if isinstance(qmsg, list):
106        return [ordered_qmp(atom) for atom in qmsg]
107    if isinstance(qmsg, dict):
108        od = OrderedDict()
109        for k, v in sorted(qmsg.items()):
110            if conv_keys:
111                k = k.replace('_', '-')
112            od[k] = ordered_qmp(v, conv_keys=False)
113        return od
114    return qmsg
115
116def qemu_img_create(*args):
117    args = list(args)
118
119    # default luks support
120    if '-f' in args and args[args.index('-f') + 1] == 'luks':
121        if '-o' in args:
122            i = args.index('-o')
123            if 'key-secret' not in args[i + 1]:
124                args[i + 1].append(luks_default_key_secret_opt)
125                args.insert(i + 2, '--object')
126                args.insert(i + 3, luks_default_secret_object)
127        else:
128            args = ['-o', luks_default_key_secret_opt,
129                    '--object', luks_default_secret_object] + args
130
131    args.insert(0, 'create')
132
133    return qemu_img(*args)
134
135def qemu_img_verbose(*args):
136    '''Run qemu-img without suppressing its output and return the exit code'''
137    exitcode = subprocess.call(qemu_img_args + list(args))
138    if exitcode < 0:
139        sys.stderr.write('qemu-img received signal %i: %s\n'
140                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
141    return exitcode
142
143def qemu_img_pipe(*args):
144    '''Run qemu-img and return its output'''
145    subp = subprocess.Popen(qemu_img_args + list(args),
146                            stdout=subprocess.PIPE,
147                            stderr=subprocess.STDOUT,
148                            universal_newlines=True)
149    exitcode = subp.wait()
150    if exitcode < 0:
151        sys.stderr.write('qemu-img received signal %i: %s\n'
152                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
153    return subp.communicate()[0]
154
155def qemu_img_log(*args):
156    result = qemu_img_pipe(*args)
157    log(result, filters=[filter_testfiles])
158    return result
159
160def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
161    args = ['info']
162    if imgopts:
163        args.append('--image-opts')
164    else:
165        args += ['-f', imgfmt]
166    args += extra_args
167    args.append(filename)
168
169    output = qemu_img_pipe(*args)
170    if not filter_path:
171        filter_path = filename
172    log(filter_img_info(output, filter_path))
173
174def qemu_io(*args):
175    '''Run qemu-io and return the stdout data'''
176    args = qemu_io_args + list(args)
177    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
178                            stderr=subprocess.STDOUT,
179                            universal_newlines=True)
180    exitcode = subp.wait()
181    if exitcode < 0:
182        sys.stderr.write('qemu-io received signal %i: %s\n'
183                         % (-exitcode, ' '.join(args)))
184    return subp.communicate()[0]
185
186def qemu_io_log(*args):
187    result = qemu_io(*args)
188    log(result, filters=[filter_testfiles, filter_qemu_io])
189    return result
190
191def qemu_io_silent(*args):
192    '''Run qemu-io and return the exit code, suppressing stdout'''
193    args = qemu_io_args + list(args)
194    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
195    if exitcode < 0:
196        sys.stderr.write('qemu-io received signal %i: %s\n' %
197                         (-exitcode, ' '.join(args)))
198    return exitcode
199
200def qemu_io_silent_check(*args):
201    '''Run qemu-io and return the true if subprocess returned 0'''
202    args = qemu_io_args + list(args)
203    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
204                               stderr=subprocess.STDOUT)
205    return exitcode == 0
206
207def get_virtio_scsi_device():
208    if qemu_default_machine == 's390-ccw-virtio':
209        return 'virtio-scsi-ccw'
210    return 'virtio-scsi-pci'
211
212class QemuIoInteractive:
213    def __init__(self, *args):
214        self.args = qemu_io_args + list(args)
215        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
216                                   stdout=subprocess.PIPE,
217                                   stderr=subprocess.STDOUT,
218                                   universal_newlines=True)
219        assert self._p.stdout.read(9) == 'qemu-io> '
220
221    def close(self):
222        self._p.communicate('q\n')
223
224    def _read_output(self):
225        pattern = 'qemu-io> '
226        n = len(pattern)
227        pos = 0
228        s = []
229        while pos != n:
230            c = self._p.stdout.read(1)
231            # check unexpected EOF
232            assert c != ''
233            s.append(c)
234            if c == pattern[pos]:
235                pos += 1
236            else:
237                pos = 0
238
239        return ''.join(s[:-n])
240
241    def cmd(self, cmd):
242        # quit command is in close(), '\n' is added automatically
243        assert '\n' not in cmd
244        cmd = cmd.strip()
245        assert cmd not in ('q', 'quit')
246        self._p.stdin.write(cmd + '\n')
247        self._p.stdin.flush()
248        return self._read_output()
249
250
251def qemu_nbd(*args):
252    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
253    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
254
255def qemu_nbd_early_pipe(*args):
256    '''Run qemu-nbd in daemon mode and return both the parent's exit code
257       and its output in case of an error'''
258    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
259                            stdout=subprocess.PIPE,
260                            stderr=subprocess.STDOUT,
261                            universal_newlines=True)
262    exitcode = subp.wait()
263    if exitcode < 0:
264        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
265                         (-exitcode,
266                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
267
268    return exitcode, subp.communicate()[0] if exitcode else ''
269
270def qemu_nbd_popen(*args):
271    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
272    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
273
274def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
275    '''Return True if two image files are identical'''
276    return qemu_img('compare', '-f', fmt1,
277                    '-F', fmt2, img1, img2) == 0
278
279def create_image(name, size):
280    '''Create a fully-allocated raw image with sector markers'''
281    file = open(name, 'wb')
282    i = 0
283    while i < size:
284        sector = struct.pack('>l504xl', i // 512, i // 512)
285        file.write(sector)
286        i = i + 512
287    file.close()
288
289def image_size(img):
290    '''Return image's virtual size'''
291    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
292    return json.loads(r)['virtual-size']
293
294def is_str(val):
295    return isinstance(val, str)
296
297test_dir_re = re.compile(r"%s" % test_dir)
298def filter_test_dir(msg):
299    return test_dir_re.sub("TEST_DIR", msg)
300
301win32_re = re.compile(r"\r")
302def filter_win32(msg):
303    return win32_re.sub("", msg)
304
305qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
306                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
307                        r"and [0-9\/.inf]* ops\/sec\)")
308def filter_qemu_io(msg):
309    msg = filter_win32(msg)
310    return qemu_io_re.sub("X ops; XX:XX:XX.X "
311                          "(XXX YYY/sec and XXX ops/sec)", msg)
312
313chown_re = re.compile(r"chown [0-9]+:[0-9]+")
314def filter_chown(msg):
315    return chown_re.sub("chown UID:GID", msg)
316
317def filter_qmp_event(event):
318    '''Filter a QMP event dict'''
319    event = dict(event)
320    if 'timestamp' in event:
321        event['timestamp']['seconds'] = 'SECS'
322        event['timestamp']['microseconds'] = 'USECS'
323    return event
324
325def filter_qmp(qmsg, filter_fn):
326    '''Given a string filter, filter a QMP object's values.
327    filter_fn takes a (key, value) pair.'''
328    # Iterate through either lists or dicts;
329    if isinstance(qmsg, list):
330        items = enumerate(qmsg)
331    else:
332        items = qmsg.items()
333
334    for k, v in items:
335        if isinstance(v, (dict, list)):
336            qmsg[k] = filter_qmp(v, filter_fn)
337        else:
338            qmsg[k] = filter_fn(k, v)
339    return qmsg
340
341def filter_testfiles(msg):
342    prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
343    return msg.replace(prefix, 'TEST_DIR/PID-')
344
345def filter_qmp_testfiles(qmsg):
346    def _filter(_key, value):
347        if is_str(value):
348            return filter_testfiles(value)
349        return value
350    return filter_qmp(qmsg, _filter)
351
352def filter_generated_node_ids(msg):
353    return re.sub("#block[0-9]+", "NODE_NAME", msg)
354
355def filter_img_info(output, filename):
356    lines = []
357    for line in output.split('\n'):
358        if 'disk size' in line or 'actual-size' in line:
359            continue
360        line = line.replace(filename, 'TEST_IMG')
361        line = filter_testfiles(line)
362        line = line.replace(imgfmt, 'IMGFMT')
363        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
364        line = re.sub('uuid: [-a-f0-9]+',
365                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
366                      line)
367        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
368        lines.append(line)
369    return '\n'.join(lines)
370
371def filter_imgfmt(msg):
372    return msg.replace(imgfmt, 'IMGFMT')
373
374def filter_qmp_imgfmt(qmsg):
375    def _filter(_key, value):
376        if is_str(value):
377            return filter_imgfmt(value)
378        return value
379    return filter_qmp(qmsg, _filter)
380
381
382Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
383
384def log(msg: Msg,
385        filters: Iterable[Callable[[Msg], Msg]] = (),
386        indent: Optional[int] = None) -> None:
387    """
388    Logs either a string message or a JSON serializable message (like QMP).
389    If indent is provided, JSON serializable messages are pretty-printed.
390    """
391    for flt in filters:
392        msg = flt(msg)
393    if isinstance(msg, (dict, list)):
394        # Don't sort if it's already sorted
395        do_sort = not isinstance(msg, OrderedDict)
396        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
397    else:
398        test_logger.info(msg)
399
400class Timeout:
401    def __init__(self, seconds, errmsg="Timeout"):
402        self.seconds = seconds
403        self.errmsg = errmsg
404    def __enter__(self):
405        signal.signal(signal.SIGALRM, self.timeout)
406        signal.setitimer(signal.ITIMER_REAL, self.seconds)
407        return self
408    def __exit__(self, exc_type, value, traceback):
409        signal.setitimer(signal.ITIMER_REAL, 0)
410        return False
411    def timeout(self, signum, frame):
412        raise Exception(self.errmsg)
413
414def file_pattern(name):
415    return "{0}-{1}".format(os.getpid(), name)
416
417class FilePaths:
418    """
419    FilePaths is an auto-generated filename that cleans itself up.
420
421    Use this context manager to generate filenames and ensure that the file
422    gets deleted::
423
424        with FilePaths(['test.img']) as img_path:
425            qemu_img('create', img_path, '1G')
426        # migration_sock_path is automatically deleted
427    """
428    def __init__(self, names, base_dir=test_dir):
429        self.paths = []
430        for name in names:
431            self.paths.append(os.path.join(base_dir, file_pattern(name)))
432
433    def __enter__(self):
434        return self.paths
435
436    def __exit__(self, exc_type, exc_val, exc_tb):
437        try:
438            for path in self.paths:
439                os.remove(path)
440        except OSError:
441            pass
442        return False
443
444class FilePath(FilePaths):
445    """
446    FilePath is a specialization of FilePaths that takes a single filename.
447    """
448    def __init__(self, name, base_dir=test_dir):
449        super(FilePath, self).__init__([name], base_dir)
450
451    def __enter__(self):
452        return self.paths[0]
453
454def file_path_remover():
455    for path in reversed(file_path_remover.paths):
456        try:
457            os.remove(path)
458        except OSError:
459            pass
460
461
462def file_path(*names, base_dir=test_dir):
463    ''' Another way to get auto-generated filename that cleans itself up.
464
465    Use is as simple as:
466
467    img_a, img_b = file_path('a.img', 'b.img')
468    sock = file_path('socket')
469    '''
470
471    if not hasattr(file_path_remover, 'paths'):
472        file_path_remover.paths = []
473        atexit.register(file_path_remover)
474
475    paths = []
476    for name in names:
477        filename = file_pattern(name)
478        path = os.path.join(base_dir, filename)
479        file_path_remover.paths.append(path)
480        paths.append(path)
481
482    return paths[0] if len(paths) == 1 else paths
483
484def remote_filename(path):
485    if imgproto == 'file':
486        return path
487    elif imgproto == 'ssh':
488        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
489    else:
490        raise Exception("Protocol %s not supported" % (imgproto))
491
492class VM(qtest.QEMUQtestMachine):
493    '''A QEMU VM'''
494
495    def __init__(self, path_suffix=''):
496        name = "qemu%s-%d" % (path_suffix, os.getpid())
497        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
498                                 test_dir=test_dir,
499                                 socket_scm_helper=socket_scm_helper,
500                                 sock_dir=sock_dir)
501        self._num_drives = 0
502
503    def add_object(self, opts):
504        self._args.append('-object')
505        self._args.append(opts)
506        return self
507
508    def add_device(self, opts):
509        self._args.append('-device')
510        self._args.append(opts)
511        return self
512
513    def add_drive_raw(self, opts):
514        self._args.append('-drive')
515        self._args.append(opts)
516        return self
517
518    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
519        '''Add a virtio-blk drive to the VM'''
520        options = ['if=%s' % interface,
521                   'id=drive%d' % self._num_drives]
522
523        if path is not None:
524            options.append('file=%s' % path)
525            options.append('format=%s' % img_format)
526            options.append('cache=%s' % cachemode)
527            options.append('aio=%s' % aiomode)
528
529        if opts:
530            options.append(opts)
531
532        if img_format == 'luks' and 'key-secret' not in opts:
533            # default luks support
534            if luks_default_secret_object not in self._args:
535                self.add_object(luks_default_secret_object)
536
537            options.append(luks_default_key_secret_opt)
538
539        self._args.append('-drive')
540        self._args.append(','.join(options))
541        self._num_drives += 1
542        return self
543
544    def add_blockdev(self, opts):
545        self._args.append('-blockdev')
546        if isinstance(opts, str):
547            self._args.append(opts)
548        else:
549            self._args.append(','.join(opts))
550        return self
551
552    def add_incoming(self, addr):
553        self._args.append('-incoming')
554        self._args.append(addr)
555        return self
556
557    def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
558        cmd = 'human-monitor-command'
559        kwargs = {'command-line': command_line}
560        if use_log:
561            return self.qmp_log(cmd, **kwargs)
562        else:
563            return self.qmp(cmd, **kwargs)
564
565    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
566        """Pause drive r/w operations"""
567        if not event:
568            self.pause_drive(drive, "read_aio")
569            self.pause_drive(drive, "write_aio")
570            return
571        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
572
573    def resume_drive(self, drive: str) -> None:
574        """Resume drive r/w operations"""
575        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
576
577    def hmp_qemu_io(self, drive: str, cmd: str,
578                    use_log: bool = False) -> QMPResponse:
579        """Write to a given drive using an HMP command"""
580        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
581
582    def flatten_qmp_object(self, obj, output=None, basestr=''):
583        if output is None:
584            output = dict()
585        if isinstance(obj, list):
586            for i, item in enumerate(obj):
587                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
588        elif isinstance(obj, dict):
589            for key in obj:
590                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
591        else:
592            output[basestr[:-1]] = obj # Strip trailing '.'
593        return output
594
595    def qmp_to_opts(self, obj):
596        obj = self.flatten_qmp_object(obj)
597        output_list = list()
598        for key in obj:
599            output_list += [key + '=' + obj[key]]
600        return ','.join(output_list)
601
602    def get_qmp_events_filtered(self, wait=60.0):
603        result = []
604        for ev in self.get_qmp_events(wait=wait):
605            result.append(filter_qmp_event(ev))
606        return result
607
608    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
609        full_cmd = OrderedDict((
610            ("execute", cmd),
611            ("arguments", ordered_qmp(kwargs))
612        ))
613        log(full_cmd, filters, indent=indent)
614        result = self.qmp(cmd, **kwargs)
615        log(result, filters, indent=indent)
616        return result
617
618    # Returns None on success, and an error string on failure
619    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
620                pre_finalize=None, cancel=False, wait=60.0):
621        """
622        run_job moves a job from creation through to dismissal.
623
624        :param job: String. ID of recently-launched job
625        :param auto_finalize: Bool. True if the job was launched with
626                              auto_finalize. Defaults to True.
627        :param auto_dismiss: Bool. True if the job was launched with
628                             auto_dismiss=True. Defaults to False.
629        :param pre_finalize: Callback. A callable that takes no arguments to be
630                             invoked prior to issuing job-finalize, if any.
631        :param cancel: Bool. When true, cancels the job after the pre_finalize
632                       callback.
633        :param wait: Float. Timeout value specifying how long to wait for any
634                     event, in seconds. Defaults to 60.0.
635        """
636        match_device = {'data': {'device': job}}
637        match_id = {'data': {'id': job}}
638        events = [
639            ('BLOCK_JOB_COMPLETED', match_device),
640            ('BLOCK_JOB_CANCELLED', match_device),
641            ('BLOCK_JOB_ERROR', match_device),
642            ('BLOCK_JOB_READY', match_device),
643            ('BLOCK_JOB_PENDING', match_id),
644            ('JOB_STATUS_CHANGE', match_id)
645        ]
646        error = None
647        while True:
648            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
649            if ev['event'] != 'JOB_STATUS_CHANGE':
650                log(ev)
651                continue
652            status = ev['data']['status']
653            if status == 'aborting':
654                result = self.qmp('query-jobs')
655                for j in result['return']:
656                    if j['id'] == job:
657                        error = j['error']
658                        log('Job failed: %s' % (j['error']))
659            elif status == 'ready':
660                self.qmp_log('job-complete', id=job)
661            elif status == 'pending' and not auto_finalize:
662                if pre_finalize:
663                    pre_finalize()
664                if cancel:
665                    self.qmp_log('job-cancel', id=job)
666                else:
667                    self.qmp_log('job-finalize', id=job)
668            elif status == 'concluded' and not auto_dismiss:
669                self.qmp_log('job-dismiss', id=job)
670            elif status == 'null':
671                return error
672
673    # Returns None on success, and an error string on failure
674    def blockdev_create(self, options, job_id='job0', filters=None):
675        if filters is None:
676            filters = [filter_qmp_testfiles]
677        result = self.qmp_log('blockdev-create', filters=filters,
678                              job_id=job_id, options=options)
679
680        if 'return' in result:
681            assert result['return'] == {}
682            job_result = self.run_job(job_id)
683        else:
684            job_result = result['error']
685
686        log("")
687        return job_result
688
689    def enable_migration_events(self, name):
690        log('Enabling migration QMP events on %s...' % name)
691        log(self.qmp('migrate-set-capabilities', capabilities=[
692            {
693                'capability': 'events',
694                'state': True
695            }
696        ]))
697
698    def wait_migration(self, expect_runstate):
699        while True:
700            event = self.event_wait('MIGRATION')
701            log(event, filters=[filter_qmp_event])
702            if event['data']['status'] == 'completed':
703                break
704        # The event may occur in finish-migrate, so wait for the expected
705        # post-migration runstate
706        while self.qmp('query-status')['return']['status'] != expect_runstate:
707            pass
708
709    def node_info(self, node_name):
710        nodes = self.qmp('query-named-block-nodes')
711        for x in nodes['return']:
712            if x['node-name'] == node_name:
713                return x
714        return None
715
716    def query_bitmaps(self):
717        res = self.qmp("query-named-block-nodes")
718        return {device['node-name']: device['dirty-bitmaps']
719                for device in res['return'] if 'dirty-bitmaps' in device}
720
721    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
722        """
723        get a specific bitmap from the object returned by query_bitmaps.
724        :param recording: If specified, filter results by the specified value.
725        :param bitmaps: If specified, use it instead of call query_bitmaps()
726        """
727        if bitmaps is None:
728            bitmaps = self.query_bitmaps()
729
730        for bitmap in bitmaps[node_name]:
731            if bitmap.get('name', '') == bitmap_name:
732                if recording is None or bitmap.get('recording') == recording:
733                    return bitmap
734        return None
735
736    def check_bitmap_status(self, node_name, bitmap_name, fields):
737        ret = self.get_bitmap(node_name, bitmap_name)
738
739        return fields.items() <= ret.items()
740
741    def assert_block_path(self, root, path, expected_node, graph=None):
742        """
743        Check whether the node under the given path in the block graph
744        is @expected_node.
745
746        @root is the node name of the node where the @path is rooted.
747
748        @path is a string that consists of child names separated by
749        slashes.  It must begin with a slash.
750
751        Examples for @root + @path:
752          - root="qcow2-node", path="/backing/file"
753          - root="quorum-node", path="/children.2/file"
754
755        Hypothetically, @path could be empty, in which case it would
756        point to @root.  However, in practice this case is not useful
757        and hence not allowed.
758
759        @expected_node may be None.  (All elements of the path but the
760        leaf must still exist.)
761
762        @graph may be None or the result of an x-debug-query-block-graph
763        call that has already been performed.
764        """
765        if graph is None:
766            graph = self.qmp('x-debug-query-block-graph')['return']
767
768        iter_path = iter(path.split('/'))
769
770        # Must start with a /
771        assert next(iter_path) == ''
772
773        node = next((node for node in graph['nodes'] if node['name'] == root),
774                    None)
775
776        # An empty @path is not allowed, so the root node must be present
777        assert node is not None, 'Root node %s not found' % root
778
779        for child_name in iter_path:
780            assert node is not None, 'Cannot follow path %s%s' % (root, path)
781
782            try:
783                node_id = next(edge['child'] for edge in graph['edges']
784                               if (edge['parent'] == node['id'] and
785                                   edge['name'] == child_name))
786
787                node = next(node for node in graph['nodes']
788                            if node['id'] == node_id)
789
790            except StopIteration:
791                node = None
792
793        if node is None:
794            assert expected_node is None, \
795                   'No node found under %s (but expected %s)' % \
796                   (path, expected_node)
797        else:
798            assert node['name'] == expected_node, \
799                   'Found node %s under %s (but expected %s)' % \
800                   (node['name'], path, expected_node)
801
802index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
803
804class QMPTestCase(unittest.TestCase):
805    '''Abstract base class for QMP test cases'''
806
807    def __init__(self, *args, **kwargs):
808        super().__init__(*args, **kwargs)
809        # Many users of this class set a VM property we rely on heavily
810        # in the methods below.
811        self.vm = None
812
813    def dictpath(self, d, path):
814        '''Traverse a path in a nested dict'''
815        for component in path.split('/'):
816            m = index_re.match(component)
817            if m:
818                component, idx = m.groups()
819                idx = int(idx)
820
821            if not isinstance(d, dict) or component not in d:
822                self.fail(f'failed path traversal for "{path}" in "{d}"')
823            d = d[component]
824
825            if m:
826                if not isinstance(d, list):
827                    self.fail(f'path component "{component}" in "{path}" '
828                              f'is not a list in "{d}"')
829                try:
830                    d = d[idx]
831                except IndexError:
832                    self.fail(f'invalid index "{idx}" in path "{path}" '
833                              f'in "{d}"')
834        return d
835
836    def assert_qmp_absent(self, d, path):
837        try:
838            result = self.dictpath(d, path)
839        except AssertionError:
840            return
841        self.fail('path "%s" has value "%s"' % (path, str(result)))
842
843    def assert_qmp(self, d, path, value):
844        '''Assert that the value for a specific path in a QMP dict
845           matches.  When given a list of values, assert that any of
846           them matches.'''
847
848        result = self.dictpath(d, path)
849
850        # [] makes no sense as a list of valid values, so treat it as
851        # an actual single value.
852        if isinstance(value, list) and value != []:
853            for v in value:
854                if result == v:
855                    return
856            self.fail('no match for "%s" in %s' % (str(result), str(value)))
857        else:
858            self.assertEqual(result, value,
859                             '"%s" is "%s", expected "%s"'
860                             % (path, str(result), str(value)))
861
862    def assert_no_active_block_jobs(self):
863        result = self.vm.qmp('query-block-jobs')
864        self.assert_qmp(result, 'return', [])
865
866    def assert_has_block_node(self, node_name=None, file_name=None):
867        """Issue a query-named-block-nodes and assert node_name and/or
868        file_name is present in the result"""
869        def check_equal_or_none(a, b):
870            return a is None or b is None or a == b
871        assert node_name or file_name
872        result = self.vm.qmp('query-named-block-nodes')
873        for x in result["return"]:
874            if check_equal_or_none(x.get("node-name"), node_name) and \
875                    check_equal_or_none(x.get("file"), file_name):
876                return
877        self.fail("Cannot find %s %s in result:\n%s" %
878                  (node_name, file_name, result))
879
880    def assert_json_filename_equal(self, json_filename, reference):
881        '''Asserts that the given filename is a json: filename and that its
882           content is equal to the given reference object'''
883        self.assertEqual(json_filename[:5], 'json:')
884        self.assertEqual(
885            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
886            self.vm.flatten_qmp_object(reference)
887        )
888
889    def cancel_and_wait(self, drive='drive0', force=False,
890                        resume=False, wait=60.0):
891        '''Cancel a block job and wait for it to finish, returning the event'''
892        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
893        self.assert_qmp(result, 'return', {})
894
895        if resume:
896            self.vm.resume_drive(drive)
897
898        cancelled = False
899        result = None
900        while not cancelled:
901            for event in self.vm.get_qmp_events(wait=wait):
902                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
903                   event['event'] == 'BLOCK_JOB_CANCELLED':
904                    self.assert_qmp(event, 'data/device', drive)
905                    result = event
906                    cancelled = True
907                elif event['event'] == 'JOB_STATUS_CHANGE':
908                    self.assert_qmp(event, 'data/id', drive)
909
910
911        self.assert_no_active_block_jobs()
912        return result
913
914    def wait_until_completed(self, drive='drive0', check_offset=True,
915                             wait=60.0, error=None):
916        '''Wait for a block job to finish, returning the event'''
917        while True:
918            for event in self.vm.get_qmp_events(wait=wait):
919                if event['event'] == 'BLOCK_JOB_COMPLETED':
920                    self.assert_qmp(event, 'data/device', drive)
921                    if error is None:
922                        self.assert_qmp_absent(event, 'data/error')
923                        if check_offset:
924                            self.assert_qmp(event, 'data/offset',
925                                            event['data']['len'])
926                    else:
927                        self.assert_qmp(event, 'data/error', error)
928                    self.assert_no_active_block_jobs()
929                    return event
930                if event['event'] == 'JOB_STATUS_CHANGE':
931                    self.assert_qmp(event, 'data/id', drive)
932
933    def wait_ready(self, drive='drive0'):
934        """Wait until a BLOCK_JOB_READY event, and return the event."""
935        f = {'data': {'type': 'mirror', 'device': drive}}
936        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
937
938    def wait_ready_and_cancel(self, drive='drive0'):
939        self.wait_ready(drive=drive)
940        event = self.cancel_and_wait(drive=drive)
941        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
942        self.assert_qmp(event, 'data/type', 'mirror')
943        self.assert_qmp(event, 'data/offset', event['data']['len'])
944
945    def complete_and_wait(self, drive='drive0', wait_ready=True,
946                          completion_error=None):
947        '''Complete a block job and wait for it to finish'''
948        if wait_ready:
949            self.wait_ready(drive=drive)
950
951        result = self.vm.qmp('block-job-complete', device=drive)
952        self.assert_qmp(result, 'return', {})
953
954        event = self.wait_until_completed(drive=drive, error=completion_error)
955        self.assert_qmp(event, 'data/type', 'mirror')
956
957    def pause_wait(self, job_id='job0'):
958        with Timeout(3, "Timeout waiting for job to pause"):
959            while True:
960                result = self.vm.qmp('query-block-jobs')
961                found = False
962                for job in result['return']:
963                    if job['device'] == job_id:
964                        found = True
965                        if job['paused'] and not job['busy']:
966                            return job
967                        break
968                assert found
969
970    def pause_job(self, job_id='job0', wait=True):
971        result = self.vm.qmp('block-job-pause', device=job_id)
972        self.assert_qmp(result, 'return', {})
973        if wait:
974            return self.pause_wait(job_id)
975        return result
976
977    def case_skip(self, reason):
978        '''Skip this test case'''
979        case_notrun(reason)
980        self.skipTest(reason)
981
982
983def notrun(reason):
984    '''Skip this test suite'''
985    # Each test in qemu-iotests has a number ("seq")
986    seq = os.path.basename(sys.argv[0])
987
988    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
989    logger.warning("%s not run: %s", seq, reason)
990    sys.exit(0)
991
992def case_notrun(reason):
993    '''Mark this test case as not having been run (without actually
994    skipping it, that is left to the caller).  See
995    QMPTestCase.case_skip() for a variant that actually skips the
996    current test case.'''
997
998    # Each test in qemu-iotests has a number ("seq")
999    seq = os.path.basename(sys.argv[0])
1000
1001    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1002        '    [case not run] ' + reason + '\n')
1003
1004def _verify_image_format(supported_fmts: Sequence[str] = (),
1005                         unsupported_fmts: Sequence[str] = ()) -> None:
1006    assert not (supported_fmts and unsupported_fmts)
1007
1008    if 'generic' in supported_fmts and \
1009            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1010        # similar to
1011        #   _supported_fmt generic
1012        # for bash tests
1013        return
1014
1015    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1016    if not_sup or (imgfmt in unsupported_fmts):
1017        notrun('not suitable for this image format: %s' % imgfmt)
1018
1019def _verify_protocol(supported: Sequence[str] = (),
1020                     unsupported: Sequence[str] = ()) -> None:
1021    assert not (supported and unsupported)
1022
1023    if 'generic' in supported:
1024        return
1025
1026    not_sup = supported and (imgproto not in supported)
1027    if not_sup or (imgproto in unsupported):
1028        notrun('not suitable for this protocol: %s' % imgproto)
1029
1030def _verify_platform(supported: Sequence[str] = (),
1031                     unsupported: Sequence[str] = ()) -> None:
1032    if any((sys.platform.startswith(x) for x in unsupported)):
1033        notrun('not suitable for this OS: %s' % sys.platform)
1034
1035    if supported:
1036        if not any((sys.platform.startswith(x) for x in supported)):
1037            notrun('not suitable for this OS: %s' % sys.platform)
1038
1039def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1040    if supported_cache_modes and (cachemode not in supported_cache_modes):
1041        notrun('not suitable for this cache mode: %s' % cachemode)
1042
1043def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1044    if supported_aio_modes and (aiomode not in supported_aio_modes):
1045        notrun('not suitable for this aio mode: %s' % aiomode)
1046
1047def supports_quorum():
1048    return 'quorum' in qemu_img_pipe('--help')
1049
1050def verify_quorum():
1051    '''Skip test suite if quorum support is not available'''
1052    if not supports_quorum():
1053        notrun('quorum support missing')
1054
1055def qemu_pipe(*args):
1056    """
1057    Run qemu with an option to print something and exit (e.g. a help option).
1058
1059    :return: QEMU's stdout output.
1060    """
1061    args = [qemu_prog] + qemu_opts + list(args)
1062    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1063                            stderr=subprocess.STDOUT,
1064                            universal_newlines=True)
1065    exitcode = subp.wait()
1066    if exitcode < 0:
1067        sys.stderr.write('qemu received signal %i: %s\n' %
1068                         (-exitcode, ' '.join(args)))
1069    return subp.communicate()[0]
1070
1071def supported_formats(read_only=False):
1072    '''Set 'read_only' to True to check ro-whitelist
1073       Otherwise, rw-whitelist is checked'''
1074
1075    if not hasattr(supported_formats, "formats"):
1076        supported_formats.formats = {}
1077
1078    if read_only not in supported_formats.formats:
1079        format_message = qemu_pipe("-drive", "format=help")
1080        line = 1 if read_only else 0
1081        supported_formats.formats[read_only] = \
1082            format_message.splitlines()[line].split(":")[1].split()
1083
1084    return supported_formats.formats[read_only]
1085
1086def skip_if_unsupported(required_formats=(), read_only=False):
1087    '''Skip Test Decorator
1088       Runs the test if all the required formats are whitelisted'''
1089    def skip_test_decorator(func):
1090        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1091                         **kwargs: Dict[str, Any]) -> None:
1092            if callable(required_formats):
1093                fmts = required_formats(test_case)
1094            else:
1095                fmts = required_formats
1096
1097            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1098            if usf_list:
1099                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1100                test_case.case_skip(msg)
1101            else:
1102                func(test_case, *args, **kwargs)
1103        return func_wrapper
1104    return skip_test_decorator
1105
1106def skip_if_user_is_root(func):
1107    '''Skip Test Decorator
1108       Runs the test only without root permissions'''
1109    def func_wrapper(*args, **kwargs):
1110        if os.getuid() == 0:
1111            case_notrun('{}: cannot be run as root'.format(args[0]))
1112            return None
1113        else:
1114            return func(*args, **kwargs)
1115    return func_wrapper
1116
1117def execute_unittest(debug=False):
1118    """Executes unittests within the calling module."""
1119
1120    verbosity = 2 if debug else 1
1121
1122    if debug:
1123        output = sys.stdout
1124    else:
1125        # We need to filter out the time taken from the output so that
1126        # qemu-iotest can reliably diff the results against master output.
1127        output = io.StringIO()
1128
1129    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1130                                     verbosity=verbosity)
1131    try:
1132        # unittest.main() will use sys.exit(); so expect a SystemExit
1133        # exception
1134        unittest.main(testRunner=runner)
1135    finally:
1136        # We need to filter out the time taken from the output so that
1137        # qemu-iotest can reliably diff the results against master output.
1138        if not debug:
1139            out = output.getvalue()
1140            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1141
1142            # Hide skipped tests from the reference output
1143            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1144            out_first_line, out_rest = out.split('\n', 1)
1145            out = out_first_line.replace('s', '.') + '\n' + out_rest
1146
1147            sys.stderr.write(out)
1148
1149def execute_setup_common(supported_fmts: Sequence[str] = (),
1150                         supported_platforms: Sequence[str] = (),
1151                         supported_cache_modes: Sequence[str] = (),
1152                         supported_aio_modes: Sequence[str] = (),
1153                         unsupported_fmts: Sequence[str] = (),
1154                         supported_protocols: Sequence[str] = (),
1155                         unsupported_protocols: Sequence[str] = ()) -> bool:
1156    """
1157    Perform necessary setup for either script-style or unittest-style tests.
1158
1159    :return: Bool; Whether or not debug mode has been requested via the CLI.
1160    """
1161    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1162
1163    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1164    # indicate that we're not being run via "check". There may be
1165    # other things set up by "check" that individual test cases rely
1166    # on.
1167    if test_dir is None or qemu_default_machine is None:
1168        sys.stderr.write('Please run this test via the "check" script\n')
1169        sys.exit(os.EX_USAGE)
1170
1171    debug = '-d' in sys.argv
1172    if debug:
1173        sys.argv.remove('-d')
1174    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1175
1176    _verify_image_format(supported_fmts, unsupported_fmts)
1177    _verify_protocol(supported_protocols, unsupported_protocols)
1178    _verify_platform(supported=supported_platforms)
1179    _verify_cache_mode(supported_cache_modes)
1180    _verify_aio_mode(supported_aio_modes)
1181
1182    return debug
1183
1184def execute_test(*args, test_function=None, **kwargs):
1185    """Run either unittest or script-style tests."""
1186
1187    debug = execute_setup_common(*args, **kwargs)
1188    if not test_function:
1189        execute_unittest(debug)
1190    else:
1191        test_function()
1192
1193def activate_logging():
1194    """Activate iotests.log() output to stdout for script-style tests."""
1195    handler = logging.StreamHandler(stream=sys.stdout)
1196    formatter = logging.Formatter('%(message)s')
1197    handler.setFormatter(formatter)
1198    test_logger.addHandler(handler)
1199    test_logger.setLevel(logging.INFO)
1200    test_logger.propagate = False
1201
1202# This is called from script-style iotests without a single point of entry
1203def script_initialize(*args, **kwargs):
1204    """Initialize script-style tests without running any tests."""
1205    activate_logging()
1206    execute_setup_common(*args, **kwargs)
1207
1208# This is called from script-style iotests with a single point of entry
1209def script_main(test_function, *args, **kwargs):
1210    """Run script-style tests outside of the unittest framework"""
1211    activate_logging()
1212    execute_test(*args, test_function=test_function, **kwargs)
1213
1214# This is called from unittest style iotests
1215def main(*args, **kwargs):
1216    """Run tests using the unittest framework"""
1217    execute_test(*args, **kwargs)
1218