xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision c27c1cc3)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43assert sys.version_info >= (3, 6)
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_measure(*args):
145    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
146
147def qemu_img_check(*args):
148    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
149
150def qemu_img_verbose(*args):
151    '''Run qemu-img without suppressing its output and return the exit code'''
152    exitcode = subprocess.call(qemu_img_args + list(args))
153    if exitcode < 0:
154        sys.stderr.write('qemu-img received signal %i: %s\n'
155                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
156    return exitcode
157
158def qemu_img_pipe(*args: str) -> str:
159    '''Run qemu-img and return its output'''
160    return qemu_img_pipe_and_status(*args)[0]
161
162def qemu_img_log(*args):
163    result = qemu_img_pipe(*args)
164    log(result, filters=[filter_testfiles])
165    return result
166
167def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
168    args = ['info']
169    if imgopts:
170        args.append('--image-opts')
171    else:
172        args += ['-f', imgfmt]
173    args += extra_args
174    args.append(filename)
175
176    output = qemu_img_pipe(*args)
177    if not filter_path:
178        filter_path = filename
179    log(filter_img_info(output, filter_path))
180
181def qemu_io(*args):
182    '''Run qemu-io and return the stdout data'''
183    args = qemu_io_args + list(args)
184    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
185                            stderr=subprocess.STDOUT,
186                            universal_newlines=True)
187    output = subp.communicate()[0]
188    if subp.returncode < 0:
189        sys.stderr.write('qemu-io received signal %i: %s\n'
190                         % (-subp.returncode, ' '.join(args)))
191    return output
192
193def qemu_io_log(*args):
194    result = qemu_io(*args)
195    log(result, filters=[filter_testfiles, filter_qemu_io])
196    return result
197
198def qemu_io_silent(*args):
199    '''Run qemu-io and return the exit code, suppressing stdout'''
200    args = qemu_io_args + list(args)
201    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
202    if exitcode < 0:
203        sys.stderr.write('qemu-io received signal %i: %s\n' %
204                         (-exitcode, ' '.join(args)))
205    return exitcode
206
207def qemu_io_silent_check(*args):
208    '''Run qemu-io and return the true if subprocess returned 0'''
209    args = qemu_io_args + list(args)
210    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
211                               stderr=subprocess.STDOUT)
212    return exitcode == 0
213
214def get_virtio_scsi_device():
215    if qemu_default_machine == 's390-ccw-virtio':
216        return 'virtio-scsi-ccw'
217    return 'virtio-scsi-pci'
218
219class QemuIoInteractive:
220    def __init__(self, *args):
221        self.args = qemu_io_args_no_fmt + list(args)
222        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
223                                   stdout=subprocess.PIPE,
224                                   stderr=subprocess.STDOUT,
225                                   universal_newlines=True)
226        out = self._p.stdout.read(9)
227        if out != 'qemu-io> ':
228            # Most probably qemu-io just failed to start.
229            # Let's collect the whole output and exit.
230            out += self._p.stdout.read()
231            self._p.wait(timeout=1)
232            raise ValueError(out)
233
234    def close(self):
235        self._p.communicate('q\n')
236
237    def _read_output(self):
238        pattern = 'qemu-io> '
239        n = len(pattern)
240        pos = 0
241        s = []
242        while pos != n:
243            c = self._p.stdout.read(1)
244            # check unexpected EOF
245            assert c != ''
246            s.append(c)
247            if c == pattern[pos]:
248                pos += 1
249            else:
250                pos = 0
251
252        return ''.join(s[:-n])
253
254    def cmd(self, cmd):
255        # quit command is in close(), '\n' is added automatically
256        assert '\n' not in cmd
257        cmd = cmd.strip()
258        assert cmd not in ('q', 'quit')
259        self._p.stdin.write(cmd + '\n')
260        self._p.stdin.flush()
261        return self._read_output()
262
263
264def qemu_nbd(*args):
265    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
266    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
267
268def qemu_nbd_early_pipe(*args):
269    '''Run qemu-nbd in daemon mode and return both the parent's exit code
270       and its output in case of an error'''
271    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
272                            stdout=subprocess.PIPE,
273                            universal_newlines=True)
274    output = subp.communicate()[0]
275    if subp.returncode < 0:
276        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
277                         (-subp.returncode,
278                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
279
280    return subp.returncode, output if subp.returncode else ''
281
282@contextmanager
283def qemu_nbd_popen(*args):
284    '''Context manager running qemu-nbd within the context'''
285    pid_file = file_path("pid")
286
287    cmd = list(qemu_nbd_args)
288    cmd.extend(('--persistent', '--pid-file', pid_file))
289    cmd.extend(args)
290
291    log('Start NBD server')
292    p = subprocess.Popen(cmd)
293    try:
294        while not os.path.exists(pid_file):
295            if p.poll() is not None:
296                raise RuntimeError(
297                    "qemu-nbd terminated with exit code {}: {}"
298                    .format(p.returncode, ' '.join(cmd)))
299
300            time.sleep(0.01)
301        yield
302    finally:
303        log('Kill NBD server')
304        p.kill()
305        p.wait()
306
307def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
308    '''Return True if two image files are identical'''
309    return qemu_img('compare', '-f', fmt1,
310                    '-F', fmt2, img1, img2) == 0
311
312def create_image(name, size):
313    '''Create a fully-allocated raw image with sector markers'''
314    file = open(name, 'wb')
315    i = 0
316    while i < size:
317        sector = struct.pack('>l504xl', i // 512, i // 512)
318        file.write(sector)
319        i = i + 512
320    file.close()
321
322def image_size(img):
323    '''Return image's virtual size'''
324    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
325    return json.loads(r)['virtual-size']
326
327def is_str(val):
328    return isinstance(val, str)
329
330test_dir_re = re.compile(r"%s" % test_dir)
331def filter_test_dir(msg):
332    return test_dir_re.sub("TEST_DIR", msg)
333
334win32_re = re.compile(r"\r")
335def filter_win32(msg):
336    return win32_re.sub("", msg)
337
338qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
339                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
340                        r"and [0-9\/.inf]* ops\/sec\)")
341def filter_qemu_io(msg):
342    msg = filter_win32(msg)
343    return qemu_io_re.sub("X ops; XX:XX:XX.X "
344                          "(XXX YYY/sec and XXX ops/sec)", msg)
345
346chown_re = re.compile(r"chown [0-9]+:[0-9]+")
347def filter_chown(msg):
348    return chown_re.sub("chown UID:GID", msg)
349
350def filter_qmp_event(event):
351    '''Filter a QMP event dict'''
352    event = dict(event)
353    if 'timestamp' in event:
354        event['timestamp']['seconds'] = 'SECS'
355        event['timestamp']['microseconds'] = 'USECS'
356    return event
357
358def filter_qmp(qmsg, filter_fn):
359    '''Given a string filter, filter a QMP object's values.
360    filter_fn takes a (key, value) pair.'''
361    # Iterate through either lists or dicts;
362    if isinstance(qmsg, list):
363        items = enumerate(qmsg)
364    else:
365        items = qmsg.items()
366
367    for k, v in items:
368        if isinstance(v, (dict, list)):
369            qmsg[k] = filter_qmp(v, filter_fn)
370        else:
371            qmsg[k] = filter_fn(k, v)
372    return qmsg
373
374def filter_testfiles(msg):
375    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
376    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
377    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
378
379def filter_qmp_testfiles(qmsg):
380    def _filter(_key, value):
381        if is_str(value):
382            return filter_testfiles(value)
383        return value
384    return filter_qmp(qmsg, _filter)
385
386def filter_generated_node_ids(msg):
387    return re.sub("#block[0-9]+", "NODE_NAME", msg)
388
389def filter_img_info(output, filename):
390    lines = []
391    for line in output.split('\n'):
392        if 'disk size' in line or 'actual-size' in line:
393            continue
394        line = line.replace(filename, 'TEST_IMG')
395        line = filter_testfiles(line)
396        line = line.replace(imgfmt, 'IMGFMT')
397        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
398        line = re.sub('uuid: [-a-f0-9]+',
399                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
400                      line)
401        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
402        lines.append(line)
403    return '\n'.join(lines)
404
405def filter_imgfmt(msg):
406    return msg.replace(imgfmt, 'IMGFMT')
407
408def filter_qmp_imgfmt(qmsg):
409    def _filter(_key, value):
410        if is_str(value):
411            return filter_imgfmt(value)
412        return value
413    return filter_qmp(qmsg, _filter)
414
415
416Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
417
418def log(msg: Msg,
419        filters: Iterable[Callable[[Msg], Msg]] = (),
420        indent: Optional[int] = None) -> None:
421    """
422    Logs either a string message or a JSON serializable message (like QMP).
423    If indent is provided, JSON serializable messages are pretty-printed.
424    """
425    for flt in filters:
426        msg = flt(msg)
427    if isinstance(msg, (dict, list)):
428        # Don't sort if it's already sorted
429        do_sort = not isinstance(msg, OrderedDict)
430        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
431    else:
432        test_logger.info(msg)
433
434class Timeout:
435    def __init__(self, seconds, errmsg="Timeout"):
436        self.seconds = seconds
437        self.errmsg = errmsg
438    def __enter__(self):
439        signal.signal(signal.SIGALRM, self.timeout)
440        signal.setitimer(signal.ITIMER_REAL, self.seconds)
441        return self
442    def __exit__(self, exc_type, value, traceback):
443        signal.setitimer(signal.ITIMER_REAL, 0)
444        return False
445    def timeout(self, signum, frame):
446        raise Exception(self.errmsg)
447
448def file_pattern(name):
449    return "{0}-{1}".format(os.getpid(), name)
450
451class FilePaths:
452    """
453    FilePaths is an auto-generated filename that cleans itself up.
454
455    Use this context manager to generate filenames and ensure that the file
456    gets deleted::
457
458        with FilePaths(['test.img']) as img_path:
459            qemu_img('create', img_path, '1G')
460        # migration_sock_path is automatically deleted
461    """
462    def __init__(self, names, base_dir=test_dir):
463        self.paths = []
464        for name in names:
465            self.paths.append(os.path.join(base_dir, file_pattern(name)))
466
467    def __enter__(self):
468        return self.paths
469
470    def __exit__(self, exc_type, exc_val, exc_tb):
471        try:
472            for path in self.paths:
473                os.remove(path)
474        except OSError:
475            pass
476        return False
477
478class FilePath(FilePaths):
479    """
480    FilePath is a specialization of FilePaths that takes a single filename.
481    """
482    def __init__(self, name, base_dir=test_dir):
483        super(FilePath, self).__init__([name], base_dir)
484
485    def __enter__(self):
486        return self.paths[0]
487
488def file_path_remover():
489    for path in reversed(file_path_remover.paths):
490        try:
491            os.remove(path)
492        except OSError:
493            pass
494
495
496def file_path(*names, base_dir=test_dir):
497    ''' Another way to get auto-generated filename that cleans itself up.
498
499    Use is as simple as:
500
501    img_a, img_b = file_path('a.img', 'b.img')
502    sock = file_path('socket')
503    '''
504
505    if not hasattr(file_path_remover, 'paths'):
506        file_path_remover.paths = []
507        atexit.register(file_path_remover)
508
509    paths = []
510    for name in names:
511        filename = file_pattern(name)
512        path = os.path.join(base_dir, filename)
513        file_path_remover.paths.append(path)
514        paths.append(path)
515
516    return paths[0] if len(paths) == 1 else paths
517
518def remote_filename(path):
519    if imgproto == 'file':
520        return path
521    elif imgproto == 'ssh':
522        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
523    else:
524        raise Exception("Protocol %s not supported" % (imgproto))
525
526class VM(qtest.QEMUQtestMachine):
527    '''A QEMU VM'''
528
529    def __init__(self, path_suffix=''):
530        name = "qemu%s-%d" % (path_suffix, os.getpid())
531        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
532                                 test_dir=test_dir,
533                                 socket_scm_helper=socket_scm_helper,
534                                 sock_dir=sock_dir)
535        self._num_drives = 0
536
537    def add_object(self, opts):
538        self._args.append('-object')
539        self._args.append(opts)
540        return self
541
542    def add_device(self, opts):
543        self._args.append('-device')
544        self._args.append(opts)
545        return self
546
547    def add_drive_raw(self, opts):
548        self._args.append('-drive')
549        self._args.append(opts)
550        return self
551
552    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
553        '''Add a virtio-blk drive to the VM'''
554        options = ['if=%s' % interface,
555                   'id=drive%d' % self._num_drives]
556
557        if path is not None:
558            options.append('file=%s' % path)
559            options.append('format=%s' % img_format)
560            options.append('cache=%s' % cachemode)
561            options.append('aio=%s' % aiomode)
562
563        if opts:
564            options.append(opts)
565
566        if img_format == 'luks' and 'key-secret' not in opts:
567            # default luks support
568            if luks_default_secret_object not in self._args:
569                self.add_object(luks_default_secret_object)
570
571            options.append(luks_default_key_secret_opt)
572
573        self._args.append('-drive')
574        self._args.append(','.join(options))
575        self._num_drives += 1
576        return self
577
578    def add_blockdev(self, opts):
579        self._args.append('-blockdev')
580        if isinstance(opts, str):
581            self._args.append(opts)
582        else:
583            self._args.append(','.join(opts))
584        return self
585
586    def add_incoming(self, addr):
587        self._args.append('-incoming')
588        self._args.append(addr)
589        return self
590
591    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
592        cmd = 'human-monitor-command'
593        kwargs = {'command-line': command_line}
594        if use_log:
595            return self.qmp_log(cmd, **kwargs)
596        else:
597            return self.qmp(cmd, **kwargs)
598
599    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
600        """Pause drive r/w operations"""
601        if not event:
602            self.pause_drive(drive, "read_aio")
603            self.pause_drive(drive, "write_aio")
604            return
605        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
606
607    def resume_drive(self, drive: str) -> None:
608        """Resume drive r/w operations"""
609        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
610
611    def hmp_qemu_io(self, drive: str, cmd: str,
612                    use_log: bool = False) -> QMPMessage:
613        """Write to a given drive using an HMP command"""
614        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
615
616    def flatten_qmp_object(self, obj, output=None, basestr=''):
617        if output is None:
618            output = dict()
619        if isinstance(obj, list):
620            for i, item in enumerate(obj):
621                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
622        elif isinstance(obj, dict):
623            for key in obj:
624                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
625        else:
626            output[basestr[:-1]] = obj # Strip trailing '.'
627        return output
628
629    def qmp_to_opts(self, obj):
630        obj = self.flatten_qmp_object(obj)
631        output_list = list()
632        for key in obj:
633            output_list += [key + '=' + obj[key]]
634        return ','.join(output_list)
635
636    def get_qmp_events_filtered(self, wait=60.0):
637        result = []
638        for ev in self.get_qmp_events(wait=wait):
639            result.append(filter_qmp_event(ev))
640        return result
641
642    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
643        full_cmd = OrderedDict((
644            ("execute", cmd),
645            ("arguments", ordered_qmp(kwargs))
646        ))
647        log(full_cmd, filters, indent=indent)
648        result = self.qmp(cmd, **kwargs)
649        log(result, filters, indent=indent)
650        return result
651
652    # Returns None on success, and an error string on failure
653    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
654                pre_finalize=None, cancel=False, wait=60.0):
655        """
656        run_job moves a job from creation through to dismissal.
657
658        :param job: String. ID of recently-launched job
659        :param auto_finalize: Bool. True if the job was launched with
660                              auto_finalize. Defaults to True.
661        :param auto_dismiss: Bool. True if the job was launched with
662                             auto_dismiss=True. Defaults to False.
663        :param pre_finalize: Callback. A callable that takes no arguments to be
664                             invoked prior to issuing job-finalize, if any.
665        :param cancel: Bool. When true, cancels the job after the pre_finalize
666                       callback.
667        :param wait: Float. Timeout value specifying how long to wait for any
668                     event, in seconds. Defaults to 60.0.
669        """
670        match_device = {'data': {'device': job}}
671        match_id = {'data': {'id': job}}
672        events = [
673            ('BLOCK_JOB_COMPLETED', match_device),
674            ('BLOCK_JOB_CANCELLED', match_device),
675            ('BLOCK_JOB_ERROR', match_device),
676            ('BLOCK_JOB_READY', match_device),
677            ('BLOCK_JOB_PENDING', match_id),
678            ('JOB_STATUS_CHANGE', match_id)
679        ]
680        error = None
681        while True:
682            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
683            if ev['event'] != 'JOB_STATUS_CHANGE':
684                log(ev)
685                continue
686            status = ev['data']['status']
687            if status == 'aborting':
688                result = self.qmp('query-jobs')
689                for j in result['return']:
690                    if j['id'] == job:
691                        error = j['error']
692                        log('Job failed: %s' % (j['error']))
693            elif status == 'ready':
694                self.qmp_log('job-complete', id=job)
695            elif status == 'pending' and not auto_finalize:
696                if pre_finalize:
697                    pre_finalize()
698                if cancel:
699                    self.qmp_log('job-cancel', id=job)
700                else:
701                    self.qmp_log('job-finalize', id=job)
702            elif status == 'concluded' and not auto_dismiss:
703                self.qmp_log('job-dismiss', id=job)
704            elif status == 'null':
705                return error
706
707    # Returns None on success, and an error string on failure
708    def blockdev_create(self, options, job_id='job0', filters=None):
709        if filters is None:
710            filters = [filter_qmp_testfiles]
711        result = self.qmp_log('blockdev-create', filters=filters,
712                              job_id=job_id, options=options)
713
714        if 'return' in result:
715            assert result['return'] == {}
716            job_result = self.run_job(job_id)
717        else:
718            job_result = result['error']
719
720        log("")
721        return job_result
722
723    def enable_migration_events(self, name):
724        log('Enabling migration QMP events on %s...' % name)
725        log(self.qmp('migrate-set-capabilities', capabilities=[
726            {
727                'capability': 'events',
728                'state': True
729            }
730        ]))
731
732    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
733        while True:
734            event = self.event_wait('MIGRATION')
735            log(event, filters=[filter_qmp_event])
736            if event['data']['status'] in ('completed', 'failed'):
737                break
738
739        if event['data']['status'] == 'completed':
740            # The event may occur in finish-migrate, so wait for the expected
741            # post-migration runstate
742            runstate = None
743            while runstate != expect_runstate:
744                runstate = self.qmp('query-status')['return']['status']
745            return True
746        else:
747            return False
748
749    def node_info(self, node_name):
750        nodes = self.qmp('query-named-block-nodes')
751        for x in nodes['return']:
752            if x['node-name'] == node_name:
753                return x
754        return None
755
756    def query_bitmaps(self):
757        res = self.qmp("query-named-block-nodes")
758        return {device['node-name']: device['dirty-bitmaps']
759                for device in res['return'] if 'dirty-bitmaps' in device}
760
761    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
762        """
763        get a specific bitmap from the object returned by query_bitmaps.
764        :param recording: If specified, filter results by the specified value.
765        :param bitmaps: If specified, use it instead of call query_bitmaps()
766        """
767        if bitmaps is None:
768            bitmaps = self.query_bitmaps()
769
770        for bitmap in bitmaps[node_name]:
771            if bitmap.get('name', '') == bitmap_name:
772                if recording is None or bitmap.get('recording') == recording:
773                    return bitmap
774        return None
775
776    def check_bitmap_status(self, node_name, bitmap_name, fields):
777        ret = self.get_bitmap(node_name, bitmap_name)
778
779        return fields.items() <= ret.items()
780
781    def assert_block_path(self, root, path, expected_node, graph=None):
782        """
783        Check whether the node under the given path in the block graph
784        is @expected_node.
785
786        @root is the node name of the node where the @path is rooted.
787
788        @path is a string that consists of child names separated by
789        slashes.  It must begin with a slash.
790
791        Examples for @root + @path:
792          - root="qcow2-node", path="/backing/file"
793          - root="quorum-node", path="/children.2/file"
794
795        Hypothetically, @path could be empty, in which case it would
796        point to @root.  However, in practice this case is not useful
797        and hence not allowed.
798
799        @expected_node may be None.  (All elements of the path but the
800        leaf must still exist.)
801
802        @graph may be None or the result of an x-debug-query-block-graph
803        call that has already been performed.
804        """
805        if graph is None:
806            graph = self.qmp('x-debug-query-block-graph')['return']
807
808        iter_path = iter(path.split('/'))
809
810        # Must start with a /
811        assert next(iter_path) == ''
812
813        node = next((node for node in graph['nodes'] if node['name'] == root),
814                    None)
815
816        # An empty @path is not allowed, so the root node must be present
817        assert node is not None, 'Root node %s not found' % root
818
819        for child_name in iter_path:
820            assert node is not None, 'Cannot follow path %s%s' % (root, path)
821
822            try:
823                node_id = next(edge['child'] for edge in graph['edges']
824                               if (edge['parent'] == node['id'] and
825                                   edge['name'] == child_name))
826
827                node = next(node for node in graph['nodes']
828                            if node['id'] == node_id)
829
830            except StopIteration:
831                node = None
832
833        if node is None:
834            assert expected_node is None, \
835                   'No node found under %s (but expected %s)' % \
836                   (path, expected_node)
837        else:
838            assert node['name'] == expected_node, \
839                   'Found node %s under %s (but expected %s)' % \
840                   (node['name'], path, expected_node)
841
842index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
843
844class QMPTestCase(unittest.TestCase):
845    '''Abstract base class for QMP test cases'''
846
847    def __init__(self, *args, **kwargs):
848        super().__init__(*args, **kwargs)
849        # Many users of this class set a VM property we rely on heavily
850        # in the methods below.
851        self.vm = None
852
853    def dictpath(self, d, path):
854        '''Traverse a path in a nested dict'''
855        for component in path.split('/'):
856            m = index_re.match(component)
857            if m:
858                component, idx = m.groups()
859                idx = int(idx)
860
861            if not isinstance(d, dict) or component not in d:
862                self.fail(f'failed path traversal for "{path}" in "{d}"')
863            d = d[component]
864
865            if m:
866                if not isinstance(d, list):
867                    self.fail(f'path component "{component}" in "{path}" '
868                              f'is not a list in "{d}"')
869                try:
870                    d = d[idx]
871                except IndexError:
872                    self.fail(f'invalid index "{idx}" in path "{path}" '
873                              f'in "{d}"')
874        return d
875
876    def assert_qmp_absent(self, d, path):
877        try:
878            result = self.dictpath(d, path)
879        except AssertionError:
880            return
881        self.fail('path "%s" has value "%s"' % (path, str(result)))
882
883    def assert_qmp(self, d, path, value):
884        '''Assert that the value for a specific path in a QMP dict
885           matches.  When given a list of values, assert that any of
886           them matches.'''
887
888        result = self.dictpath(d, path)
889
890        # [] makes no sense as a list of valid values, so treat it as
891        # an actual single value.
892        if isinstance(value, list) and value != []:
893            for v in value:
894                if result == v:
895                    return
896            self.fail('no match for "%s" in %s' % (str(result), str(value)))
897        else:
898            self.assertEqual(result, value,
899                             '"%s" is "%s", expected "%s"'
900                             % (path, str(result), str(value)))
901
902    def assert_no_active_block_jobs(self):
903        result = self.vm.qmp('query-block-jobs')
904        self.assert_qmp(result, 'return', [])
905
906    def assert_has_block_node(self, node_name=None, file_name=None):
907        """Issue a query-named-block-nodes and assert node_name and/or
908        file_name is present in the result"""
909        def check_equal_or_none(a, b):
910            return a is None or b is None or a == b
911        assert node_name or file_name
912        result = self.vm.qmp('query-named-block-nodes')
913        for x in result["return"]:
914            if check_equal_or_none(x.get("node-name"), node_name) and \
915                    check_equal_or_none(x.get("file"), file_name):
916                return
917        self.fail("Cannot find %s %s in result:\n%s" %
918                  (node_name, file_name, result))
919
920    def assert_json_filename_equal(self, json_filename, reference):
921        '''Asserts that the given filename is a json: filename and that its
922           content is equal to the given reference object'''
923        self.assertEqual(json_filename[:5], 'json:')
924        self.assertEqual(
925            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
926            self.vm.flatten_qmp_object(reference)
927        )
928
929    def cancel_and_wait(self, drive='drive0', force=False,
930                        resume=False, wait=60.0):
931        '''Cancel a block job and wait for it to finish, returning the event'''
932        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
933        self.assert_qmp(result, 'return', {})
934
935        if resume:
936            self.vm.resume_drive(drive)
937
938        cancelled = False
939        result = None
940        while not cancelled:
941            for event in self.vm.get_qmp_events(wait=wait):
942                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
943                   event['event'] == 'BLOCK_JOB_CANCELLED':
944                    self.assert_qmp(event, 'data/device', drive)
945                    result = event
946                    cancelled = True
947                elif event['event'] == 'JOB_STATUS_CHANGE':
948                    self.assert_qmp(event, 'data/id', drive)
949
950
951        self.assert_no_active_block_jobs()
952        return result
953
954    def wait_until_completed(self, drive='drive0', check_offset=True,
955                             wait=60.0, error=None):
956        '''Wait for a block job to finish, returning the event'''
957        while True:
958            for event in self.vm.get_qmp_events(wait=wait):
959                if event['event'] == 'BLOCK_JOB_COMPLETED':
960                    self.assert_qmp(event, 'data/device', drive)
961                    if error is None:
962                        self.assert_qmp_absent(event, 'data/error')
963                        if check_offset:
964                            self.assert_qmp(event, 'data/offset',
965                                            event['data']['len'])
966                    else:
967                        self.assert_qmp(event, 'data/error', error)
968                    self.assert_no_active_block_jobs()
969                    return event
970                if event['event'] == 'JOB_STATUS_CHANGE':
971                    self.assert_qmp(event, 'data/id', drive)
972
973    def wait_ready(self, drive='drive0'):
974        """Wait until a BLOCK_JOB_READY event, and return the event."""
975        f = {'data': {'type': 'mirror', 'device': drive}}
976        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
977
978    def wait_ready_and_cancel(self, drive='drive0'):
979        self.wait_ready(drive=drive)
980        event = self.cancel_and_wait(drive=drive)
981        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
982        self.assert_qmp(event, 'data/type', 'mirror')
983        self.assert_qmp(event, 'data/offset', event['data']['len'])
984
985    def complete_and_wait(self, drive='drive0', wait_ready=True,
986                          completion_error=None):
987        '''Complete a block job and wait for it to finish'''
988        if wait_ready:
989            self.wait_ready(drive=drive)
990
991        result = self.vm.qmp('block-job-complete', device=drive)
992        self.assert_qmp(result, 'return', {})
993
994        event = self.wait_until_completed(drive=drive, error=completion_error)
995        self.assert_qmp(event, 'data/type', 'mirror')
996
997    def pause_wait(self, job_id='job0'):
998        with Timeout(3, "Timeout waiting for job to pause"):
999            while True:
1000                result = self.vm.qmp('query-block-jobs')
1001                found = False
1002                for job in result['return']:
1003                    if job['device'] == job_id:
1004                        found = True
1005                        if job['paused'] and not job['busy']:
1006                            return job
1007                        break
1008                assert found
1009
1010    def pause_job(self, job_id='job0', wait=True):
1011        result = self.vm.qmp('block-job-pause', device=job_id)
1012        self.assert_qmp(result, 'return', {})
1013        if wait:
1014            return self.pause_wait(job_id)
1015        return result
1016
1017    def case_skip(self, reason):
1018        '''Skip this test case'''
1019        case_notrun(reason)
1020        self.skipTest(reason)
1021
1022
1023def notrun(reason):
1024    '''Skip this test suite'''
1025    # Each test in qemu-iotests has a number ("seq")
1026    seq = os.path.basename(sys.argv[0])
1027
1028    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1029    logger.warning("%s not run: %s", seq, reason)
1030    sys.exit(0)
1031
1032def case_notrun(reason):
1033    '''Mark this test case as not having been run (without actually
1034    skipping it, that is left to the caller).  See
1035    QMPTestCase.case_skip() for a variant that actually skips the
1036    current test case.'''
1037
1038    # Each test in qemu-iotests has a number ("seq")
1039    seq = os.path.basename(sys.argv[0])
1040
1041    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1042        '    [case not run] ' + reason + '\n')
1043
1044def _verify_image_format(supported_fmts: Sequence[str] = (),
1045                         unsupported_fmts: Sequence[str] = ()) -> None:
1046    assert not (supported_fmts and unsupported_fmts)
1047
1048    if 'generic' in supported_fmts and \
1049            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1050        # similar to
1051        #   _supported_fmt generic
1052        # for bash tests
1053        if imgfmt == 'luks':
1054            verify_working_luks()
1055        return
1056
1057    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1058    if not_sup or (imgfmt in unsupported_fmts):
1059        notrun('not suitable for this image format: %s' % imgfmt)
1060
1061    if imgfmt == 'luks':
1062        verify_working_luks()
1063
1064def _verify_protocol(supported: Sequence[str] = (),
1065                     unsupported: Sequence[str] = ()) -> None:
1066    assert not (supported and unsupported)
1067
1068    if 'generic' in supported:
1069        return
1070
1071    not_sup = supported and (imgproto not in supported)
1072    if not_sup or (imgproto in unsupported):
1073        notrun('not suitable for this protocol: %s' % imgproto)
1074
1075def _verify_platform(supported: Sequence[str] = (),
1076                     unsupported: Sequence[str] = ()) -> None:
1077    if any((sys.platform.startswith(x) for x in unsupported)):
1078        notrun('not suitable for this OS: %s' % sys.platform)
1079
1080    if supported:
1081        if not any((sys.platform.startswith(x) for x in supported)):
1082            notrun('not suitable for this OS: %s' % sys.platform)
1083
1084def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1085    if supported_cache_modes and (cachemode not in supported_cache_modes):
1086        notrun('not suitable for this cache mode: %s' % cachemode)
1087
1088def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1089    if supported_aio_modes and (aiomode not in supported_aio_modes):
1090        notrun('not suitable for this aio mode: %s' % aiomode)
1091
1092def supports_quorum():
1093    return 'quorum' in qemu_img_pipe('--help')
1094
1095def verify_quorum():
1096    '''Skip test suite if quorum support is not available'''
1097    if not supports_quorum():
1098        notrun('quorum support missing')
1099
1100def has_working_luks() -> Tuple[bool, str]:
1101    """
1102    Check whether our LUKS driver can actually create images
1103    (this extends to LUKS encryption for qcow2).
1104
1105    If not, return the reason why.
1106    """
1107
1108    img_file = f'{test_dir}/luks-test.luks'
1109    (output, status) = \
1110        qemu_img_pipe_and_status('create', '-f', 'luks',
1111                                 '--object', luks_default_secret_object,
1112                                 '-o', luks_default_key_secret_opt,
1113                                 '-o', 'iter-time=10',
1114                                 img_file, '1G')
1115    try:
1116        os.remove(img_file)
1117    except OSError:
1118        pass
1119
1120    if status != 0:
1121        reason = output
1122        for line in output.splitlines():
1123            if img_file + ':' in line:
1124                reason = line.split(img_file + ':', 1)[1].strip()
1125                break
1126
1127        return (False, reason)
1128    else:
1129        return (True, '')
1130
1131def verify_working_luks():
1132    """
1133    Skip test suite if LUKS does not work
1134    """
1135    (working, reason) = has_working_luks()
1136    if not working:
1137        notrun(reason)
1138
1139def qemu_pipe(*args):
1140    """
1141    Run qemu with an option to print something and exit (e.g. a help option).
1142
1143    :return: QEMU's stdout output.
1144    """
1145    args = [qemu_prog] + qemu_opts + list(args)
1146    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1147                            stderr=subprocess.STDOUT,
1148                            universal_newlines=True)
1149    output = subp.communicate()[0]
1150    if subp.returncode < 0:
1151        sys.stderr.write('qemu received signal %i: %s\n' %
1152                         (-subp.returncode, ' '.join(args)))
1153    return output
1154
1155def supported_formats(read_only=False):
1156    '''Set 'read_only' to True to check ro-whitelist
1157       Otherwise, rw-whitelist is checked'''
1158
1159    if not hasattr(supported_formats, "formats"):
1160        supported_formats.formats = {}
1161
1162    if read_only not in supported_formats.formats:
1163        format_message = qemu_pipe("-drive", "format=help")
1164        line = 1 if read_only else 0
1165        supported_formats.formats[read_only] = \
1166            format_message.splitlines()[line].split(":")[1].split()
1167
1168    return supported_formats.formats[read_only]
1169
1170def skip_if_unsupported(required_formats=(), read_only=False):
1171    '''Skip Test Decorator
1172       Runs the test if all the required formats are whitelisted'''
1173    def skip_test_decorator(func):
1174        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1175                         **kwargs: Dict[str, Any]) -> None:
1176            if callable(required_formats):
1177                fmts = required_formats(test_case)
1178            else:
1179                fmts = required_formats
1180
1181            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1182            if usf_list:
1183                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1184                test_case.case_skip(msg)
1185            else:
1186                func(test_case, *args, **kwargs)
1187        return func_wrapper
1188    return skip_test_decorator
1189
1190def skip_for_formats(formats: Sequence[str] = ()) \
1191    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1192                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1193    '''Skip Test Decorator
1194       Skips the test for the given formats'''
1195    def skip_test_decorator(func):
1196        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1197                         **kwargs: Dict[str, Any]) -> None:
1198            if imgfmt in formats:
1199                msg = f'{test_case}: Skipped for format {imgfmt}'
1200                test_case.case_skip(msg)
1201            else:
1202                func(test_case, *args, **kwargs)
1203        return func_wrapper
1204    return skip_test_decorator
1205
1206def skip_if_user_is_root(func):
1207    '''Skip Test Decorator
1208       Runs the test only without root permissions'''
1209    def func_wrapper(*args, **kwargs):
1210        if os.getuid() == 0:
1211            case_notrun('{}: cannot be run as root'.format(args[0]))
1212            return None
1213        else:
1214            return func(*args, **kwargs)
1215    return func_wrapper
1216
1217def execute_unittest(debug=False):
1218    """Executes unittests within the calling module."""
1219
1220    verbosity = 2 if debug else 1
1221
1222    if debug:
1223        output = sys.stdout
1224    else:
1225        # We need to filter out the time taken from the output so that
1226        # qemu-iotest can reliably diff the results against master output.
1227        output = io.StringIO()
1228
1229    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1230                                     verbosity=verbosity)
1231    try:
1232        # unittest.main() will use sys.exit(); so expect a SystemExit
1233        # exception
1234        unittest.main(testRunner=runner)
1235    finally:
1236        # We need to filter out the time taken from the output so that
1237        # qemu-iotest can reliably diff the results against master output.
1238        if not debug:
1239            out = output.getvalue()
1240            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1241
1242            # Hide skipped tests from the reference output
1243            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1244            out_first_line, out_rest = out.split('\n', 1)
1245            out = out_first_line.replace('s', '.') + '\n' + out_rest
1246
1247            sys.stderr.write(out)
1248
1249def execute_setup_common(supported_fmts: Sequence[str] = (),
1250                         supported_platforms: Sequence[str] = (),
1251                         supported_cache_modes: Sequence[str] = (),
1252                         supported_aio_modes: Sequence[str] = (),
1253                         unsupported_fmts: Sequence[str] = (),
1254                         supported_protocols: Sequence[str] = (),
1255                         unsupported_protocols: Sequence[str] = ()) -> bool:
1256    """
1257    Perform necessary setup for either script-style or unittest-style tests.
1258
1259    :return: Bool; Whether or not debug mode has been requested via the CLI.
1260    """
1261    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1262
1263    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1264    # indicate that we're not being run via "check". There may be
1265    # other things set up by "check" that individual test cases rely
1266    # on.
1267    if test_dir is None or qemu_default_machine is None:
1268        sys.stderr.write('Please run this test via the "check" script\n')
1269        sys.exit(os.EX_USAGE)
1270
1271    debug = '-d' in sys.argv
1272    if debug:
1273        sys.argv.remove('-d')
1274    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1275
1276    _verify_image_format(supported_fmts, unsupported_fmts)
1277    _verify_protocol(supported_protocols, unsupported_protocols)
1278    _verify_platform(supported=supported_platforms)
1279    _verify_cache_mode(supported_cache_modes)
1280    _verify_aio_mode(supported_aio_modes)
1281
1282    return debug
1283
1284def execute_test(*args, test_function=None, **kwargs):
1285    """Run either unittest or script-style tests."""
1286
1287    debug = execute_setup_common(*args, **kwargs)
1288    if not test_function:
1289        execute_unittest(debug)
1290    else:
1291        test_function()
1292
1293def activate_logging():
1294    """Activate iotests.log() output to stdout for script-style tests."""
1295    handler = logging.StreamHandler(stream=sys.stdout)
1296    formatter = logging.Formatter('%(message)s')
1297    handler.setFormatter(formatter)
1298    test_logger.addHandler(handler)
1299    test_logger.setLevel(logging.INFO)
1300    test_logger.propagate = False
1301
1302# This is called from script-style iotests without a single point of entry
1303def script_initialize(*args, **kwargs):
1304    """Initialize script-style tests without running any tests."""
1305    activate_logging()
1306    execute_setup_common(*args, **kwargs)
1307
1308# This is called from script-style iotests with a single point of entry
1309def script_main(test_function, *args, **kwargs):
1310    """Run script-style tests outside of the unittest framework"""
1311    activate_logging()
1312    execute_test(*args, test_function=test_function, **kwargs)
1313
1314# This is called from unittest style iotests
1315def main(*args, **kwargs):
1316    """Run tests using the unittest framework"""
1317    execute_test(*args, **kwargs)
1318