xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 3aaebc0cce4a4963f331f45643e17266646411e6)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31from typing import (Any, Callable, Dict, Iterable,
32                    List, Optional, Sequence, Tuple, TypeVar)
33import unittest
34
35# pylint: disable=import-error, wrong-import-position
36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37from qemu import qtest
38from qemu.qmp import QMPMessage
39
40assert sys.version_info >= (3, 6)
41
42# Use this logger for logging messages directly from the iotests module
43logger = logging.getLogger('qemu.iotests')
44logger.addHandler(logging.NullHandler())
45
46# Use this logger for messages that ought to be used for diff output.
47test_logger = logging.getLogger('qemu.iotests.diff_io')
48
49
50faulthandler.enable()
51
52# This will not work if arguments contain spaces but is necessary if we
53# want to support the override options that ./check supports.
54qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55if os.environ.get('QEMU_IMG_OPTIONS'):
56    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
57
58qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59if os.environ.get('QEMU_IO_OPTIONS'):
60    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
61
62qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64    qemu_io_args_no_fmt += \
65        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
66
67qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
68if os.environ.get('QEMU_NBD_OPTIONS'):
69    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
70
71qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
72qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
73
74imgfmt = os.environ.get('IMGFMT', 'raw')
75imgproto = os.environ.get('IMGPROTO', 'file')
76test_dir = os.environ.get('TEST_DIR')
77sock_dir = os.environ.get('SOCK_DIR')
78output_dir = os.environ.get('OUTPUT_DIR', '.')
79cachemode = os.environ.get('CACHEMODE')
80aiomode = os.environ.get('AIOMODE')
81qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
82
83socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
84
85luks_default_secret_object = 'secret,id=keysec0,data=' + \
86                             os.environ.get('IMGKEYSECRET', '')
87luks_default_key_secret_opt = 'key-secret=keysec0'
88
89
90def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
91    """
92    Run qemu-img and return both its output and its exit code
93    """
94    subp = subprocess.Popen(qemu_img_args + list(args),
95                            stdout=subprocess.PIPE,
96                            stderr=subprocess.STDOUT,
97                            universal_newlines=True)
98    output = subp.communicate()[0]
99    if subp.returncode < 0:
100        sys.stderr.write('qemu-img received signal %i: %s\n'
101                         % (-subp.returncode,
102                            ' '.join(qemu_img_args + list(args))))
103    return (output, subp.returncode)
104
105def qemu_img(*args: str) -> int:
106    '''Run qemu-img and return the exit code'''
107    return qemu_img_pipe_and_status(*args)[1]
108
109def ordered_qmp(qmsg, conv_keys=True):
110    # Dictionaries are not ordered prior to 3.6, therefore:
111    if isinstance(qmsg, list):
112        return [ordered_qmp(atom) for atom in qmsg]
113    if isinstance(qmsg, dict):
114        od = OrderedDict()
115        for k, v in sorted(qmsg.items()):
116            if conv_keys:
117                k = k.replace('_', '-')
118            od[k] = ordered_qmp(v, conv_keys=False)
119        return od
120    return qmsg
121
122def qemu_img_create(*args):
123    args = list(args)
124
125    # default luks support
126    if '-f' in args and args[args.index('-f') + 1] == 'luks':
127        if '-o' in args:
128            i = args.index('-o')
129            if 'key-secret' not in args[i + 1]:
130                args[i + 1].append(luks_default_key_secret_opt)
131                args.insert(i + 2, '--object')
132                args.insert(i + 3, luks_default_secret_object)
133        else:
134            args = ['-o', luks_default_key_secret_opt,
135                    '--object', luks_default_secret_object] + args
136
137    args.insert(0, 'create')
138
139    return qemu_img(*args)
140
141def qemu_img_verbose(*args):
142    '''Run qemu-img without suppressing its output and return the exit code'''
143    exitcode = subprocess.call(qemu_img_args + list(args))
144    if exitcode < 0:
145        sys.stderr.write('qemu-img received signal %i: %s\n'
146                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
147    return exitcode
148
149def qemu_img_pipe(*args: str) -> str:
150    '''Run qemu-img and return its output'''
151    return qemu_img_pipe_and_status(*args)[0]
152
153def qemu_img_log(*args):
154    result = qemu_img_pipe(*args)
155    log(result, filters=[filter_testfiles])
156    return result
157
158def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
159    args = ['info']
160    if imgopts:
161        args.append('--image-opts')
162    else:
163        args += ['-f', imgfmt]
164    args += extra_args
165    args.append(filename)
166
167    output = qemu_img_pipe(*args)
168    if not filter_path:
169        filter_path = filename
170    log(filter_img_info(output, filter_path))
171
172def qemu_io(*args):
173    '''Run qemu-io and return the stdout data'''
174    args = qemu_io_args + list(args)
175    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
176                            stderr=subprocess.STDOUT,
177                            universal_newlines=True)
178    output = subp.communicate()[0]
179    if subp.returncode < 0:
180        sys.stderr.write('qemu-io received signal %i: %s\n'
181                         % (-subp.returncode, ' '.join(args)))
182    return output
183
184def qemu_io_log(*args):
185    result = qemu_io(*args)
186    log(result, filters=[filter_testfiles, filter_qemu_io])
187    return result
188
189def qemu_io_silent(*args):
190    '''Run qemu-io and return the exit code, suppressing stdout'''
191    args = qemu_io_args + list(args)
192    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
193    if exitcode < 0:
194        sys.stderr.write('qemu-io received signal %i: %s\n' %
195                         (-exitcode, ' '.join(args)))
196    return exitcode
197
198def qemu_io_silent_check(*args):
199    '''Run qemu-io and return the true if subprocess returned 0'''
200    args = qemu_io_args + list(args)
201    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
202                               stderr=subprocess.STDOUT)
203    return exitcode == 0
204
205def get_virtio_scsi_device():
206    if qemu_default_machine == 's390-ccw-virtio':
207        return 'virtio-scsi-ccw'
208    return 'virtio-scsi-pci'
209
210class QemuIoInteractive:
211    def __init__(self, *args):
212        self.args = qemu_io_args_no_fmt + list(args)
213        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
214                                   stdout=subprocess.PIPE,
215                                   stderr=subprocess.STDOUT,
216                                   universal_newlines=True)
217        out = self._p.stdout.read(9)
218        if out != 'qemu-io> ':
219            # Most probably qemu-io just failed to start.
220            # Let's collect the whole output and exit.
221            out += self._p.stdout.read()
222            self._p.wait(timeout=1)
223            raise ValueError(out)
224
225    def close(self):
226        self._p.communicate('q\n')
227
228    def _read_output(self):
229        pattern = 'qemu-io> '
230        n = len(pattern)
231        pos = 0
232        s = []
233        while pos != n:
234            c = self._p.stdout.read(1)
235            # check unexpected EOF
236            assert c != ''
237            s.append(c)
238            if c == pattern[pos]:
239                pos += 1
240            else:
241                pos = 0
242
243        return ''.join(s[:-n])
244
245    def cmd(self, cmd):
246        # quit command is in close(), '\n' is added automatically
247        assert '\n' not in cmd
248        cmd = cmd.strip()
249        assert cmd not in ('q', 'quit')
250        self._p.stdin.write(cmd + '\n')
251        self._p.stdin.flush()
252        return self._read_output()
253
254
255def qemu_nbd(*args):
256    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
257    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
258
259def qemu_nbd_early_pipe(*args):
260    '''Run qemu-nbd in daemon mode and return both the parent's exit code
261       and its output in case of an error'''
262    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
263                            stdout=subprocess.PIPE,
264                            universal_newlines=True)
265    output = subp.communicate()[0]
266    if subp.returncode < 0:
267        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
268                         (-subp.returncode,
269                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
270
271    return subp.returncode, output if subp.returncode else ''
272
273def qemu_nbd_popen(*args):
274    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
275    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
276
277def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
278    '''Return True if two image files are identical'''
279    return qemu_img('compare', '-f', fmt1,
280                    '-F', fmt2, img1, img2) == 0
281
282def create_image(name, size):
283    '''Create a fully-allocated raw image with sector markers'''
284    file = open(name, 'wb')
285    i = 0
286    while i < size:
287        sector = struct.pack('>l504xl', i // 512, i // 512)
288        file.write(sector)
289        i = i + 512
290    file.close()
291
292def image_size(img):
293    '''Return image's virtual size'''
294    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
295    return json.loads(r)['virtual-size']
296
297def is_str(val):
298    return isinstance(val, str)
299
300test_dir_re = re.compile(r"%s" % test_dir)
301def filter_test_dir(msg):
302    return test_dir_re.sub("TEST_DIR", msg)
303
304win32_re = re.compile(r"\r")
305def filter_win32(msg):
306    return win32_re.sub("", msg)
307
308qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
309                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
310                        r"and [0-9\/.inf]* ops\/sec\)")
311def filter_qemu_io(msg):
312    msg = filter_win32(msg)
313    return qemu_io_re.sub("X ops; XX:XX:XX.X "
314                          "(XXX YYY/sec and XXX ops/sec)", msg)
315
316chown_re = re.compile(r"chown [0-9]+:[0-9]+")
317def filter_chown(msg):
318    return chown_re.sub("chown UID:GID", msg)
319
320def filter_qmp_event(event):
321    '''Filter a QMP event dict'''
322    event = dict(event)
323    if 'timestamp' in event:
324        event['timestamp']['seconds'] = 'SECS'
325        event['timestamp']['microseconds'] = 'USECS'
326    return event
327
328def filter_qmp(qmsg, filter_fn):
329    '''Given a string filter, filter a QMP object's values.
330    filter_fn takes a (key, value) pair.'''
331    # Iterate through either lists or dicts;
332    if isinstance(qmsg, list):
333        items = enumerate(qmsg)
334    else:
335        items = qmsg.items()
336
337    for k, v in items:
338        if isinstance(v, (dict, list)):
339            qmsg[k] = filter_qmp(v, filter_fn)
340        else:
341            qmsg[k] = filter_fn(k, v)
342    return qmsg
343
344def filter_testfiles(msg):
345    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
346    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
347    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
348
349def filter_qmp_testfiles(qmsg):
350    def _filter(_key, value):
351        if is_str(value):
352            return filter_testfiles(value)
353        return value
354    return filter_qmp(qmsg, _filter)
355
356def filter_generated_node_ids(msg):
357    return re.sub("#block[0-9]+", "NODE_NAME", msg)
358
359def filter_img_info(output, filename):
360    lines = []
361    for line in output.split('\n'):
362        if 'disk size' in line or 'actual-size' in line:
363            continue
364        line = line.replace(filename, 'TEST_IMG')
365        line = filter_testfiles(line)
366        line = line.replace(imgfmt, 'IMGFMT')
367        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
368        line = re.sub('uuid: [-a-f0-9]+',
369                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
370                      line)
371        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
372        lines.append(line)
373    return '\n'.join(lines)
374
375def filter_imgfmt(msg):
376    return msg.replace(imgfmt, 'IMGFMT')
377
378def filter_qmp_imgfmt(qmsg):
379    def _filter(_key, value):
380        if is_str(value):
381            return filter_imgfmt(value)
382        return value
383    return filter_qmp(qmsg, _filter)
384
385
386Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
387
388def log(msg: Msg,
389        filters: Iterable[Callable[[Msg], Msg]] = (),
390        indent: Optional[int] = None) -> None:
391    """
392    Logs either a string message or a JSON serializable message (like QMP).
393    If indent is provided, JSON serializable messages are pretty-printed.
394    """
395    for flt in filters:
396        msg = flt(msg)
397    if isinstance(msg, (dict, list)):
398        # Don't sort if it's already sorted
399        do_sort = not isinstance(msg, OrderedDict)
400        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
401    else:
402        test_logger.info(msg)
403
404class Timeout:
405    def __init__(self, seconds, errmsg="Timeout"):
406        self.seconds = seconds
407        self.errmsg = errmsg
408    def __enter__(self):
409        signal.signal(signal.SIGALRM, self.timeout)
410        signal.setitimer(signal.ITIMER_REAL, self.seconds)
411        return self
412    def __exit__(self, exc_type, value, traceback):
413        signal.setitimer(signal.ITIMER_REAL, 0)
414        return False
415    def timeout(self, signum, frame):
416        raise Exception(self.errmsg)
417
418def file_pattern(name):
419    return "{0}-{1}".format(os.getpid(), name)
420
421class FilePaths:
422    """
423    FilePaths is an auto-generated filename that cleans itself up.
424
425    Use this context manager to generate filenames and ensure that the file
426    gets deleted::
427
428        with FilePaths(['test.img']) as img_path:
429            qemu_img('create', img_path, '1G')
430        # migration_sock_path is automatically deleted
431    """
432    def __init__(self, names, base_dir=test_dir):
433        self.paths = []
434        for name in names:
435            self.paths.append(os.path.join(base_dir, file_pattern(name)))
436
437    def __enter__(self):
438        return self.paths
439
440    def __exit__(self, exc_type, exc_val, exc_tb):
441        try:
442            for path in self.paths:
443                os.remove(path)
444        except OSError:
445            pass
446        return False
447
448class FilePath(FilePaths):
449    """
450    FilePath is a specialization of FilePaths that takes a single filename.
451    """
452    def __init__(self, name, base_dir=test_dir):
453        super(FilePath, self).__init__([name], base_dir)
454
455    def __enter__(self):
456        return self.paths[0]
457
458def file_path_remover():
459    for path in reversed(file_path_remover.paths):
460        try:
461            os.remove(path)
462        except OSError:
463            pass
464
465
466def file_path(*names, base_dir=test_dir):
467    ''' Another way to get auto-generated filename that cleans itself up.
468
469    Use is as simple as:
470
471    img_a, img_b = file_path('a.img', 'b.img')
472    sock = file_path('socket')
473    '''
474
475    if not hasattr(file_path_remover, 'paths'):
476        file_path_remover.paths = []
477        atexit.register(file_path_remover)
478
479    paths = []
480    for name in names:
481        filename = file_pattern(name)
482        path = os.path.join(base_dir, filename)
483        file_path_remover.paths.append(path)
484        paths.append(path)
485
486    return paths[0] if len(paths) == 1 else paths
487
488def remote_filename(path):
489    if imgproto == 'file':
490        return path
491    elif imgproto == 'ssh':
492        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
493    else:
494        raise Exception("Protocol %s not supported" % (imgproto))
495
496class VM(qtest.QEMUQtestMachine):
497    '''A QEMU VM'''
498
499    def __init__(self, path_suffix=''):
500        name = "qemu%s-%d" % (path_suffix, os.getpid())
501        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
502                                 test_dir=test_dir,
503                                 socket_scm_helper=socket_scm_helper,
504                                 sock_dir=sock_dir)
505        self._num_drives = 0
506
507    def add_object(self, opts):
508        self._args.append('-object')
509        self._args.append(opts)
510        return self
511
512    def add_device(self, opts):
513        self._args.append('-device')
514        self._args.append(opts)
515        return self
516
517    def add_drive_raw(self, opts):
518        self._args.append('-drive')
519        self._args.append(opts)
520        return self
521
522    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
523        '''Add a virtio-blk drive to the VM'''
524        options = ['if=%s' % interface,
525                   'id=drive%d' % self._num_drives]
526
527        if path is not None:
528            options.append('file=%s' % path)
529            options.append('format=%s' % img_format)
530            options.append('cache=%s' % cachemode)
531            options.append('aio=%s' % aiomode)
532
533        if opts:
534            options.append(opts)
535
536        if img_format == 'luks' and 'key-secret' not in opts:
537            # default luks support
538            if luks_default_secret_object not in self._args:
539                self.add_object(luks_default_secret_object)
540
541            options.append(luks_default_key_secret_opt)
542
543        self._args.append('-drive')
544        self._args.append(','.join(options))
545        self._num_drives += 1
546        return self
547
548    def add_blockdev(self, opts):
549        self._args.append('-blockdev')
550        if isinstance(opts, str):
551            self._args.append(opts)
552        else:
553            self._args.append(','.join(opts))
554        return self
555
556    def add_incoming(self, addr):
557        self._args.append('-incoming')
558        self._args.append(addr)
559        return self
560
561    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
562        cmd = 'human-monitor-command'
563        kwargs = {'command-line': command_line}
564        if use_log:
565            return self.qmp_log(cmd, **kwargs)
566        else:
567            return self.qmp(cmd, **kwargs)
568
569    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
570        """Pause drive r/w operations"""
571        if not event:
572            self.pause_drive(drive, "read_aio")
573            self.pause_drive(drive, "write_aio")
574            return
575        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
576
577    def resume_drive(self, drive: str) -> None:
578        """Resume drive r/w operations"""
579        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
580
581    def hmp_qemu_io(self, drive: str, cmd: str,
582                    use_log: bool = False) -> QMPMessage:
583        """Write to a given drive using an HMP command"""
584        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
585
586    def flatten_qmp_object(self, obj, output=None, basestr=''):
587        if output is None:
588            output = dict()
589        if isinstance(obj, list):
590            for i, item in enumerate(obj):
591                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
592        elif isinstance(obj, dict):
593            for key in obj:
594                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
595        else:
596            output[basestr[:-1]] = obj # Strip trailing '.'
597        return output
598
599    def qmp_to_opts(self, obj):
600        obj = self.flatten_qmp_object(obj)
601        output_list = list()
602        for key in obj:
603            output_list += [key + '=' + obj[key]]
604        return ','.join(output_list)
605
606    def get_qmp_events_filtered(self, wait=60.0):
607        result = []
608        for ev in self.get_qmp_events(wait=wait):
609            result.append(filter_qmp_event(ev))
610        return result
611
612    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
613        full_cmd = OrderedDict((
614            ("execute", cmd),
615            ("arguments", ordered_qmp(kwargs))
616        ))
617        log(full_cmd, filters, indent=indent)
618        result = self.qmp(cmd, **kwargs)
619        log(result, filters, indent=indent)
620        return result
621
622    # Returns None on success, and an error string on failure
623    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
624                pre_finalize=None, cancel=False, wait=60.0):
625        """
626        run_job moves a job from creation through to dismissal.
627
628        :param job: String. ID of recently-launched job
629        :param auto_finalize: Bool. True if the job was launched with
630                              auto_finalize. Defaults to True.
631        :param auto_dismiss: Bool. True if the job was launched with
632                             auto_dismiss=True. Defaults to False.
633        :param pre_finalize: Callback. A callable that takes no arguments to be
634                             invoked prior to issuing job-finalize, if any.
635        :param cancel: Bool. When true, cancels the job after the pre_finalize
636                       callback.
637        :param wait: Float. Timeout value specifying how long to wait for any
638                     event, in seconds. Defaults to 60.0.
639        """
640        match_device = {'data': {'device': job}}
641        match_id = {'data': {'id': job}}
642        events = [
643            ('BLOCK_JOB_COMPLETED', match_device),
644            ('BLOCK_JOB_CANCELLED', match_device),
645            ('BLOCK_JOB_ERROR', match_device),
646            ('BLOCK_JOB_READY', match_device),
647            ('BLOCK_JOB_PENDING', match_id),
648            ('JOB_STATUS_CHANGE', match_id)
649        ]
650        error = None
651        while True:
652            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
653            if ev['event'] != 'JOB_STATUS_CHANGE':
654                log(ev)
655                continue
656            status = ev['data']['status']
657            if status == 'aborting':
658                result = self.qmp('query-jobs')
659                for j in result['return']:
660                    if j['id'] == job:
661                        error = j['error']
662                        log('Job failed: %s' % (j['error']))
663            elif status == 'ready':
664                self.qmp_log('job-complete', id=job)
665            elif status == 'pending' and not auto_finalize:
666                if pre_finalize:
667                    pre_finalize()
668                if cancel:
669                    self.qmp_log('job-cancel', id=job)
670                else:
671                    self.qmp_log('job-finalize', id=job)
672            elif status == 'concluded' and not auto_dismiss:
673                self.qmp_log('job-dismiss', id=job)
674            elif status == 'null':
675                return error
676
677    # Returns None on success, and an error string on failure
678    def blockdev_create(self, options, job_id='job0', filters=None):
679        if filters is None:
680            filters = [filter_qmp_testfiles]
681        result = self.qmp_log('blockdev-create', filters=filters,
682                              job_id=job_id, options=options)
683
684        if 'return' in result:
685            assert result['return'] == {}
686            job_result = self.run_job(job_id)
687        else:
688            job_result = result['error']
689
690        log("")
691        return job_result
692
693    def enable_migration_events(self, name):
694        log('Enabling migration QMP events on %s...' % name)
695        log(self.qmp('migrate-set-capabilities', capabilities=[
696            {
697                'capability': 'events',
698                'state': True
699            }
700        ]))
701
702    def wait_migration(self, expect_runstate):
703        while True:
704            event = self.event_wait('MIGRATION')
705            log(event, filters=[filter_qmp_event])
706            if event['data']['status'] == 'completed':
707                break
708        # The event may occur in finish-migrate, so wait for the expected
709        # post-migration runstate
710        while self.qmp('query-status')['return']['status'] != expect_runstate:
711            pass
712
713    def node_info(self, node_name):
714        nodes = self.qmp('query-named-block-nodes')
715        for x in nodes['return']:
716            if x['node-name'] == node_name:
717                return x
718        return None
719
720    def query_bitmaps(self):
721        res = self.qmp("query-named-block-nodes")
722        return {device['node-name']: device['dirty-bitmaps']
723                for device in res['return'] if 'dirty-bitmaps' in device}
724
725    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
726        """
727        get a specific bitmap from the object returned by query_bitmaps.
728        :param recording: If specified, filter results by the specified value.
729        :param bitmaps: If specified, use it instead of call query_bitmaps()
730        """
731        if bitmaps is None:
732            bitmaps = self.query_bitmaps()
733
734        for bitmap in bitmaps[node_name]:
735            if bitmap.get('name', '') == bitmap_name:
736                if recording is None or bitmap.get('recording') == recording:
737                    return bitmap
738        return None
739
740    def check_bitmap_status(self, node_name, bitmap_name, fields):
741        ret = self.get_bitmap(node_name, bitmap_name)
742
743        return fields.items() <= ret.items()
744
745    def assert_block_path(self, root, path, expected_node, graph=None):
746        """
747        Check whether the node under the given path in the block graph
748        is @expected_node.
749
750        @root is the node name of the node where the @path is rooted.
751
752        @path is a string that consists of child names separated by
753        slashes.  It must begin with a slash.
754
755        Examples for @root + @path:
756          - root="qcow2-node", path="/backing/file"
757          - root="quorum-node", path="/children.2/file"
758
759        Hypothetically, @path could be empty, in which case it would
760        point to @root.  However, in practice this case is not useful
761        and hence not allowed.
762
763        @expected_node may be None.  (All elements of the path but the
764        leaf must still exist.)
765
766        @graph may be None or the result of an x-debug-query-block-graph
767        call that has already been performed.
768        """
769        if graph is None:
770            graph = self.qmp('x-debug-query-block-graph')['return']
771
772        iter_path = iter(path.split('/'))
773
774        # Must start with a /
775        assert next(iter_path) == ''
776
777        node = next((node for node in graph['nodes'] if node['name'] == root),
778                    None)
779
780        # An empty @path is not allowed, so the root node must be present
781        assert node is not None, 'Root node %s not found' % root
782
783        for child_name in iter_path:
784            assert node is not None, 'Cannot follow path %s%s' % (root, path)
785
786            try:
787                node_id = next(edge['child'] for edge in graph['edges']
788                               if (edge['parent'] == node['id'] and
789                                   edge['name'] == child_name))
790
791                node = next(node for node in graph['nodes']
792                            if node['id'] == node_id)
793
794            except StopIteration:
795                node = None
796
797        if node is None:
798            assert expected_node is None, \
799                   'No node found under %s (but expected %s)' % \
800                   (path, expected_node)
801        else:
802            assert node['name'] == expected_node, \
803                   'Found node %s under %s (but expected %s)' % \
804                   (node['name'], path, expected_node)
805
806index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
807
808class QMPTestCase(unittest.TestCase):
809    '''Abstract base class for QMP test cases'''
810
811    def __init__(self, *args, **kwargs):
812        super().__init__(*args, **kwargs)
813        # Many users of this class set a VM property we rely on heavily
814        # in the methods below.
815        self.vm = None
816
817    def dictpath(self, d, path):
818        '''Traverse a path in a nested dict'''
819        for component in path.split('/'):
820            m = index_re.match(component)
821            if m:
822                component, idx = m.groups()
823                idx = int(idx)
824
825            if not isinstance(d, dict) or component not in d:
826                self.fail(f'failed path traversal for "{path}" in "{d}"')
827            d = d[component]
828
829            if m:
830                if not isinstance(d, list):
831                    self.fail(f'path component "{component}" in "{path}" '
832                              f'is not a list in "{d}"')
833                try:
834                    d = d[idx]
835                except IndexError:
836                    self.fail(f'invalid index "{idx}" in path "{path}" '
837                              f'in "{d}"')
838        return d
839
840    def assert_qmp_absent(self, d, path):
841        try:
842            result = self.dictpath(d, path)
843        except AssertionError:
844            return
845        self.fail('path "%s" has value "%s"' % (path, str(result)))
846
847    def assert_qmp(self, d, path, value):
848        '''Assert that the value for a specific path in a QMP dict
849           matches.  When given a list of values, assert that any of
850           them matches.'''
851
852        result = self.dictpath(d, path)
853
854        # [] makes no sense as a list of valid values, so treat it as
855        # an actual single value.
856        if isinstance(value, list) and value != []:
857            for v in value:
858                if result == v:
859                    return
860            self.fail('no match for "%s" in %s' % (str(result), str(value)))
861        else:
862            self.assertEqual(result, value,
863                             '"%s" is "%s", expected "%s"'
864                             % (path, str(result), str(value)))
865
866    def assert_no_active_block_jobs(self):
867        result = self.vm.qmp('query-block-jobs')
868        self.assert_qmp(result, 'return', [])
869
870    def assert_has_block_node(self, node_name=None, file_name=None):
871        """Issue a query-named-block-nodes and assert node_name and/or
872        file_name is present in the result"""
873        def check_equal_or_none(a, b):
874            return a is None or b is None or a == b
875        assert node_name or file_name
876        result = self.vm.qmp('query-named-block-nodes')
877        for x in result["return"]:
878            if check_equal_or_none(x.get("node-name"), node_name) and \
879                    check_equal_or_none(x.get("file"), file_name):
880                return
881        self.fail("Cannot find %s %s in result:\n%s" %
882                  (node_name, file_name, result))
883
884    def assert_json_filename_equal(self, json_filename, reference):
885        '''Asserts that the given filename is a json: filename and that its
886           content is equal to the given reference object'''
887        self.assertEqual(json_filename[:5], 'json:')
888        self.assertEqual(
889            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
890            self.vm.flatten_qmp_object(reference)
891        )
892
893    def cancel_and_wait(self, drive='drive0', force=False,
894                        resume=False, wait=60.0):
895        '''Cancel a block job and wait for it to finish, returning the event'''
896        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
897        self.assert_qmp(result, 'return', {})
898
899        if resume:
900            self.vm.resume_drive(drive)
901
902        cancelled = False
903        result = None
904        while not cancelled:
905            for event in self.vm.get_qmp_events(wait=wait):
906                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
907                   event['event'] == 'BLOCK_JOB_CANCELLED':
908                    self.assert_qmp(event, 'data/device', drive)
909                    result = event
910                    cancelled = True
911                elif event['event'] == 'JOB_STATUS_CHANGE':
912                    self.assert_qmp(event, 'data/id', drive)
913
914
915        self.assert_no_active_block_jobs()
916        return result
917
918    def wait_until_completed(self, drive='drive0', check_offset=True,
919                             wait=60.0, error=None):
920        '''Wait for a block job to finish, returning the event'''
921        while True:
922            for event in self.vm.get_qmp_events(wait=wait):
923                if event['event'] == 'BLOCK_JOB_COMPLETED':
924                    self.assert_qmp(event, 'data/device', drive)
925                    if error is None:
926                        self.assert_qmp_absent(event, 'data/error')
927                        if check_offset:
928                            self.assert_qmp(event, 'data/offset',
929                                            event['data']['len'])
930                    else:
931                        self.assert_qmp(event, 'data/error', error)
932                    self.assert_no_active_block_jobs()
933                    return event
934                if event['event'] == 'JOB_STATUS_CHANGE':
935                    self.assert_qmp(event, 'data/id', drive)
936
937    def wait_ready(self, drive='drive0'):
938        """Wait until a BLOCK_JOB_READY event, and return the event."""
939        f = {'data': {'type': 'mirror', 'device': drive}}
940        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
941
942    def wait_ready_and_cancel(self, drive='drive0'):
943        self.wait_ready(drive=drive)
944        event = self.cancel_and_wait(drive=drive)
945        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
946        self.assert_qmp(event, 'data/type', 'mirror')
947        self.assert_qmp(event, 'data/offset', event['data']['len'])
948
949    def complete_and_wait(self, drive='drive0', wait_ready=True,
950                          completion_error=None):
951        '''Complete a block job and wait for it to finish'''
952        if wait_ready:
953            self.wait_ready(drive=drive)
954
955        result = self.vm.qmp('block-job-complete', device=drive)
956        self.assert_qmp(result, 'return', {})
957
958        event = self.wait_until_completed(drive=drive, error=completion_error)
959        self.assert_qmp(event, 'data/type', 'mirror')
960
961    def pause_wait(self, job_id='job0'):
962        with Timeout(3, "Timeout waiting for job to pause"):
963            while True:
964                result = self.vm.qmp('query-block-jobs')
965                found = False
966                for job in result['return']:
967                    if job['device'] == job_id:
968                        found = True
969                        if job['paused'] and not job['busy']:
970                            return job
971                        break
972                assert found
973
974    def pause_job(self, job_id='job0', wait=True):
975        result = self.vm.qmp('block-job-pause', device=job_id)
976        self.assert_qmp(result, 'return', {})
977        if wait:
978            return self.pause_wait(job_id)
979        return result
980
981    def case_skip(self, reason):
982        '''Skip this test case'''
983        case_notrun(reason)
984        self.skipTest(reason)
985
986
987def notrun(reason):
988    '''Skip this test suite'''
989    # Each test in qemu-iotests has a number ("seq")
990    seq = os.path.basename(sys.argv[0])
991
992    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
993    logger.warning("%s not run: %s", seq, reason)
994    sys.exit(0)
995
996def case_notrun(reason):
997    '''Mark this test case as not having been run (without actually
998    skipping it, that is left to the caller).  See
999    QMPTestCase.case_skip() for a variant that actually skips the
1000    current test case.'''
1001
1002    # Each test in qemu-iotests has a number ("seq")
1003    seq = os.path.basename(sys.argv[0])
1004
1005    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1006        '    [case not run] ' + reason + '\n')
1007
1008def _verify_image_format(supported_fmts: Sequence[str] = (),
1009                         unsupported_fmts: Sequence[str] = ()) -> None:
1010    assert not (supported_fmts and unsupported_fmts)
1011
1012    if 'generic' in supported_fmts and \
1013            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1014        # similar to
1015        #   _supported_fmt generic
1016        # for bash tests
1017        if imgfmt == 'luks':
1018            verify_working_luks()
1019        return
1020
1021    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1022    if not_sup or (imgfmt in unsupported_fmts):
1023        notrun('not suitable for this image format: %s' % imgfmt)
1024
1025    if imgfmt == 'luks':
1026        verify_working_luks()
1027
1028def _verify_protocol(supported: Sequence[str] = (),
1029                     unsupported: Sequence[str] = ()) -> None:
1030    assert not (supported and unsupported)
1031
1032    if 'generic' in supported:
1033        return
1034
1035    not_sup = supported and (imgproto not in supported)
1036    if not_sup or (imgproto in unsupported):
1037        notrun('not suitable for this protocol: %s' % imgproto)
1038
1039def _verify_platform(supported: Sequence[str] = (),
1040                     unsupported: Sequence[str] = ()) -> None:
1041    if any((sys.platform.startswith(x) for x in unsupported)):
1042        notrun('not suitable for this OS: %s' % sys.platform)
1043
1044    if supported:
1045        if not any((sys.platform.startswith(x) for x in supported)):
1046            notrun('not suitable for this OS: %s' % sys.platform)
1047
1048def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1049    if supported_cache_modes and (cachemode not in supported_cache_modes):
1050        notrun('not suitable for this cache mode: %s' % cachemode)
1051
1052def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1053    if supported_aio_modes and (aiomode not in supported_aio_modes):
1054        notrun('not suitable for this aio mode: %s' % aiomode)
1055
1056def supports_quorum():
1057    return 'quorum' in qemu_img_pipe('--help')
1058
1059def verify_quorum():
1060    '''Skip test suite if quorum support is not available'''
1061    if not supports_quorum():
1062        notrun('quorum support missing')
1063
1064def has_working_luks() -> Tuple[bool, str]:
1065    """
1066    Check whether our LUKS driver can actually create images
1067    (this extends to LUKS encryption for qcow2).
1068
1069    If not, return the reason why.
1070    """
1071
1072    img_file = f'{test_dir}/luks-test.luks'
1073    (output, status) = \
1074        qemu_img_pipe_and_status('create', '-f', 'luks',
1075                                 '--object', luks_default_secret_object,
1076                                 '-o', luks_default_key_secret_opt,
1077                                 '-o', 'iter-time=10',
1078                                 img_file, '1G')
1079    try:
1080        os.remove(img_file)
1081    except OSError:
1082        pass
1083
1084    if status != 0:
1085        reason = output
1086        for line in output.splitlines():
1087            if img_file + ':' in line:
1088                reason = line.split(img_file + ':', 1)[1].strip()
1089                break
1090
1091        return (False, reason)
1092    else:
1093        return (True, '')
1094
1095def verify_working_luks():
1096    """
1097    Skip test suite if LUKS does not work
1098    """
1099    (working, reason) = has_working_luks()
1100    if not working:
1101        notrun(reason)
1102
1103def qemu_pipe(*args):
1104    """
1105    Run qemu with an option to print something and exit (e.g. a help option).
1106
1107    :return: QEMU's stdout output.
1108    """
1109    args = [qemu_prog] + qemu_opts + list(args)
1110    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1111                            stderr=subprocess.STDOUT,
1112                            universal_newlines=True)
1113    output = subp.communicate()[0]
1114    if subp.returncode < 0:
1115        sys.stderr.write('qemu received signal %i: %s\n' %
1116                         (-subp.returncode, ' '.join(args)))
1117    return output
1118
1119def supported_formats(read_only=False):
1120    '''Set 'read_only' to True to check ro-whitelist
1121       Otherwise, rw-whitelist is checked'''
1122
1123    if not hasattr(supported_formats, "formats"):
1124        supported_formats.formats = {}
1125
1126    if read_only not in supported_formats.formats:
1127        format_message = qemu_pipe("-drive", "format=help")
1128        line = 1 if read_only else 0
1129        supported_formats.formats[read_only] = \
1130            format_message.splitlines()[line].split(":")[1].split()
1131
1132    return supported_formats.formats[read_only]
1133
1134def skip_if_unsupported(required_formats=(), read_only=False):
1135    '''Skip Test Decorator
1136       Runs the test if all the required formats are whitelisted'''
1137    def skip_test_decorator(func):
1138        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1139                         **kwargs: Dict[str, Any]) -> None:
1140            if callable(required_formats):
1141                fmts = required_formats(test_case)
1142            else:
1143                fmts = required_formats
1144
1145            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1146            if usf_list:
1147                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1148                test_case.case_skip(msg)
1149            else:
1150                func(test_case, *args, **kwargs)
1151        return func_wrapper
1152    return skip_test_decorator
1153
1154def skip_for_formats(formats: Sequence[str] = ()) \
1155    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1156                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1157    '''Skip Test Decorator
1158       Skips the test for the given formats'''
1159    def skip_test_decorator(func):
1160        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1161                         **kwargs: Dict[str, Any]) -> None:
1162            if imgfmt in formats:
1163                msg = f'{test_case}: Skipped for format {imgfmt}'
1164                test_case.case_skip(msg)
1165            else:
1166                func(test_case, *args, **kwargs)
1167        return func_wrapper
1168    return skip_test_decorator
1169
1170def skip_if_user_is_root(func):
1171    '''Skip Test Decorator
1172       Runs the test only without root permissions'''
1173    def func_wrapper(*args, **kwargs):
1174        if os.getuid() == 0:
1175            case_notrun('{}: cannot be run as root'.format(args[0]))
1176            return None
1177        else:
1178            return func(*args, **kwargs)
1179    return func_wrapper
1180
1181def execute_unittest(debug=False):
1182    """Executes unittests within the calling module."""
1183
1184    verbosity = 2 if debug else 1
1185
1186    if debug:
1187        output = sys.stdout
1188    else:
1189        # We need to filter out the time taken from the output so that
1190        # qemu-iotest can reliably diff the results against master output.
1191        output = io.StringIO()
1192
1193    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1194                                     verbosity=verbosity)
1195    try:
1196        # unittest.main() will use sys.exit(); so expect a SystemExit
1197        # exception
1198        unittest.main(testRunner=runner)
1199    finally:
1200        # We need to filter out the time taken from the output so that
1201        # qemu-iotest can reliably diff the results against master output.
1202        if not debug:
1203            out = output.getvalue()
1204            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1205
1206            # Hide skipped tests from the reference output
1207            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1208            out_first_line, out_rest = out.split('\n', 1)
1209            out = out_first_line.replace('s', '.') + '\n' + out_rest
1210
1211            sys.stderr.write(out)
1212
1213def execute_setup_common(supported_fmts: Sequence[str] = (),
1214                         supported_platforms: Sequence[str] = (),
1215                         supported_cache_modes: Sequence[str] = (),
1216                         supported_aio_modes: Sequence[str] = (),
1217                         unsupported_fmts: Sequence[str] = (),
1218                         supported_protocols: Sequence[str] = (),
1219                         unsupported_protocols: Sequence[str] = ()) -> bool:
1220    """
1221    Perform necessary setup for either script-style or unittest-style tests.
1222
1223    :return: Bool; Whether or not debug mode has been requested via the CLI.
1224    """
1225    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1226
1227    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1228    # indicate that we're not being run via "check". There may be
1229    # other things set up by "check" that individual test cases rely
1230    # on.
1231    if test_dir is None or qemu_default_machine is None:
1232        sys.stderr.write('Please run this test via the "check" script\n')
1233        sys.exit(os.EX_USAGE)
1234
1235    debug = '-d' in sys.argv
1236    if debug:
1237        sys.argv.remove('-d')
1238    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1239
1240    _verify_image_format(supported_fmts, unsupported_fmts)
1241    _verify_protocol(supported_protocols, unsupported_protocols)
1242    _verify_platform(supported=supported_platforms)
1243    _verify_cache_mode(supported_cache_modes)
1244    _verify_aio_mode(supported_aio_modes)
1245
1246    return debug
1247
1248def execute_test(*args, test_function=None, **kwargs):
1249    """Run either unittest or script-style tests."""
1250
1251    debug = execute_setup_common(*args, **kwargs)
1252    if not test_function:
1253        execute_unittest(debug)
1254    else:
1255        test_function()
1256
1257def activate_logging():
1258    """Activate iotests.log() output to stdout for script-style tests."""
1259    handler = logging.StreamHandler(stream=sys.stdout)
1260    formatter = logging.Formatter('%(message)s')
1261    handler.setFormatter(formatter)
1262    test_logger.addHandler(handler)
1263    test_logger.setLevel(logging.INFO)
1264    test_logger.propagate = False
1265
1266# This is called from script-style iotests without a single point of entry
1267def script_initialize(*args, **kwargs):
1268    """Initialize script-style tests without running any tests."""
1269    activate_logging()
1270    execute_setup_common(*args, **kwargs)
1271
1272# This is called from script-style iotests with a single point of entry
1273def script_main(test_function, *args, **kwargs):
1274    """Run script-style tests outside of the unittest framework"""
1275    activate_logging()
1276    execute_test(*args, test_function=test_function, **kwargs)
1277
1278# This is called from unittest style iotests
1279def main(*args, **kwargs):
1280    """Run tests using the unittest framework"""
1281    execute_test(*args, **kwargs)
1282