xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision a68694cd)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43assert sys.version_info >= (3, 6)
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_measure(*args):
145    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
146
147def qemu_img_check(*args):
148    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
149
150def qemu_img_verbose(*args):
151    '''Run qemu-img without suppressing its output and return the exit code'''
152    exitcode = subprocess.call(qemu_img_args + list(args))
153    if exitcode < 0:
154        sys.stderr.write('qemu-img received signal %i: %s\n'
155                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
156    return exitcode
157
158def qemu_img_pipe(*args: str) -> str:
159    '''Run qemu-img and return its output'''
160    return qemu_img_pipe_and_status(*args)[0]
161
162def qemu_img_log(*args):
163    result = qemu_img_pipe(*args)
164    log(result, filters=[filter_testfiles])
165    return result
166
167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168    args = ['info']
169    if imgopts:
170        args.append('--image-opts')
171    else:
172        args += ['-f', imgfmt]
173    args += extra_args
174    args.append(filename)
175
176    output = qemu_img_pipe(*args)
177    if not filter_path:
178        filter_path = filename
179    log(filter_img_info(output, filter_path))
180
181def qemu_io(*args):
182    '''Run qemu-io and return the stdout data'''
183    args = qemu_io_args + list(args)
184    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185                            stderr=subprocess.STDOUT,
186                            universal_newlines=True)
187    output = subp.communicate()[0]
188    if subp.returncode < 0:
189        sys.stderr.write('qemu-io received signal %i: %s\n'
190                         % (-subp.returncode, ' '.join(args)))
191    return output
192
193def qemu_io_log(*args):
194    result = qemu_io(*args)
195    log(result, filters=[filter_testfiles, filter_qemu_io])
196    return result
197
198def qemu_io_silent(*args):
199    '''Run qemu-io and return the exit code, suppressing stdout'''
200    args = qemu_io_args + list(args)
201    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202    if exitcode < 0:
203        sys.stderr.write('qemu-io received signal %i: %s\n' %
204                         (-exitcode, ' '.join(args)))
205    return exitcode
206
207def qemu_io_silent_check(*args):
208    '''Run qemu-io and return the true if subprocess returned 0'''
209    args = qemu_io_args + list(args)
210    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211                               stderr=subprocess.STDOUT)
212    return exitcode == 0
213
214def get_virtio_scsi_device():
215    if qemu_default_machine == 's390-ccw-virtio':
216        return 'virtio-scsi-ccw'
217    return 'virtio-scsi-pci'
218
219class QemuIoInteractive:
220    def __init__(self, *args):
221        self.args = qemu_io_args_no_fmt + list(args)
222        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223                                   stdout=subprocess.PIPE,
224                                   stderr=subprocess.STDOUT,
225                                   universal_newlines=True)
226        out = self._p.stdout.read(9)
227        if out != 'qemu-io> ':
228            # Most probably qemu-io just failed to start.
229            # Let's collect the whole output and exit.
230            out += self._p.stdout.read()
231            self._p.wait(timeout=1)
232            raise ValueError(out)
233
234    def close(self):
235        self._p.communicate('q\n')
236
237    def _read_output(self):
238        pattern = 'qemu-io> '
239        n = len(pattern)
240        pos = 0
241        s = []
242        while pos != n:
243            c = self._p.stdout.read(1)
244            # check unexpected EOF
245            assert c != ''
246            s.append(c)
247            if c == pattern[pos]:
248                pos += 1
249            else:
250                pos = 0
251
252        return ''.join(s[:-n])
253
254    def cmd(self, cmd):
255        # quit command is in close(), '\n' is added automatically
256        assert '\n' not in cmd
257        cmd = cmd.strip()
258        assert cmd not in ('q', 'quit')
259        self._p.stdin.write(cmd + '\n')
260        self._p.stdin.flush()
261        return self._read_output()
262
263
264def qemu_nbd(*args):
265    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
267
268def qemu_nbd_early_pipe(*args):
269    '''Run qemu-nbd in daemon mode and return both the parent's exit code
270       and its output in case of an error'''
271    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272                            stdout=subprocess.PIPE,
273                            universal_newlines=True)
274    output = subp.communicate()[0]
275    if subp.returncode < 0:
276        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277                         (-subp.returncode,
278                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
279
280    return subp.returncode, output if subp.returncode else ''
281
282@contextmanager
283def qemu_nbd_popen(*args):
284    '''Context manager running qemu-nbd within the context'''
285    pid_file = file_path("pid")
286
287    cmd = list(qemu_nbd_args)
288    cmd.extend(('--persistent', '--pid-file', pid_file))
289    cmd.extend(args)
290
291    log('Start NBD server')
292    p = subprocess.Popen(cmd)
293    try:
294        while not os.path.exists(pid_file):
295            if p.poll() is not None:
296                raise RuntimeError(
297                    "qemu-nbd terminated with exit code {}: {}"
298                    .format(p.returncode, ' '.join(cmd)))
299
300            time.sleep(0.01)
301        yield
302    finally:
303        log('Kill NBD server')
304        p.kill()
305        p.wait()
306
307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308    '''Return True if two image files are identical'''
309    return qemu_img('compare', '-f', fmt1,
310                    '-F', fmt2, img1, img2) == 0
311
312def create_image(name, size):
313    '''Create a fully-allocated raw image with sector markers'''
314    file = open(name, 'wb')
315    i = 0
316    while i < size:
317        sector = struct.pack('>l504xl', i // 512, i // 512)
318        file.write(sector)
319        i = i + 512
320    file.close()
321
322def image_size(img):
323    '''Return image's virtual size'''
324    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325    return json.loads(r)['virtual-size']
326
327def is_str(val):
328    return isinstance(val, str)
329
330test_dir_re = re.compile(r"%s" % test_dir)
331def filter_test_dir(msg):
332    return test_dir_re.sub("TEST_DIR", msg)
333
334win32_re = re.compile(r"\r")
335def filter_win32(msg):
336    return win32_re.sub("", msg)
337
338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340                        r"and [0-9\/.inf]* ops\/sec\)")
341def filter_qemu_io(msg):
342    msg = filter_win32(msg)
343    return qemu_io_re.sub("X ops; XX:XX:XX.X "
344                          "(XXX YYY/sec and XXX ops/sec)", msg)
345
346chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347def filter_chown(msg):
348    return chown_re.sub("chown UID:GID", msg)
349
350def filter_qmp_event(event):
351    '''Filter a QMP event dict'''
352    event = dict(event)
353    if 'timestamp' in event:
354        event['timestamp']['seconds'] = 'SECS'
355        event['timestamp']['microseconds'] = 'USECS'
356    return event
357
358def filter_qmp(qmsg, filter_fn):
359    '''Given a string filter, filter a QMP object's values.
360    filter_fn takes a (key, value) pair.'''
361    # Iterate through either lists or dicts;
362    if isinstance(qmsg, list):
363        items = enumerate(qmsg)
364    else:
365        items = qmsg.items()
366
367    for k, v in items:
368        if isinstance(v, (dict, list)):
369            qmsg[k] = filter_qmp(v, filter_fn)
370        else:
371            qmsg[k] = filter_fn(k, v)
372    return qmsg
373
374def filter_testfiles(msg):
375    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
378
379def filter_qmp_testfiles(qmsg):
380    def _filter(_key, value):
381        if is_str(value):
382            return filter_testfiles(value)
383        return value
384    return filter_qmp(qmsg, _filter)
385
386def filter_generated_node_ids(msg):
387    return re.sub("#block[0-9]+", "NODE_NAME", msg)
388
389def filter_img_info(output, filename):
390    lines = []
391    for line in output.split('\n'):
392        if 'disk size' in line or 'actual-size' in line:
393            continue
394        line = line.replace(filename, 'TEST_IMG')
395        line = filter_testfiles(line)
396        line = line.replace(imgfmt, 'IMGFMT')
397        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398        line = re.sub('uuid: [-a-f0-9]+',
399                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400                      line)
401        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402        lines.append(line)
403    return '\n'.join(lines)
404
405def filter_imgfmt(msg):
406    return msg.replace(imgfmt, 'IMGFMT')
407
408def filter_qmp_imgfmt(qmsg):
409    def _filter(_key, value):
410        if is_str(value):
411            return filter_imgfmt(value)
412        return value
413    return filter_qmp(qmsg, _filter)
414
415
416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
417
418def log(msg: Msg,
419        filters: Iterable[Callable[[Msg], Msg]] = (),
420        indent: Optional[int] = None) -> None:
421    """
422    Logs either a string message or a JSON serializable message (like QMP).
423    If indent is provided, JSON serializable messages are pretty-printed.
424    """
425    for flt in filters:
426        msg = flt(msg)
427    if isinstance(msg, (dict, list)):
428        # Don't sort if it's already sorted
429        do_sort = not isinstance(msg, OrderedDict)
430        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431    else:
432        test_logger.info(msg)
433
434class Timeout:
435    def __init__(self, seconds, errmsg="Timeout"):
436        self.seconds = seconds
437        self.errmsg = errmsg
438    def __enter__(self):
439        signal.signal(signal.SIGALRM, self.timeout)
440        signal.setitimer(signal.ITIMER_REAL, self.seconds)
441        return self
442    def __exit__(self, exc_type, value, traceback):
443        signal.setitimer(signal.ITIMER_REAL, 0)
444        return False
445    def timeout(self, signum, frame):
446        raise Exception(self.errmsg)
447
448def file_pattern(name):
449    return "{0}-{1}".format(os.getpid(), name)
450
451class FilePaths:
452    """
453    FilePaths is an auto-generated filename that cleans itself up.
454
455    Use this context manager to generate filenames and ensure that the file
456    gets deleted::
457
458        with FilePaths(['test.img']) as img_path:
459            qemu_img('create', img_path, '1G')
460        # migration_sock_path is automatically deleted
461    """
462    def __init__(self, names, base_dir=test_dir):
463        self.paths = []
464        for name in names:
465            self.paths.append(os.path.join(base_dir, file_pattern(name)))
466
467    def __enter__(self):
468        return self.paths
469
470    def __exit__(self, exc_type, exc_val, exc_tb):
471        try:
472            for path in self.paths:
473                os.remove(path)
474        except OSError:
475            pass
476        return False
477
478class FilePath(FilePaths):
479    """
480    FilePath is a specialization of FilePaths that takes a single filename.
481    """
482    def __init__(self, name, base_dir=test_dir):
483        super(FilePath, self).__init__([name], base_dir)
484
485    def __enter__(self):
486        return self.paths[0]
487
488def file_path_remover():
489    for path in reversed(file_path_remover.paths):
490        try:
491            os.remove(path)
492        except OSError:
493            pass
494
495
496def file_path(*names, base_dir=test_dir):
497    ''' Another way to get auto-generated filename that cleans itself up.
498
499    Use is as simple as:
500
501    img_a, img_b = file_path('a.img', 'b.img')
502    sock = file_path('socket')
503    '''
504
505    if not hasattr(file_path_remover, 'paths'):
506        file_path_remover.paths = []
507        atexit.register(file_path_remover)
508
509    paths = []
510    for name in names:
511        filename = file_pattern(name)
512        path = os.path.join(base_dir, filename)
513        file_path_remover.paths.append(path)
514        paths.append(path)
515
516    return paths[0] if len(paths) == 1 else paths
517
518def remote_filename(path):
519    if imgproto == 'file':
520        return path
521    elif imgproto == 'ssh':
522        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
523    else:
524        raise Exception("Protocol %s not supported" % (imgproto))
525
526class VM(qtest.QEMUQtestMachine):
527    '''A QEMU VM'''
528
529    def __init__(self, path_suffix=''):
530        name = "qemu%s-%d" % (path_suffix, os.getpid())
531        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
532                                 test_dir=test_dir,
533                                 socket_scm_helper=socket_scm_helper,
534                                 sock_dir=sock_dir)
535        self._num_drives = 0
536
537    def add_object(self, opts):
538        self._args.append('-object')
539        self._args.append(opts)
540        return self
541
542    def add_device(self, opts):
543        self._args.append('-device')
544        self._args.append(opts)
545        return self
546
547    def add_drive_raw(self, opts):
548        self._args.append('-drive')
549        self._args.append(opts)
550        return self
551
552    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
553        '''Add a virtio-blk drive to the VM'''
554        options = ['if=%s' % interface,
555                   'id=drive%d' % self._num_drives]
556
557        if path is not None:
558            options.append('file=%s' % path)
559            options.append('format=%s' % img_format)
560            options.append('cache=%s' % cachemode)
561            options.append('aio=%s' % aiomode)
562
563        if opts:
564            options.append(opts)
565
566        if img_format == 'luks' and 'key-secret' not in opts:
567            # default luks support
568            if luks_default_secret_object not in self._args:
569                self.add_object(luks_default_secret_object)
570
571            options.append(luks_default_key_secret_opt)
572
573        self._args.append('-drive')
574        self._args.append(','.join(options))
575        self._num_drives += 1
576        return self
577
578    def add_blockdev(self, opts):
579        self._args.append('-blockdev')
580        if isinstance(opts, str):
581            self._args.append(opts)
582        else:
583            self._args.append(','.join(opts))
584        return self
585
586    def add_incoming(self, addr):
587        self._args.append('-incoming')
588        self._args.append(addr)
589        return self
590
591    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
592        cmd = 'human-monitor-command'
593        kwargs = {'command-line': command_line}
594        if use_log:
595            return self.qmp_log(cmd, **kwargs)
596        else:
597            return self.qmp(cmd, **kwargs)
598
599    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
600        """Pause drive r/w operations"""
601        if not event:
602            self.pause_drive(drive, "read_aio")
603            self.pause_drive(drive, "write_aio")
604            return
605        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
606
607    def resume_drive(self, drive: str) -> None:
608        """Resume drive r/w operations"""
609        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
610
611    def hmp_qemu_io(self, drive: str, cmd: str,
612                    use_log: bool = False) -> QMPMessage:
613        """Write to a given drive using an HMP command"""
614        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
615
616    def flatten_qmp_object(self, obj, output=None, basestr=''):
617        if output is None:
618            output = dict()
619        if isinstance(obj, list):
620            for i, item in enumerate(obj):
621                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
622        elif isinstance(obj, dict):
623            for key in obj:
624                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
625        else:
626            output[basestr[:-1]] = obj # Strip trailing '.'
627        return output
628
629    def qmp_to_opts(self, obj):
630        obj = self.flatten_qmp_object(obj)
631        output_list = list()
632        for key in obj:
633            output_list += [key + '=' + obj[key]]
634        return ','.join(output_list)
635
636    def get_qmp_events_filtered(self, wait=60.0):
637        result = []
638        for ev in self.get_qmp_events(wait=wait):
639            result.append(filter_qmp_event(ev))
640        return result
641
642    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
643        full_cmd = OrderedDict((
644            ("execute", cmd),
645            ("arguments", ordered_qmp(kwargs))
646        ))
647        log(full_cmd, filters, indent=indent)
648        result = self.qmp(cmd, **kwargs)
649        log(result, filters, indent=indent)
650        return result
651
652    # Returns None on success, and an error string on failure
653    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
654                pre_finalize=None, cancel=False, wait=60.0):
655        """
656        run_job moves a job from creation through to dismissal.
657
658        :param job: String. ID of recently-launched job
659        :param auto_finalize: Bool. True if the job was launched with
660                              auto_finalize. Defaults to True.
661        :param auto_dismiss: Bool. True if the job was launched with
662                             auto_dismiss=True. Defaults to False.
663        :param pre_finalize: Callback. A callable that takes no arguments to be
664                             invoked prior to issuing job-finalize, if any.
665        :param cancel: Bool. When true, cancels the job after the pre_finalize
666                       callback.
667        :param wait: Float. Timeout value specifying how long to wait for any
668                     event, in seconds. Defaults to 60.0.
669        """
670        match_device = {'data': {'device': job}}
671        match_id = {'data': {'id': job}}
672        events = [
673            ('BLOCK_JOB_COMPLETED', match_device),
674            ('BLOCK_JOB_CANCELLED', match_device),
675            ('BLOCK_JOB_ERROR', match_device),
676            ('BLOCK_JOB_READY', match_device),
677            ('BLOCK_JOB_PENDING', match_id),
678            ('JOB_STATUS_CHANGE', match_id)
679        ]
680        error = None
681        while True:
682            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
683            if ev['event'] != 'JOB_STATUS_CHANGE':
684                log(ev)
685                continue
686            status = ev['data']['status']
687            if status == 'aborting':
688                result = self.qmp('query-jobs')
689                for j in result['return']:
690                    if j['id'] == job:
691                        error = j['error']
692                        log('Job failed: %s' % (j['error']))
693            elif status == 'ready':
694                self.qmp_log('job-complete', id=job)
695            elif status == 'pending' and not auto_finalize:
696                if pre_finalize:
697                    pre_finalize()
698                if cancel:
699                    self.qmp_log('job-cancel', id=job)
700                else:
701                    self.qmp_log('job-finalize', id=job)
702            elif status == 'concluded' and not auto_dismiss:
703                self.qmp_log('job-dismiss', id=job)
704            elif status == 'null':
705                return error
706
707    # Returns None on success, and an error string on failure
708    def blockdev_create(self, options, job_id='job0', filters=None):
709        if filters is None:
710            filters = [filter_qmp_testfiles]
711        result = self.qmp_log('blockdev-create', filters=filters,
712                              job_id=job_id, options=options)
713
714        if 'return' in result:
715            assert result['return'] == {}
716            job_result = self.run_job(job_id)
717        else:
718            job_result = result['error']
719
720        log("")
721        return job_result
722
723    def enable_migration_events(self, name):
724        log('Enabling migration QMP events on %s...' % name)
725        log(self.qmp('migrate-set-capabilities', capabilities=[
726            {
727                'capability': 'events',
728                'state': True
729            }
730        ]))
731
732    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
733        while True:
734            event = self.event_wait('MIGRATION')
735            log(event, filters=[filter_qmp_event])
736            if event['data']['status'] in ('completed', 'failed'):
737                break
738
739        if event['data']['status'] == 'completed':
740            # The event may occur in finish-migrate, so wait for the expected
741            # post-migration runstate
742            runstate = None
743            while runstate != expect_runstate:
744                runstate = self.qmp('query-status')['return']['status']
745            return True
746        else:
747            return False
748
749    def node_info(self, node_name):
750        nodes = self.qmp('query-named-block-nodes')
751        for x in nodes['return']:
752            if x['node-name'] == node_name:
753                return x
754        return None
755
756    def query_bitmaps(self):
757        res = self.qmp("query-named-block-nodes")
758        return {device['node-name']: device['dirty-bitmaps']
759                for device in res['return'] if 'dirty-bitmaps' in device}
760
761    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
762        """
763        get a specific bitmap from the object returned by query_bitmaps.
764        :param recording: If specified, filter results by the specified value.
765        :param bitmaps: If specified, use it instead of call query_bitmaps()
766        """
767        if bitmaps is None:
768            bitmaps = self.query_bitmaps()
769
770        for bitmap in bitmaps[node_name]:
771            if bitmap.get('name', '') == bitmap_name:
772                if recording is None or bitmap.get('recording') == recording:
773                    return bitmap
774        return None
775
776    def check_bitmap_status(self, node_name, bitmap_name, fields):
777        ret = self.get_bitmap(node_name, bitmap_name)
778
779        return fields.items() <= ret.items()
780
781    def assert_block_path(self, root, path, expected_node, graph=None):
782        """
783        Check whether the node under the given path in the block graph
784        is @expected_node.
785
786        @root is the node name of the node where the @path is rooted.
787
788        @path is a string that consists of child names separated by
789        slashes.  It must begin with a slash.
790
791        Examples for @root + @path:
792          - root="qcow2-node", path="/backing/file"
793          - root="quorum-node", path="/children.2/file"
794
795        Hypothetically, @path could be empty, in which case it would
796        point to @root.  However, in practice this case is not useful
797        and hence not allowed.
798
799        @expected_node may be None.  (All elements of the path but the
800        leaf must still exist.)
801
802        @graph may be None or the result of an x-debug-query-block-graph
803        call that has already been performed.
804        """
805        if graph is None:
806            graph = self.qmp('x-debug-query-block-graph')['return']
807
808        iter_path = iter(path.split('/'))
809
810        # Must start with a /
811        assert next(iter_path) == ''
812
813        node = next((node for node in graph['nodes'] if node['name'] == root),
814                    None)
815
816        # An empty @path is not allowed, so the root node must be present
817        assert node is not None, 'Root node %s not found' % root
818
819        for child_name in iter_path:
820            assert node is not None, 'Cannot follow path %s%s' % (root, path)
821
822            try:
823                node_id = next(edge['child'] for edge in graph['edges']
824                               if (edge['parent'] == node['id'] and
825                                   edge['name'] == child_name))
826
827                node = next(node for node in graph['nodes']
828                            if node['id'] == node_id)
829
830            except StopIteration:
831                node = None
832
833        if node is None:
834            assert expected_node is None, \
835                   'No node found under %s (but expected %s)' % \
836                   (path, expected_node)
837        else:
838            assert node['name'] == expected_node, \
839                   'Found node %s under %s (but expected %s)' % \
840                   (node['name'], path, expected_node)
841
842index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
843
844class QMPTestCase(unittest.TestCase):
845    '''Abstract base class for QMP test cases'''
846
847    def __init__(self, *args, **kwargs):
848        super().__init__(*args, **kwargs)
849        # Many users of this class set a VM property we rely on heavily
850        # in the methods below.
851        self.vm = None
852
853    def dictpath(self, d, path):
854        '''Traverse a path in a nested dict'''
855        for component in path.split('/'):
856            m = index_re.match(component)
857            if m:
858                component, idx = m.groups()
859                idx = int(idx)
860
861            if not isinstance(d, dict) or component not in d:
862                self.fail(f'failed path traversal for "{path}" in "{d}"')
863            d = d[component]
864
865            if m:
866                if not isinstance(d, list):
867                    self.fail(f'path component "{component}" in "{path}" '
868                              f'is not a list in "{d}"')
869                try:
870                    d = d[idx]
871                except IndexError:
872                    self.fail(f'invalid index "{idx}" in path "{path}" '
873                              f'in "{d}"')
874        return d
875
876    def assert_qmp_absent(self, d, path):
877        try:
878            result = self.dictpath(d, path)
879        except AssertionError:
880            return
881        self.fail('path "%s" has value "%s"' % (path, str(result)))
882
883    def assert_qmp(self, d, path, value):
884        '''Assert that the value for a specific path in a QMP dict
885           matches.  When given a list of values, assert that any of
886           them matches.'''
887
888        result = self.dictpath(d, path)
889
890        # [] makes no sense as a list of valid values, so treat it as
891        # an actual single value.
892        if isinstance(value, list) and value != []:
893            for v in value:
894                if result == v:
895                    return
896            self.fail('no match for "%s" in %s' % (str(result), str(value)))
897        else:
898            self.assertEqual(result, value,
899                             '"%s" is "%s", expected "%s"'
900                             % (path, str(result), str(value)))
901
902    def assert_no_active_block_jobs(self):
903        result = self.vm.qmp('query-block-jobs')
904        self.assert_qmp(result, 'return', [])
905
906    def assert_has_block_node(self, node_name=None, file_name=None):
907        """Issue a query-named-block-nodes and assert node_name and/or
908        file_name is present in the result"""
909        def check_equal_or_none(a, b):
910            return a is None or b is None or a == b
911        assert node_name or file_name
912        result = self.vm.qmp('query-named-block-nodes')
913        for x in result["return"]:
914            if check_equal_or_none(x.get("node-name"), node_name) and \
915                    check_equal_or_none(x.get("file"), file_name):
916                return
917        self.fail("Cannot find %s %s in result:\n%s" %
918                  (node_name, file_name, result))
919
920    def assert_json_filename_equal(self, json_filename, reference):
921        '''Asserts that the given filename is a json: filename and that its
922           content is equal to the given reference object'''
923        self.assertEqual(json_filename[:5], 'json:')
924        self.assertEqual(
925            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
926            self.vm.flatten_qmp_object(reference)
927        )
928
929    def cancel_and_wait(self, drive='drive0', force=False,
930                        resume=False, wait=60.0):
931        '''Cancel a block job and wait for it to finish, returning the event'''
932        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
933        self.assert_qmp(result, 'return', {})
934
935        if resume:
936            self.vm.resume_drive(drive)
937
938        cancelled = False
939        result = None
940        while not cancelled:
941            for event in self.vm.get_qmp_events(wait=wait):
942                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
943                   event['event'] == 'BLOCK_JOB_CANCELLED':
944                    self.assert_qmp(event, 'data/device', drive)
945                    result = event
946                    cancelled = True
947                elif event['event'] == 'JOB_STATUS_CHANGE':
948                    self.assert_qmp(event, 'data/id', drive)
949
950
951        self.assert_no_active_block_jobs()
952        return result
953
954    def wait_until_completed(self, drive='drive0', check_offset=True,
955                             wait=60.0, error=None):
956        '''Wait for a block job to finish, returning the event'''
957        while True:
958            for event in self.vm.get_qmp_events(wait=wait):
959                if event['event'] == 'BLOCK_JOB_COMPLETED':
960                    self.assert_qmp(event, 'data/device', drive)
961                    if error is None:
962                        self.assert_qmp_absent(event, 'data/error')
963                        if check_offset:
964                            self.assert_qmp(event, 'data/offset',
965                                            event['data']['len'])
966                    else:
967                        self.assert_qmp(event, 'data/error', error)
968                    self.assert_no_active_block_jobs()
969                    return event
970                if event['event'] == 'JOB_STATUS_CHANGE':
971                    self.assert_qmp(event, 'data/id', drive)
972
973    def wait_ready(self, drive='drive0'):
974        """Wait until a BLOCK_JOB_READY event, and return the event."""
975        return self.vm.events_wait([
976            ('BLOCK_JOB_READY',
977             {'data': {'type': 'mirror', 'device': drive}}),
978            ('BLOCK_JOB_READY',
979             {'data': {'type': 'commit', 'device': drive}})
980        ])
981
982    def wait_ready_and_cancel(self, drive='drive0'):
983        self.wait_ready(drive=drive)
984        event = self.cancel_and_wait(drive=drive)
985        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
986        self.assert_qmp(event, 'data/type', 'mirror')
987        self.assert_qmp(event, 'data/offset', event['data']['len'])
988
989    def complete_and_wait(self, drive='drive0', wait_ready=True,
990                          completion_error=None):
991        '''Complete a block job and wait for it to finish'''
992        if wait_ready:
993            self.wait_ready(drive=drive)
994
995        result = self.vm.qmp('block-job-complete', device=drive)
996        self.assert_qmp(result, 'return', {})
997
998        event = self.wait_until_completed(drive=drive, error=completion_error)
999        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1000
1001    def pause_wait(self, job_id='job0'):
1002        with Timeout(3, "Timeout waiting for job to pause"):
1003            while True:
1004                result = self.vm.qmp('query-block-jobs')
1005                found = False
1006                for job in result['return']:
1007                    if job['device'] == job_id:
1008                        found = True
1009                        if job['paused'] and not job['busy']:
1010                            return job
1011                        break
1012                assert found
1013
1014    def pause_job(self, job_id='job0', wait=True):
1015        result = self.vm.qmp('block-job-pause', device=job_id)
1016        self.assert_qmp(result, 'return', {})
1017        if wait:
1018            return self.pause_wait(job_id)
1019        return result
1020
1021    def case_skip(self, reason):
1022        '''Skip this test case'''
1023        case_notrun(reason)
1024        self.skipTest(reason)
1025
1026
1027def notrun(reason):
1028    '''Skip this test suite'''
1029    # Each test in qemu-iotests has a number ("seq")
1030    seq = os.path.basename(sys.argv[0])
1031
1032    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1033    logger.warning("%s not run: %s", seq, reason)
1034    sys.exit(0)
1035
1036def case_notrun(reason):
1037    '''Mark this test case as not having been run (without actually
1038    skipping it, that is left to the caller).  See
1039    QMPTestCase.case_skip() for a variant that actually skips the
1040    current test case.'''
1041
1042    # Each test in qemu-iotests has a number ("seq")
1043    seq = os.path.basename(sys.argv[0])
1044
1045    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1046        '    [case not run] ' + reason + '\n')
1047
1048def _verify_image_format(supported_fmts: Sequence[str] = (),
1049                         unsupported_fmts: Sequence[str] = ()) -> None:
1050    assert not (supported_fmts and unsupported_fmts)
1051
1052    if 'generic' in supported_fmts and \
1053            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1054        # similar to
1055        #   _supported_fmt generic
1056        # for bash tests
1057        if imgfmt == 'luks':
1058            verify_working_luks()
1059        return
1060
1061    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1062    if not_sup or (imgfmt in unsupported_fmts):
1063        notrun('not suitable for this image format: %s' % imgfmt)
1064
1065    if imgfmt == 'luks':
1066        verify_working_luks()
1067
1068def _verify_protocol(supported: Sequence[str] = (),
1069                     unsupported: Sequence[str] = ()) -> None:
1070    assert not (supported and unsupported)
1071
1072    if 'generic' in supported:
1073        return
1074
1075    not_sup = supported and (imgproto not in supported)
1076    if not_sup or (imgproto in unsupported):
1077        notrun('not suitable for this protocol: %s' % imgproto)
1078
1079def _verify_platform(supported: Sequence[str] = (),
1080                     unsupported: Sequence[str] = ()) -> None:
1081    if any((sys.platform.startswith(x) for x in unsupported)):
1082        notrun('not suitable for this OS: %s' % sys.platform)
1083
1084    if supported:
1085        if not any((sys.platform.startswith(x) for x in supported)):
1086            notrun('not suitable for this OS: %s' % sys.platform)
1087
1088def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1089    if supported_cache_modes and (cachemode not in supported_cache_modes):
1090        notrun('not suitable for this cache mode: %s' % cachemode)
1091
1092def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1093    if supported_aio_modes and (aiomode not in supported_aio_modes):
1094        notrun('not suitable for this aio mode: %s' % aiomode)
1095
1096def supports_quorum():
1097    return 'quorum' in qemu_img_pipe('--help')
1098
1099def verify_quorum():
1100    '''Skip test suite if quorum support is not available'''
1101    if not supports_quorum():
1102        notrun('quorum support missing')
1103
1104def has_working_luks() -> Tuple[bool, str]:
1105    """
1106    Check whether our LUKS driver can actually create images
1107    (this extends to LUKS encryption for qcow2).
1108
1109    If not, return the reason why.
1110    """
1111
1112    img_file = f'{test_dir}/luks-test.luks'
1113    (output, status) = \
1114        qemu_img_pipe_and_status('create', '-f', 'luks',
1115                                 '--object', luks_default_secret_object,
1116                                 '-o', luks_default_key_secret_opt,
1117                                 '-o', 'iter-time=10',
1118                                 img_file, '1G')
1119    try:
1120        os.remove(img_file)
1121    except OSError:
1122        pass
1123
1124    if status != 0:
1125        reason = output
1126        for line in output.splitlines():
1127            if img_file + ':' in line:
1128                reason = line.split(img_file + ':', 1)[1].strip()
1129                break
1130
1131        return (False, reason)
1132    else:
1133        return (True, '')
1134
1135def verify_working_luks():
1136    """
1137    Skip test suite if LUKS does not work
1138    """
1139    (working, reason) = has_working_luks()
1140    if not working:
1141        notrun(reason)
1142
1143def qemu_pipe(*args):
1144    """
1145    Run qemu with an option to print something and exit (e.g. a help option).
1146
1147    :return: QEMU's stdout output.
1148    """
1149    args = [qemu_prog] + qemu_opts + list(args)
1150    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1151                            stderr=subprocess.STDOUT,
1152                            universal_newlines=True)
1153    output = subp.communicate()[0]
1154    if subp.returncode < 0:
1155        sys.stderr.write('qemu received signal %i: %s\n' %
1156                         (-subp.returncode, ' '.join(args)))
1157    return output
1158
1159def supported_formats(read_only=False):
1160    '''Set 'read_only' to True to check ro-whitelist
1161       Otherwise, rw-whitelist is checked'''
1162
1163    if not hasattr(supported_formats, "formats"):
1164        supported_formats.formats = {}
1165
1166    if read_only not in supported_formats.formats:
1167        format_message = qemu_pipe("-drive", "format=help")
1168        line = 1 if read_only else 0
1169        supported_formats.formats[read_only] = \
1170            format_message.splitlines()[line].split(":")[1].split()
1171
1172    return supported_formats.formats[read_only]
1173
1174def skip_if_unsupported(required_formats=(), read_only=False):
1175    '''Skip Test Decorator
1176       Runs the test if all the required formats are whitelisted'''
1177    def skip_test_decorator(func):
1178        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1179                         **kwargs: Dict[str, Any]) -> None:
1180            if callable(required_formats):
1181                fmts = required_formats(test_case)
1182            else:
1183                fmts = required_formats
1184
1185            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1186            if usf_list:
1187                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1188                test_case.case_skip(msg)
1189            else:
1190                func(test_case, *args, **kwargs)
1191        return func_wrapper
1192    return skip_test_decorator
1193
1194def skip_for_formats(formats: Sequence[str] = ()) \
1195    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1196                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1197    '''Skip Test Decorator
1198       Skips the test for the given formats'''
1199    def skip_test_decorator(func):
1200        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1201                         **kwargs: Dict[str, Any]) -> None:
1202            if imgfmt in formats:
1203                msg = f'{test_case}: Skipped for format {imgfmt}'
1204                test_case.case_skip(msg)
1205            else:
1206                func(test_case, *args, **kwargs)
1207        return func_wrapper
1208    return skip_test_decorator
1209
1210def skip_if_user_is_root(func):
1211    '''Skip Test Decorator
1212       Runs the test only without root permissions'''
1213    def func_wrapper(*args, **kwargs):
1214        if os.getuid() == 0:
1215            case_notrun('{}: cannot be run as root'.format(args[0]))
1216            return None
1217        else:
1218            return func(*args, **kwargs)
1219    return func_wrapper
1220
1221def execute_unittest(debug=False):
1222    """Executes unittests within the calling module."""
1223
1224    verbosity = 2 if debug else 1
1225
1226    if debug:
1227        output = sys.stdout
1228    else:
1229        # We need to filter out the time taken from the output so that
1230        # qemu-iotest can reliably diff the results against master output.
1231        output = io.StringIO()
1232
1233    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1234                                     verbosity=verbosity)
1235    try:
1236        # unittest.main() will use sys.exit(); so expect a SystemExit
1237        # exception
1238        unittest.main(testRunner=runner)
1239    finally:
1240        # We need to filter out the time taken from the output so that
1241        # qemu-iotest can reliably diff the results against master output.
1242        if not debug:
1243            out = output.getvalue()
1244            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1245
1246            # Hide skipped tests from the reference output
1247            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1248            out_first_line, out_rest = out.split('\n', 1)
1249            out = out_first_line.replace('s', '.') + '\n' + out_rest
1250
1251            sys.stderr.write(out)
1252
1253def execute_setup_common(supported_fmts: Sequence[str] = (),
1254                         supported_platforms: Sequence[str] = (),
1255                         supported_cache_modes: Sequence[str] = (),
1256                         supported_aio_modes: Sequence[str] = (),
1257                         unsupported_fmts: Sequence[str] = (),
1258                         supported_protocols: Sequence[str] = (),
1259                         unsupported_protocols: Sequence[str] = ()) -> bool:
1260    """
1261    Perform necessary setup for either script-style or unittest-style tests.
1262
1263    :return: Bool; Whether or not debug mode has been requested via the CLI.
1264    """
1265    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1266
1267    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1268    # indicate that we're not being run via "check". There may be
1269    # other things set up by "check" that individual test cases rely
1270    # on.
1271    if test_dir is None or qemu_default_machine is None:
1272        sys.stderr.write('Please run this test via the "check" script\n')
1273        sys.exit(os.EX_USAGE)
1274
1275    debug = '-d' in sys.argv
1276    if debug:
1277        sys.argv.remove('-d')
1278    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1279
1280    _verify_image_format(supported_fmts, unsupported_fmts)
1281    _verify_protocol(supported_protocols, unsupported_protocols)
1282    _verify_platform(supported=supported_platforms)
1283    _verify_cache_mode(supported_cache_modes)
1284    _verify_aio_mode(supported_aio_modes)
1285
1286    return debug
1287
1288def execute_test(*args, test_function=None, **kwargs):
1289    """Run either unittest or script-style tests."""
1290
1291    debug = execute_setup_common(*args, **kwargs)
1292    if not test_function:
1293        execute_unittest(debug)
1294    else:
1295        test_function()
1296
1297def activate_logging():
1298    """Activate iotests.log() output to stdout for script-style tests."""
1299    handler = logging.StreamHandler(stream=sys.stdout)
1300    formatter = logging.Formatter('%(message)s')
1301    handler.setFormatter(formatter)
1302    test_logger.addHandler(handler)
1303    test_logger.setLevel(logging.INFO)
1304    test_logger.propagate = False
1305
1306# This is called from script-style iotests without a single point of entry
1307def script_initialize(*args, **kwargs):
1308    """Initialize script-style tests without running any tests."""
1309    activate_logging()
1310    execute_setup_common(*args, **kwargs)
1311
1312# This is called from script-style iotests with a single point of entry
1313def script_main(test_function, *args, **kwargs):
1314    """Run script-style tests outside of the unittest framework"""
1315    activate_logging()
1316    execute_test(*args, test_function=test_function, **kwargs)
1317
1318# This is called from unittest style iotests
1319def main(*args, **kwargs):
1320    """Run tests using the unittest framework"""
1321    execute_test(*args, **kwargs)
1322