xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 01afa757)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31from typing import (Any, Callable, Dict, Iterable,
32                    List, Optional, Sequence, Tuple, TypeVar)
33import unittest
34
35# pylint: disable=import-error, wrong-import-position
36sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
37from qemu import qtest
38
39assert sys.version_info >= (3, 6)
40
41# Type Aliases
42QMPResponse = Dict[str, Any]
43
44
45# Use this logger for logging messages directly from the iotests module
46logger = logging.getLogger('qemu.iotests')
47logger.addHandler(logging.NullHandler())
48
49# Use this logger for messages that ought to be used for diff output.
50test_logger = logging.getLogger('qemu.iotests.diff_io')
51
52
53faulthandler.enable()
54
55# This will not work if arguments contain spaces but is necessary if we
56# want to support the override options that ./check supports.
57qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
58if os.environ.get('QEMU_IMG_OPTIONS'):
59    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
60
61qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
62if os.environ.get('QEMU_IO_OPTIONS'):
63    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
64
65qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
66if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
67    qemu_io_args_no_fmt += \
68        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69
70qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
71if os.environ.get('QEMU_NBD_OPTIONS'):
72    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77imgfmt = os.environ.get('IMGFMT', 'raw')
78imgproto = os.environ.get('IMGPROTO', 'file')
79test_dir = os.environ.get('TEST_DIR')
80sock_dir = os.environ.get('SOCK_DIR')
81output_dir = os.environ.get('OUTPUT_DIR', '.')
82cachemode = os.environ.get('CACHEMODE')
83aiomode = os.environ.get('AIOMODE')
84qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
85
86socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
87
88luks_default_secret_object = 'secret,id=keysec0,data=' + \
89                             os.environ.get('IMGKEYSECRET', '')
90luks_default_key_secret_opt = 'key-secret=keysec0'
91
92
93def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
94    """
95    Run qemu-img and return both its output and its exit code
96    """
97    subp = subprocess.Popen(qemu_img_args + list(args),
98                            stdout=subprocess.PIPE,
99                            stderr=subprocess.STDOUT,
100                            universal_newlines=True)
101    output = subp.communicate()[0]
102    if subp.returncode < 0:
103        sys.stderr.write('qemu-img received signal %i: %s\n'
104                         % (-subp.returncode,
105                            ' '.join(qemu_img_args + list(args))))
106    return (output, subp.returncode)
107
108def qemu_img(*args: str) -> int:
109    '''Run qemu-img and return the exit code'''
110    return qemu_img_pipe_and_status(*args)[1]
111
112def ordered_qmp(qmsg, conv_keys=True):
113    # Dictionaries are not ordered prior to 3.6, therefore:
114    if isinstance(qmsg, list):
115        return [ordered_qmp(atom) for atom in qmsg]
116    if isinstance(qmsg, dict):
117        od = OrderedDict()
118        for k, v in sorted(qmsg.items()):
119            if conv_keys:
120                k = k.replace('_', '-')
121            od[k] = ordered_qmp(v, conv_keys=False)
122        return od
123    return qmsg
124
125def qemu_img_create(*args):
126    args = list(args)
127
128    # default luks support
129    if '-f' in args and args[args.index('-f') + 1] == 'luks':
130        if '-o' in args:
131            i = args.index('-o')
132            if 'key-secret' not in args[i + 1]:
133                args[i + 1].append(luks_default_key_secret_opt)
134                args.insert(i + 2, '--object')
135                args.insert(i + 3, luks_default_secret_object)
136        else:
137            args = ['-o', luks_default_key_secret_opt,
138                    '--object', luks_default_secret_object] + args
139
140    args.insert(0, 'create')
141
142    return qemu_img(*args)
143
144def qemu_img_verbose(*args):
145    '''Run qemu-img without suppressing its output and return the exit code'''
146    exitcode = subprocess.call(qemu_img_args + list(args))
147    if exitcode < 0:
148        sys.stderr.write('qemu-img received signal %i: %s\n'
149                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
150    return exitcode
151
152def qemu_img_pipe(*args: str) -> str:
153    '''Run qemu-img and return its output'''
154    return qemu_img_pipe_and_status(*args)[0]
155
156def qemu_img_log(*args):
157    result = qemu_img_pipe(*args)
158    log(result, filters=[filter_testfiles])
159    return result
160
161def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
162    args = ['info']
163    if imgopts:
164        args.append('--image-opts')
165    else:
166        args += ['-f', imgfmt]
167    args += extra_args
168    args.append(filename)
169
170    output = qemu_img_pipe(*args)
171    if not filter_path:
172        filter_path = filename
173    log(filter_img_info(output, filter_path))
174
175def qemu_io(*args):
176    '''Run qemu-io and return the stdout data'''
177    args = qemu_io_args + list(args)
178    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
179                            stderr=subprocess.STDOUT,
180                            universal_newlines=True)
181    output = subp.communicate()[0]
182    if subp.returncode < 0:
183        sys.stderr.write('qemu-io received signal %i: %s\n'
184                         % (-subp.returncode, ' '.join(args)))
185    return output
186
187def qemu_io_log(*args):
188    result = qemu_io(*args)
189    log(result, filters=[filter_testfiles, filter_qemu_io])
190    return result
191
192def qemu_io_silent(*args):
193    '''Run qemu-io and return the exit code, suppressing stdout'''
194    args = qemu_io_args + list(args)
195    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
196    if exitcode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n' %
198                         (-exitcode, ' '.join(args)))
199    return exitcode
200
201def qemu_io_silent_check(*args):
202    '''Run qemu-io and return the true if subprocess returned 0'''
203    args = qemu_io_args + list(args)
204    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
205                               stderr=subprocess.STDOUT)
206    return exitcode == 0
207
208def get_virtio_scsi_device():
209    if qemu_default_machine == 's390-ccw-virtio':
210        return 'virtio-scsi-ccw'
211    return 'virtio-scsi-pci'
212
213class QemuIoInteractive:
214    def __init__(self, *args):
215        self.args = qemu_io_args_no_fmt + list(args)
216        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
217                                   stdout=subprocess.PIPE,
218                                   stderr=subprocess.STDOUT,
219                                   universal_newlines=True)
220        out = self._p.stdout.read(9)
221        if out != 'qemu-io> ':
222            # Most probably qemu-io just failed to start.
223            # Let's collect the whole output and exit.
224            out += self._p.stdout.read()
225            self._p.wait(timeout=1)
226            raise ValueError(out)
227
228    def close(self):
229        self._p.communicate('q\n')
230
231    def _read_output(self):
232        pattern = 'qemu-io> '
233        n = len(pattern)
234        pos = 0
235        s = []
236        while pos != n:
237            c = self._p.stdout.read(1)
238            # check unexpected EOF
239            assert c != ''
240            s.append(c)
241            if c == pattern[pos]:
242                pos += 1
243            else:
244                pos = 0
245
246        return ''.join(s[:-n])
247
248    def cmd(self, cmd):
249        # quit command is in close(), '\n' is added automatically
250        assert '\n' not in cmd
251        cmd = cmd.strip()
252        assert cmd not in ('q', 'quit')
253        self._p.stdin.write(cmd + '\n')
254        self._p.stdin.flush()
255        return self._read_output()
256
257
258def qemu_nbd(*args):
259    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
260    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
261
262def qemu_nbd_early_pipe(*args):
263    '''Run qemu-nbd in daemon mode and return both the parent's exit code
264       and its output in case of an error'''
265    subp = subprocess.Popen(qemu_nbd_args + ['--fork'] + list(args),
266                            stdout=subprocess.PIPE,
267                            universal_newlines=True)
268    output = subp.communicate()[0]
269    if subp.returncode < 0:
270        sys.stderr.write('qemu-nbd received signal %i: %s\n' %
271                         (-subp.returncode,
272                          ' '.join(qemu_nbd_args + ['--fork'] + list(args))))
273
274    return subp.returncode, output if subp.returncode else ''
275
276def qemu_nbd_popen(*args):
277    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
278    return subprocess.Popen(qemu_nbd_args + ['--persistent'] + list(args))
279
280def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
281    '''Return True if two image files are identical'''
282    return qemu_img('compare', '-f', fmt1,
283                    '-F', fmt2, img1, img2) == 0
284
285def create_image(name, size):
286    '''Create a fully-allocated raw image with sector markers'''
287    file = open(name, 'wb')
288    i = 0
289    while i < size:
290        sector = struct.pack('>l504xl', i // 512, i // 512)
291        file.write(sector)
292        i = i + 512
293    file.close()
294
295def image_size(img):
296    '''Return image's virtual size'''
297    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
298    return json.loads(r)['virtual-size']
299
300def is_str(val):
301    return isinstance(val, str)
302
303test_dir_re = re.compile(r"%s" % test_dir)
304def filter_test_dir(msg):
305    return test_dir_re.sub("TEST_DIR", msg)
306
307win32_re = re.compile(r"\r")
308def filter_win32(msg):
309    return win32_re.sub("", msg)
310
311qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
312                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
313                        r"and [0-9\/.inf]* ops\/sec\)")
314def filter_qemu_io(msg):
315    msg = filter_win32(msg)
316    return qemu_io_re.sub("X ops; XX:XX:XX.X "
317                          "(XXX YYY/sec and XXX ops/sec)", msg)
318
319chown_re = re.compile(r"chown [0-9]+:[0-9]+")
320def filter_chown(msg):
321    return chown_re.sub("chown UID:GID", msg)
322
323def filter_qmp_event(event):
324    '''Filter a QMP event dict'''
325    event = dict(event)
326    if 'timestamp' in event:
327        event['timestamp']['seconds'] = 'SECS'
328        event['timestamp']['microseconds'] = 'USECS'
329    return event
330
331def filter_qmp(qmsg, filter_fn):
332    '''Given a string filter, filter a QMP object's values.
333    filter_fn takes a (key, value) pair.'''
334    # Iterate through either lists or dicts;
335    if isinstance(qmsg, list):
336        items = enumerate(qmsg)
337    else:
338        items = qmsg.items()
339
340    for k, v in items:
341        if isinstance(v, (dict, list)):
342            qmsg[k] = filter_qmp(v, filter_fn)
343        else:
344            qmsg[k] = filter_fn(k, v)
345    return qmsg
346
347def filter_testfiles(msg):
348    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
349    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
350    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
351
352def filter_qmp_testfiles(qmsg):
353    def _filter(_key, value):
354        if is_str(value):
355            return filter_testfiles(value)
356        return value
357    return filter_qmp(qmsg, _filter)
358
359def filter_generated_node_ids(msg):
360    return re.sub("#block[0-9]+", "NODE_NAME", msg)
361
362def filter_img_info(output, filename):
363    lines = []
364    for line in output.split('\n'):
365        if 'disk size' in line or 'actual-size' in line:
366            continue
367        line = line.replace(filename, 'TEST_IMG')
368        line = filter_testfiles(line)
369        line = line.replace(imgfmt, 'IMGFMT')
370        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
371        line = re.sub('uuid: [-a-f0-9]+',
372                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
373                      line)
374        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
375        lines.append(line)
376    return '\n'.join(lines)
377
378def filter_imgfmt(msg):
379    return msg.replace(imgfmt, 'IMGFMT')
380
381def filter_qmp_imgfmt(qmsg):
382    def _filter(_key, value):
383        if is_str(value):
384            return filter_imgfmt(value)
385        return value
386    return filter_qmp(qmsg, _filter)
387
388
389Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
390
391def log(msg: Msg,
392        filters: Iterable[Callable[[Msg], Msg]] = (),
393        indent: Optional[int] = None) -> None:
394    """
395    Logs either a string message or a JSON serializable message (like QMP).
396    If indent is provided, JSON serializable messages are pretty-printed.
397    """
398    for flt in filters:
399        msg = flt(msg)
400    if isinstance(msg, (dict, list)):
401        # Don't sort if it's already sorted
402        do_sort = not isinstance(msg, OrderedDict)
403        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
404    else:
405        test_logger.info(msg)
406
407class Timeout:
408    def __init__(self, seconds, errmsg="Timeout"):
409        self.seconds = seconds
410        self.errmsg = errmsg
411    def __enter__(self):
412        signal.signal(signal.SIGALRM, self.timeout)
413        signal.setitimer(signal.ITIMER_REAL, self.seconds)
414        return self
415    def __exit__(self, exc_type, value, traceback):
416        signal.setitimer(signal.ITIMER_REAL, 0)
417        return False
418    def timeout(self, signum, frame):
419        raise Exception(self.errmsg)
420
421def file_pattern(name):
422    return "{0}-{1}".format(os.getpid(), name)
423
424class FilePaths:
425    """
426    FilePaths is an auto-generated filename that cleans itself up.
427
428    Use this context manager to generate filenames and ensure that the file
429    gets deleted::
430
431        with FilePaths(['test.img']) as img_path:
432            qemu_img('create', img_path, '1G')
433        # migration_sock_path is automatically deleted
434    """
435    def __init__(self, names, base_dir=test_dir):
436        self.paths = []
437        for name in names:
438            self.paths.append(os.path.join(base_dir, file_pattern(name)))
439
440    def __enter__(self):
441        return self.paths
442
443    def __exit__(self, exc_type, exc_val, exc_tb):
444        try:
445            for path in self.paths:
446                os.remove(path)
447        except OSError:
448            pass
449        return False
450
451class FilePath(FilePaths):
452    """
453    FilePath is a specialization of FilePaths that takes a single filename.
454    """
455    def __init__(self, name, base_dir=test_dir):
456        super(FilePath, self).__init__([name], base_dir)
457
458    def __enter__(self):
459        return self.paths[0]
460
461def file_path_remover():
462    for path in reversed(file_path_remover.paths):
463        try:
464            os.remove(path)
465        except OSError:
466            pass
467
468
469def file_path(*names, base_dir=test_dir):
470    ''' Another way to get auto-generated filename that cleans itself up.
471
472    Use is as simple as:
473
474    img_a, img_b = file_path('a.img', 'b.img')
475    sock = file_path('socket')
476    '''
477
478    if not hasattr(file_path_remover, 'paths'):
479        file_path_remover.paths = []
480        atexit.register(file_path_remover)
481
482    paths = []
483    for name in names:
484        filename = file_pattern(name)
485        path = os.path.join(base_dir, filename)
486        file_path_remover.paths.append(path)
487        paths.append(path)
488
489    return paths[0] if len(paths) == 1 else paths
490
491def remote_filename(path):
492    if imgproto == 'file':
493        return path
494    elif imgproto == 'ssh':
495        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
496    else:
497        raise Exception("Protocol %s not supported" % (imgproto))
498
499class VM(qtest.QEMUQtestMachine):
500    '''A QEMU VM'''
501
502    def __init__(self, path_suffix=''):
503        name = "qemu%s-%d" % (path_suffix, os.getpid())
504        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
505                                 test_dir=test_dir,
506                                 socket_scm_helper=socket_scm_helper,
507                                 sock_dir=sock_dir)
508        self._num_drives = 0
509
510    def add_object(self, opts):
511        self._args.append('-object')
512        self._args.append(opts)
513        return self
514
515    def add_device(self, opts):
516        self._args.append('-device')
517        self._args.append(opts)
518        return self
519
520    def add_drive_raw(self, opts):
521        self._args.append('-drive')
522        self._args.append(opts)
523        return self
524
525    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
526        '''Add a virtio-blk drive to the VM'''
527        options = ['if=%s' % interface,
528                   'id=drive%d' % self._num_drives]
529
530        if path is not None:
531            options.append('file=%s' % path)
532            options.append('format=%s' % img_format)
533            options.append('cache=%s' % cachemode)
534            options.append('aio=%s' % aiomode)
535
536        if opts:
537            options.append(opts)
538
539        if img_format == 'luks' and 'key-secret' not in opts:
540            # default luks support
541            if luks_default_secret_object not in self._args:
542                self.add_object(luks_default_secret_object)
543
544            options.append(luks_default_key_secret_opt)
545
546        self._args.append('-drive')
547        self._args.append(','.join(options))
548        self._num_drives += 1
549        return self
550
551    def add_blockdev(self, opts):
552        self._args.append('-blockdev')
553        if isinstance(opts, str):
554            self._args.append(opts)
555        else:
556            self._args.append(','.join(opts))
557        return self
558
559    def add_incoming(self, addr):
560        self._args.append('-incoming')
561        self._args.append(addr)
562        return self
563
564    def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse:
565        cmd = 'human-monitor-command'
566        kwargs = {'command-line': command_line}
567        if use_log:
568            return self.qmp_log(cmd, **kwargs)
569        else:
570            return self.qmp(cmd, **kwargs)
571
572    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
573        """Pause drive r/w operations"""
574        if not event:
575            self.pause_drive(drive, "read_aio")
576            self.pause_drive(drive, "write_aio")
577            return
578        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
579
580    def resume_drive(self, drive: str) -> None:
581        """Resume drive r/w operations"""
582        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
583
584    def hmp_qemu_io(self, drive: str, cmd: str,
585                    use_log: bool = False) -> QMPResponse:
586        """Write to a given drive using an HMP command"""
587        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
588
589    def flatten_qmp_object(self, obj, output=None, basestr=''):
590        if output is None:
591            output = dict()
592        if isinstance(obj, list):
593            for i, item in enumerate(obj):
594                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
595        elif isinstance(obj, dict):
596            for key in obj:
597                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
598        else:
599            output[basestr[:-1]] = obj # Strip trailing '.'
600        return output
601
602    def qmp_to_opts(self, obj):
603        obj = self.flatten_qmp_object(obj)
604        output_list = list()
605        for key in obj:
606            output_list += [key + '=' + obj[key]]
607        return ','.join(output_list)
608
609    def get_qmp_events_filtered(self, wait=60.0):
610        result = []
611        for ev in self.get_qmp_events(wait=wait):
612            result.append(filter_qmp_event(ev))
613        return result
614
615    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
616        full_cmd = OrderedDict((
617            ("execute", cmd),
618            ("arguments", ordered_qmp(kwargs))
619        ))
620        log(full_cmd, filters, indent=indent)
621        result = self.qmp(cmd, **kwargs)
622        log(result, filters, indent=indent)
623        return result
624
625    # Returns None on success, and an error string on failure
626    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
627                pre_finalize=None, cancel=False, wait=60.0):
628        """
629        run_job moves a job from creation through to dismissal.
630
631        :param job: String. ID of recently-launched job
632        :param auto_finalize: Bool. True if the job was launched with
633                              auto_finalize. Defaults to True.
634        :param auto_dismiss: Bool. True if the job was launched with
635                             auto_dismiss=True. Defaults to False.
636        :param pre_finalize: Callback. A callable that takes no arguments to be
637                             invoked prior to issuing job-finalize, if any.
638        :param cancel: Bool. When true, cancels the job after the pre_finalize
639                       callback.
640        :param wait: Float. Timeout value specifying how long to wait for any
641                     event, in seconds. Defaults to 60.0.
642        """
643        match_device = {'data': {'device': job}}
644        match_id = {'data': {'id': job}}
645        events = [
646            ('BLOCK_JOB_COMPLETED', match_device),
647            ('BLOCK_JOB_CANCELLED', match_device),
648            ('BLOCK_JOB_ERROR', match_device),
649            ('BLOCK_JOB_READY', match_device),
650            ('BLOCK_JOB_PENDING', match_id),
651            ('JOB_STATUS_CHANGE', match_id)
652        ]
653        error = None
654        while True:
655            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
656            if ev['event'] != 'JOB_STATUS_CHANGE':
657                log(ev)
658                continue
659            status = ev['data']['status']
660            if status == 'aborting':
661                result = self.qmp('query-jobs')
662                for j in result['return']:
663                    if j['id'] == job:
664                        error = j['error']
665                        log('Job failed: %s' % (j['error']))
666            elif status == 'ready':
667                self.qmp_log('job-complete', id=job)
668            elif status == 'pending' and not auto_finalize:
669                if pre_finalize:
670                    pre_finalize()
671                if cancel:
672                    self.qmp_log('job-cancel', id=job)
673                else:
674                    self.qmp_log('job-finalize', id=job)
675            elif status == 'concluded' and not auto_dismiss:
676                self.qmp_log('job-dismiss', id=job)
677            elif status == 'null':
678                return error
679
680    # Returns None on success, and an error string on failure
681    def blockdev_create(self, options, job_id='job0', filters=None):
682        if filters is None:
683            filters = [filter_qmp_testfiles]
684        result = self.qmp_log('blockdev-create', filters=filters,
685                              job_id=job_id, options=options)
686
687        if 'return' in result:
688            assert result['return'] == {}
689            job_result = self.run_job(job_id)
690        else:
691            job_result = result['error']
692
693        log("")
694        return job_result
695
696    def enable_migration_events(self, name):
697        log('Enabling migration QMP events on %s...' % name)
698        log(self.qmp('migrate-set-capabilities', capabilities=[
699            {
700                'capability': 'events',
701                'state': True
702            }
703        ]))
704
705    def wait_migration(self, expect_runstate):
706        while True:
707            event = self.event_wait('MIGRATION')
708            log(event, filters=[filter_qmp_event])
709            if event['data']['status'] == 'completed':
710                break
711        # The event may occur in finish-migrate, so wait for the expected
712        # post-migration runstate
713        while self.qmp('query-status')['return']['status'] != expect_runstate:
714            pass
715
716    def node_info(self, node_name):
717        nodes = self.qmp('query-named-block-nodes')
718        for x in nodes['return']:
719            if x['node-name'] == node_name:
720                return x
721        return None
722
723    def query_bitmaps(self):
724        res = self.qmp("query-named-block-nodes")
725        return {device['node-name']: device['dirty-bitmaps']
726                for device in res['return'] if 'dirty-bitmaps' in device}
727
728    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
729        """
730        get a specific bitmap from the object returned by query_bitmaps.
731        :param recording: If specified, filter results by the specified value.
732        :param bitmaps: If specified, use it instead of call query_bitmaps()
733        """
734        if bitmaps is None:
735            bitmaps = self.query_bitmaps()
736
737        for bitmap in bitmaps[node_name]:
738            if bitmap.get('name', '') == bitmap_name:
739                if recording is None or bitmap.get('recording') == recording:
740                    return bitmap
741        return None
742
743    def check_bitmap_status(self, node_name, bitmap_name, fields):
744        ret = self.get_bitmap(node_name, bitmap_name)
745
746        return fields.items() <= ret.items()
747
748    def assert_block_path(self, root, path, expected_node, graph=None):
749        """
750        Check whether the node under the given path in the block graph
751        is @expected_node.
752
753        @root is the node name of the node where the @path is rooted.
754
755        @path is a string that consists of child names separated by
756        slashes.  It must begin with a slash.
757
758        Examples for @root + @path:
759          - root="qcow2-node", path="/backing/file"
760          - root="quorum-node", path="/children.2/file"
761
762        Hypothetically, @path could be empty, in which case it would
763        point to @root.  However, in practice this case is not useful
764        and hence not allowed.
765
766        @expected_node may be None.  (All elements of the path but the
767        leaf must still exist.)
768
769        @graph may be None or the result of an x-debug-query-block-graph
770        call that has already been performed.
771        """
772        if graph is None:
773            graph = self.qmp('x-debug-query-block-graph')['return']
774
775        iter_path = iter(path.split('/'))
776
777        # Must start with a /
778        assert next(iter_path) == ''
779
780        node = next((node for node in graph['nodes'] if node['name'] == root),
781                    None)
782
783        # An empty @path is not allowed, so the root node must be present
784        assert node is not None, 'Root node %s not found' % root
785
786        for child_name in iter_path:
787            assert node is not None, 'Cannot follow path %s%s' % (root, path)
788
789            try:
790                node_id = next(edge['child'] for edge in graph['edges']
791                               if (edge['parent'] == node['id'] and
792                                   edge['name'] == child_name))
793
794                node = next(node for node in graph['nodes']
795                            if node['id'] == node_id)
796
797            except StopIteration:
798                node = None
799
800        if node is None:
801            assert expected_node is None, \
802                   'No node found under %s (but expected %s)' % \
803                   (path, expected_node)
804        else:
805            assert node['name'] == expected_node, \
806                   'Found node %s under %s (but expected %s)' % \
807                   (node['name'], path, expected_node)
808
809index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
810
811class QMPTestCase(unittest.TestCase):
812    '''Abstract base class for QMP test cases'''
813
814    def __init__(self, *args, **kwargs):
815        super().__init__(*args, **kwargs)
816        # Many users of this class set a VM property we rely on heavily
817        # in the methods below.
818        self.vm = None
819
820    def dictpath(self, d, path):
821        '''Traverse a path in a nested dict'''
822        for component in path.split('/'):
823            m = index_re.match(component)
824            if m:
825                component, idx = m.groups()
826                idx = int(idx)
827
828            if not isinstance(d, dict) or component not in d:
829                self.fail(f'failed path traversal for "{path}" in "{d}"')
830            d = d[component]
831
832            if m:
833                if not isinstance(d, list):
834                    self.fail(f'path component "{component}" in "{path}" '
835                              f'is not a list in "{d}"')
836                try:
837                    d = d[idx]
838                except IndexError:
839                    self.fail(f'invalid index "{idx}" in path "{path}" '
840                              f'in "{d}"')
841        return d
842
843    def assert_qmp_absent(self, d, path):
844        try:
845            result = self.dictpath(d, path)
846        except AssertionError:
847            return
848        self.fail('path "%s" has value "%s"' % (path, str(result)))
849
850    def assert_qmp(self, d, path, value):
851        '''Assert that the value for a specific path in a QMP dict
852           matches.  When given a list of values, assert that any of
853           them matches.'''
854
855        result = self.dictpath(d, path)
856
857        # [] makes no sense as a list of valid values, so treat it as
858        # an actual single value.
859        if isinstance(value, list) and value != []:
860            for v in value:
861                if result == v:
862                    return
863            self.fail('no match for "%s" in %s' % (str(result), str(value)))
864        else:
865            self.assertEqual(result, value,
866                             '"%s" is "%s", expected "%s"'
867                             % (path, str(result), str(value)))
868
869    def assert_no_active_block_jobs(self):
870        result = self.vm.qmp('query-block-jobs')
871        self.assert_qmp(result, 'return', [])
872
873    def assert_has_block_node(self, node_name=None, file_name=None):
874        """Issue a query-named-block-nodes and assert node_name and/or
875        file_name is present in the result"""
876        def check_equal_or_none(a, b):
877            return a is None or b is None or a == b
878        assert node_name or file_name
879        result = self.vm.qmp('query-named-block-nodes')
880        for x in result["return"]:
881            if check_equal_or_none(x.get("node-name"), node_name) and \
882                    check_equal_or_none(x.get("file"), file_name):
883                return
884        self.fail("Cannot find %s %s in result:\n%s" %
885                  (node_name, file_name, result))
886
887    def assert_json_filename_equal(self, json_filename, reference):
888        '''Asserts that the given filename is a json: filename and that its
889           content is equal to the given reference object'''
890        self.assertEqual(json_filename[:5], 'json:')
891        self.assertEqual(
892            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
893            self.vm.flatten_qmp_object(reference)
894        )
895
896    def cancel_and_wait(self, drive='drive0', force=False,
897                        resume=False, wait=60.0):
898        '''Cancel a block job and wait for it to finish, returning the event'''
899        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
900        self.assert_qmp(result, 'return', {})
901
902        if resume:
903            self.vm.resume_drive(drive)
904
905        cancelled = False
906        result = None
907        while not cancelled:
908            for event in self.vm.get_qmp_events(wait=wait):
909                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
910                   event['event'] == 'BLOCK_JOB_CANCELLED':
911                    self.assert_qmp(event, 'data/device', drive)
912                    result = event
913                    cancelled = True
914                elif event['event'] == 'JOB_STATUS_CHANGE':
915                    self.assert_qmp(event, 'data/id', drive)
916
917
918        self.assert_no_active_block_jobs()
919        return result
920
921    def wait_until_completed(self, drive='drive0', check_offset=True,
922                             wait=60.0, error=None):
923        '''Wait for a block job to finish, returning the event'''
924        while True:
925            for event in self.vm.get_qmp_events(wait=wait):
926                if event['event'] == 'BLOCK_JOB_COMPLETED':
927                    self.assert_qmp(event, 'data/device', drive)
928                    if error is None:
929                        self.assert_qmp_absent(event, 'data/error')
930                        if check_offset:
931                            self.assert_qmp(event, 'data/offset',
932                                            event['data']['len'])
933                    else:
934                        self.assert_qmp(event, 'data/error', error)
935                    self.assert_no_active_block_jobs()
936                    return event
937                if event['event'] == 'JOB_STATUS_CHANGE':
938                    self.assert_qmp(event, 'data/id', drive)
939
940    def wait_ready(self, drive='drive0'):
941        """Wait until a BLOCK_JOB_READY event, and return the event."""
942        f = {'data': {'type': 'mirror', 'device': drive}}
943        return self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
944
945    def wait_ready_and_cancel(self, drive='drive0'):
946        self.wait_ready(drive=drive)
947        event = self.cancel_and_wait(drive=drive)
948        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
949        self.assert_qmp(event, 'data/type', 'mirror')
950        self.assert_qmp(event, 'data/offset', event['data']['len'])
951
952    def complete_and_wait(self, drive='drive0', wait_ready=True,
953                          completion_error=None):
954        '''Complete a block job and wait for it to finish'''
955        if wait_ready:
956            self.wait_ready(drive=drive)
957
958        result = self.vm.qmp('block-job-complete', device=drive)
959        self.assert_qmp(result, 'return', {})
960
961        event = self.wait_until_completed(drive=drive, error=completion_error)
962        self.assert_qmp(event, 'data/type', 'mirror')
963
964    def pause_wait(self, job_id='job0'):
965        with Timeout(3, "Timeout waiting for job to pause"):
966            while True:
967                result = self.vm.qmp('query-block-jobs')
968                found = False
969                for job in result['return']:
970                    if job['device'] == job_id:
971                        found = True
972                        if job['paused'] and not job['busy']:
973                            return job
974                        break
975                assert found
976
977    def pause_job(self, job_id='job0', wait=True):
978        result = self.vm.qmp('block-job-pause', device=job_id)
979        self.assert_qmp(result, 'return', {})
980        if wait:
981            return self.pause_wait(job_id)
982        return result
983
984    def case_skip(self, reason):
985        '''Skip this test case'''
986        case_notrun(reason)
987        self.skipTest(reason)
988
989
990def notrun(reason):
991    '''Skip this test suite'''
992    # Each test in qemu-iotests has a number ("seq")
993    seq = os.path.basename(sys.argv[0])
994
995    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
996    logger.warning("%s not run: %s", seq, reason)
997    sys.exit(0)
998
999def case_notrun(reason):
1000    '''Mark this test case as not having been run (without actually
1001    skipping it, that is left to the caller).  See
1002    QMPTestCase.case_skip() for a variant that actually skips the
1003    current test case.'''
1004
1005    # Each test in qemu-iotests has a number ("seq")
1006    seq = os.path.basename(sys.argv[0])
1007
1008    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1009        '    [case not run] ' + reason + '\n')
1010
1011def _verify_image_format(supported_fmts: Sequence[str] = (),
1012                         unsupported_fmts: Sequence[str] = ()) -> None:
1013    assert not (supported_fmts and unsupported_fmts)
1014
1015    if 'generic' in supported_fmts and \
1016            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1017        # similar to
1018        #   _supported_fmt generic
1019        # for bash tests
1020        if imgfmt == 'luks':
1021            verify_working_luks()
1022        return
1023
1024    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1025    if not_sup or (imgfmt in unsupported_fmts):
1026        notrun('not suitable for this image format: %s' % imgfmt)
1027
1028    if imgfmt == 'luks':
1029        verify_working_luks()
1030
1031def _verify_protocol(supported: Sequence[str] = (),
1032                     unsupported: Sequence[str] = ()) -> None:
1033    assert not (supported and unsupported)
1034
1035    if 'generic' in supported:
1036        return
1037
1038    not_sup = supported and (imgproto not in supported)
1039    if not_sup or (imgproto in unsupported):
1040        notrun('not suitable for this protocol: %s' % imgproto)
1041
1042def _verify_platform(supported: Sequence[str] = (),
1043                     unsupported: Sequence[str] = ()) -> None:
1044    if any((sys.platform.startswith(x) for x in unsupported)):
1045        notrun('not suitable for this OS: %s' % sys.platform)
1046
1047    if supported:
1048        if not any((sys.platform.startswith(x) for x in supported)):
1049            notrun('not suitable for this OS: %s' % sys.platform)
1050
1051def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1052    if supported_cache_modes and (cachemode not in supported_cache_modes):
1053        notrun('not suitable for this cache mode: %s' % cachemode)
1054
1055def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1056    if supported_aio_modes and (aiomode not in supported_aio_modes):
1057        notrun('not suitable for this aio mode: %s' % aiomode)
1058
1059def supports_quorum():
1060    return 'quorum' in qemu_img_pipe('--help')
1061
1062def verify_quorum():
1063    '''Skip test suite if quorum support is not available'''
1064    if not supports_quorum():
1065        notrun('quorum support missing')
1066
1067def has_working_luks() -> Tuple[bool, str]:
1068    """
1069    Check whether our LUKS driver can actually create images
1070    (this extends to LUKS encryption for qcow2).
1071
1072    If not, return the reason why.
1073    """
1074
1075    img_file = f'{test_dir}/luks-test.luks'
1076    (output, status) = \
1077        qemu_img_pipe_and_status('create', '-f', 'luks',
1078                                 '--object', luks_default_secret_object,
1079                                 '-o', luks_default_key_secret_opt,
1080                                 '-o', 'iter-time=10',
1081                                 img_file, '1G')
1082    try:
1083        os.remove(img_file)
1084    except OSError:
1085        pass
1086
1087    if status != 0:
1088        reason = output
1089        for line in output.splitlines():
1090            if img_file + ':' in line:
1091                reason = line.split(img_file + ':', 1)[1].strip()
1092                break
1093
1094        return (False, reason)
1095    else:
1096        return (True, '')
1097
1098def verify_working_luks():
1099    """
1100    Skip test suite if LUKS does not work
1101    """
1102    (working, reason) = has_working_luks()
1103    if not working:
1104        notrun(reason)
1105
1106def qemu_pipe(*args):
1107    """
1108    Run qemu with an option to print something and exit (e.g. a help option).
1109
1110    :return: QEMU's stdout output.
1111    """
1112    args = [qemu_prog] + qemu_opts + list(args)
1113    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
1114                            stderr=subprocess.STDOUT,
1115                            universal_newlines=True)
1116    output = subp.communicate()[0]
1117    if subp.returncode < 0:
1118        sys.stderr.write('qemu received signal %i: %s\n' %
1119                         (-subp.returncode, ' '.join(args)))
1120    return output
1121
1122def supported_formats(read_only=False):
1123    '''Set 'read_only' to True to check ro-whitelist
1124       Otherwise, rw-whitelist is checked'''
1125
1126    if not hasattr(supported_formats, "formats"):
1127        supported_formats.formats = {}
1128
1129    if read_only not in supported_formats.formats:
1130        format_message = qemu_pipe("-drive", "format=help")
1131        line = 1 if read_only else 0
1132        supported_formats.formats[read_only] = \
1133            format_message.splitlines()[line].split(":")[1].split()
1134
1135    return supported_formats.formats[read_only]
1136
1137def skip_if_unsupported(required_formats=(), read_only=False):
1138    '''Skip Test Decorator
1139       Runs the test if all the required formats are whitelisted'''
1140    def skip_test_decorator(func):
1141        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1142                         **kwargs: Dict[str, Any]) -> None:
1143            if callable(required_formats):
1144                fmts = required_formats(test_case)
1145            else:
1146                fmts = required_formats
1147
1148            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1149            if usf_list:
1150                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1151                test_case.case_skip(msg)
1152            else:
1153                func(test_case, *args, **kwargs)
1154        return func_wrapper
1155    return skip_test_decorator
1156
1157def skip_for_formats(formats: Sequence[str] = ()) \
1158    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1159                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1160    '''Skip Test Decorator
1161       Skips the test for the given formats'''
1162    def skip_test_decorator(func):
1163        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1164                         **kwargs: Dict[str, Any]) -> None:
1165            if imgfmt in formats:
1166                msg = f'{test_case}: Skipped for format {imgfmt}'
1167                test_case.case_skip(msg)
1168            else:
1169                func(test_case, *args, **kwargs)
1170        return func_wrapper
1171    return skip_test_decorator
1172
1173def skip_if_user_is_root(func):
1174    '''Skip Test Decorator
1175       Runs the test only without root permissions'''
1176    def func_wrapper(*args, **kwargs):
1177        if os.getuid() == 0:
1178            case_notrun('{}: cannot be run as root'.format(args[0]))
1179            return None
1180        else:
1181            return func(*args, **kwargs)
1182    return func_wrapper
1183
1184def execute_unittest(debug=False):
1185    """Executes unittests within the calling module."""
1186
1187    verbosity = 2 if debug else 1
1188
1189    if debug:
1190        output = sys.stdout
1191    else:
1192        # We need to filter out the time taken from the output so that
1193        # qemu-iotest can reliably diff the results against master output.
1194        output = io.StringIO()
1195
1196    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1197                                     verbosity=verbosity)
1198    try:
1199        # unittest.main() will use sys.exit(); so expect a SystemExit
1200        # exception
1201        unittest.main(testRunner=runner)
1202    finally:
1203        # We need to filter out the time taken from the output so that
1204        # qemu-iotest can reliably diff the results against master output.
1205        if not debug:
1206            out = output.getvalue()
1207            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1208
1209            # Hide skipped tests from the reference output
1210            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1211            out_first_line, out_rest = out.split('\n', 1)
1212            out = out_first_line.replace('s', '.') + '\n' + out_rest
1213
1214            sys.stderr.write(out)
1215
1216def execute_setup_common(supported_fmts: Sequence[str] = (),
1217                         supported_platforms: Sequence[str] = (),
1218                         supported_cache_modes: Sequence[str] = (),
1219                         supported_aio_modes: Sequence[str] = (),
1220                         unsupported_fmts: Sequence[str] = (),
1221                         supported_protocols: Sequence[str] = (),
1222                         unsupported_protocols: Sequence[str] = ()) -> bool:
1223    """
1224    Perform necessary setup for either script-style or unittest-style tests.
1225
1226    :return: Bool; Whether or not debug mode has been requested via the CLI.
1227    """
1228    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1229
1230    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1231    # indicate that we're not being run via "check". There may be
1232    # other things set up by "check" that individual test cases rely
1233    # on.
1234    if test_dir is None or qemu_default_machine is None:
1235        sys.stderr.write('Please run this test via the "check" script\n')
1236        sys.exit(os.EX_USAGE)
1237
1238    debug = '-d' in sys.argv
1239    if debug:
1240        sys.argv.remove('-d')
1241    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1242
1243    _verify_image_format(supported_fmts, unsupported_fmts)
1244    _verify_protocol(supported_protocols, unsupported_protocols)
1245    _verify_platform(supported=supported_platforms)
1246    _verify_cache_mode(supported_cache_modes)
1247    _verify_aio_mode(supported_aio_modes)
1248
1249    return debug
1250
1251def execute_test(*args, test_function=None, **kwargs):
1252    """Run either unittest or script-style tests."""
1253
1254    debug = execute_setup_common(*args, **kwargs)
1255    if not test_function:
1256        execute_unittest(debug)
1257    else:
1258        test_function()
1259
1260def activate_logging():
1261    """Activate iotests.log() output to stdout for script-style tests."""
1262    handler = logging.StreamHandler(stream=sys.stdout)
1263    formatter = logging.Formatter('%(message)s')
1264    handler.setFormatter(formatter)
1265    test_logger.addHandler(handler)
1266    test_logger.setLevel(logging.INFO)
1267    test_logger.propagate = False
1268
1269# This is called from script-style iotests without a single point of entry
1270def script_initialize(*args, **kwargs):
1271    """Initialize script-style tests without running any tests."""
1272    activate_logging()
1273    execute_setup_common(*args, **kwargs)
1274
1275# This is called from script-style iotests with a single point of entry
1276def script_main(test_function, *args, **kwargs):
1277    """Run script-style tests outside of the unittest framework"""
1278    activate_logging()
1279    execute_test(*args, test_function=test_function, **kwargs)
1280
1281# This is called from unittest style iotests
1282def main(*args, **kwargs):
1283    """Run tests using the unittest framework"""
1284    execute_test(*args, **kwargs)
1285