1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import subprocess 14import sys 15import tempfile 16import time 17import uuid 18 19import avocado 20from avocado.utils import ssh 21from avocado.utils.path import find_command 22 23from qemu.machine import QEMUMachine 24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 25 tcg_available) 26 27 28#: The QEMU build root directory. It may also be the source directory 29#: if building from the source dir, but it's safer to use BUILD_DIR for 30#: that purpose. Be aware that if this code is moved outside of a source 31#: and build tree, it will not be accurate. 32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 33 34 35def has_cmd(name, args=None): 36 """ 37 This function is for use in a @avocado.skipUnless decorator, e.g.: 38 39 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 40 def test_something_that_needs_sudo(self): 41 ... 42 """ 43 44 if args is None: 45 args = ('which', name) 46 47 try: 48 _, stderr, exitcode = run_cmd(args) 49 except Exception as e: 50 exitcode = -1 51 stderr = str(e) 52 53 if exitcode != 0: 54 cmd_line = ' '.join(args) 55 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 56 return (False, err) 57 else: 58 return (True, '') 59 60def has_cmds(*cmds): 61 """ 62 This function is for use in a @avocado.skipUnless decorator and 63 allows checking for the availability of multiple commands, e.g.: 64 65 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 66 'cmd2', 'cmd3')) 67 def test_something_that_needs_cmd1_and_cmd2(self): 68 ... 69 """ 70 71 for cmd in cmds: 72 if isinstance(cmd, str): 73 cmd = (cmd,) 74 75 ok, errstr = has_cmd(*cmd) 76 if not ok: 77 return (False, errstr) 78 79 return (True, '') 80 81def run_cmd(args): 82 subp = subprocess.Popen(args, 83 stdout=subprocess.PIPE, 84 stderr=subprocess.PIPE, 85 universal_newlines=True) 86 stdout, stderr = subp.communicate() 87 ret = subp.returncode 88 89 return (stdout, stderr, ret) 90 91def is_readable_executable_file(path): 92 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 93 94 95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 96 """ 97 Picks the path of a QEMU binary, starting either in the current working 98 directory or in the source tree root directory. 99 100 :param arch: the arch to use when looking for a QEMU binary (the target 101 will match the arch given). If None (the default), arch 102 will be the current host system arch (as given by 103 :func:`os.uname`). 104 :type arch: str 105 :returns: the path to the default QEMU binary or None if one could not 106 be found 107 :rtype: str or None 108 """ 109 if arch is None: 110 arch = os.uname()[4] 111 # qemu binary path does not match arch for powerpc, handle it 112 if 'ppc64le' in arch: 113 arch = 'ppc64' 114 qemu_bin_name = bin_prefix + arch 115 qemu_bin_paths = [ 116 os.path.join(".", qemu_bin_name), 117 os.path.join(BUILD_DIR, qemu_bin_name), 118 os.path.join(BUILD_DIR, "build", qemu_bin_name), 119 ] 120 for path in qemu_bin_paths: 121 if is_readable_executable_file(path): 122 return path 123 return None 124 125 126def _console_interaction(test, success_message, failure_message, 127 send_string, keep_sending=False, vm=None): 128 assert not keep_sending or send_string 129 if vm is None: 130 vm = test.vm 131 console = vm.console_file 132 console_logger = logging.getLogger('console') 133 while True: 134 if send_string: 135 vm.console_socket.sendall(send_string.encode()) 136 if not keep_sending: 137 send_string = None # send only once 138 139 # Only consume console output if waiting for something 140 if success_message is None and failure_message is None: 141 if send_string is None: 142 break 143 continue 144 145 try: 146 msg = console.readline().decode().strip() 147 except UnicodeDecodeError: 148 msg = None 149 if not msg: 150 continue 151 console_logger.debug(msg) 152 if success_message is None or success_message in msg: 153 break 154 if failure_message and failure_message in msg: 155 console.close() 156 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 157 (failure_message, success_message) 158 test.fail(fail) 159 160def interrupt_interactive_console_until_pattern(test, success_message, 161 failure_message=None, 162 interrupt_string='\r'): 163 """ 164 Keep sending a string to interrupt a console prompt, while logging the 165 console output. Typical use case is to break a boot loader prompt, such: 166 167 Press a key within 5 seconds to interrupt boot process. 168 5 169 4 170 3 171 2 172 1 173 Booting default image... 174 175 :param test: an Avocado test containing a VM that will have its console 176 read and probed for a success or failure message 177 :type test: :class:`avocado_qemu.QemuSystemTest` 178 :param success_message: if this message appears, test succeeds 179 :param failure_message: if this message appears, test fails 180 :param interrupt_string: a string to send to the console before trying 181 to read a new line 182 """ 183 _console_interaction(test, success_message, failure_message, 184 interrupt_string, True) 185 186def wait_for_console_pattern(test, success_message, failure_message=None, 187 vm=None): 188 """ 189 Waits for messages to appear on the console, while logging the content 190 191 :param test: an Avocado test containing a VM that will have its console 192 read and probed for a success or failure message 193 :type test: :class:`avocado_qemu.QemuSystemTest` 194 :param success_message: if this message appears, test succeeds 195 :param failure_message: if this message appears, test fails 196 """ 197 _console_interaction(test, success_message, failure_message, None, vm=vm) 198 199def exec_command(test, command): 200 """ 201 Send a command to a console (appending CRLF characters), while logging 202 the content. 203 204 :param test: an Avocado test containing a VM. 205 :type test: :class:`avocado_qemu.QemuSystemTest` 206 :param command: the command to send 207 :type command: str 208 """ 209 _console_interaction(test, None, None, command + '\r') 210 211def exec_command_and_wait_for_pattern(test, command, 212 success_message, failure_message=None): 213 """ 214 Send a command to a console (appending CRLF characters), then wait 215 for success_message to appear on the console, while logging the. 216 content. Mark the test as failed if failure_message is found instead. 217 218 :param test: an Avocado test containing a VM that will have its console 219 read and probed for a success or failure message 220 :type test: :class:`avocado_qemu.QemuSystemTest` 221 :param command: the command to send 222 :param success_message: if this message appears, test succeeds 223 :param failure_message: if this message appears, test fails 224 """ 225 _console_interaction(test, success_message, failure_message, command + '\r') 226 227class QemuBaseTest(avocado.Test): 228 229 # default timeout for all tests, can be overridden 230 timeout = 120 231 232 def _get_unique_tag_val(self, tag_name): 233 """ 234 Gets a tag value, if unique for a key 235 """ 236 vals = self.tags.get(tag_name, []) 237 if len(vals) == 1: 238 return vals.pop() 239 return None 240 241 def setUp(self, bin_prefix): 242 self.arch = self.params.get('arch', 243 default=self._get_unique_tag_val('arch')) 244 245 self.cpu = self.params.get('cpu', 246 default=self._get_unique_tag_val('cpu')) 247 248 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 249 self.qemu_bin = self.params.get('qemu_bin', 250 default=default_qemu_bin) 251 if self.qemu_bin is None: 252 self.cancel("No QEMU binary defined or found in the build tree") 253 254 def fetch_asset(self, name, 255 asset_hash, algorithm=None, 256 locations=None, expire=None, 257 find_only=False, cancel_on_missing=True): 258 return super().fetch_asset(name, 259 asset_hash=asset_hash, 260 algorithm=algorithm, 261 locations=locations, 262 expire=expire, 263 find_only=find_only, 264 cancel_on_missing=cancel_on_missing) 265 266 267class QemuSystemTest(QemuBaseTest): 268 """Facilitates system emulation tests.""" 269 270 def setUp(self): 271 self._vms = {} 272 273 super().setUp('qemu-system-') 274 275 accel_required = self._get_unique_tag_val('accel') 276 if accel_required: 277 self.require_accelerator(accel_required) 278 279 self.machine = self.params.get('machine', 280 default=self._get_unique_tag_val('machine')) 281 282 def require_accelerator(self, accelerator): 283 """ 284 Requires an accelerator to be available for the test to continue 285 286 It takes into account the currently set qemu binary. 287 288 If the check fails, the test is canceled. If the check itself 289 for the given accelerator is not available, the test is also 290 canceled. 291 292 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 293 :type accelerator: str 294 """ 295 checker = {'tcg': tcg_available, 296 'kvm': kvm_available}.get(accelerator) 297 if checker is None: 298 self.cancel("Don't know how to check for the presence " 299 "of accelerator %s" % accelerator) 300 if not checker(qemu_bin=self.qemu_bin): 301 self.cancel("%s accelerator does not seem to be " 302 "available" % accelerator) 303 304 def require_netdev(self, netdevname): 305 netdevhelp = run_cmd([self.qemu_bin, 306 '-M', 'none', '-netdev', 'help'])[0]; 307 if netdevhelp.find('\n' + netdevname + '\n') < 0: 308 self.cancel('no support for user networking') 309 310 def require_multiprocess(self): 311 """ 312 Test for the presence of the x-pci-proxy-dev which is required 313 to support multiprocess. 314 """ 315 devhelp = run_cmd([self.qemu_bin, 316 '-M', 'none', '-device', 'help'])[0]; 317 if devhelp.find('x-pci-proxy-dev') < 0: 318 self.cancel('no support for multiprocess device emulation') 319 320 def _new_vm(self, name, *args): 321 self._sd = tempfile.TemporaryDirectory(prefix="qemu_") 322 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 323 log_dir=self.logdir) 324 self.log.debug('QEMUMachine "%s" created', name) 325 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 326 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 327 if args: 328 vm.add_args(*args) 329 return vm 330 331 def get_qemu_img(self): 332 self.log.debug('Looking for and selecting a qemu-img binary') 333 334 # If qemu-img has been built, use it, otherwise the system wide one 335 # will be used. 336 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 337 if not os.path.exists(qemu_img): 338 qemu_img = find_command('qemu-img', False) 339 if qemu_img is False: 340 self.cancel('Could not find "qemu-img"') 341 342 return qemu_img 343 344 @property 345 def vm(self): 346 return self.get_vm(name='default') 347 348 def get_vm(self, *args, name=None): 349 if not name: 350 name = str(uuid.uuid4()) 351 if self._vms.get(name) is None: 352 self._vms[name] = self._new_vm(name, *args) 353 if self.cpu is not None: 354 self._vms[name].add_args('-cpu', self.cpu) 355 if self.machine is not None: 356 self._vms[name].set_machine(self.machine) 357 return self._vms[name] 358 359 def set_vm_arg(self, arg, value): 360 """ 361 Set an argument to list of extra arguments to be given to the QEMU 362 binary. If the argument already exists then its value is replaced. 363 364 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 365 :type arg: str 366 :param value: the argument value, such as "host" in "-cpu host" 367 :type value: str 368 """ 369 if not arg or not value: 370 return 371 if arg not in self.vm.args: 372 self.vm.args.extend([arg, value]) 373 else: 374 idx = self.vm.args.index(arg) + 1 375 if idx < len(self.vm.args): 376 self.vm.args[idx] = value 377 else: 378 self.vm.args.append(value) 379 380 def tearDown(self): 381 for vm in self._vms.values(): 382 vm.shutdown() 383 self._sd = None 384 super().tearDown() 385 386 387class LinuxSSHMixIn: 388 """Contains utility methods for interacting with a guest via SSH.""" 389 390 def ssh_connect(self, username, credential, credential_is_key=True): 391 self.ssh_logger = logging.getLogger('ssh') 392 res = self.vm.cmd('human-monitor-command', 393 command_line='info usernet') 394 port = get_info_usernet_hostfwd_port(res) 395 self.assertIsNotNone(port) 396 self.assertGreater(port, 0) 397 self.log.debug('sshd listening on port: %d', port) 398 if credential_is_key: 399 self.ssh_session = ssh.Session('127.0.0.1', port=port, 400 user=username, key=credential) 401 else: 402 self.ssh_session = ssh.Session('127.0.0.1', port=port, 403 user=username, password=credential) 404 for i in range(10): 405 try: 406 self.ssh_session.connect() 407 return 408 except: 409 time.sleep(i) 410 self.fail('ssh connection timeout') 411 412 def ssh_command(self, command): 413 self.ssh_logger.info(command) 414 result = self.ssh_session.cmd(command) 415 stdout_lines = [line.rstrip() for line 416 in result.stdout_text.splitlines()] 417 for line in stdout_lines: 418 self.ssh_logger.info(line) 419 stderr_lines = [line.rstrip() for line 420 in result.stderr_text.splitlines()] 421 for line in stderr_lines: 422 self.ssh_logger.warning(line) 423 424 self.assertEqual(result.exit_status, 0, 425 f'Guest command failed: {command}') 426 return stdout_lines, stderr_lines 427 428 def ssh_command_output_contains(self, cmd, exp): 429 stdout, _ = self.ssh_command(cmd) 430 for line in stdout: 431 if exp in line: 432 break 433 else: 434 self.fail('"%s" output does not contain "%s"' % (cmd, exp)) 435