xref: /openbmc/qemu/tests/qemu-iotests/iotests.py (revision f550805d)
1from __future__ import print_function
2# Common utilities and Python wrappers for qemu-iotests
3#
4# Copyright (C) 2012 IBM Corp.
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 2 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18#
19
20import errno
21import os
22import re
23import subprocess
24import string
25import unittest
26import sys
27import struct
28import json
29import signal
30import logging
31import atexit
32import io
33from collections import OrderedDict
34
35sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36import qtest
37
38
39# This will not work if arguments contain spaces but is necessary if we
40# want to support the override options that ./check supports.
41qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42if os.environ.get('QEMU_IMG_OPTIONS'):
43    qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
44
45qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46if os.environ.get('QEMU_IO_OPTIONS'):
47    qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
48
49qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50if os.environ.get('QEMU_NBD_OPTIONS'):
51    qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
52
53qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
55
56imgfmt = os.environ.get('IMGFMT', 'raw')
57imgproto = os.environ.get('IMGPROTO', 'file')
58test_dir = os.environ.get('TEST_DIR')
59output_dir = os.environ.get('OUTPUT_DIR', '.')
60cachemode = os.environ.get('CACHEMODE')
61qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
62
63socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64debug = False
65
66luks_default_secret_object = 'secret,id=keysec0,data=' + \
67                             os.environ.get('IMGKEYSECRET', '')
68luks_default_key_secret_opt = 'key-secret=keysec0'
69
70
71def qemu_img(*args):
72    '''Run qemu-img and return the exit code'''
73    devnull = open('/dev/null', 'r+')
74    exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75    if exitcode < 0:
76        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77    return exitcode
78
79def ordered_kwargs(kwargs):
80    # kwargs prior to 3.6 are not ordered, so:
81    od = OrderedDict()
82    for k, v in sorted(kwargs.items()):
83        if isinstance(v, dict):
84            od[k] = ordered_kwargs(v)
85        else:
86            od[k] = v
87    return od
88
89def qemu_img_create(*args):
90    args = list(args)
91
92    # default luks support
93    if '-f' in args and args[args.index('-f') + 1] == 'luks':
94        if '-o' in args:
95            i = args.index('-o')
96            if 'key-secret' not in args[i + 1]:
97                args[i + 1].append(luks_default_key_secret_opt)
98                args.insert(i + 2, '--object')
99                args.insert(i + 3, luks_default_secret_object)
100        else:
101            args = ['-o', luks_default_key_secret_opt,
102                    '--object', luks_default_secret_object] + args
103
104    args.insert(0, 'create')
105
106    return qemu_img(*args)
107
108def qemu_img_verbose(*args):
109    '''Run qemu-img without suppressing its output and return the exit code'''
110    exitcode = subprocess.call(qemu_img_args + list(args))
111    if exitcode < 0:
112        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
113    return exitcode
114
115def qemu_img_pipe(*args):
116    '''Run qemu-img and return its output'''
117    subp = subprocess.Popen(qemu_img_args + list(args),
118                            stdout=subprocess.PIPE,
119                            stderr=subprocess.STDOUT,
120                            universal_newlines=True)
121    exitcode = subp.wait()
122    if exitcode < 0:
123        sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
124    return subp.communicate()[0]
125
126def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
127    args = [ 'info' ]
128    if imgopts:
129        args.append('--image-opts')
130    else:
131        args += [ '-f', imgfmt ]
132    args += extra_args
133    args.append(filename)
134
135    output = qemu_img_pipe(*args)
136    if not filter_path:
137        filter_path = filename
138    log(filter_img_info(output, filter_path))
139
140def qemu_io(*args):
141    '''Run qemu-io and return the stdout data'''
142    args = qemu_io_args + list(args)
143    subp = subprocess.Popen(args, stdout=subprocess.PIPE,
144                            stderr=subprocess.STDOUT,
145                            universal_newlines=True)
146    exitcode = subp.wait()
147    if exitcode < 0:
148        sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
149    return subp.communicate()[0]
150
151def qemu_io_silent(*args):
152    '''Run qemu-io and return the exit code, suppressing stdout'''
153    args = qemu_io_args + list(args)
154    exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
155    if exitcode < 0:
156        sys.stderr.write('qemu-io received signal %i: %s\n' %
157                         (-exitcode, ' '.join(args)))
158    return exitcode
159
160
161class QemuIoInteractive:
162    def __init__(self, *args):
163        self.args = qemu_io_args + list(args)
164        self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
165                                   stdout=subprocess.PIPE,
166                                   stderr=subprocess.STDOUT,
167                                   universal_newlines=True)
168        assert self._p.stdout.read(9) == 'qemu-io> '
169
170    def close(self):
171        self._p.communicate('q\n')
172
173    def _read_output(self):
174        pattern = 'qemu-io> '
175        n = len(pattern)
176        pos = 0
177        s = []
178        while pos != n:
179            c = self._p.stdout.read(1)
180            # check unexpected EOF
181            assert c != ''
182            s.append(c)
183            if c == pattern[pos]:
184                pos += 1
185            else:
186                pos = 0
187
188        return ''.join(s[:-n])
189
190    def cmd(self, cmd):
191        # quit command is in close(), '\n' is added automatically
192        assert '\n' not in cmd
193        cmd = cmd.strip()
194        assert cmd != 'q' and cmd != 'quit'
195        self._p.stdin.write(cmd + '\n')
196        self._p.stdin.flush()
197        return self._read_output()
198
199
200def qemu_nbd(*args):
201    '''Run qemu-nbd in daemon mode and return the parent's exit code'''
202    return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
203
204def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
205    '''Return True if two image files are identical'''
206    return qemu_img('compare', '-f', fmt1,
207                    '-F', fmt2, img1, img2) == 0
208
209def create_image(name, size):
210    '''Create a fully-allocated raw image with sector markers'''
211    file = open(name, 'wb')
212    i = 0
213    while i < size:
214        sector = struct.pack('>l504xl', i // 512, i // 512)
215        file.write(sector)
216        i = i + 512
217    file.close()
218
219def image_size(img):
220    '''Return image's virtual size'''
221    r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
222    return json.loads(r)['virtual-size']
223
224test_dir_re = re.compile(r"%s" % test_dir)
225def filter_test_dir(msg):
226    return test_dir_re.sub("TEST_DIR", msg)
227
228win32_re = re.compile(r"\r")
229def filter_win32(msg):
230    return win32_re.sub("", msg)
231
232qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
233def filter_qemu_io(msg):
234    msg = filter_win32(msg)
235    return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
236
237chown_re = re.compile(r"chown [0-9]+:[0-9]+")
238def filter_chown(msg):
239    return chown_re.sub("chown UID:GID", msg)
240
241def filter_qmp_event(event):
242    '''Filter a QMP event dict'''
243    event = dict(event)
244    if 'timestamp' in event:
245        event['timestamp']['seconds'] = 'SECS'
246        event['timestamp']['microseconds'] = 'USECS'
247    return event
248
249def filter_qmp(qmsg, filter_fn):
250    '''Given a string filter, filter a QMP object's values.
251    filter_fn takes a (key, value) pair.'''
252    # Iterate through either lists or dicts;
253    if isinstance(qmsg, list):
254        items = enumerate(qmsg)
255    else:
256        items = qmsg.items()
257
258    for k, v in items:
259        if isinstance(v, list) or isinstance(v, dict):
260            qmsg[k] = filter_qmp(v, filter_fn)
261        else:
262            qmsg[k] = filter_fn(k, v)
263    return qmsg
264
265def filter_testfiles(msg):
266    prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
267    return msg.replace(prefix, 'TEST_DIR/PID-')
268
269def filter_qmp_testfiles(qmsg):
270    def _filter(key, value):
271        if key == 'filename' or key == 'backing-file':
272            return filter_testfiles(value)
273        return value
274    return filter_qmp(qmsg, _filter)
275
276def filter_generated_node_ids(msg):
277    return re.sub("#block[0-9]+", "NODE_NAME", msg)
278
279def filter_img_info(output, filename):
280    lines = []
281    for line in output.split('\n'):
282        if 'disk size' in line or 'actual-size' in line:
283            continue
284        line = line.replace(filename, 'TEST_IMG') \
285                   .replace(imgfmt, 'IMGFMT')
286        line = re.sub('iters: [0-9]+', 'iters: XXX', line)
287        line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
288        lines.append(line)
289    return '\n'.join(lines)
290
291def log(msg, filters=[], indent=None):
292    '''Logs either a string message or a JSON serializable message (like QMP).
293    If indent is provided, JSON serializable messages are pretty-printed.'''
294    for flt in filters:
295        msg = flt(msg)
296    if isinstance(msg, dict) or isinstance(msg, list):
297        # Python < 3.4 needs to know not to add whitespace when pretty-printing:
298        separators = (', ', ': ') if indent is None else (',', ': ')
299        # Don't sort if it's already sorted
300        do_sort = not isinstance(msg, OrderedDict)
301        print(json.dumps(msg, sort_keys=do_sort,
302                         indent=indent, separators=separators))
303    else:
304        print(msg)
305
306class Timeout:
307    def __init__(self, seconds, errmsg = "Timeout"):
308        self.seconds = seconds
309        self.errmsg = errmsg
310    def __enter__(self):
311        signal.signal(signal.SIGALRM, self.timeout)
312        signal.setitimer(signal.ITIMER_REAL, self.seconds)
313        return self
314    def __exit__(self, type, value, traceback):
315        signal.setitimer(signal.ITIMER_REAL, 0)
316        return False
317    def timeout(self, signum, frame):
318        raise Exception(self.errmsg)
319
320
321class FilePath(object):
322    '''An auto-generated filename that cleans itself up.
323
324    Use this context manager to generate filenames and ensure that the file
325    gets deleted::
326
327        with TestFilePath('test.img') as img_path:
328            qemu_img('create', img_path, '1G')
329        # migration_sock_path is automatically deleted
330    '''
331    def __init__(self, name):
332        filename = '{0}-{1}'.format(os.getpid(), name)
333        self.path = os.path.join(test_dir, filename)
334
335    def __enter__(self):
336        return self.path
337
338    def __exit__(self, exc_type, exc_val, exc_tb):
339        try:
340            os.remove(self.path)
341        except OSError:
342            pass
343        return False
344
345
346def file_path_remover():
347    for path in reversed(file_path_remover.paths):
348        try:
349            os.remove(path)
350        except OSError:
351            pass
352
353
354def file_path(*names):
355    ''' Another way to get auto-generated filename that cleans itself up.
356
357    Use is as simple as:
358
359    img_a, img_b = file_path('a.img', 'b.img')
360    sock = file_path('socket')
361    '''
362
363    if not hasattr(file_path_remover, 'paths'):
364        file_path_remover.paths = []
365        atexit.register(file_path_remover)
366
367    paths = []
368    for name in names:
369        filename = '{0}-{1}'.format(os.getpid(), name)
370        path = os.path.join(test_dir, filename)
371        file_path_remover.paths.append(path)
372        paths.append(path)
373
374    return paths[0] if len(paths) == 1 else paths
375
376def remote_filename(path):
377    if imgproto == 'file':
378        return path
379    elif imgproto == 'ssh':
380        return "ssh://127.0.0.1%s" % (path)
381    else:
382        raise Exception("Protocol %s not supported" % (imgproto))
383
384class VM(qtest.QEMUQtestMachine):
385    '''A QEMU VM'''
386
387    def __init__(self, path_suffix=''):
388        name = "qemu%s-%d" % (path_suffix, os.getpid())
389        super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
390                                 test_dir=test_dir,
391                                 socket_scm_helper=socket_scm_helper)
392        self._num_drives = 0
393
394    def add_object(self, opts):
395        self._args.append('-object')
396        self._args.append(opts)
397        return self
398
399    def add_device(self, opts):
400        self._args.append('-device')
401        self._args.append(opts)
402        return self
403
404    def add_drive_raw(self, opts):
405        self._args.append('-drive')
406        self._args.append(opts)
407        return self
408
409    def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
410        '''Add a virtio-blk drive to the VM'''
411        options = ['if=%s' % interface,
412                   'id=drive%d' % self._num_drives]
413
414        if path is not None:
415            options.append('file=%s' % path)
416            options.append('format=%s' % format)
417            options.append('cache=%s' % cachemode)
418
419        if opts:
420            options.append(opts)
421
422        if format == 'luks' and 'key-secret' not in opts:
423            # default luks support
424            if luks_default_secret_object not in self._args:
425                self.add_object(luks_default_secret_object)
426
427            options.append(luks_default_key_secret_opt)
428
429        self._args.append('-drive')
430        self._args.append(','.join(options))
431        self._num_drives += 1
432        return self
433
434    def add_blockdev(self, opts):
435        self._args.append('-blockdev')
436        if isinstance(opts, str):
437            self._args.append(opts)
438        else:
439            self._args.append(','.join(opts))
440        return self
441
442    def add_incoming(self, addr):
443        self._args.append('-incoming')
444        self._args.append(addr)
445        return self
446
447    def pause_drive(self, drive, event=None):
448        '''Pause drive r/w operations'''
449        if not event:
450            self.pause_drive(drive, "read_aio")
451            self.pause_drive(drive, "write_aio")
452            return
453        self.qmp('human-monitor-command',
454                    command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
455
456    def resume_drive(self, drive):
457        self.qmp('human-monitor-command',
458                    command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
459
460    def hmp_qemu_io(self, drive, cmd):
461        '''Write to a given drive using an HMP command'''
462        return self.qmp('human-monitor-command',
463                        command_line='qemu-io %s "%s"' % (drive, cmd))
464
465    def flatten_qmp_object(self, obj, output=None, basestr=''):
466        if output is None:
467            output = dict()
468        if isinstance(obj, list):
469            for i in range(len(obj)):
470                self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
471        elif isinstance(obj, dict):
472            for key in obj:
473                self.flatten_qmp_object(obj[key], output, basestr + key + '.')
474        else:
475            output[basestr[:-1]] = obj # Strip trailing '.'
476        return output
477
478    def qmp_to_opts(self, obj):
479        obj = self.flatten_qmp_object(obj)
480        output_list = list()
481        for key in obj:
482            output_list += [key + '=' + obj[key]]
483        return ','.join(output_list)
484
485    def get_qmp_events_filtered(self, wait=True):
486        result = []
487        for ev in self.get_qmp_events(wait=wait):
488            result.append(filter_qmp_event(ev))
489        return result
490
491    def qmp_log(self, cmd, filters=[], indent=None, **kwargs):
492        full_cmd = OrderedDict((
493            ("execute", cmd),
494            ("arguments", ordered_kwargs(kwargs))
495        ))
496        log(full_cmd, filters, indent=indent)
497        result = self.qmp(cmd, **kwargs)
498        log(result, filters, indent=indent)
499        return result
500
501    def run_job(self, job, auto_finalize=True, auto_dismiss=False):
502        while True:
503            for ev in self.get_qmp_events_filtered(wait=True):
504                if ev['event'] == 'JOB_STATUS_CHANGE':
505                    status = ev['data']['status']
506                    if status == 'aborting':
507                        result = self.qmp('query-jobs')
508                        for j in result['return']:
509                            if j['id'] == job:
510                                log('Job failed: %s' % (j['error']))
511                    elif status == 'pending' and not auto_finalize:
512                        self.qmp_log('job-finalize', id=job)
513                    elif status == 'concluded' and not auto_dismiss:
514                        self.qmp_log('job-dismiss', id=job)
515                    elif status == 'null':
516                        return
517                else:
518                    iotests.log(ev)
519
520
521index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
522
523class QMPTestCase(unittest.TestCase):
524    '''Abstract base class for QMP test cases'''
525
526    def dictpath(self, d, path):
527        '''Traverse a path in a nested dict'''
528        for component in path.split('/'):
529            m = index_re.match(component)
530            if m:
531                component, idx = m.groups()
532                idx = int(idx)
533
534            if not isinstance(d, dict) or component not in d:
535                self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
536            d = d[component]
537
538            if m:
539                if not isinstance(d, list):
540                    self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
541                try:
542                    d = d[idx]
543                except IndexError:
544                    self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
545        return d
546
547    def assert_qmp_absent(self, d, path):
548        try:
549            result = self.dictpath(d, path)
550        except AssertionError:
551            return
552        self.fail('path "%s" has value "%s"' % (path, str(result)))
553
554    def assert_qmp(self, d, path, value):
555        '''Assert that the value for a specific path in a QMP dict matches'''
556        result = self.dictpath(d, path)
557        self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
558
559    def assert_no_active_block_jobs(self):
560        result = self.vm.qmp('query-block-jobs')
561        self.assert_qmp(result, 'return', [])
562
563    def assert_has_block_node(self, node_name=None, file_name=None):
564        """Issue a query-named-block-nodes and assert node_name and/or
565        file_name is present in the result"""
566        def check_equal_or_none(a, b):
567            return a == None or b == None or a == b
568        assert node_name or file_name
569        result = self.vm.qmp('query-named-block-nodes')
570        for x in result["return"]:
571            if check_equal_or_none(x.get("node-name"), node_name) and \
572                    check_equal_or_none(x.get("file"), file_name):
573                return
574        self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
575                (node_name, file_name, result))
576
577    def assert_json_filename_equal(self, json_filename, reference):
578        '''Asserts that the given filename is a json: filename and that its
579           content is equal to the given reference object'''
580        self.assertEqual(json_filename[:5], 'json:')
581        self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
582                         self.vm.flatten_qmp_object(reference))
583
584    def cancel_and_wait(self, drive='drive0', force=False, resume=False):
585        '''Cancel a block job and wait for it to finish, returning the event'''
586        result = self.vm.qmp('block-job-cancel', device=drive, force=force)
587        self.assert_qmp(result, 'return', {})
588
589        if resume:
590            self.vm.resume_drive(drive)
591
592        cancelled = False
593        result = None
594        while not cancelled:
595            for event in self.vm.get_qmp_events(wait=True):
596                if event['event'] == 'BLOCK_JOB_COMPLETED' or \
597                   event['event'] == 'BLOCK_JOB_CANCELLED':
598                    self.assert_qmp(event, 'data/device', drive)
599                    result = event
600                    cancelled = True
601                elif event['event'] == 'JOB_STATUS_CHANGE':
602                    self.assert_qmp(event, 'data/id', drive)
603
604
605        self.assert_no_active_block_jobs()
606        return result
607
608    def wait_until_completed(self, drive='drive0', check_offset=True):
609        '''Wait for a block job to finish, returning the event'''
610        while True:
611            for event in self.vm.get_qmp_events(wait=True):
612                if event['event'] == 'BLOCK_JOB_COMPLETED':
613                    self.assert_qmp(event, 'data/device', drive)
614                    self.assert_qmp_absent(event, 'data/error')
615                    if check_offset:
616                        self.assert_qmp(event, 'data/offset', event['data']['len'])
617                    self.assert_no_active_block_jobs()
618                    return event
619                elif event['event'] == 'JOB_STATUS_CHANGE':
620                    self.assert_qmp(event, 'data/id', drive)
621
622    def wait_ready(self, drive='drive0'):
623        '''Wait until a block job BLOCK_JOB_READY event'''
624        f = {'data': {'type': 'mirror', 'device': drive } }
625        event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
626
627    def wait_ready_and_cancel(self, drive='drive0'):
628        self.wait_ready(drive=drive)
629        event = self.cancel_and_wait(drive=drive)
630        self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
631        self.assert_qmp(event, 'data/type', 'mirror')
632        self.assert_qmp(event, 'data/offset', event['data']['len'])
633
634    def complete_and_wait(self, drive='drive0', wait_ready=True):
635        '''Complete a block job and wait for it to finish'''
636        if wait_ready:
637            self.wait_ready(drive=drive)
638
639        result = self.vm.qmp('block-job-complete', device=drive)
640        self.assert_qmp(result, 'return', {})
641
642        event = self.wait_until_completed(drive=drive)
643        self.assert_qmp(event, 'data/type', 'mirror')
644
645    def pause_wait(self, job_id='job0'):
646        with Timeout(1, "Timeout waiting for job to pause"):
647            while True:
648                result = self.vm.qmp('query-block-jobs')
649                found = False
650                for job in result['return']:
651                    if job['device'] == job_id:
652                        found = True
653                        if job['paused'] == True and job['busy'] == False:
654                            return job
655                        break
656                assert found
657
658    def pause_job(self, job_id='job0', wait=True):
659        result = self.vm.qmp('block-job-pause', device=job_id)
660        self.assert_qmp(result, 'return', {})
661        if wait:
662            return self.pause_wait(job_id)
663        return result
664
665
666def notrun(reason):
667    '''Skip this test suite'''
668    # Each test in qemu-iotests has a number ("seq")
669    seq = os.path.basename(sys.argv[0])
670
671    open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
672    print('%s not run: %s' % (seq, reason))
673    sys.exit(0)
674
675def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
676    assert not (supported_fmts and unsupported_fmts)
677
678    if 'generic' in supported_fmts and \
679            os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
680        # similar to
681        #   _supported_fmt generic
682        # for bash tests
683        return
684
685    not_sup = supported_fmts and (imgfmt not in supported_fmts)
686    if not_sup or (imgfmt in unsupported_fmts):
687        notrun('not suitable for this image format: %s' % imgfmt)
688
689def verify_protocol(supported=[], unsupported=[]):
690    assert not (supported and unsupported)
691
692    if 'generic' in supported:
693        return
694
695    not_sup = supported and (imgproto not in supported)
696    if not_sup or (imgproto in unsupported):
697        notrun('not suitable for this protocol: %s' % imgproto)
698
699def verify_platform(supported_oses=['linux']):
700    if True not in [sys.platform.startswith(x) for x in supported_oses]:
701        notrun('not suitable for this OS: %s' % sys.platform)
702
703def verify_cache_mode(supported_cache_modes=[]):
704    if supported_cache_modes and (cachemode not in supported_cache_modes):
705        notrun('not suitable for this cache mode: %s' % cachemode)
706
707def supports_quorum():
708    return 'quorum' in qemu_img_pipe('--help')
709
710def verify_quorum():
711    '''Skip test suite if quorum support is not available'''
712    if not supports_quorum():
713        notrun('quorum support missing')
714
715def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
716         unsupported_fmts=[]):
717    '''Run tests'''
718
719    global debug
720
721    # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
722    # indicate that we're not being run via "check". There may be
723    # other things set up by "check" that individual test cases rely
724    # on.
725    if test_dir is None or qemu_default_machine is None:
726        sys.stderr.write('Please run this test via the "check" script\n')
727        sys.exit(os.EX_USAGE)
728
729    debug = '-d' in sys.argv
730    verbosity = 1
731    verify_image_format(supported_fmts, unsupported_fmts)
732    verify_platform(supported_oses)
733    verify_cache_mode(supported_cache_modes)
734
735    if debug:
736        output = sys.stdout
737        verbosity = 2
738        sys.argv.remove('-d')
739    else:
740        # We need to filter out the time taken from the output so that
741        # qemu-iotest can reliably diff the results against master output.
742        if sys.version_info.major >= 3:
743            output = io.StringIO()
744        else:
745            # io.StringIO is for unicode strings, which is not what
746            # 2.x's test runner emits.
747            output = io.BytesIO()
748
749    logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
750
751    class MyTestRunner(unittest.TextTestRunner):
752        def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
753            unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
754
755    # unittest.main() will use sys.exit() so expect a SystemExit exception
756    try:
757        unittest.main(testRunner=MyTestRunner)
758    finally:
759        if not debug:
760            sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))
761