xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision cfb5387a)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78test_dir = os.environ.get('TEST_DIR')
79sock_dir = os.environ.get('SOCK_DIR')
80output_dir = os.environ.get('OUTPUT_DIR', '.')
81cachemode = os.environ.get('CACHEMODE')
82aiomode = os.environ.get('AIOMODE')
83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87luks_default_secret_object = 'secret,id=keysec0,data=' + \
88                             os.environ.get('IMGKEYSECRET', '')
89luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93                              connect_stderr: bool = True) -> Tuple[str, int]:
94    """
95    Run a tool and return both its output and its exit code
96    """
97    stderr = subprocess.STDOUT if connect_stderr else None
98    subp = subprocess.Popen(args,
99                            stdout=subprocess.PIPE,
100                            stderr=stderr,
101                            universal_newlines=True)
102    output = subp.communicate()[0]
103    if subp.returncode < 0:
104        cmd = ' '.join(args)
105        sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
106    return (output, subp.returncode)
107
108def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
109    """
110    Run qemu-img and return both its output and its exit code
111    """
112    full_args = qemu_img_args + list(args)
113    return qemu_tool_pipe_and_status('qemu-img', full_args)
114
115def qemu_img(*args: str) -> int:
116    '''Run qemu-img and return the exit code'''
117    return qemu_img_pipe_and_status(*args)[1]
118
119def ordered_qmp(qmsg, conv_keys=True):
120    # Dictionaries are not ordered prior to 3.6, therefore:
121    if isinstance(qmsg, list):
122        return [ordered_qmp(atom) for atom in qmsg]
123    if isinstance(qmsg, dict):
124        od = OrderedDict()
125        for k, v in sorted(qmsg.items()):
126            if conv_keys:
127                k = k.replace('_', '-')
128            od[k] = ordered_qmp(v, conv_keys=False)
129        return od
130    return qmsg
131
132def qemu_img_create(*args):
133    args = list(args)
134
135    # default luks support
136    if '-f' in args and args[args.index('-f') + 1] == 'luks':
137        if '-o' in args:
138            i = args.index('-o')
139            if 'key-secret' not in args[i + 1]:
140                args[i + 1].append(luks_default_key_secret_opt)
141                args.insert(i + 2, '--object')
142                args.insert(i + 3, luks_default_secret_object)
143        else:
144            args = ['-o', luks_default_key_secret_opt,
145                    '--object', luks_default_secret_object] + args
146
147    args.insert(0, 'create')
148
149    return qemu_img(*args)
150
151def qemu_img_measure(*args):
152    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
153
154def qemu_img_check(*args):
155    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
156
157def qemu_img_verbose(*args):
158    '''Run qemu-img without suppressing its output and return the exit code'''
159    exitcode = subprocess.call(qemu_img_args + list(args))
160    if exitcode < 0:
161        sys.stderr.write('qemu-img received signal %i: %s\n'
162                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
163    return exitcode
164
165def qemu_img_pipe(*args: str) -> str:
166    '''Run qemu-img and return its output'''
167    return qemu_img_pipe_and_status(*args)[0]
168
169def qemu_img_log(*args):
170    result = qemu_img_pipe(*args)
171    log(result, filters=[filter_testfiles])
172    return result
173
174def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
175    args = ['info']
176    if imgopts:
177        args.append('--image-opts')
178    else:
179        args += ['-f', imgfmt]
180    args += extra_args
181    args.append(filename)
182
183    output = qemu_img_pipe(*args)
184    if not filter_path:
185        filter_path = filename
186    log(filter_img_info(output, filter_path))
187
188def qemu_io(*args):
189    '''Run qemu-io and return the stdout data'''
190    args = qemu_io_args + list(args)
191    return qemu_tool_pipe_and_status('qemu-io', args)[0]
192
193def qemu_io_log(*args):
194    result = qemu_io(*args)
195    log(result, filters=[filter_testfiles, filter_qemu_io])
196    return result
197
198def qemu_io_silent(*args):
199    '''Run qemu-io and return the exit code, suppressing stdout'''
200    if '-f' in args or '--image-opts' in args:
201        default_args = qemu_io_args_no_fmt
202    else:
203        default_args = qemu_io_args
204
205    args = default_args + list(args)
206    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
207    if exitcode < 0:
208        sys.stderr.write('qemu-io received signal %i: %s\n' %
209                         (-exitcode, ' '.join(args)))
210    return exitcode
211
212def qemu_io_silent_check(*args):
213    '''Run qemu-io and return the true if subprocess returned 0'''
214    args = qemu_io_args + list(args)
215    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
216                               stderr=subprocess.STDOUT)
217    return exitcode == 0
218
219def get_virtio_scsi_device():
220    if qemu_default_machine == 's390-ccw-virtio':
221        return 'virtio-scsi-ccw'
222    return 'virtio-scsi-pci'
223
224class QemuIoInteractive:
225    def __init__(self, *args):
226        self.args = qemu_io_args_no_fmt + list(args)
227        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
228                                   stdout=subprocess.PIPE,
229                                   stderr=subprocess.STDOUT,
230                                   universal_newlines=True)
231        out = self._p.stdout.read(9)
232        if out != 'qemu-io> ':
233            # Most probably qemu-io just failed to start.
234            # Let's collect the whole output and exit.
235            out += self._p.stdout.read()
236            self._p.wait(timeout=1)
237            raise ValueError(out)
238
239    def close(self):
240        self._p.communicate('q\n')
241
242    def _read_output(self):
243        pattern = 'qemu-io> '
244        n = len(pattern)
245        pos = 0
246        s = []
247        while pos != n:
248            c = self._p.stdout.read(1)
249            # check unexpected EOF
250            assert c != ''
251            s.append(c)
252            if c == pattern[pos]:
253                pos += 1
254            else:
255                pos = 0
256
257        return ''.join(s[:-n])
258
259    def cmd(self, cmd):
260        # quit command is in close(), '\n' is added automatically
261        assert '\n' not in cmd
262        cmd = cmd.strip()
263        assert cmd not in ('q', 'quit')
264        self._p.stdin.write(cmd + '\n')
265        self._p.stdin.flush()
266        return self._read_output()
267
268
269def qemu_nbd(*args):
270    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
271    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
272
273def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
274    '''Run qemu-nbd in daemon mode and return both the parent's exit code
275       and its output in case of an error'''
276    full_args = qemu_nbd_args + ['--fork'] + list(args)
277    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
278                                                   connect_stderr=False)
279    return returncode, output if returncode else ''
280
281def qemu_nbd_list_log(*args: str) -> str:
282    '''Run qemu-nbd to list remote exports'''
283    full_args = [qemu_nbd_prog, '-L'] + list(args)
284    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
285    log(output, filters=[filter_testfiles, filter_nbd_exports])
286    return output
287
288@contextmanager
289def qemu_nbd_popen(*args):
290    '''Context manager running qemu-nbd within the context'''
291    pid_file = file_path("pid")
292
293    cmd = list(qemu_nbd_args)
294    cmd.extend(('--persistent', '--pid-file', pid_file))
295    cmd.extend(args)
296
297    log('Start NBD server')
298    p = subprocess.Popen(cmd)
299    try:
300        while not os.path.exists(pid_file):
301            if p.poll() is not None:
302                raise RuntimeError(
303                    "qemu-nbd terminated with exit code {}: {}"
304                    .format(p.returncode, ' '.join(cmd)))
305
306            time.sleep(0.01)
307        yield
308    finally:
309        log('Kill NBD server')
310        p.kill()
311        p.wait()
312
313def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
314    '''Return True if two image files are identical'''
315    return qemu_img('compare', '-f', fmt1,
316                    '-F', fmt2, img1, img2) == 0
317
318def create_image(name, size):
319    '''Create a fully-allocated raw image with sector markers'''
320    file = open(name, 'wb')
321    i = 0
322    while i < size:
323        sector = struct.pack('>l504xl', i // 512, i // 512)
324        file.write(sector)
325        i = i + 512
326    file.close()
327
328def image_size(img):
329    '''Return image's virtual size'''
330    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
331    return json.loads(r)['virtual-size']
332
333def is_str(val):
334    return isinstance(val, str)
335
336test_dir_re = re.compile(r"%s" % test_dir)
337def filter_test_dir(msg):
338    return test_dir_re.sub("TEST_DIR", msg)
339
340win32_re = re.compile(r"\r")
341def filter_win32(msg):
342    return win32_re.sub("", msg)
343
344qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
345                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
346                        r"and [0-9\/.inf]* ops\/sec\)")
347def filter_qemu_io(msg):
348    msg = filter_win32(msg)
349    return qemu_io_re.sub("X ops; XX:XX:XX.X "
350                          "(XXX YYY/sec and XXX ops/sec)", msg)
351
352chown_re = re.compile(r"chown [0-9]+:[0-9]+")
353def filter_chown(msg):
354    return chown_re.sub("chown UID:GID", msg)
355
356def filter_qmp_event(event):
357    '''Filter a QMP event dict'''
358    event = dict(event)
359    if 'timestamp' in event:
360        event['timestamp']['seconds'] = 'SECS'
361        event['timestamp']['microseconds'] = 'USECS'
362    return event
363
364def filter_qmp(qmsg, filter_fn):
365    '''Given a string filter, filter a QMP object's values.
366    filter_fn takes a (key, value) pair.'''
367    # Iterate through either lists or dicts;
368    if isinstance(qmsg, list):
369        items = enumerate(qmsg)
370    else:
371        items = qmsg.items()
372
373    for k, v in items:
374        if isinstance(v, (dict, list)):
375            qmsg[k] = filter_qmp(v, filter_fn)
376        else:
377            qmsg[k] = filter_fn(k, v)
378    return qmsg
379
380def filter_testfiles(msg):
381    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
382    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
383    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
384
385def filter_qmp_testfiles(qmsg):
386    def _filter(_key, value):
387        if is_str(value):
388            return filter_testfiles(value)
389        return value
390    return filter_qmp(qmsg, _filter)
391
392def filter_virtio_scsi(output: str) -> str:
393    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
394
395def filter_qmp_virtio_scsi(qmsg):
396    def _filter(_key, value):
397        if is_str(value):
398            return filter_virtio_scsi(value)
399        return value
400    return filter_qmp(qmsg, _filter)
401
402def filter_generated_node_ids(msg):
403    return re.sub("#block[0-9]+", "NODE_NAME", msg)
404
405def filter_img_info(output, filename):
406    lines = []
407    for line in output.split('\n'):
408        if 'disk size' in line or 'actual-size' in line:
409            continue
410        line = line.replace(filename, 'TEST_IMG')
411        line = filter_testfiles(line)
412        line = line.replace(imgfmt, 'IMGFMT')
413        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
414        line = re.sub('uuid: [-a-f0-9]+',
415                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
416                      line)
417        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
418        lines.append(line)
419    return '\n'.join(lines)
420
421def filter_imgfmt(msg):
422    return msg.replace(imgfmt, 'IMGFMT')
423
424def filter_qmp_imgfmt(qmsg):
425    def _filter(_key, value):
426        if is_str(value):
427            return filter_imgfmt(value)
428        return value
429    return filter_qmp(qmsg, _filter)
430
431def filter_nbd_exports(output: str) -> str:
432    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
433
434
435Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
436
437def log(msg: Msg,
438        filters: Iterable[Callable[[Msg], Msg]] = (),
439        indent: Optional[int] = None) -> None:
440    """
441    Logs either a string message or a JSON serializable message (like QMP).
442    If indent is provided, JSON serializable messages are pretty-printed.
443    """
444    for flt in filters:
445        msg = flt(msg)
446    if isinstance(msg, (dict, list)):
447        # Don't sort if it's already sorted
448        do_sort = not isinstance(msg, OrderedDict)
449        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
450    else:
451        test_logger.info(msg)
452
453class Timeout:
454    def __init__(self, seconds, errmsg="Timeout"):
455        self.seconds = seconds
456        self.errmsg = errmsg
457    def __enter__(self):
458        signal.signal(signal.SIGALRM, self.timeout)
459        signal.setitimer(signal.ITIMER_REAL, self.seconds)
460        return self
461    def __exit__(self, exc_type, value, traceback):
462        signal.setitimer(signal.ITIMER_REAL, 0)
463        return False
464    def timeout(self, signum, frame):
465        raise Exception(self.errmsg)
466
467def file_pattern(name):
468    return "{0}-{1}".format(os.getpid(), name)
469
470class FilePath:
471    """
472    Context manager generating multiple file names. The generated files are
473    removed when exiting the context.
474
475    Example usage:
476
477        with FilePath('a.img', 'b.img') as (img_a, img_b):
478            # Use img_a and img_b here...
479
480        # a.img and b.img are automatically removed here.
481
482    By default images are created in iotests.test_dir. To create sockets use
483    iotests.sock_dir:
484
485       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
486
487    For convenience, calling with one argument yields a single file instead of
488    a tuple with one item.
489
490    """
491    def __init__(self, *names, base_dir=test_dir):
492        self.paths = [os.path.join(base_dir, file_pattern(name))
493                      for name in names]
494
495    def __enter__(self):
496        if len(self.paths) == 1:
497            return self.paths[0]
498        else:
499            return self.paths
500
501    def __exit__(self, exc_type, exc_val, exc_tb):
502        for path in self.paths:
503            try:
504                os.remove(path)
505            except OSError:
506                pass
507        return False
508
509
510def file_path_remover():
511    for path in reversed(file_path_remover.paths):
512        try:
513            os.remove(path)
514        except OSError:
515            pass
516
517
518def file_path(*names, base_dir=test_dir):
519    ''' Another way to get auto-generated filename that cleans itself up.
520
521    Use is as simple as:
522
523    img_a, img_b = file_path('a.img', 'b.img')
524    sock = file_path('socket')
525    '''
526
527    if not hasattr(file_path_remover, 'paths'):
528        file_path_remover.paths = []
529        atexit.register(file_path_remover)
530
531    paths = []
532    for name in names:
533        filename = file_pattern(name)
534        path = os.path.join(base_dir, filename)
535        file_path_remover.paths.append(path)
536        paths.append(path)
537
538    return paths[0] if len(paths) == 1 else paths
539
540def remote_filename(path):
541    if imgproto == 'file':
542        return path
543    elif imgproto == 'ssh':
544        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
545    else:
546        raise Exception("Protocol %s not supported" % (imgproto))
547
548class VM(qtest.QEMUQtestMachine):
549    '''A QEMU VM'''
550
551    def __init__(self, path_suffix=''):
552        name = "qemu%s-%d" % (path_suffix, os.getpid())
553        super().__init__(qemu_prog, qemu_opts, name=name,
554                         test_dir=test_dir,
555                         socket_scm_helper=socket_scm_helper,
556                         sock_dir=sock_dir)
557        self._num_drives = 0
558
559    def add_object(self, opts):
560        self._args.append('-object')
561        self._args.append(opts)
562        return self
563
564    def add_device(self, opts):
565        self._args.append('-device')
566        self._args.append(opts)
567        return self
568
569    def add_drive_raw(self, opts):
570        self._args.append('-drive')
571        self._args.append(opts)
572        return self
573
574    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
575        '''Add a virtio-blk drive to the VM'''
576        options = ['if=%s' % interface,
577                   'id=drive%d' % self._num_drives]
578
579        if path is not None:
580            options.append('file=%s' % path)
581            options.append('format=%s' % img_format)
582            options.append('cache=%s' % cachemode)
583            options.append('aio=%s' % aiomode)
584
585        if opts:
586            options.append(opts)
587
588        if img_format == 'luks' and 'key-secret' not in opts:
589            # default luks support
590            if luks_default_secret_object not in self._args:
591                self.add_object(luks_default_secret_object)
592
593            options.append(luks_default_key_secret_opt)
594
595        self._args.append('-drive')
596        self._args.append(','.join(options))
597        self._num_drives += 1
598        return self
599
600    def add_blockdev(self, opts):
601        self._args.append('-blockdev')
602        if isinstance(opts, str):
603            self._args.append(opts)
604        else:
605            self._args.append(','.join(opts))
606        return self
607
608    def add_incoming(self, addr):
609        self._args.append('-incoming')
610        self._args.append(addr)
611        return self
612
613    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
614        cmd = 'human-monitor-command'
615        kwargs: Dict[str, Any] = {'command-line': command_line}
616        if use_log:
617            return self.qmp_log(cmd, **kwargs)
618        else:
619            return self.qmp(cmd, **kwargs)
620
621    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
622        """Pause drive r/w operations"""
623        if not event:
624            self.pause_drive(drive, "read_aio")
625            self.pause_drive(drive, "write_aio")
626            return
627        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
628
629    def resume_drive(self, drive: str) -> None:
630        """Resume drive r/w operations"""
631        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
632
633    def hmp_qemu_io(self, drive: str, cmd: str,
634                    use_log: bool = False) -> QMPMessage:
635        """Write to a given drive using an HMP command"""
636        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
637
638    def flatten_qmp_object(self, obj, output=None, basestr=''):
639        if output is None:
640            output = dict()
641        if isinstance(obj, list):
642            for i, item in enumerate(obj):
643                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
644        elif isinstance(obj, dict):
645            for key in obj:
646                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
647        else:
648            output[basestr[:-1]] = obj # Strip trailing '.'
649        return output
650
651    def qmp_to_opts(self, obj):
652        obj = self.flatten_qmp_object(obj)
653        output_list = list()
654        for key in obj:
655            output_list += [key + '=' + obj[key]]
656        return ','.join(output_list)
657
658    def get_qmp_events_filtered(self, wait=60.0):
659        result = []
660        for ev in self.get_qmp_events(wait=wait):
661            result.append(filter_qmp_event(ev))
662        return result
663
664    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
665        full_cmd = OrderedDict((
666            ("execute", cmd),
667            ("arguments", ordered_qmp(kwargs))
668        ))
669        log(full_cmd, filters, indent=indent)
670        result = self.qmp(cmd, **kwargs)
671        log(result, filters, indent=indent)
672        return result
673
674    # Returns None on success, and an error string on failure
675    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
676                pre_finalize=None, cancel=False, wait=60.0):
677        """
678        run_job moves a job from creation through to dismissal.
679
680        :param job: String. ID of recently-launched job
681        :param auto_finalize: Bool. True if the job was launched with
682                              auto_finalize. Defaults to True.
683        :param auto_dismiss: Bool. True if the job was launched with
684                             auto_dismiss=True. Defaults to False.
685        :param pre_finalize: Callback. A callable that takes no arguments to be
686                             invoked prior to issuing job-finalize, if any.
687        :param cancel: Bool. When true, cancels the job after the pre_finalize
688                       callback.
689        :param wait: Float. Timeout value specifying how long to wait for any
690                     event, in seconds. Defaults to 60.0.
691        """
692        match_device = {'data': {'device': job}}
693        match_id = {'data': {'id': job}}
694        events = [
695            ('BLOCK_JOB_COMPLETED', match_device),
696            ('BLOCK_JOB_CANCELLED', match_device),
697            ('BLOCK_JOB_ERROR', match_device),
698            ('BLOCK_JOB_READY', match_device),
699            ('BLOCK_JOB_PENDING', match_id),
700            ('JOB_STATUS_CHANGE', match_id)
701        ]
702        error = None
703        while True:
704            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
705            if ev['event'] != 'JOB_STATUS_CHANGE':
706                log(ev)
707                continue
708            status = ev['data']['status']
709            if status == 'aborting':
710                result = self.qmp('query-jobs')
711                for j in result['return']:
712                    if j['id'] == job:
713                        error = j['error']
714                        log('Job failed: %s' % (j['error']))
715            elif status == 'ready':
716                self.qmp_log('job-complete', id=job)
717            elif status == 'pending' and not auto_finalize:
718                if pre_finalize:
719                    pre_finalize()
720                if cancel:
721                    self.qmp_log('job-cancel', id=job)
722                else:
723                    self.qmp_log('job-finalize', id=job)
724            elif status == 'concluded' and not auto_dismiss:
725                self.qmp_log('job-dismiss', id=job)
726            elif status == 'null':
727                return error
728
729    # Returns None on success, and an error string on failure
730    def blockdev_create(self, options, job_id='job0', filters=None):
731        if filters is None:
732            filters = [filter_qmp_testfiles]
733        result = self.qmp_log('blockdev-create', filters=filters,
734                              job_id=job_id, options=options)
735
736        if 'return' in result:
737            assert result['return'] == {}
738            job_result = self.run_job(job_id)
739        else:
740            job_result = result['error']
741
742        log("")
743        return job_result
744
745    def enable_migration_events(self, name):
746        log('Enabling migration QMP events on %s...' % name)
747        log(self.qmp('migrate-set-capabilities', capabilities=[
748            {
749                'capability': 'events',
750                'state': True
751            }
752        ]))
753
754    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
755        while True:
756            event = self.event_wait('MIGRATION')
757            # We use the default timeout, and with a timeout, event_wait()
758            # never returns None
759            assert event
760
761            log(event, filters=[filter_qmp_event])
762            if event['data']['status'] in ('completed', 'failed'):
763                break
764
765        if event['data']['status'] == 'completed':
766            # The event may occur in finish-migrate, so wait for the expected
767            # post-migration runstate
768            runstate = None
769            while runstate != expect_runstate:
770                runstate = self.qmp('query-status')['return']['status']
771            return True
772        else:
773            return False
774
775    def node_info(self, node_name):
776        nodes = self.qmp('query-named-block-nodes')
777        for x in nodes['return']:
778            if x['node-name'] == node_name:
779                return x
780        return None
781
782    def query_bitmaps(self):
783        res = self.qmp("query-named-block-nodes")
784        return {device['node-name']: device['dirty-bitmaps']
785                for device in res['return'] if 'dirty-bitmaps' in device}
786
787    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
788        """
789        get a specific bitmap from the object returned by query_bitmaps.
790        :param recording: If specified, filter results by the specified value.
791        :param bitmaps: If specified, use it instead of call query_bitmaps()
792        """
793        if bitmaps is None:
794            bitmaps = self.query_bitmaps()
795
796        for bitmap in bitmaps[node_name]:
797            if bitmap.get('name', '') == bitmap_name:
798                if recording is None or bitmap.get('recording') == recording:
799                    return bitmap
800        return None
801
802    def check_bitmap_status(self, node_name, bitmap_name, fields):
803        ret = self.get_bitmap(node_name, bitmap_name)
804
805        return fields.items() <= ret.items()
806
807    def assert_block_path(self, root, path, expected_node, graph=None):
808        """
809        Check whether the node under the given path in the block graph
810        is @expected_node.
811
812        @root is the node name of the node where the @path is rooted.
813
814        @path is a string that consists of child names separated by
815        slashes.  It must begin with a slash.
816
817        Examples for @root + @path:
818          - root="qcow2-node", path="/backing/file"
819          - root="quorum-node", path="/children.2/file"
820
821        Hypothetically, @path could be empty, in which case it would
822        point to @root.  However, in practice this case is not useful
823        and hence not allowed.
824
825        @expected_node may be None.  (All elements of the path but the
826        leaf must still exist.)
827
828        @graph may be None or the result of an x-debug-query-block-graph
829        call that has already been performed.
830        """
831        if graph is None:
832            graph = self.qmp('x-debug-query-block-graph')['return']
833
834        iter_path = iter(path.split('/'))
835
836        # Must start with a /
837        assert next(iter_path) == ''
838
839        node = next((node for node in graph['nodes'] if node['name'] == root),
840                    None)
841
842        # An empty @path is not allowed, so the root node must be present
843        assert node is not None, 'Root node %s not found' % root
844
845        for child_name in iter_path:
846            assert node is not None, 'Cannot follow path %s%s' % (root, path)
847
848            try:
849                node_id = next(edge['child'] for edge in graph['edges']
850                               if (edge['parent'] == node['id'] and
851                                   edge['name'] == child_name))
852
853                node = next(node for node in graph['nodes']
854                            if node['id'] == node_id)
855
856            except StopIteration:
857                node = None
858
859        if node is None:
860            assert expected_node is None, \
861                   'No node found under %s (but expected %s)' % \
862                   (path, expected_node)
863        else:
864            assert node['name'] == expected_node, \
865                   'Found node %s under %s (but expected %s)' % \
866                   (node['name'], path, expected_node)
867
868index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
869
870class QMPTestCase(unittest.TestCase):
871    '''Abstract base class for QMP test cases'''
872
873    def __init__(self, *args, **kwargs):
874        super().__init__(*args, **kwargs)
875        # Many users of this class set a VM property we rely on heavily
876        # in the methods below.
877        self.vm = None
878
879    def dictpath(self, d, path):
880        '''Traverse a path in a nested dict'''
881        for component in path.split('/'):
882            m = index_re.match(component)
883            if m:
884                component, idx = m.groups()
885                idx = int(idx)
886
887            if not isinstance(d, dict) or component not in d:
888                self.fail(f'failed path traversal for "{path}" in "{d}"')
889            d = d[component]
890
891            if m:
892                if not isinstance(d, list):
893                    self.fail(f'path component "{component}" in "{path}" '
894                              f'is not a list in "{d}"')
895                try:
896                    d = d[idx]
897                except IndexError:
898                    self.fail(f'invalid index "{idx}" in path "{path}" '
899                              f'in "{d}"')
900        return d
901
902    def assert_qmp_absent(self, d, path):
903        try:
904            result = self.dictpath(d, path)
905        except AssertionError:
906            return
907        self.fail('path "%s" has value "%s"' % (path, str(result)))
908
909    def assert_qmp(self, d, path, value):
910        '''Assert that the value for a specific path in a QMP dict
911           matches.  When given a list of values, assert that any of
912           them matches.'''
913
914        result = self.dictpath(d, path)
915
916        # [] makes no sense as a list of valid values, so treat it as
917        # an actual single value.
918        if isinstance(value, list) and value != []:
919            for v in value:
920                if result == v:
921                    return
922            self.fail('no match for "%s" in %s' % (str(result), str(value)))
923        else:
924            self.assertEqual(result, value,
925                             '"%s" is "%s", expected "%s"'
926                             % (path, str(result), str(value)))
927
928    def assert_no_active_block_jobs(self):
929        result = self.vm.qmp('query-block-jobs')
930        self.assert_qmp(result, 'return', [])
931
932    def assert_has_block_node(self, node_name=None, file_name=None):
933        """Issue a query-named-block-nodes and assert node_name and/or
934        file_name is present in the result"""
935        def check_equal_or_none(a, b):
936            return a is None or b is None or a == b
937        assert node_name or file_name
938        result = self.vm.qmp('query-named-block-nodes')
939        for x in result["return"]:
940            if check_equal_or_none(x.get("node-name"), node_name) and \
941                    check_equal_or_none(x.get("file"), file_name):
942                return
943        self.fail("Cannot find %s %s in result:\n%s" %
944                  (node_name, file_name, result))
945
946    def assert_json_filename_equal(self, json_filename, reference):
947        '''Asserts that the given filename is a json: filename and that its
948           content is equal to the given reference object'''
949        self.assertEqual(json_filename[:5], 'json:')
950        self.assertEqual(
951            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
952            self.vm.flatten_qmp_object(reference)
953        )
954
955    def cancel_and_wait(self, drive='drive0', force=False,
956                        resume=False, wait=60.0):
957        '''Cancel a block job and wait for it to finish, returning the event'''
958        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
959        self.assert_qmp(result, 'return', {})
960
961        if resume:
962            self.vm.resume_drive(drive)
963
964        cancelled = False
965        result = None
966        while not cancelled:
967            for event in self.vm.get_qmp_events(wait=wait):
968                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
969                   event['event'] == 'BLOCK_JOB_CANCELLED':
970                    self.assert_qmp(event, 'data/device', drive)
971                    result = event
972                    cancelled = True
973                elif event['event'] == 'JOB_STATUS_CHANGE':
974                    self.assert_qmp(event, 'data/id', drive)
975
976
977        self.assert_no_active_block_jobs()
978        return result
979
980    def wait_until_completed(self, drive='drive0', check_offset=True,
981                             wait=60.0, error=None):
982        '''Wait for a block job to finish, returning the event'''
983        while True:
984            for event in self.vm.get_qmp_events(wait=wait):
985                if event['event'] == 'BLOCK_JOB_COMPLETED':
986                    self.assert_qmp(event, 'data/device', drive)
987                    if error is None:
988                        self.assert_qmp_absent(event, 'data/error')
989                        if check_offset:
990                            self.assert_qmp(event, 'data/offset',
991                                            event['data']['len'])
992                    else:
993                        self.assert_qmp(event, 'data/error', error)
994                    self.assert_no_active_block_jobs()
995                    return event
996                if event['event'] == 'JOB_STATUS_CHANGE':
997                    self.assert_qmp(event, 'data/id', drive)
998
999    def wait_ready(self, drive='drive0'):
1000        """Wait until a BLOCK_JOB_READY event, and return the event."""
1001        return self.vm.events_wait([
1002            ('BLOCK_JOB_READY',
1003             {'data': {'type': 'mirror', 'device': drive}}),
1004            ('BLOCK_JOB_READY',
1005             {'data': {'type': 'commit', 'device': drive}})
1006        ])
1007
1008    def wait_ready_and_cancel(self, drive='drive0'):
1009        self.wait_ready(drive=drive)
1010        event = self.cancel_and_wait(drive=drive)
1011        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1012        self.assert_qmp(event, 'data/type', 'mirror')
1013        self.assert_qmp(event, 'data/offset', event['data']['len'])
1014
1015    def complete_and_wait(self, drive='drive0', wait_ready=True,
1016                          completion_error=None):
1017        '''Complete a block job and wait for it to finish'''
1018        if wait_ready:
1019            self.wait_ready(drive=drive)
1020
1021        result = self.vm.qmp('block-job-complete', device=drive)
1022        self.assert_qmp(result, 'return', {})
1023
1024        event = self.wait_until_completed(drive=drive, error=completion_error)
1025        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1026
1027    def pause_wait(self, job_id='job0'):
1028        with Timeout(3, "Timeout waiting for job to pause"):
1029            while True:
1030                result = self.vm.qmp('query-block-jobs')
1031                found = False
1032                for job in result['return']:
1033                    if job['device'] == job_id:
1034                        found = True
1035                        if job['paused'] and not job['busy']:
1036                            return job
1037                        break
1038                assert found
1039
1040    def pause_job(self, job_id='job0', wait=True):
1041        result = self.vm.qmp('block-job-pause', device=job_id)
1042        self.assert_qmp(result, 'return', {})
1043        if wait:
1044            return self.pause_wait(job_id)
1045        return result
1046
1047    def case_skip(self, reason):
1048        '''Skip this test case'''
1049        case_notrun(reason)
1050        self.skipTest(reason)
1051
1052
1053def notrun(reason):
1054    '''Skip this test suite'''
1055    # Each test in qemu-iotests has a number ("seq")
1056    seq = os.path.basename(sys.argv[0])
1057
1058    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1059    logger.warning("%s not run: %s", seq, reason)
1060    sys.exit(0)
1061
1062def case_notrun(reason):
1063    '''Mark this test case as not having been run (without actually
1064    skipping it, that is left to the caller).  See
1065    QMPTestCase.case_skip() for a variant that actually skips the
1066    current test case.'''
1067
1068    # Each test in qemu-iotests has a number ("seq")
1069    seq = os.path.basename(sys.argv[0])
1070
1071    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1072        '    [case not run] ' + reason + '\n')
1073
1074def _verify_image_format(supported_fmts: Sequence[str] = (),
1075                         unsupported_fmts: Sequence[str] = ()) -> None:
1076    if 'generic' in supported_fmts and \
1077            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1078        # similar to
1079        #   _supported_fmt generic
1080        # for bash tests
1081        supported_fmts = ()
1082
1083    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1084    if not_sup or (imgfmt in unsupported_fmts):
1085        notrun('not suitable for this image format: %s' % imgfmt)
1086
1087    if imgfmt == 'luks':
1088        verify_working_luks()
1089
1090def _verify_protocol(supported: Sequence[str] = (),
1091                     unsupported: Sequence[str] = ()) -> None:
1092    assert not (supported and unsupported)
1093
1094    if 'generic' in supported:
1095        return
1096
1097    not_sup = supported and (imgproto not in supported)
1098    if not_sup or (imgproto in unsupported):
1099        notrun('not suitable for this protocol: %s' % imgproto)
1100
1101def _verify_platform(supported: Sequence[str] = (),
1102                     unsupported: Sequence[str] = ()) -> None:
1103    if any((sys.platform.startswith(x) for x in unsupported)):
1104        notrun('not suitable for this OS: %s' % sys.platform)
1105
1106    if supported:
1107        if not any((sys.platform.startswith(x) for x in supported)):
1108            notrun('not suitable for this OS: %s' % sys.platform)
1109
1110def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1111    if supported_cache_modes and (cachemode not in supported_cache_modes):
1112        notrun('not suitable for this cache mode: %s' % cachemode)
1113
1114def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1115    if supported_aio_modes and (aiomode not in supported_aio_modes):
1116        notrun('not suitable for this aio mode: %s' % aiomode)
1117
1118def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1119    usf_list = list(set(required_formats) - set(supported_formats()))
1120    if usf_list:
1121        notrun(f'formats {usf_list} are not whitelisted')
1122
1123def supports_quorum():
1124    return 'quorum' in qemu_img_pipe('--help')
1125
1126def verify_quorum():
1127    '''Skip test suite if quorum support is not available'''
1128    if not supports_quorum():
1129        notrun('quorum support missing')
1130
1131def has_working_luks() -> Tuple[bool, str]:
1132    """
1133    Check whether our LUKS driver can actually create images
1134    (this extends to LUKS encryption for qcow2).
1135
1136    If not, return the reason why.
1137    """
1138
1139    img_file = f'{test_dir}/luks-test.luks'
1140    (output, status) = \
1141        qemu_img_pipe_and_status('create', '-f', 'luks',
1142                                 '--object', luks_default_secret_object,
1143                                 '-o', luks_default_key_secret_opt,
1144                                 '-o', 'iter-time=10',
1145                                 img_file, '1G')
1146    try:
1147        os.remove(img_file)
1148    except OSError:
1149        pass
1150
1151    if status != 0:
1152        reason = output
1153        for line in output.splitlines():
1154            if img_file + ':' in line:
1155                reason = line.split(img_file + ':', 1)[1].strip()
1156                break
1157
1158        return (False, reason)
1159    else:
1160        return (True, '')
1161
1162def verify_working_luks():
1163    """
1164    Skip test suite if LUKS does not work
1165    """
1166    (working, reason) = has_working_luks()
1167    if not working:
1168        notrun(reason)
1169
1170def qemu_pipe(*args: str) -> str:
1171    """
1172    Run qemu with an option to print something and exit (e.g. a help option).
1173
1174    :return: QEMU's stdout output.
1175    """
1176    full_args = [qemu_prog] + qemu_opts + list(args)
1177    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1178    return output
1179
1180def supported_formats(read_only=False):
1181    '''Set 'read_only' to True to check ro-whitelist
1182       Otherwise, rw-whitelist is checked'''
1183
1184    if not hasattr(supported_formats, "formats"):
1185        supported_formats.formats = {}
1186
1187    if read_only not in supported_formats.formats:
1188        format_message = qemu_pipe("-drive", "format=help")
1189        line = 1 if read_only else 0
1190        supported_formats.formats[read_only] = \
1191            format_message.splitlines()[line].split(":")[1].split()
1192
1193    return supported_formats.formats[read_only]
1194
1195def skip_if_unsupported(required_formats=(), read_only=False):
1196    '''Skip Test Decorator
1197       Runs the test if all the required formats are whitelisted'''
1198    def skip_test_decorator(func):
1199        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1200                         **kwargs: Dict[str, Any]) -> None:
1201            if callable(required_formats):
1202                fmts = required_formats(test_case)
1203            else:
1204                fmts = required_formats
1205
1206            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1207            if usf_list:
1208                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1209                test_case.case_skip(msg)
1210            else:
1211                func(test_case, *args, **kwargs)
1212        return func_wrapper
1213    return skip_test_decorator
1214
1215def skip_for_formats(formats: Sequence[str] = ()) \
1216    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1217                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1218    '''Skip Test Decorator
1219       Skips the test for the given formats'''
1220    def skip_test_decorator(func):
1221        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1222                         **kwargs: Dict[str, Any]) -> None:
1223            if imgfmt in formats:
1224                msg = f'{test_case}: Skipped for format {imgfmt}'
1225                test_case.case_skip(msg)
1226            else:
1227                func(test_case, *args, **kwargs)
1228        return func_wrapper
1229    return skip_test_decorator
1230
1231def skip_if_user_is_root(func):
1232    '''Skip Test Decorator
1233       Runs the test only without root permissions'''
1234    def func_wrapper(*args, **kwargs):
1235        if os.getuid() == 0:
1236            case_notrun('{}: cannot be run as root'.format(args[0]))
1237            return None
1238        else:
1239            return func(*args, **kwargs)
1240    return func_wrapper
1241
1242def execute_unittest(debug=False):
1243    """Executes unittests within the calling module."""
1244
1245    verbosity = 2 if debug else 1
1246
1247    if debug:
1248        output = sys.stdout
1249    else:
1250        # We need to filter out the time taken from the output so that
1251        # qemu-iotest can reliably diff the results against master output.
1252        output = io.StringIO()
1253
1254    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1255                                     verbosity=verbosity)
1256    try:
1257        # unittest.main() will use sys.exit(); so expect a SystemExit
1258        # exception
1259        unittest.main(testRunner=runner)
1260    finally:
1261        # We need to filter out the time taken from the output so that
1262        # qemu-iotest can reliably diff the results against master output.
1263        if not debug:
1264            out = output.getvalue()
1265            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1266
1267            # Hide skipped tests from the reference output
1268            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1269            out_first_line, out_rest = out.split('\n', 1)
1270            out = out_first_line.replace('s', '.') + '\n' + out_rest
1271
1272            sys.stderr.write(out)
1273
1274def execute_setup_common(supported_fmts: Sequence[str] = (),
1275                         supported_platforms: Sequence[str] = (),
1276                         supported_cache_modes: Sequence[str] = (),
1277                         supported_aio_modes: Sequence[str] = (),
1278                         unsupported_fmts: Sequence[str] = (),
1279                         supported_protocols: Sequence[str] = (),
1280                         unsupported_protocols: Sequence[str] = (),
1281                         required_fmts: Sequence[str] = ()) -> bool:
1282    """
1283    Perform necessary setup for either script-style or unittest-style tests.
1284
1285    :return: Bool; Whether or not debug mode has been requested via the CLI.
1286    """
1287    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1288
1289    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1290    # indicate that we're not being run via "check". There may be
1291    # other things set up by "check" that individual test cases rely
1292    # on.
1293    if test_dir is None or qemu_default_machine is None:
1294        sys.stderr.write('Please run this test via the "check" script\n')
1295        sys.exit(os.EX_USAGE)
1296
1297    debug = '-d' in sys.argv
1298    if debug:
1299        sys.argv.remove('-d')
1300    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1301
1302    _verify_image_format(supported_fmts, unsupported_fmts)
1303    _verify_protocol(supported_protocols, unsupported_protocols)
1304    _verify_platform(supported=supported_platforms)
1305    _verify_cache_mode(supported_cache_modes)
1306    _verify_aio_mode(supported_aio_modes)
1307    _verify_formats(required_fmts)
1308
1309    return debug
1310
1311def execute_test(*args, test_function=None, **kwargs):
1312    """Run either unittest or script-style tests."""
1313
1314    debug = execute_setup_common(*args, **kwargs)
1315    if not test_function:
1316        execute_unittest(debug)
1317    else:
1318        test_function()
1319
1320def activate_logging():
1321    """Activate iotests.log() output to stdout for script-style tests."""
1322    handler = logging.StreamHandler(stream=sys.stdout)
1323    formatter = logging.Formatter('%(message)s')
1324    handler.setFormatter(formatter)
1325    test_logger.addHandler(handler)
1326    test_logger.setLevel(logging.INFO)
1327    test_logger.propagate = False
1328
1329# This is called from script-style iotests without a single point of entry
1330def script_initialize(*args, **kwargs):
1331    """Initialize script-style tests without running any tests."""
1332    activate_logging()
1333    execute_setup_common(*args, **kwargs)
1334
1335# This is called from script-style iotests with a single point of entry
1336def script_main(test_function, *args, **kwargs):
1337    """Run script-style tests outside of the unittest framework"""
1338    activate_logging()
1339    execute_test(*args, test_function=test_function, **kwargs)
1340
1341# This is called from unittest style iotests
1342def main(*args, **kwargs):
1343    """Run tests using the unittest framework"""
1344    execute_test(*args, **kwargs)
1345