xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision 1da79ecc)
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import atexit
20from collections import OrderedDict
21import faulthandler
22import io
23import json
24import logging
25import os
26import re
27import signal
28import struct
29import subprocess
30import sys
31import time
32from typing import (Any, Callable, Dict, Iterable,
33                    List, Optional, Sequence, Tuple, TypeVar)
34import unittest
35
36from contextlib import contextmanager
37
38# pylint: disable=import-error, wrong-import-position
39sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
40from qemu import qtest
41from qemu.qmp import QMPMessage
42
43# Use this logger for logging messages directly from the iotests module
44logger = logging.getLogger('qemu.iotests')
45logger.addHandler(logging.NullHandler())
46
47# Use this logger for messages that ought to be used for diff output.
48test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51faulthandler.enable()
52
53# This will not work if arguments contain spaces but is necessary if we
54# want to support the override options that ./check supports.
55qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56if os.environ.get('QEMU_IMG_OPTIONS'):
57    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60if os.environ.get('QEMU_IO_OPTIONS'):
61    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65    qemu_io_args_no_fmt += \
66        os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69qemu_nbd_args = [qemu_nbd_prog]
70if os.environ.get('QEMU_NBD_OPTIONS'):
71    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76imgfmt = os.environ.get('IMGFMT', 'raw')
77imgproto = os.environ.get('IMGPROTO', 'file')
78output_dir = os.environ.get('OUTPUT_DIR', '.')
79
80try:
81    test_dir = os.environ['TEST_DIR']
82    sock_dir = os.environ['SOCK_DIR']
83    cachemode = os.environ['CACHEMODE']
84    aiomode = os.environ['AIOMODE']
85    qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
86except KeyError:
87    # We are using these variables as proxies to indicate that we're
88    # not being run via "check". There may be other things set up by
89    # "check" that individual test cases rely on.
90    sys.stderr.write('Please run this test via the "check" script\n')
91    sys.exit(os.EX_USAGE)
92
93socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
94
95luks_default_secret_object = 'secret,id=keysec0,data=' + \
96                             os.environ.get('IMGKEYSECRET', '')
97luks_default_key_secret_opt = 'key-secret=keysec0'
98
99
100def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
101                              connect_stderr: bool = True) -> Tuple[str, int]:
102    """
103    Run a tool and return both its output and its exit code
104    """
105    stderr = subprocess.STDOUT if connect_stderr else None
106    subp = subprocess.Popen(args,
107                            stdout=subprocess.PIPE,
108                            stderr=stderr,
109                            universal_newlines=True)
110    output = subp.communicate()[0]
111    if subp.returncode < 0:
112        cmd = ' '.join(args)
113        sys.stderr.write(f'{tool} received signal {-subp.returncode}: {cmd}\n')
114    return (output, subp.returncode)
115
116def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
117    """
118    Run qemu-img and return both its output and its exit code
119    """
120    full_args = qemu_img_args + list(args)
121    return qemu_tool_pipe_and_status('qemu-img', full_args)
122
123def qemu_img(*args: str) -> int:
124    '''Run qemu-img and return the exit code'''
125    return qemu_img_pipe_and_status(*args)[1]
126
127def ordered_qmp(qmsg, conv_keys=True):
128    # Dictionaries are not ordered prior to 3.6, therefore:
129    if isinstance(qmsg, list):
130        return [ordered_qmp(atom) for atom in qmsg]
131    if isinstance(qmsg, dict):
132        od = OrderedDict()
133        for k, v in sorted(qmsg.items()):
134            if conv_keys:
135                k = k.replace('_', '-')
136            od[k] = ordered_qmp(v, conv_keys=False)
137        return od
138    return qmsg
139
140def qemu_img_create(*args):
141    args = list(args)
142
143    # default luks support
144    if '-f' in args and args[args.index('-f') + 1] == 'luks':
145        if '-o' in args:
146            i = args.index('-o')
147            if 'key-secret' not in args[i + 1]:
148                args[i + 1].append(luks_default_key_secret_opt)
149                args.insert(i + 2, '--object')
150                args.insert(i + 3, luks_default_secret_object)
151        else:
152            args = ['-o', luks_default_key_secret_opt,
153                    '--object', luks_default_secret_object] + args
154
155    args.insert(0, 'create')
156
157    return qemu_img(*args)
158
159def qemu_img_measure(*args):
160    return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
161
162def qemu_img_check(*args):
163    return json.loads(qemu_img_pipe("check", "--output", "json", *args))
164
165def qemu_img_verbose(*args):
166    '''Run qemu-img without suppressing its output and return the exit code'''
167    exitcode = subprocess.call(qemu_img_args + list(args))
168    if exitcode < 0:
169        sys.stderr.write('qemu-img received signal %i: %s\n'
170                         % (-exitcode, ' '.join(qemu_img_args + list(args))))
171    return exitcode
172
173def qemu_img_pipe(*args: str) -> str:
174    '''Run qemu-img and return its output'''
175    return qemu_img_pipe_and_status(*args)[0]
176
177def qemu_img_log(*args):
178    result = qemu_img_pipe(*args)
179    log(result, filters=[filter_testfiles])
180    return result
181
182def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
183    args = ['info']
184    if imgopts:
185        args.append('--image-opts')
186    else:
187        args += ['-f', imgfmt]
188    args += extra_args
189    args.append(filename)
190
191    output = qemu_img_pipe(*args)
192    if not filter_path:
193        filter_path = filename
194    log(filter_img_info(output, filter_path))
195
196def qemu_io(*args):
197    '''Run qemu-io and return the stdout data'''
198    args = qemu_io_args + list(args)
199    return qemu_tool_pipe_and_status('qemu-io', args)[0]
200
201def qemu_io_log(*args):
202    result = qemu_io(*args)
203    log(result, filters=[filter_testfiles, filter_qemu_io])
204    return result
205
206def qemu_io_silent(*args):
207    '''Run qemu-io and return the exit code, suppressing stdout'''
208    if '-f' in args or '--image-opts' in args:
209        default_args = qemu_io_args_no_fmt
210    else:
211        default_args = qemu_io_args
212
213    args = default_args + list(args)
214    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
215    if exitcode < 0:
216        sys.stderr.write('qemu-io received signal %i: %s\n' %
217                         (-exitcode, ' '.join(args)))
218    return exitcode
219
220def qemu_io_silent_check(*args):
221    '''Run qemu-io and return the true if subprocess returned 0'''
222    args = qemu_io_args + list(args)
223    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
224                               stderr=subprocess.STDOUT)
225    return exitcode == 0
226
227def get_virtio_scsi_device():
228    if qemu_default_machine == 's390-ccw-virtio':
229        return 'virtio-scsi-ccw'
230    return 'virtio-scsi-pci'
231
232class QemuIoInteractive:
233    def __init__(self, *args):
234        self.args = qemu_io_args_no_fmt + list(args)
235        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
236                                   stdout=subprocess.PIPE,
237                                   stderr=subprocess.STDOUT,
238                                   universal_newlines=True)
239        out = self._p.stdout.read(9)
240        if out != 'qemu-io> ':
241            # Most probably qemu-io just failed to start.
242            # Let's collect the whole output and exit.
243            out += self._p.stdout.read()
244            self._p.wait(timeout=1)
245            raise ValueError(out)
246
247    def close(self):
248        self._p.communicate('q\n')
249
250    def _read_output(self):
251        pattern = 'qemu-io> '
252        n = len(pattern)
253        pos = 0
254        s = []
255        while pos != n:
256            c = self._p.stdout.read(1)
257            # check unexpected EOF
258            assert c != ''
259            s.append(c)
260            if c == pattern[pos]:
261                pos += 1
262            else:
263                pos = 0
264
265        return ''.join(s[:-n])
266
267    def cmd(self, cmd):
268        # quit command is in close(), '\n' is added automatically
269        assert '\n' not in cmd
270        cmd = cmd.strip()
271        assert cmd not in ('q', 'quit')
272        self._p.stdin.write(cmd + '\n')
273        self._p.stdin.flush()
274        return self._read_output()
275
276
277def qemu_nbd(*args):
278    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
279    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
280
281def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
282    '''Run qemu-nbd in daemon mode and return both the parent's exit code
283       and its output in case of an error'''
284    full_args = qemu_nbd_args + ['--fork'] + list(args)
285    output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
286                                                   connect_stderr=False)
287    return returncode, output if returncode else ''
288
289def qemu_nbd_list_log(*args: str) -> str:
290    '''Run qemu-nbd to list remote exports'''
291    full_args = [qemu_nbd_prog, '-L'] + list(args)
292    output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
293    log(output, filters=[filter_testfiles, filter_nbd_exports])
294    return output
295
296@contextmanager
297def qemu_nbd_popen(*args):
298    '''Context manager running qemu-nbd within the context'''
299    pid_file = file_path("pid")
300
301    cmd = list(qemu_nbd_args)
302    cmd.extend(('--persistent', '--pid-file', pid_file))
303    cmd.extend(args)
304
305    log('Start NBD server')
306    p = subprocess.Popen(cmd)
307    try:
308        while not os.path.exists(pid_file):
309            if p.poll() is not None:
310                raise RuntimeError(
311                    "qemu-nbd terminated with exit code {}: {}"
312                    .format(p.returncode, ' '.join(cmd)))
313
314            time.sleep(0.01)
315        yield
316    finally:
317        log('Kill NBD server')
318        p.kill()
319        p.wait()
320
321def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
322    '''Return True if two image files are identical'''
323    return qemu_img('compare', '-f', fmt1,
324                    '-F', fmt2, img1, img2) == 0
325
326def create_image(name, size):
327    '''Create a fully-allocated raw image with sector markers'''
328    file = open(name, 'wb')
329    i = 0
330    while i < size:
331        sector = struct.pack('>l504xl', i // 512, i // 512)
332        file.write(sector)
333        i = i + 512
334    file.close()
335
336def image_size(img):
337    '''Return image's virtual size'''
338    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
339    return json.loads(r)['virtual-size']
340
341def is_str(val):
342    return isinstance(val, str)
343
344test_dir_re = re.compile(r"%s" % test_dir)
345def filter_test_dir(msg):
346    return test_dir_re.sub("TEST_DIR", msg)
347
348win32_re = re.compile(r"\r")
349def filter_win32(msg):
350    return win32_re.sub("", msg)
351
352qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
353                        r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
354                        r"and [0-9\/.inf]* ops\/sec\)")
355def filter_qemu_io(msg):
356    msg = filter_win32(msg)
357    return qemu_io_re.sub("X ops; XX:XX:XX.X "
358                          "(XXX YYY/sec and XXX ops/sec)", msg)
359
360chown_re = re.compile(r"chown [0-9]+:[0-9]+")
361def filter_chown(msg):
362    return chown_re.sub("chown UID:GID", msg)
363
364def filter_qmp_event(event):
365    '''Filter a QMP event dict'''
366    event = dict(event)
367    if 'timestamp' in event:
368        event['timestamp']['seconds'] = 'SECS'
369        event['timestamp']['microseconds'] = 'USECS'
370    return event
371
372def filter_qmp(qmsg, filter_fn):
373    '''Given a string filter, filter a QMP object's values.
374    filter_fn takes a (key, value) pair.'''
375    # Iterate through either lists or dicts;
376    if isinstance(qmsg, list):
377        items = enumerate(qmsg)
378    else:
379        items = qmsg.items()
380
381    for k, v in items:
382        if isinstance(v, (dict, list)):
383            qmsg[k] = filter_qmp(v, filter_fn)
384        else:
385            qmsg[k] = filter_fn(k, v)
386    return qmsg
387
388def filter_testfiles(msg):
389    pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
390    pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
391    return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
392
393def filter_qmp_testfiles(qmsg):
394    def _filter(_key, value):
395        if is_str(value):
396            return filter_testfiles(value)
397        return value
398    return filter_qmp(qmsg, _filter)
399
400def filter_virtio_scsi(output: str) -> str:
401    return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
402
403def filter_qmp_virtio_scsi(qmsg):
404    def _filter(_key, value):
405        if is_str(value):
406            return filter_virtio_scsi(value)
407        return value
408    return filter_qmp(qmsg, _filter)
409
410def filter_generated_node_ids(msg):
411    return re.sub("#block[0-9]+", "NODE_NAME", msg)
412
413def filter_img_info(output, filename):
414    lines = []
415    for line in output.split('\n'):
416        if 'disk size' in line or 'actual-size' in line:
417            continue
418        line = line.replace(filename, 'TEST_IMG')
419        line = filter_testfiles(line)
420        line = line.replace(imgfmt, 'IMGFMT')
421        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
422        line = re.sub('uuid: [-a-f0-9]+',
423                      'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
424                      line)
425        line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
426        lines.append(line)
427    return '\n'.join(lines)
428
429def filter_imgfmt(msg):
430    return msg.replace(imgfmt, 'IMGFMT')
431
432def filter_qmp_imgfmt(qmsg):
433    def _filter(_key, value):
434        if is_str(value):
435            return filter_imgfmt(value)
436        return value
437    return filter_qmp(qmsg, _filter)
438
439def filter_nbd_exports(output: str) -> str:
440    return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
441
442
443Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
444
445def log(msg: Msg,
446        filters: Iterable[Callable[[Msg], Msg]] = (),
447        indent: Optional[int] = None) -> None:
448    """
449    Logs either a string message or a JSON serializable message (like QMP).
450    If indent is provided, JSON serializable messages are pretty-printed.
451    """
452    for flt in filters:
453        msg = flt(msg)
454    if isinstance(msg, (dict, list)):
455        # Don't sort if it's already sorted
456        do_sort = not isinstance(msg, OrderedDict)
457        test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
458    else:
459        test_logger.info(msg)
460
461class Timeout:
462    def __init__(self, seconds, errmsg="Timeout"):
463        self.seconds = seconds
464        self.errmsg = errmsg
465    def __enter__(self):
466        signal.signal(signal.SIGALRM, self.timeout)
467        signal.setitimer(signal.ITIMER_REAL, self.seconds)
468        return self
469    def __exit__(self, exc_type, value, traceback):
470        signal.setitimer(signal.ITIMER_REAL, 0)
471        return False
472    def timeout(self, signum, frame):
473        raise Exception(self.errmsg)
474
475def file_pattern(name):
476    return "{0}-{1}".format(os.getpid(), name)
477
478class FilePath:
479    """
480    Context manager generating multiple file names. The generated files are
481    removed when exiting the context.
482
483    Example usage:
484
485        with FilePath('a.img', 'b.img') as (img_a, img_b):
486            # Use img_a and img_b here...
487
488        # a.img and b.img are automatically removed here.
489
490    By default images are created in iotests.test_dir. To create sockets use
491    iotests.sock_dir:
492
493       with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
494
495    For convenience, calling with one argument yields a single file instead of
496    a tuple with one item.
497
498    """
499    def __init__(self, *names, base_dir=test_dir):
500        self.paths = [os.path.join(base_dir, file_pattern(name))
501                      for name in names]
502
503    def __enter__(self):
504        if len(self.paths) == 1:
505            return self.paths[0]
506        else:
507            return self.paths
508
509    def __exit__(self, exc_type, exc_val, exc_tb):
510        for path in self.paths:
511            try:
512                os.remove(path)
513            except OSError:
514                pass
515        return False
516
517
518def try_remove(img):
519    try:
520        os.remove(img)
521    except OSError:
522        pass
523
524def file_path_remover():
525    for path in reversed(file_path_remover.paths):
526        try_remove(path)
527
528
529def file_path(*names, base_dir=test_dir):
530    ''' Another way to get auto-generated filename that cleans itself up.
531
532    Use is as simple as:
533
534    img_a, img_b = file_path('a.img', 'b.img')
535    sock = file_path('socket')
536    '''
537
538    if not hasattr(file_path_remover, 'paths'):
539        file_path_remover.paths = []
540        atexit.register(file_path_remover)
541
542    paths = []
543    for name in names:
544        filename = file_pattern(name)
545        path = os.path.join(base_dir, filename)
546        file_path_remover.paths.append(path)
547        paths.append(path)
548
549    return paths[0] if len(paths) == 1 else paths
550
551def remote_filename(path):
552    if imgproto == 'file':
553        return path
554    elif imgproto == 'ssh':
555        return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
556    else:
557        raise Exception("Protocol %s not supported" % (imgproto))
558
559class VM(qtest.QEMUQtestMachine):
560    '''A QEMU VM'''
561
562    def __init__(self, path_suffix=''):
563        name = "qemu%s-%d" % (path_suffix, os.getpid())
564        super().__init__(qemu_prog, qemu_opts, name=name,
565                         test_dir=test_dir,
566                         socket_scm_helper=socket_scm_helper,
567                         sock_dir=sock_dir)
568        self._num_drives = 0
569
570    def add_object(self, opts):
571        self._args.append('-object')
572        self._args.append(opts)
573        return self
574
575    def add_device(self, opts):
576        self._args.append('-device')
577        self._args.append(opts)
578        return self
579
580    def add_drive_raw(self, opts):
581        self._args.append('-drive')
582        self._args.append(opts)
583        return self
584
585    def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
586        '''Add a virtio-blk drive to the VM'''
587        options = ['if=%s' % interface,
588                   'id=drive%d' % self._num_drives]
589
590        if path is not None:
591            options.append('file=%s' % path)
592            options.append('format=%s' % img_format)
593            options.append('cache=%s' % cachemode)
594            options.append('aio=%s' % aiomode)
595
596        if opts:
597            options.append(opts)
598
599        if img_format == 'luks' and 'key-secret' not in opts:
600            # default luks support
601            if luks_default_secret_object not in self._args:
602                self.add_object(luks_default_secret_object)
603
604            options.append(luks_default_key_secret_opt)
605
606        self._args.append('-drive')
607        self._args.append(','.join(options))
608        self._num_drives += 1
609        return self
610
611    def add_blockdev(self, opts):
612        self._args.append('-blockdev')
613        if isinstance(opts, str):
614            self._args.append(opts)
615        else:
616            self._args.append(','.join(opts))
617        return self
618
619    def add_incoming(self, addr):
620        self._args.append('-incoming')
621        self._args.append(addr)
622        return self
623
624    def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
625        cmd = 'human-monitor-command'
626        kwargs: Dict[str, Any] = {'command-line': command_line}
627        if use_log:
628            return self.qmp_log(cmd, **kwargs)
629        else:
630            return self.qmp(cmd, **kwargs)
631
632    def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
633        """Pause drive r/w operations"""
634        if not event:
635            self.pause_drive(drive, "read_aio")
636            self.pause_drive(drive, "write_aio")
637            return
638        self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
639
640    def resume_drive(self, drive: str) -> None:
641        """Resume drive r/w operations"""
642        self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
643
644    def hmp_qemu_io(self, drive: str, cmd: str,
645                    use_log: bool = False) -> QMPMessage:
646        """Write to a given drive using an HMP command"""
647        return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
648
649    def flatten_qmp_object(self, obj, output=None, basestr=''):
650        if output is None:
651            output = dict()
652        if isinstance(obj, list):
653            for i, item in enumerate(obj):
654                self.flatten_qmp_object(item, output, basestr + str(i) + '.')
655        elif isinstance(obj, dict):
656            for key in obj:
657                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
658        else:
659            output[basestr[:-1]] = obj # Strip trailing '.'
660        return output
661
662    def qmp_to_opts(self, obj):
663        obj = self.flatten_qmp_object(obj)
664        output_list = list()
665        for key in obj:
666            output_list += [key + '=' + obj[key]]
667        return ','.join(output_list)
668
669    def get_qmp_events_filtered(self, wait=60.0):
670        result = []
671        for ev in self.get_qmp_events(wait=wait):
672            result.append(filter_qmp_event(ev))
673        return result
674
675    def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
676        full_cmd = OrderedDict((
677            ("execute", cmd),
678            ("arguments", ordered_qmp(kwargs))
679        ))
680        log(full_cmd, filters, indent=indent)
681        result = self.qmp(cmd, **kwargs)
682        log(result, filters, indent=indent)
683        return result
684
685    # Returns None on success, and an error string on failure
686    def run_job(self, job, auto_finalize=True, auto_dismiss=False,
687                pre_finalize=None, cancel=False, wait=60.0):
688        """
689        run_job moves a job from creation through to dismissal.
690
691        :param job: String. ID of recently-launched job
692        :param auto_finalize: Bool. True if the job was launched with
693                              auto_finalize. Defaults to True.
694        :param auto_dismiss: Bool. True if the job was launched with
695                             auto_dismiss=True. Defaults to False.
696        :param pre_finalize: Callback. A callable that takes no arguments to be
697                             invoked prior to issuing job-finalize, if any.
698        :param cancel: Bool. When true, cancels the job after the pre_finalize
699                       callback.
700        :param wait: Float. Timeout value specifying how long to wait for any
701                     event, in seconds. Defaults to 60.0.
702        """
703        match_device = {'data': {'device': job}}
704        match_id = {'data': {'id': job}}
705        events = [
706            ('BLOCK_JOB_COMPLETED', match_device),
707            ('BLOCK_JOB_CANCELLED', match_device),
708            ('BLOCK_JOB_ERROR', match_device),
709            ('BLOCK_JOB_READY', match_device),
710            ('BLOCK_JOB_PENDING', match_id),
711            ('JOB_STATUS_CHANGE', match_id)
712        ]
713        error = None
714        while True:
715            ev = filter_qmp_event(self.events_wait(events, timeout=wait))
716            if ev['event'] != 'JOB_STATUS_CHANGE':
717                log(ev)
718                continue
719            status = ev['data']['status']
720            if status == 'aborting':
721                result = self.qmp('query-jobs')
722                for j in result['return']:
723                    if j['id'] == job:
724                        error = j['error']
725                        log('Job failed: %s' % (j['error']))
726            elif status == 'ready':
727                self.qmp_log('job-complete', id=job)
728            elif status == 'pending' and not auto_finalize:
729                if pre_finalize:
730                    pre_finalize()
731                if cancel:
732                    self.qmp_log('job-cancel', id=job)
733                else:
734                    self.qmp_log('job-finalize', id=job)
735            elif status == 'concluded' and not auto_dismiss:
736                self.qmp_log('job-dismiss', id=job)
737            elif status == 'null':
738                return error
739
740    # Returns None on success, and an error string on failure
741    def blockdev_create(self, options, job_id='job0', filters=None):
742        if filters is None:
743            filters = [filter_qmp_testfiles]
744        result = self.qmp_log('blockdev-create', filters=filters,
745                              job_id=job_id, options=options)
746
747        if 'return' in result:
748            assert result['return'] == {}
749            job_result = self.run_job(job_id)
750        else:
751            job_result = result['error']
752
753        log("")
754        return job_result
755
756    def enable_migration_events(self, name):
757        log('Enabling migration QMP events on %s...' % name)
758        log(self.qmp('migrate-set-capabilities', capabilities=[
759            {
760                'capability': 'events',
761                'state': True
762            }
763        ]))
764
765    def wait_migration(self, expect_runstate: Optional[str]) -> bool:
766        while True:
767            event = self.event_wait('MIGRATION')
768            # We use the default timeout, and with a timeout, event_wait()
769            # never returns None
770            assert event
771
772            log(event, filters=[filter_qmp_event])
773            if event['data']['status'] in ('completed', 'failed'):
774                break
775
776        if event['data']['status'] == 'completed':
777            # The event may occur in finish-migrate, so wait for the expected
778            # post-migration runstate
779            runstate = None
780            while runstate != expect_runstate:
781                runstate = self.qmp('query-status')['return']['status']
782            return True
783        else:
784            return False
785
786    def node_info(self, node_name):
787        nodes = self.qmp('query-named-block-nodes')
788        for x in nodes['return']:
789            if x['node-name'] == node_name:
790                return x
791        return None
792
793    def query_bitmaps(self):
794        res = self.qmp("query-named-block-nodes")
795        return {device['node-name']: device['dirty-bitmaps']
796                for device in res['return'] if 'dirty-bitmaps' in device}
797
798    def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
799        """
800        get a specific bitmap from the object returned by query_bitmaps.
801        :param recording: If specified, filter results by the specified value.
802        :param bitmaps: If specified, use it instead of call query_bitmaps()
803        """
804        if bitmaps is None:
805            bitmaps = self.query_bitmaps()
806
807        for bitmap in bitmaps[node_name]:
808            if bitmap.get('name', '') == bitmap_name:
809                if recording is None or bitmap.get('recording') == recording:
810                    return bitmap
811        return None
812
813    def check_bitmap_status(self, node_name, bitmap_name, fields):
814        ret = self.get_bitmap(node_name, bitmap_name)
815
816        return fields.items() <= ret.items()
817
818    def assert_block_path(self, root, path, expected_node, graph=None):
819        """
820        Check whether the node under the given path in the block graph
821        is @expected_node.
822
823        @root is the node name of the node where the @path is rooted.
824
825        @path is a string that consists of child names separated by
826        slashes.  It must begin with a slash.
827
828        Examples for @root + @path:
829          - root="qcow2-node", path="/backing/file"
830          - root="quorum-node", path="/children.2/file"
831
832        Hypothetically, @path could be empty, in which case it would
833        point to @root.  However, in practice this case is not useful
834        and hence not allowed.
835
836        @expected_node may be None.  (All elements of the path but the
837        leaf must still exist.)
838
839        @graph may be None or the result of an x-debug-query-block-graph
840        call that has already been performed.
841        """
842        if graph is None:
843            graph = self.qmp('x-debug-query-block-graph')['return']
844
845        iter_path = iter(path.split('/'))
846
847        # Must start with a /
848        assert next(iter_path) == ''
849
850        node = next((node for node in graph['nodes'] if node['name'] == root),
851                    None)
852
853        # An empty @path is not allowed, so the root node must be present
854        assert node is not None, 'Root node %s not found' % root
855
856        for child_name in iter_path:
857            assert node is not None, 'Cannot follow path %s%s' % (root, path)
858
859            try:
860                node_id = next(edge['child'] for edge in graph['edges']
861                               if (edge['parent'] == node['id'] and
862                                   edge['name'] == child_name))
863
864                node = next(node for node in graph['nodes']
865                            if node['id'] == node_id)
866
867            except StopIteration:
868                node = None
869
870        if node is None:
871            assert expected_node is None, \
872                   'No node found under %s (but expected %s)' % \
873                   (path, expected_node)
874        else:
875            assert node['name'] == expected_node, \
876                   'Found node %s under %s (but expected %s)' % \
877                   (node['name'], path, expected_node)
878
879index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
880
881class QMPTestCase(unittest.TestCase):
882    '''Abstract base class for QMP test cases'''
883
884    def __init__(self, *args, **kwargs):
885        super().__init__(*args, **kwargs)
886        # Many users of this class set a VM property we rely on heavily
887        # in the methods below.
888        self.vm = None
889
890    def dictpath(self, d, path):
891        '''Traverse a path in a nested dict'''
892        for component in path.split('/'):
893            m = index_re.match(component)
894            if m:
895                component, idx = m.groups()
896                idx = int(idx)
897
898            if not isinstance(d, dict) or component not in d:
899                self.fail(f'failed path traversal for "{path}" in "{d}"')
900            d = d[component]
901
902            if m:
903                if not isinstance(d, list):
904                    self.fail(f'path component "{component}" in "{path}" '
905                              f'is not a list in "{d}"')
906                try:
907                    d = d[idx]
908                except IndexError:
909                    self.fail(f'invalid index "{idx}" in path "{path}" '
910                              f'in "{d}"')
911        return d
912
913    def assert_qmp_absent(self, d, path):
914        try:
915            result = self.dictpath(d, path)
916        except AssertionError:
917            return
918        self.fail('path "%s" has value "%s"' % (path, str(result)))
919
920    def assert_qmp(self, d, path, value):
921        '''Assert that the value for a specific path in a QMP dict
922           matches.  When given a list of values, assert that any of
923           them matches.'''
924
925        result = self.dictpath(d, path)
926
927        # [] makes no sense as a list of valid values, so treat it as
928        # an actual single value.
929        if isinstance(value, list) and value != []:
930            for v in value:
931                if result == v:
932                    return
933            self.fail('no match for "%s" in %s' % (str(result), str(value)))
934        else:
935            self.assertEqual(result, value,
936                             '"%s" is "%s", expected "%s"'
937                             % (path, str(result), str(value)))
938
939    def assert_no_active_block_jobs(self):
940        result = self.vm.qmp('query-block-jobs')
941        self.assert_qmp(result, 'return', [])
942
943    def assert_has_block_node(self, node_name=None, file_name=None):
944        """Issue a query-named-block-nodes and assert node_name and/or
945        file_name is present in the result"""
946        def check_equal_or_none(a, b):
947            return a is None or b is None or a == b
948        assert node_name or file_name
949        result = self.vm.qmp('query-named-block-nodes')
950        for x in result["return"]:
951            if check_equal_or_none(x.get("node-name"), node_name) and \
952                    check_equal_or_none(x.get("file"), file_name):
953                return
954        self.fail("Cannot find %s %s in result:\n%s" %
955                  (node_name, file_name, result))
956
957    def assert_json_filename_equal(self, json_filename, reference):
958        '''Asserts that the given filename is a json: filename and that its
959           content is equal to the given reference object'''
960        self.assertEqual(json_filename[:5], 'json:')
961        self.assertEqual(
962            self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
963            self.vm.flatten_qmp_object(reference)
964        )
965
966    def cancel_and_wait(self, drive='drive0', force=False,
967                        resume=False, wait=60.0):
968        '''Cancel a block job and wait for it to finish, returning the event'''
969        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
970        self.assert_qmp(result, 'return', {})
971
972        if resume:
973            self.vm.resume_drive(drive)
974
975        cancelled = False
976        result = None
977        while not cancelled:
978            for event in self.vm.get_qmp_events(wait=wait):
979                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
980                   event['event'] == 'BLOCK_JOB_CANCELLED':
981                    self.assert_qmp(event, 'data/device', drive)
982                    result = event
983                    cancelled = True
984                elif event['event'] == 'JOB_STATUS_CHANGE':
985                    self.assert_qmp(event, 'data/id', drive)
986
987
988        self.assert_no_active_block_jobs()
989        return result
990
991    def wait_until_completed(self, drive='drive0', check_offset=True,
992                             wait=60.0, error=None):
993        '''Wait for a block job to finish, returning the event'''
994        while True:
995            for event in self.vm.get_qmp_events(wait=wait):
996                if event['event'] == 'BLOCK_JOB_COMPLETED':
997                    self.assert_qmp(event, 'data/device', drive)
998                    if error is None:
999                        self.assert_qmp_absent(event, 'data/error')
1000                        if check_offset:
1001                            self.assert_qmp(event, 'data/offset',
1002                                            event['data']['len'])
1003                    else:
1004                        self.assert_qmp(event, 'data/error', error)
1005                    self.assert_no_active_block_jobs()
1006                    return event
1007                if event['event'] == 'JOB_STATUS_CHANGE':
1008                    self.assert_qmp(event, 'data/id', drive)
1009
1010    def wait_ready(self, drive='drive0'):
1011        """Wait until a BLOCK_JOB_READY event, and return the event."""
1012        return self.vm.events_wait([
1013            ('BLOCK_JOB_READY',
1014             {'data': {'type': 'mirror', 'device': drive}}),
1015            ('BLOCK_JOB_READY',
1016             {'data': {'type': 'commit', 'device': drive}})
1017        ])
1018
1019    def wait_ready_and_cancel(self, drive='drive0'):
1020        self.wait_ready(drive=drive)
1021        event = self.cancel_and_wait(drive=drive)
1022        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1023        self.assert_qmp(event, 'data/type', 'mirror')
1024        self.assert_qmp(event, 'data/offset', event['data']['len'])
1025
1026    def complete_and_wait(self, drive='drive0', wait_ready=True,
1027                          completion_error=None):
1028        '''Complete a block job and wait for it to finish'''
1029        if wait_ready:
1030            self.wait_ready(drive=drive)
1031
1032        result = self.vm.qmp('block-job-complete', device=drive)
1033        self.assert_qmp(result, 'return', {})
1034
1035        event = self.wait_until_completed(drive=drive, error=completion_error)
1036        self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1037
1038    def pause_wait(self, job_id='job0'):
1039        with Timeout(3, "Timeout waiting for job to pause"):
1040            while True:
1041                result = self.vm.qmp('query-block-jobs')
1042                found = False
1043                for job in result['return']:
1044                    if job['device'] == job_id:
1045                        found = True
1046                        if job['paused'] and not job['busy']:
1047                            return job
1048                        break
1049                assert found
1050
1051    def pause_job(self, job_id='job0', wait=True):
1052        result = self.vm.qmp('block-job-pause', device=job_id)
1053        self.assert_qmp(result, 'return', {})
1054        if wait:
1055            return self.pause_wait(job_id)
1056        return result
1057
1058    def case_skip(self, reason):
1059        '''Skip this test case'''
1060        case_notrun(reason)
1061        self.skipTest(reason)
1062
1063
1064def notrun(reason):
1065    '''Skip this test suite'''
1066    # Each test in qemu-iotests has a number ("seq")
1067    seq = os.path.basename(sys.argv[0])
1068
1069    open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1070    logger.warning("%s not run: %s", seq, reason)
1071    sys.exit(0)
1072
1073def case_notrun(reason):
1074    '''Mark this test case as not having been run (without actually
1075    skipping it, that is left to the caller).  See
1076    QMPTestCase.case_skip() for a variant that actually skips the
1077    current test case.'''
1078
1079    # Each test in qemu-iotests has a number ("seq")
1080    seq = os.path.basename(sys.argv[0])
1081
1082    open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1083        '    [case not run] ' + reason + '\n')
1084
1085def _verify_image_format(supported_fmts: Sequence[str] = (),
1086                         unsupported_fmts: Sequence[str] = ()) -> None:
1087    if 'generic' in supported_fmts and \
1088            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1089        # similar to
1090        #   _supported_fmt generic
1091        # for bash tests
1092        supported_fmts = ()
1093
1094    not_sup = supported_fmts and (imgfmt not in supported_fmts)
1095    if not_sup or (imgfmt in unsupported_fmts):
1096        notrun('not suitable for this image format: %s' % imgfmt)
1097
1098    if imgfmt == 'luks':
1099        verify_working_luks()
1100
1101def _verify_protocol(supported: Sequence[str] = (),
1102                     unsupported: Sequence[str] = ()) -> None:
1103    assert not (supported and unsupported)
1104
1105    if 'generic' in supported:
1106        return
1107
1108    not_sup = supported and (imgproto not in supported)
1109    if not_sup or (imgproto in unsupported):
1110        notrun('not suitable for this protocol: %s' % imgproto)
1111
1112def _verify_platform(supported: Sequence[str] = (),
1113                     unsupported: Sequence[str] = ()) -> None:
1114    if any((sys.platform.startswith(x) for x in unsupported)):
1115        notrun('not suitable for this OS: %s' % sys.platform)
1116
1117    if supported:
1118        if not any((sys.platform.startswith(x) for x in supported)):
1119            notrun('not suitable for this OS: %s' % sys.platform)
1120
1121def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1122    if supported_cache_modes and (cachemode not in supported_cache_modes):
1123        notrun('not suitable for this cache mode: %s' % cachemode)
1124
1125def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1126    if supported_aio_modes and (aiomode not in supported_aio_modes):
1127        notrun('not suitable for this aio mode: %s' % aiomode)
1128
1129def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1130    usf_list = list(set(required_formats) - set(supported_formats()))
1131    if usf_list:
1132        notrun(f'formats {usf_list} are not whitelisted')
1133
1134
1135def _verify_virtio_blk() -> None:
1136    out = qemu_pipe('-M', 'none', '-device', 'help')
1137    if 'virtio-blk' not in out:
1138        notrun('Missing virtio-blk in QEMU binary')
1139
1140
1141def supports_quorum():
1142    return 'quorum' in qemu_img_pipe('--help')
1143
1144def verify_quorum():
1145    '''Skip test suite if quorum support is not available'''
1146    if not supports_quorum():
1147        notrun('quorum support missing')
1148
1149def has_working_luks() -> Tuple[bool, str]:
1150    """
1151    Check whether our LUKS driver can actually create images
1152    (this extends to LUKS encryption for qcow2).
1153
1154    If not, return the reason why.
1155    """
1156
1157    img_file = f'{test_dir}/luks-test.luks'
1158    (output, status) = \
1159        qemu_img_pipe_and_status('create', '-f', 'luks',
1160                                 '--object', luks_default_secret_object,
1161                                 '-o', luks_default_key_secret_opt,
1162                                 '-o', 'iter-time=10',
1163                                 img_file, '1G')
1164    try:
1165        os.remove(img_file)
1166    except OSError:
1167        pass
1168
1169    if status != 0:
1170        reason = output
1171        for line in output.splitlines():
1172            if img_file + ':' in line:
1173                reason = line.split(img_file + ':', 1)[1].strip()
1174                break
1175
1176        return (False, reason)
1177    else:
1178        return (True, '')
1179
1180def verify_working_luks():
1181    """
1182    Skip test suite if LUKS does not work
1183    """
1184    (working, reason) = has_working_luks()
1185    if not working:
1186        notrun(reason)
1187
1188def qemu_pipe(*args: str) -> str:
1189    """
1190    Run qemu with an option to print something and exit (e.g. a help option).
1191
1192    :return: QEMU's stdout output.
1193    """
1194    full_args = [qemu_prog] + qemu_opts + list(args)
1195    output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1196    return output
1197
1198def supported_formats(read_only=False):
1199    '''Set 'read_only' to True to check ro-whitelist
1200       Otherwise, rw-whitelist is checked'''
1201
1202    if not hasattr(supported_formats, "formats"):
1203        supported_formats.formats = {}
1204
1205    if read_only not in supported_formats.formats:
1206        format_message = qemu_pipe("-drive", "format=help")
1207        line = 1 if read_only else 0
1208        supported_formats.formats[read_only] = \
1209            format_message.splitlines()[line].split(":")[1].split()
1210
1211    return supported_formats.formats[read_only]
1212
1213def skip_if_unsupported(required_formats=(), read_only=False):
1214    '''Skip Test Decorator
1215       Runs the test if all the required formats are whitelisted'''
1216    def skip_test_decorator(func):
1217        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1218                         **kwargs: Dict[str, Any]) -> None:
1219            if callable(required_formats):
1220                fmts = required_formats(test_case)
1221            else:
1222                fmts = required_formats
1223
1224            usf_list = list(set(fmts) - set(supported_formats(read_only)))
1225            if usf_list:
1226                msg = f'{test_case}: formats {usf_list} are not whitelisted'
1227                test_case.case_skip(msg)
1228            else:
1229                func(test_case, *args, **kwargs)
1230        return func_wrapper
1231    return skip_test_decorator
1232
1233def skip_for_formats(formats: Sequence[str] = ()) \
1234    -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1235                Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1236    '''Skip Test Decorator
1237       Skips the test for the given formats'''
1238    def skip_test_decorator(func):
1239        def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1240                         **kwargs: Dict[str, Any]) -> None:
1241            if imgfmt in formats:
1242                msg = f'{test_case}: Skipped for format {imgfmt}'
1243                test_case.case_skip(msg)
1244            else:
1245                func(test_case, *args, **kwargs)
1246        return func_wrapper
1247    return skip_test_decorator
1248
1249def skip_if_user_is_root(func):
1250    '''Skip Test Decorator
1251       Runs the test only without root permissions'''
1252    def func_wrapper(*args, **kwargs):
1253        if os.getuid() == 0:
1254            case_notrun('{}: cannot be run as root'.format(args[0]))
1255            return None
1256        else:
1257            return func(*args, **kwargs)
1258    return func_wrapper
1259
1260def execute_unittest(debug=False):
1261    """Executes unittests within the calling module."""
1262
1263    verbosity = 2 if debug else 1
1264
1265    if debug:
1266        output = sys.stdout
1267    else:
1268        # We need to filter out the time taken from the output so that
1269        # qemu-iotest can reliably diff the results against master output.
1270        output = io.StringIO()
1271
1272    runner = unittest.TextTestRunner(stream=output, descriptions=True,
1273                                     verbosity=verbosity)
1274    try:
1275        # unittest.main() will use sys.exit(); so expect a SystemExit
1276        # exception
1277        unittest.main(testRunner=runner)
1278    finally:
1279        # We need to filter out the time taken from the output so that
1280        # qemu-iotest can reliably diff the results against master output.
1281        if not debug:
1282            out = output.getvalue()
1283            out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out)
1284
1285            # Hide skipped tests from the reference output
1286            out = re.sub(r'OK \(skipped=\d+\)', 'OK', out)
1287            out_first_line, out_rest = out.split('\n', 1)
1288            out = out_first_line.replace('s', '.') + '\n' + out_rest
1289
1290            sys.stderr.write(out)
1291
1292def execute_setup_common(supported_fmts: Sequence[str] = (),
1293                         supported_platforms: Sequence[str] = (),
1294                         supported_cache_modes: Sequence[str] = (),
1295                         supported_aio_modes: Sequence[str] = (),
1296                         unsupported_fmts: Sequence[str] = (),
1297                         supported_protocols: Sequence[str] = (),
1298                         unsupported_protocols: Sequence[str] = (),
1299                         required_fmts: Sequence[str] = ()) -> bool:
1300    """
1301    Perform necessary setup for either script-style or unittest-style tests.
1302
1303    :return: Bool; Whether or not debug mode has been requested via the CLI.
1304    """
1305    # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1306
1307    debug = '-d' in sys.argv
1308    if debug:
1309        sys.argv.remove('-d')
1310    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1311
1312    _verify_image_format(supported_fmts, unsupported_fmts)
1313    _verify_protocol(supported_protocols, unsupported_protocols)
1314    _verify_platform(supported=supported_platforms)
1315    _verify_cache_mode(supported_cache_modes)
1316    _verify_aio_mode(supported_aio_modes)
1317    _verify_formats(required_fmts)
1318    _verify_virtio_blk()
1319
1320    return debug
1321
1322def execute_test(*args, test_function=None, **kwargs):
1323    """Run either unittest or script-style tests."""
1324
1325    debug = execute_setup_common(*args, **kwargs)
1326    if not test_function:
1327        execute_unittest(debug)
1328    else:
1329        test_function()
1330
1331def activate_logging():
1332    """Activate iotests.log() output to stdout for script-style tests."""
1333    handler = logging.StreamHandler(stream=sys.stdout)
1334    formatter = logging.Formatter('%(message)s')
1335    handler.setFormatter(formatter)
1336    test_logger.addHandler(handler)
1337    test_logger.setLevel(logging.INFO)
1338    test_logger.propagate = False
1339
1340# This is called from script-style iotests without a single point of entry
1341def script_initialize(*args, **kwargs):
1342    """Initialize script-style tests without running any tests."""
1343    activate_logging()
1344    execute_setup_common(*args, **kwargs)
1345
1346# This is called from script-style iotests with a single point of entry
1347def script_main(test_function, *args, **kwargs):
1348    """Run script-style tests outside of the unittest framework"""
1349    activate_logging()
1350    execute_test(*args, test_function=test_function, **kwargs)
1351
1352# This is called from unittest style iotests
1353def main(*args, **kwargs):
1354    """Run tests using the unittest framework"""
1355    execute_test(*args, **kwargs)
1356