xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision f343346b)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
69if os.environ.get('QEMU_NBD_OPTIONS'):
70    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71
72qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
73qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74
75imgfmt = os.environ.get('IMGFMT', 'raw')
76imgproto = os.environ.get('IMGPROTO', 'file')
77test_dir = os.environ.get('TEST_DIR')
78sock_dir = os.environ.get('SOCK_DIR')
79output_dir = os.environ.get('OUTPUT_DIR', '.')
80cachemode = os.environ.get('CACHEMODE')
81aiomode = os.environ.get('AIOMODE')
82qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
83
84socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
85
86luks_default_secret_object = 'secret,id=keysec0,data=' + \
87                             os.environ.get('IMGKEYSECRET', '')
88luks_default_key_secret_opt = 'key-secret=keysec0'
89
90
91def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
92    """
93    Run qemu-img and return both its output and its exit code
94    """
95    subp = subprocess.Popen(qemu_img_args + list(args),
96                            stdout=subprocess.PIPE,
97                            stderr=subprocess.STDOUT,
98                            universal_newlines=True)
99    output = subp.communicate()[0]
100    if subp.returncode < 0:
101        sys.stderr.write('qemu-img received signal %i: %s\n'
102                         % (-subp.returncode,
103                            ' '.join(qemu_img_args + list(args))))
104    return (output, subp.returncode)
105
106def qemu_img(*args: str) -> int:
107    '''Run qemu-img and return the exit code'''
108    return qemu_img_pipe_and_status(*args)[1]
109
110def ordered_qmp(qmsg, conv_keys=True):
111    # Dictionaries are not ordered prior to 3.6, therefore:
112    if isinstance(qmsg, list):
113        return [ordered_qmp(atom) for atom in qmsg]
114    if isinstance(qmsg, dict):
115        od = OrderedDict()
116        for k, v in sorted(qmsg.items()):
117            if conv_keys:
118                k = k.replace('_', '-')
119            od[k] = ordered_qmp(v, conv_keys=False)
120        return od
121    return qmsg
122
123def qemu_img_create(*args):
124    args = list(args)
125
126    # default luks support
127    if '-f' in args and args[args.index('-f') + 1] == 'luks':
128        if '-o' in args:
129            i = args.index('-o')
130            if 'key-secret' not in args[i + 1]:
131                args[i + 1].append(luks_default_key_secret_opt)
132                args.insert(i + 2, '--object')
133                args.insert(i + 3, luks_default_secret_object)
134        else:
135            args = ['-o', luks_default_key_secret_opt,
136                    '--object', luks_default_secret_object] + args
137
138    args.insert(0, 'create')
139
140    return qemu_img(*args)
141
142def qemu_img_measure(*args):
143    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
144
145def qemu_img_check(*args):
146    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
147
148def qemu_img_verbose(*args):
149    '''Run qemu-img without suppressing its output and return the exit code'''
150    exitcode = subprocess.call(qemu_img_args + list(args))
151    if exitcode < 0:
152        sys.stderr.write('qemu-img received signal %i: %s\n'
153                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
154    return exitcode
155
156def qemu_img_pipe(*args: str) -> str:
157    '''Run qemu-img and return its output'''
158    return qemu_img_pipe_and_status(*args)[0]
159
160def qemu_img_log(*args):
161    result = qemu_img_pipe(*args)
162    log(result, filters=[filter_testfiles])
163    return result
164
165def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
166    args = ['info']
167    if imgopts:
168        args.append('--image-opts')
169    else:
170        args += ['-f', imgfmt]
171    args += extra_args
172    args.append(filename)
173
174    output = qemu_img_pipe(*args)
175    if not filter_path:
176        filter_path = filename
177    log(filter_img_info(output, filter_path))
178
179def qemu_io(*args):
180    '''Run qemu-io and return the stdout data'''
181    args = qemu_io_args + list(args)
182    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
183                            stderr=subprocess.STDOUT,
184                            universal_newlines=True)
185    output = subp.communicate()[0]
186    if subp.returncode < 0:
187        sys.stderr.write('qemu-io received signal %i: %s\n'
188                         % (-subp.returncode, ' '.join(args)))
189    return output
190
191def qemu_io_log(*args):
192    result = qemu_io(*args)
193    log(result, filters=[filter_testfiles, filter_qemu_io])
194    return result
195
196def qemu_io_silent(*args):
197    '''Run qemu-io and return the exit code, suppressing stdout'''
198    args = qemu_io_args + list(args)
199    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
200    if exitcode < 0:
201        sys.stderr.write('qemu-io received signal %i: %s\n' %
202                         (-exitcode, ' '.join(args)))
203    return exitcode
204
205def qemu_io_silent_check(*args):
206    '''Run qemu-io and return the true if subprocess returned 0'''
207    args = qemu_io_args + list(args)
208    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
209                               stderr=subprocess.STDOUT)
210    return exitcode == 0
211
212def get_virtio_scsi_device():
213    if qemu_default_machine == 's390-ccw-virtio':
214        return 'virtio-scsi-ccw'
215    return 'virtio-scsi-pci'
216
217class QemuIoInteractive:
218    def __init__(self, *args):
219        self.args = qemu_io_args_no_fmt + list(args)
220        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
221                                   stdout=subprocess.PIPE,
222                                   stderr=subprocess.STDOUT,
223                                   universal_newlines=True)
224        out = self._p.stdout.read(9)
225        if out != 'qemu-io> ':
226            # Most probably qemu-io just failed to start.
227            # Let's collect the whole output and exit.
228            out += self._p.stdout.read()
229            self._p.wait(timeout=1)
230            raise ValueError(out)
231
232    def close(self):
233        self._p.communicate('q\n')
234
235    def _read_output(self):
236        pattern = 'qemu-io> '
237        n = len(pattern)
238        pos = 0
239        s = []
240        while pos != n:
241            c = self._p.stdout.read(1)
242            # check unexpected EOF
243            assert c != ''
244            s.append(c)
245            if c == pattern[pos]:
246                pos += 1
247            else:
248                pos = 0
249
250        return ''.join(s[:-n])
251
252    def cmd(self, cmd):
253        # quit command is in close(), '\n' is added automatically
254        assert '\n' not in cmd
255        cmd = cmd.strip()
256        assert cmd not in ('q', 'quit')
257        self._p.stdin.write(cmd + '\n')
258        self._p.stdin.flush()
259        return self._read_output()
260
261
262def qemu_nbd(*args):
263    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
264    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
265
266def qemu_nbd_early_pipe(*args):
267    '''Run qemu-nbd in daemon mode and return both the parent's exit code
268       and its output in case of an error'''
269    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
270                            stdout=subprocess.PIPE,
271                            universal_newlines=True)
272    output = subp.communicate()[0]
273    if subp.returncode < 0:
274        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
275                         (-subp.returncode,
276                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
277
278    return subp.returncode, output if subp.returncode else ''
279
280@contextmanager
281def qemu_nbd_popen(*args):
282    '''Context manager running qemu-nbd within the context'''
283    pid_file = file_path("pid")
284
285    cmd = list(qemu_nbd_args)
286    cmd.extend(('--persistent', '--pid-file', pid_file))
287    cmd.extend(args)
288
289    log('Start NBD server')
290    p = subprocess.Popen(cmd)
291    try:
292        while not os.path.exists(pid_file):
293            if p.poll() is not None:
294                raise RuntimeError(
295                    "qemu-nbd terminated with exit code {}: {}"
296                    .format(p.returncode, ' '.join(cmd)))
297
298            time.sleep(0.01)
299        yield
300    finally:
301        log('Kill NBD server')
302        p.kill()
303        p.wait()
304
305def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
306    '''Return True if two image files are identical'''
307    return qemu_img('compare', '-f', fmt1,
308                    '-F', fmt2, img1, img2) == 0
309
310def create_image(name, size):
311    '''Create a fully-allocated raw image with sector markers'''
312    file = open(name, 'wb')
313    i = 0
314    while i < size:
315        sector = struct.pack('>l504xl', i // 512, i // 512)
316        file.write(sector)
317        i = i + 512
318    file.close()
319
320def image_size(img):
321    '''Return image's virtual size'''
322    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
323    return json.loads(r)['virtual-size']
324
325def is_str(val):
326    return isinstance(val, str)
327
328test_dir_re = re.compile(r"%s" % test_dir)
329def filter_test_dir(msg):
330    return test_dir_re.sub("TEST_DIR", msg)
331
332win32_re = re.compile(r"\r")
333def filter_win32(msg):
334    return win32_re.sub("", msg)
335
336qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
337                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
338                        r"and [0-9\/.inf]* ops\/sec\)")
339def filter_qemu_io(msg):
340    msg = filter_win32(msg)
341    return qemu_io_re.sub("X ops; XX:XX:XX.X "
342                          "(XXX YYY/sec and XXX ops/sec)", msg)
343
344chown_re = re.compile(r"chown [0-9]+:[0-9]+")
345def filter_chown(msg):
346    return chown_re.sub("chown UID:GID", msg)
347
348def filter_qmp_event(event):
349    '''Filter a QMP event dict'''
350    event = dict(event)
351    if 'timestamp' in event:
352        event['timestamp']['seconds'] = 'SECS'
353        event['timestamp']['microseconds'] = 'USECS'
354    return event
355
356def filter_qmp(qmsg, filter_fn):
357    '''Given a string filter, filter a QMP object's values.
358    filter_fn takes a (key, value) pair.'''
359    # Iterate through either lists or dicts;
360    if isinstance(qmsg, list):
361        items = enumerate(qmsg)
362    else:
363        items = qmsg.items()
364
365    for k, v in items:
366        if isinstance(v, (dict, list)):
367            qmsg[k] = filter_qmp(v, filter_fn)
368        else:
369            qmsg[k] = filter_fn(k, v)
370    return qmsg
371
372def filter_testfiles(msg):
373    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
374    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
375    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
376
377def filter_qmp_testfiles(qmsg):
378    def _filter(_key, value):
379        if is_str(value):
380            return filter_testfiles(value)
381        return value
382    return filter_qmp(qmsg, _filter)
383
384def filter_generated_node_ids(msg):
385    return re.sub("#block[0-9]+", "NODE_NAME", msg)
386
387def filter_img_info(output, filename):
388    lines = []
389    for line in output.split('\n'):
390        if 'disk size' in line or 'actual-size' in line:
391            continue
392        line = line.replace(filename, 'TEST_IMG')
393        line = filter_testfiles(line)
394        line = line.replace(imgfmt, 'IMGFMT')
395        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
396        line = re.sub('uuid: [-a-f0-9]+',
397                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
398                      line)
399        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
400        lines.append(line)
401    return '\n'.join(lines)
402
403def filter_imgfmt(msg):
404    return msg.replace(imgfmt, 'IMGFMT')
405
406def filter_qmp_imgfmt(qmsg):
407    def _filter(_key, value):
408        if is_str(value):
409            return filter_imgfmt(value)
410        return value
411    return filter_qmp(qmsg, _filter)
412
413
414Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
415
416def log(msg: Msg,
417        filters: Iterable[Callable[[Msg], Msg]] = (),
418        indent: Optional[int] = None) -> None:
419    """
420    Logs either a string message or a JSON serializable message (like QMP).
421    If indent is provided, JSON serializable messages are pretty-printed.
422    """
423    for flt in filters:
424        msg = flt(msg)
425    if isinstance(msg, (dict, list)):
426        # Don't sort if it's already sorted
427        do_sort = not isinstance(msg, OrderedDict)
428        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
429    else:
430        test_logger.info(msg)
431
432class Timeout:
433    def __init__(self, seconds, errmsg="Timeout"):
434        self.seconds = seconds
435        self.errmsg = errmsg
436    def __enter__(self):
437        signal.signal(signal.SIGALRM, self.timeout)
438        signal.setitimer(signal.ITIMER_REAL, self.seconds)
439        return self
440    def __exit__(self, exc_type, value, traceback):
441        signal.setitimer(signal.ITIMER_REAL, 0)
442        return False
443    def timeout(self, signum, frame):
444        raise Exception(self.errmsg)
445
446def file_pattern(name):
447    return "{0}-{1}".format(os.getpid(), name)
448
449class FilePath:
450    """
451    Context manager generating multiple file names. The generated files are
452    removed when exiting the context.
453
454    Example usage:
455
456        with FilePath('a.img', 'b.img') as (img_a, img_b):
457            # Use img_a and img_b here...
458
459        # a.img and b.img are automatically removed here.
460
461    By default images are created in iotests.test_dir. To create sockets use
462    iotests.sock_dir:
463
464       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
465
466    For convenience, calling with one argument yields a single file instead of
467    a tuple with one item.
468
469    """
470    def __init__(self, *names, base_dir=test_dir):
471        self.paths = [os.path.join(base_dir, file_pattern(name))
472                      for name in names]
473
474    def __enter__(self):
475        if len(self.paths) == 1:
476            return self.paths[0]
477        else:
478            return self.paths
479
480    def __exit__(self, exc_type, exc_val, exc_tb):
481        for path in self.paths:
482            try:
483                os.remove(path)
484            except OSError:
485                pass
486        return False
487
488
489def file_path_remover():
490    for path in reversed(file_path_remover.paths):
491        try:
492            os.remove(path)
493        except OSError:
494            pass
495
496
497def file_path(*names, base_dir=test_dir):
498    ''' Another way to get auto-generated filename that cleans itself up.
499
500    Use is as simple as:
501
502    img_a, img_b = file_path('a.img', 'b.img')
503    sock = file_path('socket')
504    '''
505
506    if not hasattr(file_path_remover, 'paths'):
507        file_path_remover.paths = []
508        atexit.register(file_path_remover)
509
510    paths = []
511    for name in names:
512        filename = file_pattern(name)
513        path = os.path.join(base_dir, filename)
514        file_path_remover.paths.append(path)
515        paths.append(path)
516
517    return paths[0] if len(paths) == 1 else paths
518
519def remote_filename(path):
520    if imgproto == 'file':
521        return path
522    elif imgproto == 'ssh':
523        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
524    else:
525        raise Exception("Protocol %s not supported" % (imgproto))
526
527class VM(qtest.QEMUQtestMachine):
528    '''A QEMU VM'''
529
530    def __init__(self, path_suffix=''):
531        name = "qemu%s-%d" % (path_suffix, os.getpid())
532        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
533                                 test_dir=test_dir,
534                                 socket_scm_helper=socket_scm_helper,
535                                 sock_dir=sock_dir)
536        self._num_drives = 0
537
538    def add_object(self, opts):
539        self._args.append('-object')
540        self._args.append(opts)
541        return self
542
543    def add_device(self, opts):
544        self._args.append('-device')
545        self._args.append(opts)
546        return self
547
548    def add_drive_raw(self, opts):
549        self._args.append('-drive')
550        self._args.append(opts)
551        return self
552
553    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
554        '''Add a virtio-blk drive to the VM'''
555        options = ['if=%s' % interface,
556                   'id=drive%d' % self._num_drives]
557
558        if path is not None:
559            options.append('file=%s' % path)
560            options.append('format=%s' % img_format)
561            options.append('cache=%s' % cachemode)
562            options.append('aio=%s' % aiomode)
563
564        if opts:
565            options.append(opts)
566
567        if img_format == 'luks' and 'key-secret' not in opts:
568            # default luks support
569            if luks_default_secret_object not in self._args:
570                self.add_object(luks_default_secret_object)
571
572            options.append(luks_default_key_secret_opt)
573
574        self._args.append('-drive')
575        self._args.append(','.join(options))
576        self._num_drives += 1
577        return self
578
579    def add_blockdev(self, opts):
580        self._args.append('-blockdev')
581        if isinstance(opts, str):
582            self._args.append(opts)
583        else:
584            self._args.append(','.join(opts))
585        return self
586
587    def add_incoming(self, addr):
588        self._args.append('-incoming')
589        self._args.append(addr)
590        return self
591
592    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
593        cmd = 'human-monitor-command'
594        kwargs = {'command-line': command_line}
595        if use_log:
596            return self.qmp_log(cmd, **kwargs)
597        else:
598            return self.qmp(cmd, **kwargs)
599
600    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
601        """Pause drive r/w operations"""
602        if not event:
603            self.pause_drive(drive, "read_aio")
604            self.pause_drive(drive, "write_aio")
605            return
606        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
607
608    def resume_drive(self, drive: str) -> None:
609        """Resume drive r/w operations"""
610        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
611
612    def hmp_qemu_io(self, drive: str, cmd: str,
613                    use_log: bool = False) -> QMPMessage:
614        """Write to a given drive using an HMP command"""
615        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
616
617    def flatten_qmp_object(self, obj, output=None, basestr=''):
618        if output is None:
619            output = dict()
620        if isinstance(obj, list):
621            for i, item in enumerate(obj):
622                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
623        elif isinstance(obj, dict):
624            for key in obj:
625                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
626        else:
627            output[basestr[:-1]] = obj # Strip trailing '.'
628        return output
629
630    def qmp_to_opts(self, obj):
631        obj = self.flatten_qmp_object(obj)
632        output_list = list()
633        for key in obj:
634            output_list += [key + '=' + obj[key]]
635        return ','.join(output_list)
636
637    def get_qmp_events_filtered(self, wait=60.0):
638        result = []
639        for ev in self.get_qmp_events(wait=wait):
640            result.append(filter_qmp_event(ev))
641        return result
642
643    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
644        full_cmd = OrderedDict((
645            ("execute", cmd),
646            ("arguments", ordered_qmp(kwargs))
647        ))
648        log(full_cmd, filters, indent=indent)
649        result = self.qmp(cmd, **kwargs)
650        log(result, filters, indent=indent)
651        return result
652
653    # Returns None on success, and an error string on failure
654    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
655                pre_finalize=None, cancel=False, wait=60.0):
656        """
657        run_job moves a job from creation through to dismissal.
658
659        :param job: String. ID of recently-launched job
660        :param auto_finalize: Bool. True if the job was launched with
661                              auto_finalize. Defaults to True.
662        :param auto_dismiss: Bool. True if the job was launched with
663                             auto_dismiss=True. Defaults to False.
664        :param pre_finalize: Callback. A callable that takes no arguments to be
665                             invoked prior to issuing job-finalize, if any.
666        :param cancel: Bool. When true, cancels the job after the pre_finalize
667                       callback.
668        :param wait: Float. Timeout value specifying how long to wait for any
669                     event, in seconds. Defaults to 60.0.
670        """
671        match_device = {'data': {'device': job}}
672        match_id = {'data': {'id': job}}
673        events = [
674            ('BLOCK_JOB_COMPLETED', match_device),
675            ('BLOCK_JOB_CANCELLED', match_device),
676            ('BLOCK_JOB_ERROR', match_device),
677            ('BLOCK_JOB_READY', match_device),
678            ('BLOCK_JOB_PENDING', match_id),
679            ('JOB_STATUS_CHANGE', match_id)
680        ]
681        error = None
682        while True:
683            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
684            if ev['event'] != 'JOB_STATUS_CHANGE':
685                log(ev)
686                continue
687            status = ev['data']['status']
688            if status == 'aborting':
689                result = self.qmp('query-jobs')
690                for j in result['return']:
691                    if j['id'] == job:
692                        error = j['error']
693                        log('Job failed: %s' % (j['error']))
694            elif status == 'ready':
695                self.qmp_log('job-complete', id=job)
696            elif status == 'pending' and not auto_finalize:
697                if pre_finalize:
698                    pre_finalize()
699                if cancel:
700                    self.qmp_log('job-cancel', id=job)
701                else:
702                    self.qmp_log('job-finalize', id=job)
703            elif status == 'concluded' and not auto_dismiss:
704                self.qmp_log('job-dismiss', id=job)
705            elif status == 'null':
706                return error
707
708    # Returns None on success, and an error string on failure
709    def blockdev_create(self, options, job_id='job0', filters=None):
710        if filters is None:
711            filters = [filter_qmp_testfiles]
712        result = self.qmp_log('blockdev-create', filters=filters,
713                              job_id=job_id, options=options)
714
715        if 'return' in result:
716            assert result['return'] == {}
717            job_result = self.run_job(job_id)
718        else:
719            job_result = result['error']
720
721        log("")
722        return job_result
723
724    def enable_migration_events(self, name):
725        log('Enabling migration QMP events on %s...' % name)
726        log(self.qmp('migrate-set-capabilities', capabilities=[
727            {
728                'capability': 'events',
729                'state': True
730            }
731        ]))
732
733    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
734        while True:
735            event = self.event_wait('MIGRATION')
736            log(event, filters=[filter_qmp_event])
737            if event['data']['status'] in ('completed', 'failed'):
738                break
739
740        if event['data']['status'] == 'completed':
741            # The event may occur in finish-migrate, so wait for the expected
742            # post-migration runstate
743            runstate = None
744            while runstate != expect_runstate:
745                runstate = self.qmp('query-status')['return']['status']
746            return True
747        else:
748            return False
749
750    def node_info(self, node_name):
751        nodes = self.qmp('query-named-block-nodes')
752        for x in nodes['return']:
753            if x['node-name'] == node_name:
754                return x
755        return None
756
757    def query_bitmaps(self):
758        res = self.qmp("query-named-block-nodes")
759        return {device['node-name']: device['dirty-bitmaps']
760                for device in res['return'] if 'dirty-bitmaps' in device}
761
762    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
763        """
764        get a specific bitmap from the object returned by query_bitmaps.
765        :param recording: If specified, filter results by the specified value.
766        :param bitmaps: If specified, use it instead of call query_bitmaps()
767        """
768        if bitmaps is None:
769            bitmaps = self.query_bitmaps()
770
771        for bitmap in bitmaps[node_name]:
772            if bitmap.get('name', '') == bitmap_name:
773                if recording is None or bitmap.get('recording') == recording:
774                    return bitmap
775        return None
776
777    def check_bitmap_status(self, node_name, bitmap_name, fields):
778        ret = self.get_bitmap(node_name, bitmap_name)
779
780        return fields.items() <= ret.items()
781
782    def assert_block_path(self, root, path, expected_node, graph=None):
783        """
784        Check whether the node under the given path in the block graph
785        is @expected_node.
786
787        @root is the node name of the node where the @path is rooted.
788
789        @path is a string that consists of child names separated by
790        slashes.  It must begin with a slash.
791
792        Examples for @root + @path:
793          - root="qcow2-node", path="/backing/file"
794          - root="quorum-node", path="/children.2/file"
795
796        Hypothetically, @path could be empty, in which case it would
797        point to @root.  However, in practice this case is not useful
798        and hence not allowed.
799
800        @expected_node may be None.  (All elements of the path but the
801        leaf must still exist.)
802
803        @graph may be None or the result of an x-debug-query-block-graph
804        call that has already been performed.
805        """
806        if graph is None:
807            graph = self.qmp('x-debug-query-block-graph')['return']
808
809        iter_path = iter(path.split('/'))
810
811        # Must start with a /
812        assert next(iter_path) == ''
813
814        node = next((node for node in graph['nodes'] if node['name'] == root),
815                    None)
816
817        # An empty @path is not allowed, so the root node must be present
818        assert node is not None, 'Root node %s not found' % root
819
820        for child_name in iter_path:
821            assert node is not None, 'Cannot follow path %s%s' % (root, path)
822
823            try:
824                node_id = next(edge['child'] for edge in graph['edges']
825                               if (edge['parent'] == node['id'] and
826                                   edge['name'] == child_name))
827
828                node = next(node for node in graph['nodes']
829                            if node['id'] == node_id)
830
831            except StopIteration:
832                node = None
833
834        if node is None:
835            assert expected_node is None, \
836                   'No node found under %s (but expected %s)' % \
837                   (path, expected_node)
838        else:
839            assert node['name'] == expected_node, \
840                   'Found node %s under %s (but expected %s)' % \
841                   (node['name'], path, expected_node)
842
843index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
844
845class QMPTestCase(unittest.TestCase):
846    '''Abstract base class for QMP test cases'''
847
848    def __init__(self, *args, **kwargs):
849        super().__init__(*args, **kwargs)
850        # Many users of this class set a VM property we rely on heavily
851        # in the methods below.
852        self.vm = None
853
854    def dictpath(self, d, path):
855        '''Traverse a path in a nested dict'''
856        for component in path.split('/'):
857            m = index_re.match(component)
858            if m:
859                component, idx = m.groups()
860                idx = int(idx)
861
862            if not isinstance(d, dict) or component not in d:
863                self.fail(f'failed path traversal for "{path}" in "{d}"')
864            d = d[component]
865
866            if m:
867                if not isinstance(d, list):
868                    self.fail(f'path component "{component}" in "{path}" '
869                              f'is not a list in "{d}"')
870                try:
871                    d = d[idx]
872                except IndexError:
873                    self.fail(f'invalid index "{idx}" in path "{path}" '
874                              f'in "{d}"')
875        return d
876
877    def assert_qmp_absent(self, d, path):
878        try:
879            result = self.dictpath(d, path)
880        except AssertionError:
881            return
882        self.fail('path "%s" has value "%s"' % (path, str(result)))
883
884    def assert_qmp(self, d, path, value):
885        '''Assert that the value for a specific path in a QMP dict
886           matches.  When given a list of values, assert that any of
887           them matches.'''
888
889        result = self.dictpath(d, path)
890
891        # [] makes no sense as a list of valid values, so treat it as
892        # an actual single value.
893        if isinstance(value, list) and value != []:
894            for v in value:
895                if result == v:
896                    return
897            self.fail('no match for "%s" in %s' % (str(result), str(value)))
898        else:
899            self.assertEqual(result, value,
900                             '"%s" is "%s", expected "%s"'
901                             % (path, str(result), str(value)))
902
903    def assert_no_active_block_jobs(self):
904        result = self.vm.qmp('query-block-jobs')
905        self.assert_qmp(result, 'return', [])
906
907    def assert_has_block_node(self, node_name=None, file_name=None):
908        """Issue a query-named-block-nodes and assert node_name and/or
909        file_name is present in the result"""
910        def check_equal_or_none(a, b):
911            return a is None or b is None or a == b
912        assert node_name or file_name
913        result = self.vm.qmp('query-named-block-nodes')
914        for x in result["return"]:
915            if check_equal_or_none(x.get("node-name"), node_name) and \
916                    check_equal_or_none(x.get("file"), file_name):
917                return
918        self.fail("Cannot find %s %s in result:\n%s" %
919                  (node_name, file_name, result))
920
921    def assert_json_filename_equal(self, json_filename, reference):
922        '''Asserts that the given filename is a json: filename and that its
923           content is equal to the given reference object'''
924        self.assertEqual(json_filename[:5], 'json:')
925        self.assertEqual(
926            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
927            self.vm.flatten_qmp_object(reference)
928        )
929
930    def cancel_and_wait(self, drive='drive0', force=False,
931                        resume=False, wait=60.0):
932        '''Cancel a block job and wait for it to finish, returning the event'''
933        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
934        self.assert_qmp(result, 'return', {})
935
936        if resume:
937            self.vm.resume_drive(drive)
938
939        cancelled = False
940        result = None
941        while not cancelled:
942            for event in self.vm.get_qmp_events(wait=wait):
943                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
944                   event['event'] == 'BLOCK_JOB_CANCELLED':
945                    self.assert_qmp(event, 'data/device', drive)
946                    result = event
947                    cancelled = True
948                elif event['event'] == 'JOB_STATUS_CHANGE':
949                    self.assert_qmp(event, 'data/id', drive)
950
951
952        self.assert_no_active_block_jobs()
953        return result
954
955    def wait_until_completed(self, drive='drive0', check_offset=True,
956                             wait=60.0, error=None):
957        '''Wait for a block job to finish, returning the event'''
958        while True:
959            for event in self.vm.get_qmp_events(wait=wait):
960                if event['event'] == 'BLOCK_JOB_COMPLETED':
961                    self.assert_qmp(event, 'data/device', drive)
962                    if error is None:
963                        self.assert_qmp_absent(event, 'data/error')
964                        if check_offset:
965                            self.assert_qmp(event, 'data/offset',
966                                            event['data']['len'])
967                    else:
968                        self.assert_qmp(event, 'data/error', error)
969                    self.assert_no_active_block_jobs()
970                    return event
971                if event['event'] == 'JOB_STATUS_CHANGE':
972                    self.assert_qmp(event, 'data/id', drive)
973
974    def wait_ready(self, drive='drive0'):
975        """Wait until a BLOCK_JOB_READY event, and return the event."""
976        return self.vm.events_wait([
977            ('BLOCK_JOB_READY',
978             {'data': {'type': 'mirror', 'device': drive}}),
979            ('BLOCK_JOB_READY',
980             {'data': {'type': 'commit', 'device': drive}})
981        ])
982
983    def wait_ready_and_cancel(self, drive='drive0'):
984        self.wait_ready(drive=drive)
985        event = self.cancel_and_wait(drive=drive)
986        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
987        self.assert_qmp(event, 'data/type', 'mirror')
988        self.assert_qmp(event, 'data/offset', event['data']['len'])
989
990    def complete_and_wait(self, drive='drive0', wait_ready=True,
991                          completion_error=None):
992        '''Complete a block job and wait for it to finish'''
993        if wait_ready:
994            self.wait_ready(drive=drive)
995
996        result = self.vm.qmp('block-job-complete', device=drive)
997        self.assert_qmp(result, 'return', {})
998
999        event = self.wait_until_completed(drive=drive, error=completion_error)
1000        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1001
1002    def pause_wait(self, job_id='job0'):
1003        with Timeout(3, "Timeout waiting for job to pause"):
1004            while True:
1005                result = self.vm.qmp('query-block-jobs')
1006                found = False
1007                for job in result['return']:
1008                    if job['device'] == job_id:
1009                        found = True
1010                        if job['paused'] and not job['busy']:
1011                            return job
1012                        break
1013                assert found
1014
1015    def pause_job(self, job_id='job0', wait=True):
1016        result = self.vm.qmp('block-job-pause', device=job_id)
1017        self.assert_qmp(result, 'return', {})
1018        if wait:
1019            return self.pause_wait(job_id)
1020        return result
1021
1022    def case_skip(self, reason):
1023        '''Skip this test case'''
1024        case_notrun(reason)
1025        self.skipTest(reason)
1026
1027
1028def notrun(reason):
1029    '''Skip this test suite'''
1030    # Each test in qemu-iotests has a number ("seq")
1031    seq = os.path.basename(sys.argv[0])
1032
1033    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1034    logger.warning("%s not run: %s", seq, reason)
1035    sys.exit(0)
1036
1037def case_notrun(reason):
1038    '''Mark this test case as not having been run (without actually
1039    skipping it, that is left to the caller).  See
1040    QMPTestCase.case_skip() for a variant that actually skips the
1041    current test case.'''
1042
1043    # Each test in qemu-iotests has a number ("seq")
1044    seq = os.path.basename(sys.argv[0])
1045
1046    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1047        '    [case not run] ' + reason + '\n')
1048
1049def _verify_image_format(supported_fmts: Sequence[str] = (),
1050                         unsupported_fmts: Sequence[str] = ()) -> None:
1051    assert not (supported_fmts and unsupported_fmts)
1052
1053    if 'generic' in supported_fmts and \
1054            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1055        # similar to
1056        #   _supported_fmt generic
1057        # for bash tests
1058        if imgfmt == 'luks':
1059            verify_working_luks()
1060        return
1061
1062    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1063    if not_sup or (imgfmt in unsupported_fmts):
1064        notrun('not suitable for this image format: %s' % imgfmt)
1065
1066    if imgfmt == 'luks':
1067        verify_working_luks()
1068
1069def _verify_protocol(supported: Sequence[str] = (),
1070                     unsupported: Sequence[str] = ()) -> None:
1071    assert not (supported and unsupported)
1072
1073    if 'generic' in supported:
1074        return
1075
1076    not_sup = supported and (imgproto not in supported)
1077    if not_sup or (imgproto in unsupported):
1078        notrun('not suitable for this protocol: %s' % imgproto)
1079
1080def _verify_platform(supported: Sequence[str] = (),
1081                     unsupported: Sequence[str] = ()) -> None:
1082    if any((sys.platform.startswith(x) for x in unsupported)):
1083        notrun('not suitable for this OS: %s' % sys.platform)
1084
1085    if supported:
1086        if not any((sys.platform.startswith(x) for x in supported)):
1087            notrun('not suitable for this OS: %s' % sys.platform)
1088
1089def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1090    if supported_cache_modes and (cachemode not in supported_cache_modes):
1091        notrun('not suitable for this cache mode: %s' % cachemode)
1092
1093def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1094    if supported_aio_modes and (aiomode not in supported_aio_modes):
1095        notrun('not suitable for this aio mode: %s' % aiomode)
1096
1097def supports_quorum():
1098    return 'quorum' in qemu_img_pipe('--help')
1099
1100def verify_quorum():
1101    '''Skip test suite if quorum support is not available'''
1102    if not supports_quorum():
1103        notrun('quorum support missing')
1104
1105def has_working_luks() -> Tuple[bool, str]:
1106    """
1107    Check whether our LUKS driver can actually create images
1108    (this extends to LUKS encryption for qcow2).
1109
1110    If not, return the reason why.
1111    """
1112
1113    img_file = f'{test_dir}/luks-test.luks'
1114    (output, status) = \
1115        qemu_img_pipe_and_status('create', '-f', 'luks',
1116                                 '--object', luks_default_secret_object,
1117                                 '-o', luks_default_key_secret_opt,
1118                                 '-o', 'iter-time=10',
1119                                 img_file, '1G')
1120    try:
1121        os.remove(img_file)
1122    except OSError:
1123        pass
1124
1125    if status != 0:
1126        reason = output
1127        for line in output.splitlines():
1128            if img_file + ':' in line:
1129                reason = line.split(img_file + ':', 1)[1].strip()
1130                break
1131
1132        return (False, reason)
1133    else:
1134        return (True, '')
1135
1136def verify_working_luks():
1137    """
1138    Skip test suite if LUKS does not work
1139    """
1140    (working, reason) = has_working_luks()
1141    if not working:
1142        notrun(reason)
1143
1144def qemu_pipe(*args):
1145    """
1146    Run qemu with an option to print something and exit (e.g. a help option).
1147
1148    :return: QEMU's stdout output.
1149    """
1150    args = [qemu_prog] + qemu_opts + list(args)
1151    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1152                            stderr=subprocess.STDOUT,
1153                            universal_newlines=True)
1154    output = subp.communicate()[0]
1155    if subp.returncode < 0:
1156        sys.stderr.write('qemu received signal %i: %s\n' %
1157                         (-subp.returncode, ' '.join(args)))
1158    return output
1159
1160def supported_formats(read_only=False):
1161    '''Set 'read_only' to True to check ro-whitelist
1162       Otherwise, rw-whitelist is checked'''
1163
1164    if not hasattr(supported_formats, "formats"):
1165        supported_formats.formats = {}
1166
1167    if read_only not in supported_formats.formats:
1168        format_message = qemu_pipe("-drive", "format=help")
1169        line = 1 if read_only else 0
1170        supported_formats.formats[read_only] = \
1171            format_message.splitlines()[line].split(":")[1].split()
1172
1173    return supported_formats.formats[read_only]
1174
1175def skip_if_unsupported(required_formats=(), read_only=False):
1176    '''Skip Test Decorator
1177       Runs the test if all the required formats are whitelisted'''
1178    def skip_test_decorator(func):
1179        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1180                         **kwargs: Dict[str, Any]) -> None:
1181            if callable(required_formats):
1182                fmts = required_formats(test_case)
1183            else:
1184                fmts = required_formats
1185
1186            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1187            if usf_list:
1188                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1189                test_case.case_skip(msg)
1190            else:
1191                func(test_case, *args, **kwargs)
1192        return func_wrapper
1193    return skip_test_decorator
1194
1195def skip_for_formats(formats: Sequence[str] = ()) \
1196    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1197                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1198    '''Skip Test Decorator
1199       Skips the test for the given formats'''
1200    def skip_test_decorator(func):
1201        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1202                         **kwargs: Dict[str, Any]) -> None:
1203            if imgfmt in formats:
1204                msg = f'{test_case}: Skipped for format {imgfmt}'
1205                test_case.case_skip(msg)
1206            else:
1207                func(test_case, *args, **kwargs)
1208        return func_wrapper
1209    return skip_test_decorator
1210
1211def skip_if_user_is_root(func):
1212    '''Skip Test Decorator
1213       Runs the test only without root permissions'''
1214    def func_wrapper(*args, **kwargs):
1215        if os.getuid() == 0:
1216            case_notrun('{}: cannot be run as root'.format(args[0]))
1217            return None
1218        else:
1219            return func(*args, **kwargs)
1220    return func_wrapper
1221
1222def execute_unittest(debug=False):
1223    """Executes unittests within the calling module."""
1224
1225    verbosity = 2 if debug else 1
1226
1227    if debug:
1228        output = sys.stdout
1229    else:
1230        # We need to filter out the time taken from the output so that
1231        # qemu-iotest can reliably diff the results against master output.
1232        output = io.StringIO()
1233
1234    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1235                                     verbosity=verbosity)
1236    try:
1237        # unittest.main() will use sys.exit(); so expect a SystemExit
1238        # exception
1239        unittest.main(testRunner=runner)
1240    finally:
1241        # We need to filter out the time taken from the output so that
1242        # qemu-iotest can reliably diff the results against master output.
1243        if not debug:
1244            out = output.getvalue()
1245            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1246
1247            # Hide skipped tests from the reference output
1248            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1249            out_first_line, out_rest = out.split('\n', 1)
1250            out = out_first_line.replace('s', '.') + '\n' + out_rest
1251
1252            sys.stderr.write(out)
1253
1254def execute_setup_common(supported_fmts: Sequence[str] = (),
1255                         supported_platforms: Sequence[str] = (),
1256                         supported_cache_modes: Sequence[str] = (),
1257                         supported_aio_modes: Sequence[str] = (),
1258                         unsupported_fmts: Sequence[str] = (),
1259                         supported_protocols: Sequence[str] = (),
1260                         unsupported_protocols: Sequence[str] = ()) -> bool:
1261    """
1262    Perform necessary setup for either script-style or unittest-style tests.
1263
1264    :return: Bool; Whether or not debug mode has been requested via the CLI.
1265    """
1266    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1267
1268    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1269    # indicate that we're not being run via "check". There may be
1270    # other things set up by "check" that individual test cases rely
1271    # on.
1272    if test_dir is None or qemu_default_machine is None:
1273        sys.stderr.write('Please run this test via the "check" script\n')
1274        sys.exit(os.EX_USAGE)
1275
1276    debug = '-d' in sys.argv
1277    if debug:
1278        sys.argv.remove('-d')
1279    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1280
1281    _verify_image_format(supported_fmts, unsupported_fmts)
1282    _verify_protocol(supported_protocols, unsupported_protocols)
1283    _verify_platform(supported=supported_platforms)
1284    _verify_cache_mode(supported_cache_modes)
1285    _verify_aio_mode(supported_aio_modes)
1286
1287    return debug
1288
1289def execute_test(*args, test_function=None, **kwargs):
1290    """Run either unittest or script-style tests."""
1291
1292    debug = execute_setup_common(*args, **kwargs)
1293    if not test_function:
1294        execute_unittest(debug)
1295    else:
1296        test_function()
1297
1298def activate_logging():
1299    """Activate iotests.log() output to stdout for script-style tests."""
1300    handler = logging.StreamHandler(stream=sys.stdout)
1301    formatter = logging.Formatter('%(message)s')
1302    handler.setFormatter(formatter)
1303    test_logger.addHandler(handler)
1304    test_logger.setLevel(logging.INFO)
1305    test_logger.propagate = False
1306
1307# This is called from script-style iotests without a single point of entry
1308def script_initialize(*args, **kwargs):
1309    """Initialize script-style tests without running any tests."""
1310    activate_logging()
1311    execute_setup_common(*args, **kwargs)
1312
1313# This is called from script-style iotests with a single point of entry
1314def script_main(test_function, *args, **kwargs):
1315    """Run script-style tests outside of the unittest framework"""
1316    activate_logging()
1317    execute_test(*args, test_function=test_function, **kwargs)
1318
1319# This is called from unittest style iotests
1320def main(*args, **kwargs):
1321    """Run tests using the unittest framework"""
1322    execute_test(*args, **kwargs)
1323