1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import subprocess 14import sys 15import tempfile 16import time 17import uuid 18 19import avocado 20from avocado.utils import ssh 21from avocado.utils.path import find_command 22 23from qemu.machine import QEMUMachine 24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 25 tcg_available) 26 27 28#: The QEMU build root directory. It may also be the source directory 29#: if building from the source dir, but it's safer to use BUILD_DIR for 30#: that purpose. Be aware that if this code is moved outside of a source 31#: and build tree, it will not be accurate. 32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 33 34 35def has_cmd(name, args=None): 36 """ 37 This function is for use in a @avocado.skipUnless decorator, e.g.: 38 39 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 40 def test_something_that_needs_sudo(self): 41 ... 42 """ 43 44 if args is None: 45 args = ('which', name) 46 47 try: 48 _, stderr, exitcode = run_cmd(args) 49 except Exception as e: 50 exitcode = -1 51 stderr = str(e) 52 53 if exitcode != 0: 54 cmd_line = ' '.join(args) 55 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 56 return (False, err) 57 else: 58 return (True, '') 59 60def has_cmds(*cmds): 61 """ 62 This function is for use in a @avocado.skipUnless decorator and 63 allows checking for the availability of multiple commands, e.g.: 64 65 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 66 'cmd2', 'cmd3')) 67 def test_something_that_needs_cmd1_and_cmd2(self): 68 ... 69 """ 70 71 for cmd in cmds: 72 if isinstance(cmd, str): 73 cmd = (cmd,) 74 75 ok, errstr = has_cmd(*cmd) 76 if not ok: 77 return (False, errstr) 78 79 return (True, '') 80 81def run_cmd(args): 82 subp = subprocess.Popen(args, 83 stdout=subprocess.PIPE, 84 stderr=subprocess.PIPE, 85 universal_newlines=True) 86 stdout, stderr = subp.communicate() 87 ret = subp.returncode 88 89 return (stdout, stderr, ret) 90 91def is_readable_executable_file(path): 92 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 93 94 95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 96 """ 97 Picks the path of a QEMU binary, starting either in the current working 98 directory or in the source tree root directory. 99 100 :param arch: the arch to use when looking for a QEMU binary (the target 101 will match the arch given). If None (the default), arch 102 will be the current host system arch (as given by 103 :func:`os.uname`). 104 :type arch: str 105 :returns: the path to the default QEMU binary or None if one could not 106 be found 107 :rtype: str or None 108 """ 109 if arch is None: 110 arch = os.uname()[4] 111 # qemu binary path does not match arch for powerpc, handle it 112 if 'ppc64le' in arch: 113 arch = 'ppc64' 114 qemu_bin_name = bin_prefix + arch 115 qemu_bin_paths = [ 116 os.path.join(".", qemu_bin_name), 117 os.path.join(BUILD_DIR, qemu_bin_name), 118 os.path.join(BUILD_DIR, "build", qemu_bin_name), 119 ] 120 for path in qemu_bin_paths: 121 if is_readable_executable_file(path): 122 return path 123 return None 124 125 126def _console_interaction(test, success_message, failure_message, 127 send_string, keep_sending=False, vm=None): 128 assert not keep_sending or send_string 129 if vm is None: 130 vm = test.vm 131 console = vm.console_file 132 console_logger = logging.getLogger('console') 133 while True: 134 if send_string: 135 vm.console_socket.sendall(send_string.encode()) 136 if not keep_sending: 137 send_string = None # send only once 138 try: 139 msg = console.readline().decode().strip() 140 except UnicodeDecodeError: 141 msg = None 142 if not msg: 143 continue 144 console_logger.debug(msg) 145 if success_message is None or success_message in msg: 146 break 147 if failure_message and failure_message in msg: 148 console.close() 149 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 150 (failure_message, success_message) 151 test.fail(fail) 152 153def interrupt_interactive_console_until_pattern(test, success_message, 154 failure_message=None, 155 interrupt_string='\r'): 156 """ 157 Keep sending a string to interrupt a console prompt, while logging the 158 console output. Typical use case is to break a boot loader prompt, such: 159 160 Press a key within 5 seconds to interrupt boot process. 161 5 162 4 163 3 164 2 165 1 166 Booting default image... 167 168 :param test: an Avocado test containing a VM that will have its console 169 read and probed for a success or failure message 170 :type test: :class:`avocado_qemu.QemuSystemTest` 171 :param success_message: if this message appears, test succeeds 172 :param failure_message: if this message appears, test fails 173 :param interrupt_string: a string to send to the console before trying 174 to read a new line 175 """ 176 _console_interaction(test, success_message, failure_message, 177 interrupt_string, True) 178 179def wait_for_console_pattern(test, success_message, failure_message=None, 180 vm=None): 181 """ 182 Waits for messages to appear on the console, while logging the content 183 184 :param test: an Avocado test containing a VM that will have its console 185 read and probed for a success or failure message 186 :type test: :class:`avocado_qemu.QemuSystemTest` 187 :param success_message: if this message appears, test succeeds 188 :param failure_message: if this message appears, test fails 189 """ 190 _console_interaction(test, success_message, failure_message, None, vm=vm) 191 192def exec_command(test, command): 193 """ 194 Send a command to a console (appending CRLF characters), while logging 195 the content. 196 197 :param test: an Avocado test containing a VM. 198 :type test: :class:`avocado_qemu.QemuSystemTest` 199 :param command: the command to send 200 :type command: str 201 """ 202 _console_interaction(test, None, None, command + '\r') 203 204def exec_command_and_wait_for_pattern(test, command, 205 success_message, failure_message=None): 206 """ 207 Send a command to a console (appending CRLF characters), then wait 208 for success_message to appear on the console, while logging the. 209 content. Mark the test as failed if failure_message is found instead. 210 211 :param test: an Avocado test containing a VM that will have its console 212 read and probed for a success or failure message 213 :type test: :class:`avocado_qemu.QemuSystemTest` 214 :param command: the command to send 215 :param success_message: if this message appears, test succeeds 216 :param failure_message: if this message appears, test fails 217 """ 218 _console_interaction(test, success_message, failure_message, command + '\r') 219 220class QemuBaseTest(avocado.Test): 221 222 # default timeout for all tests, can be overridden 223 timeout = 120 224 225 def _get_unique_tag_val(self, tag_name): 226 """ 227 Gets a tag value, if unique for a key 228 """ 229 vals = self.tags.get(tag_name, []) 230 if len(vals) == 1: 231 return vals.pop() 232 return None 233 234 def setUp(self, bin_prefix): 235 self.arch = self.params.get('arch', 236 default=self._get_unique_tag_val('arch')) 237 238 self.cpu = self.params.get('cpu', 239 default=self._get_unique_tag_val('cpu')) 240 241 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 242 self.qemu_bin = self.params.get('qemu_bin', 243 default=default_qemu_bin) 244 if self.qemu_bin is None: 245 self.cancel("No QEMU binary defined or found in the build tree") 246 247 def fetch_asset(self, name, 248 asset_hash, algorithm=None, 249 locations=None, expire=None, 250 find_only=False, cancel_on_missing=True): 251 return super().fetch_asset(name, 252 asset_hash=asset_hash, 253 algorithm=algorithm, 254 locations=locations, 255 expire=expire, 256 find_only=find_only, 257 cancel_on_missing=cancel_on_missing) 258 259 260class QemuSystemTest(QemuBaseTest): 261 """Facilitates system emulation tests.""" 262 263 def setUp(self): 264 self._vms = {} 265 266 super().setUp('qemu-system-') 267 268 accel_required = self._get_unique_tag_val('accel') 269 if accel_required: 270 self.require_accelerator(accel_required) 271 272 self.machine = self.params.get('machine', 273 default=self._get_unique_tag_val('machine')) 274 275 def require_accelerator(self, accelerator): 276 """ 277 Requires an accelerator to be available for the test to continue 278 279 It takes into account the currently set qemu binary. 280 281 If the check fails, the test is canceled. If the check itself 282 for the given accelerator is not available, the test is also 283 canceled. 284 285 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 286 :type accelerator: str 287 """ 288 checker = {'tcg': tcg_available, 289 'kvm': kvm_available}.get(accelerator) 290 if checker is None: 291 self.cancel("Don't know how to check for the presence " 292 "of accelerator %s" % accelerator) 293 if not checker(qemu_bin=self.qemu_bin): 294 self.cancel("%s accelerator does not seem to be " 295 "available" % accelerator) 296 297 def require_netdev(self, netdevname): 298 netdevhelp = run_cmd([self.qemu_bin, 299 '-M', 'none', '-netdev', 'help'])[0]; 300 if netdevhelp.find('\n' + netdevname + '\n') < 0: 301 self.cancel('no support for user networking') 302 303 def require_multiprocess(self): 304 """ 305 Test for the presence of the x-pci-proxy-dev which is required 306 to support multiprocess. 307 """ 308 devhelp = run_cmd([self.qemu_bin, 309 '-M', 'none', '-device', 'help'])[0]; 310 if devhelp.find('x-pci-proxy-dev') < 0: 311 self.cancel('no support for multiprocess device emulation') 312 313 def _new_vm(self, name, *args): 314 self._sd = tempfile.TemporaryDirectory(prefix="qemu_") 315 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 316 log_dir=self.logdir) 317 self.log.debug('QEMUMachine "%s" created', name) 318 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 319 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 320 if args: 321 vm.add_args(*args) 322 return vm 323 324 def get_qemu_img(self): 325 self.log.debug('Looking for and selecting a qemu-img binary') 326 327 # If qemu-img has been built, use it, otherwise the system wide one 328 # will be used. 329 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 330 if not os.path.exists(qemu_img): 331 qemu_img = find_command('qemu-img', False) 332 if qemu_img is False: 333 self.cancel('Could not find "qemu-img"') 334 335 return qemu_img 336 337 @property 338 def vm(self): 339 return self.get_vm(name='default') 340 341 def get_vm(self, *args, name=None): 342 if not name: 343 name = str(uuid.uuid4()) 344 if self._vms.get(name) is None: 345 self._vms[name] = self._new_vm(name, *args) 346 if self.cpu is not None: 347 self._vms[name].add_args('-cpu', self.cpu) 348 if self.machine is not None: 349 self._vms[name].set_machine(self.machine) 350 return self._vms[name] 351 352 def set_vm_arg(self, arg, value): 353 """ 354 Set an argument to list of extra arguments to be given to the QEMU 355 binary. If the argument already exists then its value is replaced. 356 357 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 358 :type arg: str 359 :param value: the argument value, such as "host" in "-cpu host" 360 :type value: str 361 """ 362 if not arg or not value: 363 return 364 if arg not in self.vm.args: 365 self.vm.args.extend([arg, value]) 366 else: 367 idx = self.vm.args.index(arg) + 1 368 if idx < len(self.vm.args): 369 self.vm.args[idx] = value 370 else: 371 self.vm.args.append(value) 372 373 def tearDown(self): 374 for vm in self._vms.values(): 375 vm.shutdown() 376 self._sd = None 377 super().tearDown() 378 379 380class QemuUserTest(QemuBaseTest): 381 """Facilitates user-mode emulation tests.""" 382 383 def setUp(self): 384 self._ldpath = [] 385 super().setUp('qemu-') 386 387 def add_ldpath(self, ldpath): 388 self._ldpath.append(os.path.abspath(ldpath)) 389 390 def run(self, bin_path, args=[]): 391 qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath]) 392 bin_args = " ".join(args) 393 return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args, 394 bin_path, bin_args)) 395 396 397class LinuxSSHMixIn: 398 """Contains utility methods for interacting with a guest via SSH.""" 399 400 def ssh_connect(self, username, credential, credential_is_key=True): 401 self.ssh_logger = logging.getLogger('ssh') 402 res = self.vm.cmd('human-monitor-command', 403 command_line='info usernet') 404 port = get_info_usernet_hostfwd_port(res) 405 self.assertIsNotNone(port) 406 self.assertGreater(port, 0) 407 self.log.debug('sshd listening on port: %d', port) 408 if credential_is_key: 409 self.ssh_session = ssh.Session('127.0.0.1', port=port, 410 user=username, key=credential) 411 else: 412 self.ssh_session = ssh.Session('127.0.0.1', port=port, 413 user=username, password=credential) 414 for i in range(10): 415 try: 416 self.ssh_session.connect() 417 return 418 except: 419 time.sleep(i) 420 self.fail('ssh connection timeout') 421 422 def ssh_command(self, command): 423 self.ssh_logger.info(command) 424 result = self.ssh_session.cmd(command) 425 stdout_lines = [line.rstrip() for line 426 in result.stdout_text.splitlines()] 427 for line in stdout_lines: 428 self.ssh_logger.info(line) 429 stderr_lines = [line.rstrip() for line 430 in result.stderr_text.splitlines()] 431 for line in stderr_lines: 432 self.ssh_logger.warning(line) 433 434 self.assertEqual(result.exit_status, 0, 435 f'Guest command failed: {command}') 436 return stdout_lines, stderr_lines 437 438 def ssh_command_output_contains(self, cmd, exp): 439 stdout, _ = self.ssh_command(cmd) 440 for line in stdout: 441 if exp in line: 442 break 443 else: 444 self.fail('"%s" output does not contain "%s"' % (cmd, exp)) 445