1# Common utilities and Python wrappers for qemu-iotests 2# 3# Copyright (C) 2012 IBM Corp. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import errno 20import os 21import re 22import subprocess 23import string 24import unittest 25import sys 26sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts')) 27sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts', 'qmp')) 28import qmp 29import qtest 30import struct 31 32 33# This will not work if arguments contain spaces but is necessary if we 34# want to support the override options that ./check supports. 35qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')] 36if os.environ.get('QEMU_IMG_OPTIONS'): 37 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ') 38 39qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')] 40if os.environ.get('QEMU_IO_OPTIONS'): 41 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ') 42 43qemu_args = [os.environ.get('QEMU_PROG', 'qemu')] 44if os.environ.get('QEMU_OPTIONS'): 45 qemu_args += os.environ['QEMU_OPTIONS'].strip().split(' ') 46 47imgfmt = os.environ.get('IMGFMT', 'raw') 48imgproto = os.environ.get('IMGPROTO', 'file') 49test_dir = os.environ.get('TEST_DIR', '/var/tmp') 50output_dir = os.environ.get('OUTPUT_DIR', '.') 51cachemode = os.environ.get('CACHEMODE') 52qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE') 53 54socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper') 55 56def qemu_img(*args): 57 '''Run qemu-img and return the exit code''' 58 devnull = open('/dev/null', 'r+') 59 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull) 60 if exitcode < 0: 61 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 62 return exitcode 63 64def qemu_img_verbose(*args): 65 '''Run qemu-img without suppressing its output and return the exit code''' 66 exitcode = subprocess.call(qemu_img_args + list(args)) 67 if exitcode < 0: 68 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 69 return exitcode 70 71def qemu_img_pipe(*args): 72 '''Run qemu-img and return its output''' 73 subp = subprocess.Popen(qemu_img_args + list(args), 74 stdout=subprocess.PIPE, 75 stderr=subprocess.STDOUT) 76 exitcode = subp.wait() 77 if exitcode < 0: 78 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) 79 return subp.communicate()[0] 80 81def qemu_io(*args): 82 '''Run qemu-io and return the stdout data''' 83 args = qemu_io_args + list(args) 84 subp = subprocess.Popen(args, stdout=subprocess.PIPE, 85 stderr=subprocess.STDOUT) 86 exitcode = subp.wait() 87 if exitcode < 0: 88 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args))) 89 return subp.communicate()[0] 90 91def compare_images(img1, img2): 92 '''Return True if two image files are identical''' 93 return qemu_img('compare', '-f', imgfmt, 94 '-F', imgfmt, img1, img2) == 0 95 96def create_image(name, size): 97 '''Create a fully-allocated raw image with sector markers''' 98 file = open(name, 'w') 99 i = 0 100 while i < size: 101 sector = struct.pack('>l504xl', i / 512, i / 512) 102 file.write(sector) 103 i = i + 512 104 file.close() 105 106test_dir_re = re.compile(r"%s" % test_dir) 107def filter_test_dir(msg): 108 return test_dir_re.sub("TEST_DIR", msg) 109 110win32_re = re.compile(r"\r") 111def filter_win32(msg): 112 return win32_re.sub("", msg) 113 114qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)") 115def filter_qemu_io(msg): 116 msg = filter_win32(msg) 117 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg) 118 119chown_re = re.compile(r"chown [0-9]+:[0-9]+") 120def filter_chown(msg): 121 return chown_re.sub("chown UID:GID", msg) 122 123def log(msg, filters=[]): 124 for flt in filters: 125 msg = flt(msg) 126 print msg 127 128# Test if 'match' is a recursive subset of 'event' 129def event_match(event, match=None): 130 if match is None: 131 return True 132 133 for key in match: 134 if key in event: 135 if isinstance(event[key], dict): 136 if not event_match(event[key], match[key]): 137 return False 138 elif event[key] != match[key]: 139 return False 140 else: 141 return False 142 143 return True 144 145class VM(object): 146 '''A QEMU VM''' 147 148 def __init__(self): 149 self._monitor_path = os.path.join(test_dir, 'qemu-mon.%d' % os.getpid()) 150 self._qemu_log_path = os.path.join(test_dir, 'qemu-log.%d' % os.getpid()) 151 self._qtest_path = os.path.join(test_dir, 'qemu-qtest.%d' % os.getpid()) 152 self._args = qemu_args + ['-chardev', 153 'socket,id=mon,path=' + self._monitor_path, 154 '-mon', 'chardev=mon,mode=control', 155 '-qtest', 'unix:path=' + self._qtest_path, 156 '-machine', 'accel=qtest', 157 '-display', 'none', '-vga', 'none'] 158 self._num_drives = 0 159 self._events = [] 160 161 # This can be used to add an unused monitor instance. 162 def add_monitor_telnet(self, ip, port): 163 args = 'tcp:%s:%d,server,nowait,telnet' % (ip, port) 164 self._args.append('-monitor') 165 self._args.append(args) 166 167 def add_drive_raw(self, opts): 168 self._args.append('-drive') 169 self._args.append(opts) 170 return self 171 172 def add_drive(self, path, opts='', interface='virtio'): 173 '''Add a virtio-blk drive to the VM''' 174 options = ['if=%s' % interface, 175 'id=drive%d' % self._num_drives] 176 177 if path is not None: 178 options.append('file=%s' % path) 179 options.append('format=%s' % imgfmt) 180 options.append('cache=%s' % cachemode) 181 182 if opts: 183 options.append(opts) 184 185 self._args.append('-drive') 186 self._args.append(','.join(options)) 187 self._num_drives += 1 188 return self 189 190 def pause_drive(self, drive, event=None): 191 '''Pause drive r/w operations''' 192 if not event: 193 self.pause_drive(drive, "read_aio") 194 self.pause_drive(drive, "write_aio") 195 return 196 self.qmp('human-monitor-command', 197 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive)) 198 199 def resume_drive(self, drive): 200 self.qmp('human-monitor-command', 201 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive)) 202 203 def hmp_qemu_io(self, drive, cmd): 204 '''Write to a given drive using an HMP command''' 205 return self.qmp('human-monitor-command', 206 command_line='qemu-io %s "%s"' % (drive, cmd)) 207 208 def add_fd(self, fd, fdset, opaque, opts=''): 209 '''Pass a file descriptor to the VM''' 210 options = ['fd=%d' % fd, 211 'set=%d' % fdset, 212 'opaque=%s' % opaque] 213 if opts: 214 options.append(opts) 215 216 self._args.append('-add-fd') 217 self._args.append(','.join(options)) 218 return self 219 220 def send_fd_scm(self, fd_file_path): 221 # In iotest.py, the qmp should always use unix socket. 222 assert self._qmp.is_scm_available() 223 bin = socket_scm_helper 224 if os.path.exists(bin) == False: 225 print "Scm help program does not present, path '%s'." % bin 226 return -1 227 fd_param = ["%s" % bin, 228 "%d" % self._qmp.get_sock_fd(), 229 "%s" % fd_file_path] 230 devnull = open('/dev/null', 'rb') 231 p = subprocess.Popen(fd_param, stdin=devnull, stdout=sys.stdout, 232 stderr=sys.stderr) 233 return p.wait() 234 235 def launch(self): 236 '''Launch the VM and establish a QMP connection''' 237 devnull = open('/dev/null', 'rb') 238 qemulog = open(self._qemu_log_path, 'wb') 239 try: 240 self._qmp = qmp.QEMUMonitorProtocol(self._monitor_path, server=True) 241 self._qtest = qtest.QEMUQtestProtocol(self._qtest_path, server=True) 242 self._popen = subprocess.Popen(self._args, stdin=devnull, stdout=qemulog, 243 stderr=subprocess.STDOUT) 244 self._qmp.accept() 245 self._qtest.accept() 246 except: 247 _remove_if_exists(self._monitor_path) 248 _remove_if_exists(self._qtest_path) 249 raise 250 251 def shutdown(self): 252 '''Terminate the VM and clean up''' 253 if not self._popen is None: 254 self._qmp.cmd('quit') 255 exitcode = self._popen.wait() 256 if exitcode < 0: 257 sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode, ' '.join(self._args))) 258 os.remove(self._monitor_path) 259 os.remove(self._qtest_path) 260 os.remove(self._qemu_log_path) 261 self._popen = None 262 263 underscore_to_dash = string.maketrans('_', '-') 264 def qmp(self, cmd, conv_keys=True, **args): 265 '''Invoke a QMP command and return the result dict''' 266 qmp_args = dict() 267 for k in args.keys(): 268 if conv_keys: 269 qmp_args[k.translate(self.underscore_to_dash)] = args[k] 270 else: 271 qmp_args[k] = args[k] 272 273 return self._qmp.cmd(cmd, args=qmp_args) 274 275 def qtest(self, cmd): 276 '''Send a qtest command to guest''' 277 return self._qtest.cmd(cmd) 278 279 def get_qmp_event(self, wait=False): 280 '''Poll for one queued QMP events and return it''' 281 if len(self._events) > 0: 282 return self._events.pop(0) 283 return self._qmp.pull_event(wait=wait) 284 285 def get_qmp_events(self, wait=False): 286 '''Poll for queued QMP events and return a list of dicts''' 287 events = self._qmp.get_events(wait=wait) 288 events.extend(self._events) 289 del self._events[:] 290 self._qmp.clear_events() 291 return events 292 293 def event_wait(self, name='BLOCK_JOB_COMPLETED', timeout=60.0, match=None): 294 # Search cached events 295 for event in self._events: 296 if (event['event'] == name) and event_match(event, match): 297 self._events.remove(event) 298 return event 299 300 # Poll for new events 301 while True: 302 event = self._qmp.pull_event(wait=timeout) 303 if (event['event'] == name) and event_match(event, match): 304 return event 305 self._events.append(event) 306 307 return None 308 309index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') 310 311class QMPTestCase(unittest.TestCase): 312 '''Abstract base class for QMP test cases''' 313 314 def dictpath(self, d, path): 315 '''Traverse a path in a nested dict''' 316 for component in path.split('/'): 317 m = index_re.match(component) 318 if m: 319 component, idx = m.groups() 320 idx = int(idx) 321 322 if not isinstance(d, dict) or component not in d: 323 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d))) 324 d = d[component] 325 326 if m: 327 if not isinstance(d, list): 328 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d))) 329 try: 330 d = d[idx] 331 except IndexError: 332 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d))) 333 return d 334 335 def assert_qmp_absent(self, d, path): 336 try: 337 result = self.dictpath(d, path) 338 except AssertionError: 339 return 340 self.fail('path "%s" has value "%s"' % (path, str(result))) 341 342 def assert_qmp(self, d, path, value): 343 '''Assert that the value for a specific path in a QMP dict matches''' 344 result = self.dictpath(d, path) 345 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value))) 346 347 def assert_no_active_block_jobs(self): 348 result = self.vm.qmp('query-block-jobs') 349 self.assert_qmp(result, 'return', []) 350 351 def cancel_and_wait(self, drive='drive0', force=False, resume=False): 352 '''Cancel a block job and wait for it to finish, returning the event''' 353 result = self.vm.qmp('block-job-cancel', device=drive, force=force) 354 self.assert_qmp(result, 'return', {}) 355 356 if resume: 357 self.vm.resume_drive(drive) 358 359 cancelled = False 360 result = None 361 while not cancelled: 362 for event in self.vm.get_qmp_events(wait=True): 363 if event['event'] == 'BLOCK_JOB_COMPLETED' or \ 364 event['event'] == 'BLOCK_JOB_CANCELLED': 365 self.assert_qmp(event, 'data/device', drive) 366 result = event 367 cancelled = True 368 369 self.assert_no_active_block_jobs() 370 return result 371 372 def wait_until_completed(self, drive='drive0', check_offset=True): 373 '''Wait for a block job to finish, returning the event''' 374 completed = False 375 while not completed: 376 for event in self.vm.get_qmp_events(wait=True): 377 if event['event'] == 'BLOCK_JOB_COMPLETED': 378 self.assert_qmp(event, 'data/device', drive) 379 self.assert_qmp_absent(event, 'data/error') 380 if check_offset: 381 self.assert_qmp(event, 'data/offset', event['data']['len']) 382 completed = True 383 384 self.assert_no_active_block_jobs() 385 return event 386 387 def wait_ready(self, drive='drive0'): 388 '''Wait until a block job BLOCK_JOB_READY event''' 389 f = {'data': {'type': 'mirror', 'device': drive } } 390 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f) 391 392 def wait_ready_and_cancel(self, drive='drive0'): 393 self.wait_ready(drive=drive) 394 event = self.cancel_and_wait(drive=drive) 395 self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED') 396 self.assert_qmp(event, 'data/type', 'mirror') 397 self.assert_qmp(event, 'data/offset', event['data']['len']) 398 399 def complete_and_wait(self, drive='drive0', wait_ready=True): 400 '''Complete a block job and wait for it to finish''' 401 if wait_ready: 402 self.wait_ready(drive=drive) 403 404 result = self.vm.qmp('block-job-complete', device=drive) 405 self.assert_qmp(result, 'return', {}) 406 407 event = self.wait_until_completed(drive=drive) 408 self.assert_qmp(event, 'data/type', 'mirror') 409 410def _remove_if_exists(path): 411 '''Remove file object at path if it exists''' 412 try: 413 os.remove(path) 414 except OSError as exception: 415 if exception.errno == errno.ENOENT: 416 return 417 raise 418 419def notrun(reason): 420 '''Skip this test suite''' 421 # Each test in qemu-iotests has a number ("seq") 422 seq = os.path.basename(sys.argv[0]) 423 424 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n') 425 print '%s not run: %s' % (seq, reason) 426 sys.exit(0) 427 428def verify_image_format(supported_fmts=[]): 429 if supported_fmts and (imgfmt not in supported_fmts): 430 notrun('not suitable for this image format: %s' % imgfmt) 431 432def verify_platform(supported_oses=['linux']): 433 if True not in [sys.platform.startswith(x) for x in supported_oses]: 434 notrun('not suitable for this OS: %s' % sys.platform) 435 436def verify_quorum(): 437 '''Skip test suite if quorum support is not available''' 438 if 'quorum' not in qemu_img_pipe('--help'): 439 notrun('quorum support missing') 440 441def main(supported_fmts=[], supported_oses=['linux']): 442 '''Run tests''' 443 444 debug = '-d' in sys.argv 445 verbosity = 1 446 verify_image_format(supported_fmts) 447 verify_platform(supported_oses) 448 449 # We need to filter out the time taken from the output so that qemu-iotest 450 # can reliably diff the results against master output. 451 import StringIO 452 if debug: 453 output = sys.stdout 454 verbosity = 2 455 sys.argv.remove('-d') 456 else: 457 output = StringIO.StringIO() 458 459 class MyTestRunner(unittest.TextTestRunner): 460 def __init__(self, stream=output, descriptions=True, verbosity=verbosity): 461 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity) 462 463 # unittest.main() will use sys.exit() so expect a SystemExit exception 464 try: 465 unittest.main(testRunner=MyTestRunner) 466 finally: 467 if not debug: 468 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue())) 469