1from __future__ import print_function 2# Common utilities and Python wrappers for qemu-iotests 3# 4# Copyright (C) 2012 IBM Corp. 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 2 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18# 19 20import errno 21import os 22import re 23import subprocess 24import string 25import unittest 26import sys 27import struct 28import json 29import signal 30import logging 31import atexit 32import io 33from collections import OrderedDict 34 35sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts')) 36import qtest 37 38 39# This will not work if arguments contain spaces but is necessary if we 40# want to support the override options that ./check supports. 41qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 42if os.environ.get('QEMU_IMG_OPTIONS'): 43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 44 45qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 46if os.environ.get('QEMU_IO_OPTIONS'): 47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 48 49qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')] 50if os.environ.get('QEMU_NBD_OPTIONS'): 51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ') 52 53qemu_prog = os.environ.get('QEMU_PROG', 'qemu') 54qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ') 55 56imgfmt = os.environ.get('IMGFMT', 'raw') 57imgproto = os.environ.get('IMGPROTO', 'file') 58test_dir = os.environ.get('TEST_DIR') 59output_dir = os.environ.get('OUTPUT_DIR', '.') 60cachemode = os.environ.get('CACHEMODE') 61qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 62 63socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 64debug = False 65 66luks_default_secret_object = 'secret,id=keysec0,data=' + \ 67 os.environ.get('IMGKEYSECRET', '') 68luks_default_key_secret_opt = 'key-secret=keysec0' 69 70 71def qemu_img(*args): 72 '''Run qemu-img and return the exit code''' 73 devnull = open('/dev/null', 'r+') 74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull) 75 if exitcode < 0: 76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 77 return exitcode 78 79def ordered_kwargs(kwargs): 80 # kwargs prior to 3.6 are not ordered, so: 81 od = OrderedDict() 82 for k, v in sorted(kwargs.items()): 83 if isinstance(v, dict): 84 od[k] = ordered_kwargs(v) 85 else: 86 od[k] = v 87 return od 88 89def qemu_img_create(*args): 90 args = list(args) 91 92 # default luks support 93 if '-f' in args and args[args.index('-f') + 1] == 'luks': 94 if '-o' in args: 95 i = args.index('-o') 96 if 'key-secret' not in args[i + 1]: 97 args[i + 1].append(luks_default_key_secret_opt) 98 args.insert(i + 2, '--object') 99 args.insert(i + 3, luks_default_secret_object) 100 else: 101 args = ['-o', luks_default_key_secret_opt, 102 '--object', luks_default_secret_object] + args 103 104 args.insert(0, 'create') 105 106 return qemu_img(*args) 107 108def qemu_img_verbose(*args): 109 '''Run qemu-img without suppressing its output and return the exit code''' 110 exitcode = subprocess.call(qemu_img_args + list(args)) 111 if exitcode < 0: 112 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 113 return exitcode 114 115def qemu_img_pipe(*args): 116 '''Run qemu-img and return its output''' 117 subp = subprocess.Popen(qemu_img_args + list(args), 118 stdout=subprocess.PIPE, 119 stderr=subprocess.STDOUT, 120 universal_newlines=True) 121 exitcode = subp.wait() 122 if exitcode < 0: 123 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 124 return subp.communicate()[0] 125 126def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]): 127 args = [ 'info' ] 128 if imgopts: 129 args.append('--image-opts') 130 else: 131 args += [ '-f', imgfmt ] 132 args += extra_args 133 args.append(filename) 134 135 output = qemu_img_pipe(*args) 136 if not filter_path: 137 filter_path = filename 138 log(filter_img_info(output, filter_path)) 139 140def qemu_io(*args): 141 '''Run qemu-io and return the stdout data''' 142 args = qemu_io_args + list(args) 143 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 144 stderr=subprocess.STDOUT, 145 universal_newlines=True) 146 exitcode = subp.wait() 147 if exitcode < 0: 148 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args))) 149 return subp.communicate()[0] 150 151def qemu_io_silent(*args): 152 '''Run qemu-io and return the exit code, suppressing stdout''' 153 args = qemu_io_args + list(args) 154 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w')) 155 if exitcode < 0: 156 sys.stderr.write('qemu-io received signal %i: %s\n' % 157 (-exitcode, ' '.join(args))) 158 return exitcode 159 160 161class QemuIoInteractive: 162 def __init__(self, *args): 163 self.args = qemu_io_args + list(args) 164 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE, 165 stdout=subprocess.PIPE, 166 stderr=subprocess.STDOUT, 167 universal_newlines=True) 168 assert self._p.stdout.read(9) == 'qemu-io> ' 169 170 def close(self): 171 self._p.communicate('q\n') 172 173 def _read_output(self): 174 pattern = 'qemu-io> ' 175 n = len(pattern) 176 pos = 0 177 s = [] 178 while pos != n: 179 c = self._p.stdout.read(1) 180 # check unexpected EOF 181 assert c != '' 182 s.append(c) 183 if c == pattern[pos]: 184 pos += 1 185 else: 186 pos = 0 187 188 return ''.join(s[:-n]) 189 190 def cmd(self, cmd): 191 # quit command is in close(), '\n' is added automatically 192 assert '\n' not in cmd 193 cmd = cmd.strip() 194 assert cmd != 'q' and cmd != 'quit' 195 self._p.stdin.write(cmd + '\n') 196 self._p.stdin.flush() 197 return self._read_output() 198 199 200def qemu_nbd(*args): 201 '''Run qemu-nbd in daemon mode and return the parent's exit code''' 202 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args)) 203 204def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt): 205 '''Return True if two image files are identical''' 206 return qemu_img('compare', '-f', fmt1, 207 '-F', fmt2, img1, img2) == 0 208 209def create_image(name, size): 210 '''Create a fully-allocated raw image with sector markers''' 211 file = open(name, 'wb') 212 i = 0 213 while i < size: 214 sector = struct.pack('>l504xl', i // 512, i // 512) 215 file.write(sector) 216 i = i + 512 217 file.close() 218 219def image_size(img): 220 '''Return image's virtual size''' 221 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img) 222 return json.loads(r)['virtual-size'] 223 224test_dir_re = re.compile(r"%s" % test_dir) 225def filter_test_dir(msg): 226 return test_dir_re.sub("TEST_DIR", msg) 227 228win32_re = re.compile(r"\r") 229def filter_win32(msg): 230 return win32_re.sub("", msg) 231 232qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)") 233def filter_qemu_io(msg): 234 msg = filter_win32(msg) 235 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg) 236 237chown_re = re.compile(r"chown [0-9]+:[0-9]+") 238def filter_chown(msg): 239 return chown_re.sub("chown UID:GID", msg) 240 241def filter_qmp_event(event): 242 '''Filter a QMP event dict''' 243 event = dict(event) 244 if 'timestamp' in event: 245 event['timestamp']['seconds'] = 'SECS' 246 event['timestamp']['microseconds'] = 'USECS' 247 return event 248 249def filter_qmp(qmsg, filter_fn): 250 '''Given a string filter, filter a QMP object's values. 251 filter_fn takes a (key, value) pair.''' 252 # Iterate through either lists or dicts; 253 if isinstance(qmsg, list): 254 items = enumerate(qmsg) 255 else: 256 items = qmsg.items() 257 258 for k, v in items: 259 if isinstance(v, list) or isinstance(v, dict): 260 qmsg[k] = filter_qmp(v, filter_fn) 261 else: 262 qmsg[k] = filter_fn(k, v) 263 return qmsg 264 265def filter_testfiles(msg): 266 prefix = os.path.join(test_dir, "%s-" % (os.getpid())) 267 return msg.replace(prefix, 'TEST_DIR/PID-') 268 269def filter_qmp_testfiles(qmsg): 270 def _filter(key, value): 271 if key == 'filename' or key == 'backing-file': 272 return filter_testfiles(value) 273 return value 274 return filter_qmp(qmsg, _filter) 275 276def filter_generated_node_ids(msg): 277 return re.sub("#block[0-9]+", "NODE_NAME", msg) 278 279def filter_img_info(output, filename): 280 lines = [] 281 for line in output.split('\n'): 282 if 'disk size' in line or 'actual-size' in line: 283 continue 284 line = line.replace(filename, 'TEST_IMG') \ 285 .replace(imgfmt, 'IMGFMT') 286 line = re.sub('iters: [0-9]+', 'iters: XXX', line) 287 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line) 288 lines.append(line) 289 return '\n'.join(lines) 290 291def log(msg, filters=[], indent=None): 292 '''Logs either a string message or a JSON serializable message (like QMP). 293 If indent is provided, JSON serializable messages are pretty-printed.''' 294 for flt in filters: 295 msg = flt(msg) 296 if isinstance(msg, dict) or isinstance(msg, list): 297 # Python < 3.4 needs to know not to add whitespace when pretty-printing: 298 separators = (', ', ': ') if indent is None else (',', ': ') 299 # Don't sort if it's already sorted 300 do_sort = not isinstance(msg, OrderedDict) 301 print(json.dumps(msg, sort_keys=do_sort, 302 indent=indent, separators=separators)) 303 else: 304 print(msg) 305 306class Timeout: 307 def __init__(self, seconds, errmsg = "Timeout"): 308 self.seconds = seconds 309 self.errmsg = errmsg 310 def __enter__(self): 311 signal.signal(signal.SIGALRM, self.timeout) 312 signal.setitimer(signal.ITIMER_REAL, self.seconds) 313 return self 314 def __exit__(self, type, value, traceback): 315 signal.setitimer(signal.ITIMER_REAL, 0) 316 return False 317 def timeout(self, signum, frame): 318 raise Exception(self.errmsg) 319 320 321class FilePath(object): 322 '''An auto-generated filename that cleans itself up. 323 324 Use this context manager to generate filenames and ensure that the file 325 gets deleted:: 326 327 with TestFilePath('test.img') as img_path: 328 qemu_img('create', img_path, '1G') 329 # migration_sock_path is automatically deleted 330 ''' 331 def __init__(self, name): 332 filename = '{0}-{1}'.format(os.getpid(), name) 333 self.path = os.path.join(test_dir, filename) 334 335 def __enter__(self): 336 return self.path 337 338 def __exit__(self, exc_type, exc_val, exc_tb): 339 try: 340 os.remove(self.path) 341 except OSError: 342 pass 343 return False 344 345 346def file_path_remover(): 347 for path in reversed(file_path_remover.paths): 348 try: 349 os.remove(path) 350 except OSError: 351 pass 352 353 354def file_path(*names): 355 ''' Another way to get auto-generated filename that cleans itself up. 356 357 Use is as simple as: 358 359 img_a, img_b = file_path('a.img', 'b.img') 360 sock = file_path('socket') 361 ''' 362 363 if not hasattr(file_path_remover, 'paths'): 364 file_path_remover.paths = [] 365 atexit.register(file_path_remover) 366 367 paths = [] 368 for name in names: 369 filename = '{0}-{1}'.format(os.getpid(), name) 370 path = os.path.join(test_dir, filename) 371 file_path_remover.paths.append(path) 372 paths.append(path) 373 374 return paths[0] if len(paths) == 1 else paths 375 376def remote_filename(path): 377 if imgproto == 'file': 378 return path 379 elif imgproto == 'ssh': 380 return "ssh://127.0.0.1%s" % (path) 381 else: 382 raise Exception("Protocol %s not supported" % (imgproto)) 383 384class VM(qtest.QEMUQtestMachine): 385 '''A QEMU VM''' 386 387 def __init__(self, path_suffix=''): 388 name = "qemu%s-%d" % (path_suffix, os.getpid()) 389 super(VM, self).__init__(qemu_prog, qemu_opts, name=name, 390 test_dir=test_dir, 391 socket_scm_helper=socket_scm_helper) 392 self._num_drives = 0 393 394 def add_object(self, opts): 395 self._args.append('-object') 396 self._args.append(opts) 397 return self 398 399 def add_device(self, opts): 400 self._args.append('-device') 401 self._args.append(opts) 402 return self 403 404 def add_drive_raw(self, opts): 405 self._args.append('-drive') 406 self._args.append(opts) 407 return self 408 409 def add_drive(self, path, opts='', interface='virtio', format=imgfmt): 410 '''Add a virtio-blk drive to the VM''' 411 options = ['if=%s' % interface, 412 'id=drive%d' % self._num_drives] 413 414 if path is not None: 415 options.append('file=%s' % path) 416 options.append('format=%s' % format) 417 options.append('cache=%s' % cachemode) 418 419 if opts: 420 options.append(opts) 421 422 if format == 'luks' and 'key-secret' not in opts: 423 # default luks support 424 if luks_default_secret_object not in self._args: 425 self.add_object(luks_default_secret_object) 426 427 options.append(luks_default_key_secret_opt) 428 429 self._args.append('-drive') 430 self._args.append(','.join(options)) 431 self._num_drives += 1 432 return self 433 434 def add_blockdev(self, opts): 435 self._args.append('-blockdev') 436 if isinstance(opts, str): 437 self._args.append(opts) 438 else: 439 self._args.append(','.join(opts)) 440 return self 441 442 def add_incoming(self, addr): 443 self._args.append('-incoming') 444 self._args.append(addr) 445 return self 446 447 def pause_drive(self, drive, event=None): 448 '''Pause drive r/w operations''' 449 if not event: 450 self.pause_drive(drive, "read_aio") 451 self.pause_drive(drive, "write_aio") 452 return 453 self.qmp('human-monitor-command', 454 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive)) 455 456 def resume_drive(self, drive): 457 self.qmp('human-monitor-command', 458 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive)) 459 460 def hmp_qemu_io(self, drive, cmd): 461 '''Write to a given drive using an HMP command''' 462 return self.qmp('human-monitor-command', 463 command_line='qemu-io %s "%s"' % (drive, cmd)) 464 465 def flatten_qmp_object(self, obj, output=None, basestr=''): 466 if output is None: 467 output = dict() 468 if isinstance(obj, list): 469 for i in range(len(obj)): 470 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.') 471 elif isinstance(obj, dict): 472 for key in obj: 473 self.flatten_qmp_object(obj[key], output, basestr + key + '.') 474 else: 475 output[basestr[:-1]] = obj # Strip trailing '.' 476 return output 477 478 def qmp_to_opts(self, obj): 479 obj = self.flatten_qmp_object(obj) 480 output_list = list() 481 for key in obj: 482 output_list += [key + '=' + obj[key]] 483 return ','.join(output_list) 484 485 def get_qmp_events_filtered(self, wait=True): 486 result = [] 487 for ev in self.get_qmp_events(wait=wait): 488 result.append(filter_qmp_event(ev)) 489 return result 490 491 def qmp_log(self, cmd, filters=[], indent=None, **kwargs): 492 full_cmd = OrderedDict(( 493 ("execute", cmd), 494 ("arguments", ordered_kwargs(kwargs)) 495 )) 496 log(full_cmd, filters, indent=indent) 497 result = self.qmp(cmd, **kwargs) 498 log(result, filters, indent=indent) 499 return result 500 501 def run_job(self, job, auto_finalize=True, auto_dismiss=False): 502 while True: 503 for ev in self.get_qmp_events_filtered(wait=True): 504 if ev['event'] == 'JOB_STATUS_CHANGE': 505 status = ev['data']['status'] 506 if status == 'aborting': 507 result = self.qmp('query-jobs') 508 for j in result['return']: 509 if j['id'] == job: 510 log('Job failed: %s' % (j['error'])) 511 elif status == 'pending' and not auto_finalize: 512 self.qmp_log('job-finalize', id=job) 513 elif status == 'concluded' and not auto_dismiss: 514 self.qmp_log('job-dismiss', id=job) 515 elif status == 'null': 516 return 517 else: 518 iotests.log(ev) 519 520 521index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 522 523class QMPTestCase(unittest.TestCase): 524 '''Abstract base class for QMP test cases''' 525 526 def dictpath(self, d, path): 527 '''Traverse a path in a nested dict''' 528 for component in path.split('/'): 529 m = index_re.match(component) 530 if m: 531 component, idx = m.groups() 532 idx = int(idx) 533 534 if not isinstance(d, dict) or component not in d: 535 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d))) 536 d = d[component] 537 538 if m: 539 if not isinstance(d, list): 540 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d))) 541 try: 542 d = d[idx] 543 except IndexError: 544 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d))) 545 return d 546 547 def assert_qmp_absent(self, d, path): 548 try: 549 result = self.dictpath(d, path) 550 except AssertionError: 551 return 552 self.fail('path "%s" has value "%s"' % (path, str(result))) 553 554 def assert_qmp(self, d, path, value): 555 '''Assert that the value for a specific path in a QMP dict matches''' 556 result = self.dictpath(d, path) 557 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value))) 558 559 def assert_no_active_block_jobs(self): 560 result = self.vm.qmp('query-block-jobs') 561 self.assert_qmp(result, 'return', []) 562 563 def assert_has_block_node(self, node_name=None, file_name=None): 564 """Issue a query-named-block-nodes and assert node_name and/or 565 file_name is present in the result""" 566 def check_equal_or_none(a, b): 567 return a == None or b == None or a == b 568 assert node_name or file_name 569 result = self.vm.qmp('query-named-block-nodes') 570 for x in result["return"]: 571 if check_equal_or_none(x.get("node-name"), node_name) and \ 572 check_equal_or_none(x.get("file"), file_name): 573 return 574 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \ 575 (node_name, file_name, result)) 576 577 def assert_json_filename_equal(self, json_filename, reference): 578 '''Asserts that the given filename is a json: filename and that its 579 content is equal to the given reference object''' 580 self.assertEqual(json_filename[:5], 'json:') 581 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])), 582 self.vm.flatten_qmp_object(reference)) 583 584 def cancel_and_wait(self, drive='drive0', force=False, resume=False): 585 '''Cancel a block job and wait for it to finish, returning the event''' 586 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 587 self.assert_qmp(result, 'return', {}) 588 589 if resume: 590 self.vm.resume_drive(drive) 591 592 cancelled = False 593 result = None 594 while not cancelled: 595 for event in self.vm.get_qmp_events(wait=True): 596 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 597 event['event'] == 'BLOCK_JOB_CANCELLED': 598 self.assert_qmp(event, 'data/device', drive) 599 result = event 600 cancelled = True 601 elif event['event'] == 'JOB_STATUS_CHANGE': 602 self.assert_qmp(event, 'data/id', drive) 603 604 605 self.assert_no_active_block_jobs() 606 return result 607 608 def wait_until_completed(self, drive='drive0', check_offset=True): 609 '''Wait for a block job to finish, returning the event''' 610 while True: 611 for event in self.vm.get_qmp_events(wait=True): 612 if event['event'] == 'BLOCK_JOB_COMPLETED': 613 self.assert_qmp(event, 'data/device', drive) 614 self.assert_qmp_absent(event, 'data/error') 615 if check_offset: 616 self.assert_qmp(event, 'data/offset', event['data']['len']) 617 self.assert_no_active_block_jobs() 618 return event 619 elif event['event'] == 'JOB_STATUS_CHANGE': 620 self.assert_qmp(event, 'data/id', drive) 621 622 def wait_ready(self, drive='drive0'): 623 '''Wait until a block job BLOCK_JOB_READY event''' 624 f = {'data': {'type': 'mirror', 'device': drive } } 625 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 626 627 def wait_ready_and_cancel(self, drive='drive0'): 628 self.wait_ready(drive=drive) 629 event = self.cancel_and_wait(drive=drive) 630 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED') 631 self.assert_qmp(event, 'data/type', 'mirror') 632 self.assert_qmp(event, 'data/offset', event['data']['len']) 633 634 def complete_and_wait(self, drive='drive0', wait_ready=True): 635 '''Complete a block job and wait for it to finish''' 636 if wait_ready: 637 self.wait_ready(drive=drive) 638 639 result = self.vm.qmp('block-job-complete', device=drive) 640 self.assert_qmp(result, 'return', {}) 641 642 event = self.wait_until_completed(drive=drive) 643 self.assert_qmp(event, 'data/type', 'mirror') 644 645 def pause_wait(self, job_id='job0'): 646 with Timeout(1, "Timeout waiting for job to pause"): 647 while True: 648 result = self.vm.qmp('query-block-jobs') 649 found = False 650 for job in result['return']: 651 if job['device'] == job_id: 652 found = True 653 if job['paused'] == True and job['busy'] == False: 654 return job 655 break 656 assert found 657 658 def pause_job(self, job_id='job0', wait=True): 659 result = self.vm.qmp('block-job-pause', device=job_id) 660 self.assert_qmp(result, 'return', {}) 661 if wait: 662 return self.pause_wait(job_id) 663 return result 664 665 666def notrun(reason): 667 '''Skip this test suite''' 668 # Each test in qemu-iotests has a number ("seq") 669 seq = os.path.basename(sys.argv[0]) 670 671 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n') 672 print('%s not run: %s' % (seq, reason)) 673 sys.exit(0) 674 675def verify_image_format(supported_fmts=[], unsupported_fmts=[]): 676 assert not (supported_fmts and unsupported_fmts) 677 678 if 'generic' in supported_fmts and \ 679 os.environ.get('IMGFMT_GENERIC', 'true') == 'true': 680 # similar to 681 # _supported_fmt generic 682 # for bash tests 683 return 684 685 not_sup = supported_fmts and (imgfmt not in supported_fmts) 686 if not_sup or (imgfmt in unsupported_fmts): 687 notrun('not suitable for this image format: %s' % imgfmt) 688 689def verify_protocol(supported=[], unsupported=[]): 690 assert not (supported and unsupported) 691 692 if 'generic' in supported: 693 return 694 695 not_sup = supported and (imgproto not in supported) 696 if not_sup or (imgproto in unsupported): 697 notrun('not suitable for this protocol: %s' % imgproto) 698 699def verify_platform(supported_oses=['linux']): 700 if True not in [sys.platform.startswith(x) for x in supported_oses]: 701 notrun('not suitable for this OS: %s' % sys.platform) 702 703def verify_cache_mode(supported_cache_modes=[]): 704 if supported_cache_modes and (cachemode not in supported_cache_modes): 705 notrun('not suitable for this cache mode: %s' % cachemode) 706 707def supports_quorum(): 708 return 'quorum' in qemu_img_pipe('--help') 709 710def verify_quorum(): 711 '''Skip test suite if quorum support is not available''' 712 if not supports_quorum(): 713 notrun('quorum support missing') 714 715def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[], 716 unsupported_fmts=[]): 717 '''Run tests''' 718 719 global debug 720 721 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to 722 # indicate that we're not being run via "check". There may be 723 # other things set up by "check" that individual test cases rely 724 # on. 725 if test_dir is None or qemu_default_machine is None: 726 sys.stderr.write('Please run this test via the "check" script\n') 727 sys.exit(os.EX_USAGE) 728 729 debug = '-d' in sys.argv 730 verbosity = 1 731 verify_image_format(supported_fmts, unsupported_fmts) 732 verify_platform(supported_oses) 733 verify_cache_mode(supported_cache_modes) 734 735 if debug: 736 output = sys.stdout 737 verbosity = 2 738 sys.argv.remove('-d') 739 else: 740 # We need to filter out the time taken from the output so that 741 # qemu-iotest can reliably diff the results against master output. 742 if sys.version_info.major >= 3: 743 output = io.StringIO() 744 else: 745 # io.StringIO is for unicode strings, which is not what 746 # 2.x's test runner emits. 747 output = io.BytesIO() 748 749 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) 750 751 class MyTestRunner(unittest.TextTestRunner): 752 def __init__(self, stream=output, descriptions=True, verbosity=verbosity): 753 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity) 754 755 # unittest.main() will use sys.exit() so expect a SystemExit exception 756 try: 757 unittest.main(testRunner=MyTestRunner) 758 finally: 759 if not debug: 760 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue())) 761