xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 4c4465ff)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78test_dir = os.environ.get('TEST_DIR')
79sock_dir = os.environ.get('SOCK_DIR')
80output_dir = os.environ.get('OUTPUT_DIR', '.')
81cachemode = os.environ.get('CACHEMODE')
82aiomode = os.environ.get('AIOMODE')
83qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
84
85socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
86
87luks_default_secret_object = 'secret,id=keysec0,data=' + \
88                             os.environ.get('IMGKEYSECRET', '')
89luks_default_key_secret_opt = 'key-secret=keysec0'
90
91
92def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
93                              connect_stderr: bool = True) -> Tuple[str, int]:
94    """
95    Run a tool and return both its output and its exit code
96    """
97    stderr = subprocess.STDOUT if connect_stderr else None
98    subp = subprocess.Popen(args,
99                            stdout=subprocess.PIPE,
100                            stderr=stderr,
101                            universal_newlines=True)
102    output = subp.communicate()[0]
103    if subp.returncode < 0:
104        sys.stderr.write('%s received signal %i: %s\n'
105                         % (tool, -subp.returncode,
106                            ' '.join(qemu_img_args + list(args))))
107    return (output, subp.returncode)
108
109def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
110    """
111    Run qemu-img and return both its output and its exit code
112    """
113    full_args = qemu_img_args + list(args)
114    return qemu_tool_pipe_and_status('qemu-img', full_args)
115
116def qemu_img(*args: str) -> int:
117    '''Run qemu-img and return the exit code'''
118    return qemu_img_pipe_and_status(*args)[1]
119
120def ordered_qmp(qmsg, conv_keys=True):
121    # Dictionaries are not ordered prior to 3.6, therefore:
122    if isinstance(qmsg, list):
123        return [ordered_qmp(atom) for atom in qmsg]
124    if isinstance(qmsg, dict):
125        od = OrderedDict()
126        for k, v in sorted(qmsg.items()):
127            if conv_keys:
128                k = k.replace('_', '-')
129            od[k] = ordered_qmp(v, conv_keys=False)
130        return od
131    return qmsg
132
133def qemu_img_create(*args):
134    args = list(args)
135
136    # default luks support
137    if '-f' in args and args[args.index('-f') + 1] == 'luks':
138        if '-o' in args:
139            i = args.index('-o')
140            if 'key-secret' not in args[i + 1]:
141                args[i + 1].append(luks_default_key_secret_opt)
142                args.insert(i + 2, '--object')
143                args.insert(i + 3, luks_default_secret_object)
144        else:
145            args = ['-o', luks_default_key_secret_opt,
146                    '--object', luks_default_secret_object] + args
147
148    args.insert(0, 'create')
149
150    return qemu_img(*args)
151
152def qemu_img_measure(*args):
153    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
154
155def qemu_img_check(*args):
156    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
157
158def qemu_img_verbose(*args):
159    '''Run qemu-img without suppressing its output and return the exit code'''
160    exitcode = subprocess.call(qemu_img_args + list(args))
161    if exitcode < 0:
162        sys.stderr.write('qemu-img received signal %i: %s\n'
163                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
164    return exitcode
165
166def qemu_img_pipe(*args: str) -> str:
167    '''Run qemu-img and return its output'''
168    return qemu_img_pipe_and_status(*args)[0]
169
170def qemu_img_log(*args):
171    result = qemu_img_pipe(*args)
172    log(result, filters=[filter_testfiles])
173    return result
174
175def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
176    args = ['info']
177    if imgopts:
178        args.append('--image-opts')
179    else:
180        args += ['-f', imgfmt]
181    args += extra_args
182    args.append(filename)
183
184    output = qemu_img_pipe(*args)
185    if not filter_path:
186        filter_path = filename
187    log(filter_img_info(output, filter_path))
188
189def qemu_io(*args):
190    '''Run qemu-io and return the stdout data'''
191    args = qemu_io_args + list(args)
192    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
193                            stderr=subprocess.STDOUT,
194                            universal_newlines=True)
195    output = subp.communicate()[0]
196    if subp.returncode < 0:
197        sys.stderr.write('qemu-io received signal %i: %s\n'
198                         % (-subp.returncode, ' '.join(args)))
199    return output
200
201def qemu_io_log(*args):
202    result = qemu_io(*args)
203    log(result, filters=[filter_testfiles, filter_qemu_io])
204    return result
205
206def qemu_io_silent(*args):
207    '''Run qemu-io and return the exit code, suppressing stdout'''
208    if '-f' in args or '--image-opts' in args:
209        default_args = qemu_io_args_no_fmt
210    else:
211        default_args = qemu_io_args
212
213    args = default_args + list(args)
214    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
215    if exitcode < 0:
216        sys.stderr.write('qemu-io received signal %i: %s\n' %
217                         (-exitcode, ' '.join(args)))
218    return exitcode
219
220def qemu_io_silent_check(*args):
221    '''Run qemu-io and return the true if subprocess returned 0'''
222    args = qemu_io_args + list(args)
223    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
224                               stderr=subprocess.STDOUT)
225    return exitcode == 0
226
227def get_virtio_scsi_device():
228    if qemu_default_machine == 's390-ccw-virtio':
229        return 'virtio-scsi-ccw'
230    return 'virtio-scsi-pci'
231
232class QemuIoInteractive:
233    def __init__(self, *args):
234        self.args = qemu_io_args_no_fmt + list(args)
235        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
236                                   stdout=subprocess.PIPE,
237                                   stderr=subprocess.STDOUT,
238                                   universal_newlines=True)
239        out = self._p.stdout.read(9)
240        if out != 'qemu-io> ':
241            # Most probably qemu-io just failed to start.
242            # Let's collect the whole output and exit.
243            out += self._p.stdout.read()
244            self._p.wait(timeout=1)
245            raise ValueError(out)
246
247    def close(self):
248        self._p.communicate('q\n')
249
250    def _read_output(self):
251        pattern = 'qemu-io> '
252        n = len(pattern)
253        pos = 0
254        s = []
255        while pos != n:
256            c = self._p.stdout.read(1)
257            # check unexpected EOF
258            assert c != ''
259            s.append(c)
260            if c == pattern[pos]:
261                pos += 1
262            else:
263                pos = 0
264
265        return ''.join(s[:-n])
266
267    def cmd(self, cmd):
268        # quit command is in close(), '\n' is added automatically
269        assert '\n' not in cmd
270        cmd = cmd.strip()
271        assert cmd not in ('q', 'quit')
272        self._p.stdin.write(cmd + '\n')
273        self._p.stdin.flush()
274        return self._read_output()
275
276
277def qemu_nbd(*args):
278    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
279    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
280
281def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
282    '''Run qemu-nbd in daemon mode and return both the parent's exit code
283       and its output in case of an error'''
284    full_args = qemu_nbd_args + ['--fork'] + list(args)
285    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
286                                                   connect_stderr=False)
287    return returncode, output if returncode else ''
288
289def qemu_nbd_list_log(*args: str) -> str:
290    '''Run qemu-nbd to list remote exports'''
291    full_args = [qemu_nbd_prog, '-L'] + list(args)
292    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
293    log(output, filters=[filter_testfiles, filter_nbd_exports])
294    return output
295
296@contextmanager
297def qemu_nbd_popen(*args):
298    '''Context manager running qemu-nbd within the context'''
299    pid_file = file_path("pid")
300
301    cmd = list(qemu_nbd_args)
302    cmd.extend(('--persistent', '--pid-file', pid_file))
303    cmd.extend(args)
304
305    log('Start NBD server')
306    p = subprocess.Popen(cmd)
307    try:
308        while not os.path.exists(pid_file):
309            if p.poll() is not None:
310                raise RuntimeError(
311                    "qemu-nbd terminated with exit code {}: {}"
312                    .format(p.returncode, ' '.join(cmd)))
313
314            time.sleep(0.01)
315        yield
316    finally:
317        log('Kill NBD server')
318        p.kill()
319        p.wait()
320
321def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
322    '''Return True if two image files are identical'''
323    return qemu_img('compare', '-f', fmt1,
324                    '-F', fmt2, img1, img2) == 0
325
326def create_image(name, size):
327    '''Create a fully-allocated raw image with sector markers'''
328    file = open(name, 'wb')
329    i = 0
330    while i < size:
331        sector = struct.pack('>l504xl', i // 512, i // 512)
332        file.write(sector)
333        i = i + 512
334    file.close()
335
336def image_size(img):
337    '''Return image's virtual size'''
338    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
339    return json.loads(r)['virtual-size']
340
341def is_str(val):
342    return isinstance(val, str)
343
344test_dir_re = re.compile(r"%s" % test_dir)
345def filter_test_dir(msg):
346    return test_dir_re.sub("TEST_DIR", msg)
347
348win32_re = re.compile(r"\r")
349def filter_win32(msg):
350    return win32_re.sub("", msg)
351
352qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
353                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
354                        r"and [0-9\/.inf]* ops\/sec\)")
355def filter_qemu_io(msg):
356    msg = filter_win32(msg)
357    return qemu_io_re.sub("X ops; XX:XX:XX.X "
358                          "(XXX YYY/sec and XXX ops/sec)", msg)
359
360chown_re = re.compile(r"chown [0-9]+:[0-9]+")
361def filter_chown(msg):
362    return chown_re.sub("chown UID:GID", msg)
363
364def filter_qmp_event(event):
365    '''Filter a QMP event dict'''
366    event = dict(event)
367    if 'timestamp' in event:
368        event['timestamp']['seconds'] = 'SECS'
369        event['timestamp']['microseconds'] = 'USECS'
370    return event
371
372def filter_qmp(qmsg, filter_fn):
373    '''Given a string filter, filter a QMP object's values.
374    filter_fn takes a (key, value) pair.'''
375    # Iterate through either lists or dicts;
376    if isinstance(qmsg, list):
377        items = enumerate(qmsg)
378    else:
379        items = qmsg.items()
380
381    for k, v in items:
382        if isinstance(v, (dict, list)):
383            qmsg[k] = filter_qmp(v, filter_fn)
384        else:
385            qmsg[k] = filter_fn(k, v)
386    return qmsg
387
388def filter_testfiles(msg):
389    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
390    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
391    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
392
393def filter_qmp_testfiles(qmsg):
394    def _filter(_key, value):
395        if is_str(value):
396            return filter_testfiles(value)
397        return value
398    return filter_qmp(qmsg, _filter)
399
400def filter_virtio_scsi(output: str) -> str:
401    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
402
403def filter_qmp_virtio_scsi(qmsg):
404    def _filter(_key, value):
405        if is_str(value):
406            return filter_virtio_scsi(value)
407        return value
408    return filter_qmp(qmsg, _filter)
409
410def filter_generated_node_ids(msg):
411    return re.sub("#block[0-9]+", "NODE_NAME", msg)
412
413def filter_img_info(output, filename):
414    lines = []
415    for line in output.split('\n'):
416        if 'disk size' in line or 'actual-size' in line:
417            continue
418        line = line.replace(filename, 'TEST_IMG')
419        line = filter_testfiles(line)
420        line = line.replace(imgfmt, 'IMGFMT')
421        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
422        line = re.sub('uuid: [-a-f0-9]+',
423                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
424                      line)
425        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
426        lines.append(line)
427    return '\n'.join(lines)
428
429def filter_imgfmt(msg):
430    return msg.replace(imgfmt, 'IMGFMT')
431
432def filter_qmp_imgfmt(qmsg):
433    def _filter(_key, value):
434        if is_str(value):
435            return filter_imgfmt(value)
436        return value
437    return filter_qmp(qmsg, _filter)
438
439def filter_nbd_exports(output: str) -> str:
440    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
441
442
443Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
444
445def log(msg: Msg,
446        filters: Iterable[Callable[[Msg], Msg]] = (),
447        indent: Optional[int] = None) -> None:
448    """
449    Logs either a string message or a JSON serializable message (like QMP).
450    If indent is provided, JSON serializable messages are pretty-printed.
451    """
452    for flt in filters:
453        msg = flt(msg)
454    if isinstance(msg, (dict, list)):
455        # Don't sort if it's already sorted
456        do_sort = not isinstance(msg, OrderedDict)
457        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
458    else:
459        test_logger.info(msg)
460
461class Timeout:
462    def __init__(self, seconds, errmsg="Timeout"):
463        self.seconds = seconds
464        self.errmsg = errmsg
465    def __enter__(self):
466        signal.signal(signal.SIGALRM, self.timeout)
467        signal.setitimer(signal.ITIMER_REAL, self.seconds)
468        return self
469    def __exit__(self, exc_type, value, traceback):
470        signal.setitimer(signal.ITIMER_REAL, 0)
471        return False
472    def timeout(self, signum, frame):
473        raise Exception(self.errmsg)
474
475def file_pattern(name):
476    return "{0}-{1}".format(os.getpid(), name)
477
478class FilePath:
479    """
480    Context manager generating multiple file names. The generated files are
481    removed when exiting the context.
482
483    Example usage:
484
485        with FilePath('a.img', 'b.img') as (img_a, img_b):
486            # Use img_a and img_b here...
487
488        # a.img and b.img are automatically removed here.
489
490    By default images are created in iotests.test_dir. To create sockets use
491    iotests.sock_dir:
492
493       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
494
495    For convenience, calling with one argument yields a single file instead of
496    a tuple with one item.
497
498    """
499    def __init__(self, *names, base_dir=test_dir):
500        self.paths = [os.path.join(base_dir, file_pattern(name))
501                      for name in names]
502
503    def __enter__(self):
504        if len(self.paths) == 1:
505            return self.paths[0]
506        else:
507            return self.paths
508
509    def __exit__(self, exc_type, exc_val, exc_tb):
510        for path in self.paths:
511            try:
512                os.remove(path)
513            except OSError:
514                pass
515        return False
516
517
518def file_path_remover():
519    for path in reversed(file_path_remover.paths):
520        try:
521            os.remove(path)
522        except OSError:
523            pass
524
525
526def file_path(*names, base_dir=test_dir):
527    ''' Another way to get auto-generated filename that cleans itself up.
528
529    Use is as simple as:
530
531    img_a, img_b = file_path('a.img', 'b.img')
532    sock = file_path('socket')
533    '''
534
535    if not hasattr(file_path_remover, 'paths'):
536        file_path_remover.paths = []
537        atexit.register(file_path_remover)
538
539    paths = []
540    for name in names:
541        filename = file_pattern(name)
542        path = os.path.join(base_dir, filename)
543        file_path_remover.paths.append(path)
544        paths.append(path)
545
546    return paths[0] if len(paths) == 1 else paths
547
548def remote_filename(path):
549    if imgproto == 'file':
550        return path
551    elif imgproto == 'ssh':
552        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
553    else:
554        raise Exception("Protocol %s not supported" % (imgproto))
555
556class VM(qtest.QEMUQtestMachine):
557    '''A QEMU VM'''
558
559    def __init__(self, path_suffix=''):
560        name = "qemu%s-%d" % (path_suffix, os.getpid())
561        super().__init__(qemu_prog, qemu_opts, name=name,
562                         test_dir=test_dir,
563                         socket_scm_helper=socket_scm_helper,
564                         sock_dir=sock_dir)
565        self._num_drives = 0
566
567    def add_object(self, opts):
568        self._args.append('-object')
569        self._args.append(opts)
570        return self
571
572    def add_device(self, opts):
573        self._args.append('-device')
574        self._args.append(opts)
575        return self
576
577    def add_drive_raw(self, opts):
578        self._args.append('-drive')
579        self._args.append(opts)
580        return self
581
582    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
583        '''Add a virtio-blk drive to the VM'''
584        options = ['if=%s' % interface,
585                   'id=drive%d' % self._num_drives]
586
587        if path is not None:
588            options.append('file=%s' % path)
589            options.append('format=%s' % img_format)
590            options.append('cache=%s' % cachemode)
591            options.append('aio=%s' % aiomode)
592
593        if opts:
594            options.append(opts)
595
596        if img_format == 'luks' and 'key-secret' not in opts:
597            # default luks support
598            if luks_default_secret_object not in self._args:
599                self.add_object(luks_default_secret_object)
600
601            options.append(luks_default_key_secret_opt)
602
603        self._args.append('-drive')
604        self._args.append(','.join(options))
605        self._num_drives += 1
606        return self
607
608    def add_blockdev(self, opts):
609        self._args.append('-blockdev')
610        if isinstance(opts, str):
611            self._args.append(opts)
612        else:
613            self._args.append(','.join(opts))
614        return self
615
616    def add_incoming(self, addr):
617        self._args.append('-incoming')
618        self._args.append(addr)
619        return self
620
621    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
622        cmd = 'human-monitor-command'
623        kwargs: Dict[str, Any] = {'command-line': command_line}
624        if use_log:
625            return self.qmp_log(cmd, **kwargs)
626        else:
627            return self.qmp(cmd, **kwargs)
628
629    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
630        """Pause drive r/w operations"""
631        if not event:
632            self.pause_drive(drive, "read_aio")
633            self.pause_drive(drive, "write_aio")
634            return
635        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
636
637    def resume_drive(self, drive: str) -> None:
638        """Resume drive r/w operations"""
639        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
640
641    def hmp_qemu_io(self, drive: str, cmd: str,
642                    use_log: bool = False) -> QMPMessage:
643        """Write to a given drive using an HMP command"""
644        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
645
646    def flatten_qmp_object(self, obj, output=None, basestr=''):
647        if output is None:
648            output = dict()
649        if isinstance(obj, list):
650            for i, item in enumerate(obj):
651                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
652        elif isinstance(obj, dict):
653            for key in obj:
654                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
655        else:
656            output[basestr[:-1]] = obj # Strip trailing '.'
657        return output
658
659    def qmp_to_opts(self, obj):
660        obj = self.flatten_qmp_object(obj)
661        output_list = list()
662        for key in obj:
663            output_list += [key + '=' + obj[key]]
664        return ','.join(output_list)
665
666    def get_qmp_events_filtered(self, wait=60.0):
667        result = []
668        for ev in self.get_qmp_events(wait=wait):
669            result.append(filter_qmp_event(ev))
670        return result
671
672    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
673        full_cmd = OrderedDict((
674            ("execute", cmd),
675            ("arguments", ordered_qmp(kwargs))
676        ))
677        log(full_cmd, filters, indent=indent)
678        result = self.qmp(cmd, **kwargs)
679        log(result, filters, indent=indent)
680        return result
681
682    # Returns None on success, and an error string on failure
683    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
684                pre_finalize=None, cancel=False, wait=60.0):
685        """
686        run_job moves a job from creation through to dismissal.
687
688        :param job: String. ID of recently-launched job
689        :param auto_finalize: Bool. True if the job was launched with
690                              auto_finalize. Defaults to True.
691        :param auto_dismiss: Bool. True if the job was launched with
692                             auto_dismiss=True. Defaults to False.
693        :param pre_finalize: Callback. A callable that takes no arguments to be
694                             invoked prior to issuing job-finalize, if any.
695        :param cancel: Bool. When true, cancels the job after the pre_finalize
696                       callback.
697        :param wait: Float. Timeout value specifying how long to wait for any
698                     event, in seconds. Defaults to 60.0.
699        """
700        match_device = {'data': {'device': job}}
701        match_id = {'data': {'id': job}}
702        events = [
703            ('BLOCK_JOB_COMPLETED', match_device),
704            ('BLOCK_JOB_CANCELLED', match_device),
705            ('BLOCK_JOB_ERROR', match_device),
706            ('BLOCK_JOB_READY', match_device),
707            ('BLOCK_JOB_PENDING', match_id),
708            ('JOB_STATUS_CHANGE', match_id)
709        ]
710        error = None
711        while True:
712            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
713            if ev['event'] != 'JOB_STATUS_CHANGE':
714                log(ev)
715                continue
716            status = ev['data']['status']
717            if status == 'aborting':
718                result = self.qmp('query-jobs')
719                for j in result['return']:
720                    if j['id'] == job:
721                        error = j['error']
722                        log('Job failed: %s' % (j['error']))
723            elif status == 'ready':
724                self.qmp_log('job-complete', id=job)
725            elif status == 'pending' and not auto_finalize:
726                if pre_finalize:
727                    pre_finalize()
728                if cancel:
729                    self.qmp_log('job-cancel', id=job)
730                else:
731                    self.qmp_log('job-finalize', id=job)
732            elif status == 'concluded' and not auto_dismiss:
733                self.qmp_log('job-dismiss', id=job)
734            elif status == 'null':
735                return error
736
737    # Returns None on success, and an error string on failure
738    def blockdev_create(self, options, job_id='job0', filters=None):
739        if filters is None:
740            filters = [filter_qmp_testfiles]
741        result = self.qmp_log('blockdev-create', filters=filters,
742                              job_id=job_id, options=options)
743
744        if 'return' in result:
745            assert result['return'] == {}
746            job_result = self.run_job(job_id)
747        else:
748            job_result = result['error']
749
750        log("")
751        return job_result
752
753    def enable_migration_events(self, name):
754        log('Enabling migration QMP events on %s...' % name)
755        log(self.qmp('migrate-set-capabilities', capabilities=[
756            {
757                'capability': 'events',
758                'state': True
759            }
760        ]))
761
762    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
763        while True:
764            event = self.event_wait('MIGRATION')
765            # We use the default timeout, and with a timeout, event_wait()
766            # never returns None
767            assert event
768
769            log(event, filters=[filter_qmp_event])
770            if event['data']['status'] in ('completed', 'failed'):
771                break
772
773        if event['data']['status'] == 'completed':
774            # The event may occur in finish-migrate, so wait for the expected
775            # post-migration runstate
776            runstate = None
777            while runstate != expect_runstate:
778                runstate = self.qmp('query-status')['return']['status']
779            return True
780        else:
781            return False
782
783    def node_info(self, node_name):
784        nodes = self.qmp('query-named-block-nodes')
785        for x in nodes['return']:
786            if x['node-name'] == node_name:
787                return x
788        return None
789
790    def query_bitmaps(self):
791        res = self.qmp("query-named-block-nodes")
792        return {device['node-name']: device['dirty-bitmaps']
793                for device in res['return'] if 'dirty-bitmaps' in device}
794
795    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
796        """
797        get a specific bitmap from the object returned by query_bitmaps.
798        :param recording: If specified, filter results by the specified value.
799        :param bitmaps: If specified, use it instead of call query_bitmaps()
800        """
801        if bitmaps is None:
802            bitmaps = self.query_bitmaps()
803
804        for bitmap in bitmaps[node_name]:
805            if bitmap.get('name', '') == bitmap_name:
806                if recording is None or bitmap.get('recording') == recording:
807                    return bitmap
808        return None
809
810    def check_bitmap_status(self, node_name, bitmap_name, fields):
811        ret = self.get_bitmap(node_name, bitmap_name)
812
813        return fields.items() <= ret.items()
814
815    def assert_block_path(self, root, path, expected_node, graph=None):
816        """
817        Check whether the node under the given path in the block graph
818        is @expected_node.
819
820        @root is the node name of the node where the @path is rooted.
821
822        @path is a string that consists of child names separated by
823        slashes.  It must begin with a slash.
824
825        Examples for @root + @path:
826          - root="qcow2-node", path="/backing/file"
827          - root="quorum-node", path="/children.2/file"
828
829        Hypothetically, @path could be empty, in which case it would
830        point to @root.  However, in practice this case is not useful
831        and hence not allowed.
832
833        @expected_node may be None.  (All elements of the path but the
834        leaf must still exist.)
835
836        @graph may be None or the result of an x-debug-query-block-graph
837        call that has already been performed.
838        """
839        if graph is None:
840            graph = self.qmp('x-debug-query-block-graph')['return']
841
842        iter_path = iter(path.split('/'))
843
844        # Must start with a /
845        assert next(iter_path) == ''
846
847        node = next((node for node in graph['nodes'] if node['name'] == root),
848                    None)
849
850        # An empty @path is not allowed, so the root node must be present
851        assert node is not None, 'Root node %s not found' % root
852
853        for child_name in iter_path:
854            assert node is not None, 'Cannot follow path %s%s' % (root, path)
855
856            try:
857                node_id = next(edge['child'] for edge in graph['edges']
858                               if (edge['parent'] == node['id'] and
859                                   edge['name'] == child_name))
860
861                node = next(node for node in graph['nodes']
862                            if node['id'] == node_id)
863
864            except StopIteration:
865                node = None
866
867        if node is None:
868            assert expected_node is None, \
869                   'No node found under %s (but expected %s)' % \
870                   (path, expected_node)
871        else:
872            assert node['name'] == expected_node, \
873                   'Found node %s under %s (but expected %s)' % \
874                   (node['name'], path, expected_node)
875
876index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
877
878class QMPTestCase(unittest.TestCase):
879    '''Abstract base class for QMP test cases'''
880
881    def __init__(self, *args, **kwargs):
882        super().__init__(*args, **kwargs)
883        # Many users of this class set a VM property we rely on heavily
884        # in the methods below.
885        self.vm = None
886
887    def dictpath(self, d, path):
888        '''Traverse a path in a nested dict'''
889        for component in path.split('/'):
890            m = index_re.match(component)
891            if m:
892                component, idx = m.groups()
893                idx = int(idx)
894
895            if not isinstance(d, dict) or component not in d:
896                self.fail(f'failed path traversal for "{path}" in "{d}"')
897            d = d[component]
898
899            if m:
900                if not isinstance(d, list):
901                    self.fail(f'path component "{component}" in "{path}" '
902                              f'is not a list in "{d}"')
903                try:
904                    d = d[idx]
905                except IndexError:
906                    self.fail(f'invalid index "{idx}" in path "{path}" '
907                              f'in "{d}"')
908        return d
909
910    def assert_qmp_absent(self, d, path):
911        try:
912            result = self.dictpath(d, path)
913        except AssertionError:
914            return
915        self.fail('path "%s" has value "%s"' % (path, str(result)))
916
917    def assert_qmp(self, d, path, value):
918        '''Assert that the value for a specific path in a QMP dict
919           matches.  When given a list of values, assert that any of
920           them matches.'''
921
922        result = self.dictpath(d, path)
923
924        # [] makes no sense as a list of valid values, so treat it as
925        # an actual single value.
926        if isinstance(value, list) and value != []:
927            for v in value:
928                if result == v:
929                    return
930            self.fail('no match for "%s" in %s' % (str(result), str(value)))
931        else:
932            self.assertEqual(result, value,
933                             '"%s" is "%s", expected "%s"'
934                             % (path, str(result), str(value)))
935
936    def assert_no_active_block_jobs(self):
937        result = self.vm.qmp('query-block-jobs')
938        self.assert_qmp(result, 'return', [])
939
940    def assert_has_block_node(self, node_name=None, file_name=None):
941        """Issue a query-named-block-nodes and assert node_name and/or
942        file_name is present in the result"""
943        def check_equal_or_none(a, b):
944            return a is None or b is None or a == b
945        assert node_name or file_name
946        result = self.vm.qmp('query-named-block-nodes')
947        for x in result["return"]:
948            if check_equal_or_none(x.get("node-name"), node_name) and \
949                    check_equal_or_none(x.get("file"), file_name):
950                return
951        self.fail("Cannot find %s %s in result:\n%s" %
952                  (node_name, file_name, result))
953
954    def assert_json_filename_equal(self, json_filename, reference):
955        '''Asserts that the given filename is a json: filename and that its
956           content is equal to the given reference object'''
957        self.assertEqual(json_filename[:5], 'json:')
958        self.assertEqual(
959            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
960            self.vm.flatten_qmp_object(reference)
961        )
962
963    def cancel_and_wait(self, drive='drive0', force=False,
964                        resume=False, wait=60.0):
965        '''Cancel a block job and wait for it to finish, returning the event'''
966        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
967        self.assert_qmp(result, 'return', {})
968
969        if resume:
970            self.vm.resume_drive(drive)
971
972        cancelled = False
973        result = None
974        while not cancelled:
975            for event in self.vm.get_qmp_events(wait=wait):
976                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
977                   event['event'] == 'BLOCK_JOB_CANCELLED':
978                    self.assert_qmp(event, 'data/device', drive)
979                    result = event
980                    cancelled = True
981                elif event['event'] == 'JOB_STATUS_CHANGE':
982                    self.assert_qmp(event, 'data/id', drive)
983
984
985        self.assert_no_active_block_jobs()
986        return result
987
988    def wait_until_completed(self, drive='drive0', check_offset=True,
989                             wait=60.0, error=None):
990        '''Wait for a block job to finish, returning the event'''
991        while True:
992            for event in self.vm.get_qmp_events(wait=wait):
993                if event['event'] == 'BLOCK_JOB_COMPLETED':
994                    self.assert_qmp(event, 'data/device', drive)
995                    if error is None:
996                        self.assert_qmp_absent(event, 'data/error')
997                        if check_offset:
998                            self.assert_qmp(event, 'data/offset',
999                                            event['data']['len'])
1000                    else:
1001                        self.assert_qmp(event, 'data/error', error)
1002                    self.assert_no_active_block_jobs()
1003                    return event
1004                if event['event'] == 'JOB_STATUS_CHANGE':
1005                    self.assert_qmp(event, 'data/id', drive)
1006
1007    def wait_ready(self, drive='drive0'):
1008        """Wait until a BLOCK_JOB_READY event, and return the event."""
1009        return self.vm.events_wait([
1010            ('BLOCK_JOB_READY',
1011             {'data': {'type': 'mirror', 'device': drive}}),
1012            ('BLOCK_JOB_READY',
1013             {'data': {'type': 'commit', 'device': drive}})
1014        ])
1015
1016    def wait_ready_and_cancel(self, drive='drive0'):
1017        self.wait_ready(drive=drive)
1018        event = self.cancel_and_wait(drive=drive)
1019        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1020        self.assert_qmp(event, 'data/type', 'mirror')
1021        self.assert_qmp(event, 'data/offset', event['data']['len'])
1022
1023    def complete_and_wait(self, drive='drive0', wait_ready=True,
1024                          completion_error=None):
1025        '''Complete a block job and wait for it to finish'''
1026        if wait_ready:
1027            self.wait_ready(drive=drive)
1028
1029        result = self.vm.qmp('block-job-complete', device=drive)
1030        self.assert_qmp(result, 'return', {})
1031
1032        event = self.wait_until_completed(drive=drive, error=completion_error)
1033        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1034
1035    def pause_wait(self, job_id='job0'):
1036        with Timeout(3, "Timeout waiting for job to pause"):
1037            while True:
1038                result = self.vm.qmp('query-block-jobs')
1039                found = False
1040                for job in result['return']:
1041                    if job['device'] == job_id:
1042                        found = True
1043                        if job['paused'] and not job['busy']:
1044                            return job
1045                        break
1046                assert found
1047
1048    def pause_job(self, job_id='job0', wait=True):
1049        result = self.vm.qmp('block-job-pause', device=job_id)
1050        self.assert_qmp(result, 'return', {})
1051        if wait:
1052            return self.pause_wait(job_id)
1053        return result
1054
1055    def case_skip(self, reason):
1056        '''Skip this test case'''
1057        case_notrun(reason)
1058        self.skipTest(reason)
1059
1060
1061def notrun(reason):
1062    '''Skip this test suite'''
1063    # Each test in qemu-iotests has a number ("seq")
1064    seq = os.path.basename(sys.argv[0])
1065
1066    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1067    logger.warning("%s not run: %s", seq, reason)
1068    sys.exit(0)
1069
1070def case_notrun(reason):
1071    '''Mark this test case as not having been run (without actually
1072    skipping it, that is left to the caller).  See
1073    QMPTestCase.case_skip() for a variant that actually skips the
1074    current test case.'''
1075
1076    # Each test in qemu-iotests has a number ("seq")
1077    seq = os.path.basename(sys.argv[0])
1078
1079    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1080        '    [case not run] ' + reason + '\n')
1081
1082def _verify_image_format(supported_fmts: Sequence[str] = (),
1083                         unsupported_fmts: Sequence[str] = ()) -> None:
1084    if 'generic' in supported_fmts and \
1085            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1086        # similar to
1087        #   _supported_fmt generic
1088        # for bash tests
1089        supported_fmts = ()
1090
1091    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1092    if not_sup or (imgfmt in unsupported_fmts):
1093        notrun('not suitable for this image format: %s' % imgfmt)
1094
1095    if imgfmt == 'luks':
1096        verify_working_luks()
1097
1098def _verify_protocol(supported: Sequence[str] = (),
1099                     unsupported: Sequence[str] = ()) -> None:
1100    assert not (supported and unsupported)
1101
1102    if 'generic' in supported:
1103        return
1104
1105    not_sup = supported and (imgproto not in supported)
1106    if not_sup or (imgproto in unsupported):
1107        notrun('not suitable for this protocol: %s' % imgproto)
1108
1109def _verify_platform(supported: Sequence[str] = (),
1110                     unsupported: Sequence[str] = ()) -> None:
1111    if any((sys.platform.startswith(x) for x in unsupported)):
1112        notrun('not suitable for this OS: %s' % sys.platform)
1113
1114    if supported:
1115        if not any((sys.platform.startswith(x) for x in supported)):
1116            notrun('not suitable for this OS: %s' % sys.platform)
1117
1118def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1119    if supported_cache_modes and (cachemode not in supported_cache_modes):
1120        notrun('not suitable for this cache mode: %s' % cachemode)
1121
1122def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1123    if supported_aio_modes and (aiomode not in supported_aio_modes):
1124        notrun('not suitable for this aio mode: %s' % aiomode)
1125
1126def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1127    usf_list = list(set(required_formats) - set(supported_formats()))
1128    if usf_list:
1129        notrun(f'formats {usf_list} are not whitelisted')
1130
1131def supports_quorum():
1132    return 'quorum' in qemu_img_pipe('--help')
1133
1134def verify_quorum():
1135    '''Skip test suite if quorum support is not available'''
1136    if not supports_quorum():
1137        notrun('quorum support missing')
1138
1139def has_working_luks() -> Tuple[bool, str]:
1140    """
1141    Check whether our LUKS driver can actually create images
1142    (this extends to LUKS encryption for qcow2).
1143
1144    If not, return the reason why.
1145    """
1146
1147    img_file = f'{test_dir}/luks-test.luks'
1148    (output, status) = \
1149        qemu_img_pipe_and_status('create', '-f', 'luks',
1150                                 '--object', luks_default_secret_object,
1151                                 '-o', luks_default_key_secret_opt,
1152                                 '-o', 'iter-time=10',
1153                                 img_file, '1G')
1154    try:
1155        os.remove(img_file)
1156    except OSError:
1157        pass
1158
1159    if status != 0:
1160        reason = output
1161        for line in output.splitlines():
1162            if img_file + ':' in line:
1163                reason = line.split(img_file + ':', 1)[1].strip()
1164                break
1165
1166        return (False, reason)
1167    else:
1168        return (True, '')
1169
1170def verify_working_luks():
1171    """
1172    Skip test suite if LUKS does not work
1173    """
1174    (working, reason) = has_working_luks()
1175    if not working:
1176        notrun(reason)
1177
1178def qemu_pipe(*args: str) -> str:
1179    """
1180    Run qemu with an option to print something and exit (e.g. a help option).
1181
1182    :return: QEMU's stdout output.
1183    """
1184    full_args = [qemu_prog] + qemu_opts + list(args)
1185    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1186    return output
1187
1188def supported_formats(read_only=False):
1189    '''Set 'read_only' to True to check ro-whitelist
1190       Otherwise, rw-whitelist is checked'''
1191
1192    if not hasattr(supported_formats, "formats"):
1193        supported_formats.formats = {}
1194
1195    if read_only not in supported_formats.formats:
1196        format_message = qemu_pipe("-drive", "format=help")
1197        line = 1 if read_only else 0
1198        supported_formats.formats[read_only] = \
1199            format_message.splitlines()[line].split(":")[1].split()
1200
1201    return supported_formats.formats[read_only]
1202
1203def skip_if_unsupported(required_formats=(), read_only=False):
1204    '''Skip Test Decorator
1205       Runs the test if all the required formats are whitelisted'''
1206    def skip_test_decorator(func):
1207        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1208                         **kwargs: Dict[str, Any]) -> None:
1209            if callable(required_formats):
1210                fmts = required_formats(test_case)
1211            else:
1212                fmts = required_formats
1213
1214            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1215            if usf_list:
1216                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1217                test_case.case_skip(msg)
1218            else:
1219                func(test_case, *args, **kwargs)
1220        return func_wrapper
1221    return skip_test_decorator
1222
1223def skip_for_formats(formats: Sequence[str] = ()) \
1224    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1225                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1226    '''Skip Test Decorator
1227       Skips the test for the given formats'''
1228    def skip_test_decorator(func):
1229        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1230                         **kwargs: Dict[str, Any]) -> None:
1231            if imgfmt in formats:
1232                msg = f'{test_case}: Skipped for format {imgfmt}'
1233                test_case.case_skip(msg)
1234            else:
1235                func(test_case, *args, **kwargs)
1236        return func_wrapper
1237    return skip_test_decorator
1238
1239def skip_if_user_is_root(func):
1240    '''Skip Test Decorator
1241       Runs the test only without root permissions'''
1242    def func_wrapper(*args, **kwargs):
1243        if os.getuid() == 0:
1244            case_notrun('{}: cannot be run as root'.format(args[0]))
1245            return None
1246        else:
1247            return func(*args, **kwargs)
1248    return func_wrapper
1249
1250def execute_unittest(debug=False):
1251    """Executes unittests within the calling module."""
1252
1253    verbosity = 2 if debug else 1
1254
1255    if debug:
1256        output = sys.stdout
1257    else:
1258        # We need to filter out the time taken from the output so that
1259        # qemu-iotest can reliably diff the results against master output.
1260        output = io.StringIO()
1261
1262    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1263                                     verbosity=verbosity)
1264    try:
1265        # unittest.main() will use sys.exit(); so expect a SystemExit
1266        # exception
1267        unittest.main(testRunner=runner)
1268    finally:
1269        # We need to filter out the time taken from the output so that
1270        # qemu-iotest can reliably diff the results against master output.
1271        if not debug:
1272            out = output.getvalue()
1273            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1274
1275            # Hide skipped tests from the reference output
1276            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1277            out_first_line, out_rest = out.split('\n', 1)
1278            out = out_first_line.replace('s', '.') + '\n' + out_rest
1279
1280            sys.stderr.write(out)
1281
1282def execute_setup_common(supported_fmts: Sequence[str] = (),
1283                         supported_platforms: Sequence[str] = (),
1284                         supported_cache_modes: Sequence[str] = (),
1285                         supported_aio_modes: Sequence[str] = (),
1286                         unsupported_fmts: Sequence[str] = (),
1287                         supported_protocols: Sequence[str] = (),
1288                         unsupported_protocols: Sequence[str] = (),
1289                         required_fmts: Sequence[str] = ()) -> bool:
1290    """
1291    Perform necessary setup for either script-style or unittest-style tests.
1292
1293    :return: Bool; Whether or not debug mode has been requested via the CLI.
1294    """
1295    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1296
1297    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
1298    # indicate that we're not being run via "check". There may be
1299    # other things set up by "check" that individual test cases rely
1300    # on.
1301    if test_dir is None or qemu_default_machine is None:
1302        sys.stderr.write('Please run this test via the "check" script\n')
1303        sys.exit(os.EX_USAGE)
1304
1305    debug = '-d' in sys.argv
1306    if debug:
1307        sys.argv.remove('-d')
1308    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1309
1310    _verify_image_format(supported_fmts, unsupported_fmts)
1311    _verify_protocol(supported_protocols, unsupported_protocols)
1312    _verify_platform(supported=supported_platforms)
1313    _verify_cache_mode(supported_cache_modes)
1314    _verify_aio_mode(supported_aio_modes)
1315    _verify_formats(required_fmts)
1316
1317    return debug
1318
1319def execute_test(*args, test_function=None, **kwargs):
1320    """Run either unittest or script-style tests."""
1321
1322    debug = execute_setup_common(*args, **kwargs)
1323    if not test_function:
1324        execute_unittest(debug)
1325    else:
1326        test_function()
1327
1328def activate_logging():
1329    """Activate iotests.log() output to stdout for script-style tests."""
1330    handler = logging.StreamHandler(stream=sys.stdout)
1331    formatter = logging.Formatter('%(message)s')
1332    handler.setFormatter(formatter)
1333    test_logger.addHandler(handler)
1334    test_logger.setLevel(logging.INFO)
1335    test_logger.propagate = False
1336
1337# This is called from script-style iotests without a single point of entry
1338def script_initialize(*args, **kwargs):
1339    """Initialize script-style tests without running any tests."""
1340    activate_logging()
1341    execute_setup_common(*args, **kwargs)
1342
1343# This is called from script-style iotests with a single point of entry
1344def script_main(test_function, *args, **kwargs):
1345    """Run script-style tests outside of the unittest framework"""
1346    activate_logging()
1347    execute_test(*args, test_function=test_function, **kwargs)
1348
1349# This is called from unittest style iotests
1350def main(*args, **kwargs):
1351    """Run tests using the unittest framework"""
1352    execute_test(*args, **kwargs)
1353