1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import subprocess 14import sys 15import tempfile 16import time 17import uuid 18 19import avocado 20from avocado.utils import ssh 21from avocado.utils.path import find_command 22 23from qemu.machine import QEMUMachine 24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 25 tcg_available) 26 27 28#: The QEMU build root directory. It may also be the source directory 29#: if building from the source dir, but it's safer to use BUILD_DIR for 30#: that purpose. Be aware that if this code is moved outside of a source 31#: and build tree, it will not be accurate. 32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 33 34 35def has_cmd(name, args=None): 36 """ 37 This function is for use in a @avocado.skipUnless decorator, e.g.: 38 39 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 40 def test_something_that_needs_sudo(self): 41 ... 42 """ 43 44 if args is None: 45 args = ('which', name) 46 47 try: 48 _, stderr, exitcode = run_cmd(args) 49 except Exception as e: 50 exitcode = -1 51 stderr = str(e) 52 53 if exitcode != 0: 54 cmd_line = ' '.join(args) 55 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 56 return (False, err) 57 else: 58 return (True, '') 59 60def has_cmds(*cmds): 61 """ 62 This function is for use in a @avocado.skipUnless decorator and 63 allows checking for the availability of multiple commands, e.g.: 64 65 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 66 'cmd2', 'cmd3')) 67 def test_something_that_needs_cmd1_and_cmd2(self): 68 ... 69 """ 70 71 for cmd in cmds: 72 if isinstance(cmd, str): 73 cmd = (cmd,) 74 75 ok, errstr = has_cmd(*cmd) 76 if not ok: 77 return (False, errstr) 78 79 return (True, '') 80 81def run_cmd(args): 82 subp = subprocess.Popen(args, 83 stdout=subprocess.PIPE, 84 stderr=subprocess.PIPE, 85 universal_newlines=True) 86 stdout, stderr = subp.communicate() 87 ret = subp.returncode 88 89 return (stdout, stderr, ret) 90 91def is_readable_executable_file(path): 92 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 93 94 95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 96 """ 97 Picks the path of a QEMU binary, starting either in the current working 98 directory or in the source tree root directory. 99 100 :param arch: the arch to use when looking for a QEMU binary (the target 101 will match the arch given). If None (the default), arch 102 will be the current host system arch (as given by 103 :func:`os.uname`). 104 :type arch: str 105 :returns: the path to the default QEMU binary or None if one could not 106 be found 107 :rtype: str or None 108 """ 109 if arch is None: 110 arch = os.uname()[4] 111 # qemu binary path does not match arch for powerpc, handle it 112 if 'ppc64le' in arch: 113 arch = 'ppc64' 114 qemu_bin_name = bin_prefix + arch 115 qemu_bin_paths = [ 116 os.path.join(".", qemu_bin_name), 117 os.path.join(BUILD_DIR, qemu_bin_name), 118 os.path.join(BUILD_DIR, "build", qemu_bin_name), 119 ] 120 for path in qemu_bin_paths: 121 if is_readable_executable_file(path): 122 return path 123 return None 124 125 126def _console_interaction(test, success_message, failure_message, 127 send_string, keep_sending=False, vm=None): 128 assert not keep_sending or send_string 129 if vm is None: 130 vm = test.vm 131 console = vm.console_file 132 console_logger = logging.getLogger('console') 133 while True: 134 if send_string: 135 vm.console_socket.sendall(send_string.encode()) 136 if not keep_sending: 137 send_string = None # send only once 138 139 # Only consume console output if waiting for something 140 if success_message is None and failure_message is None: 141 if send_string is None: 142 break 143 continue 144 145 try: 146 msg = console.readline().decode().strip() 147 except UnicodeDecodeError: 148 msg = None 149 if not msg: 150 continue 151 console_logger.debug(msg) 152 if success_message is None or success_message in msg: 153 break 154 if failure_message and failure_message in msg: 155 console.close() 156 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 157 (failure_message, success_message) 158 test.fail(fail) 159 160def interrupt_interactive_console_until_pattern(test, success_message, 161 failure_message=None, 162 interrupt_string='\r'): 163 """ 164 Keep sending a string to interrupt a console prompt, while logging the 165 console output. Typical use case is to break a boot loader prompt, such: 166 167 Press a key within 5 seconds to interrupt boot process. 168 5 169 4 170 3 171 2 172 1 173 Booting default image... 174 175 :param test: an Avocado test containing a VM that will have its console 176 read and probed for a success or failure message 177 :type test: :class:`avocado_qemu.QemuSystemTest` 178 :param success_message: if this message appears, test succeeds 179 :param failure_message: if this message appears, test fails 180 :param interrupt_string: a string to send to the console before trying 181 to read a new line 182 """ 183 _console_interaction(test, success_message, failure_message, 184 interrupt_string, True) 185 186def wait_for_console_pattern(test, success_message, failure_message=None, 187 vm=None): 188 """ 189 Waits for messages to appear on the console, while logging the content 190 191 :param test: an Avocado test containing a VM that will have its console 192 read and probed for a success or failure message 193 :type test: :class:`avocado_qemu.QemuSystemTest` 194 :param success_message: if this message appears, test succeeds 195 :param failure_message: if this message appears, test fails 196 """ 197 _console_interaction(test, success_message, failure_message, None, vm=vm) 198 199def exec_command(test, command): 200 """ 201 Send a command to a console (appending CRLF characters), while logging 202 the content. 203 204 :param test: an Avocado test containing a VM. 205 :type test: :class:`avocado_qemu.QemuSystemTest` 206 :param command: the command to send 207 :type command: str 208 """ 209 _console_interaction(test, None, None, command + '\r') 210 211def exec_command_and_wait_for_pattern(test, command, 212 success_message, failure_message=None): 213 """ 214 Send a command to a console (appending CRLF characters), then wait 215 for success_message to appear on the console, while logging the. 216 content. Mark the test as failed if failure_message is found instead. 217 218 :param test: an Avocado test containing a VM that will have its console 219 read and probed for a success or failure message 220 :type test: :class:`avocado_qemu.QemuSystemTest` 221 :param command: the command to send 222 :param success_message: if this message appears, test succeeds 223 :param failure_message: if this message appears, test fails 224 """ 225 _console_interaction(test, success_message, failure_message, command + '\r') 226 227class QemuBaseTest(avocado.Test): 228 229 # default timeout for all tests, can be overridden 230 timeout = 120 231 232 def _get_unique_tag_val(self, tag_name): 233 """ 234 Gets a tag value, if unique for a key 235 """ 236 vals = self.tags.get(tag_name, []) 237 if len(vals) == 1: 238 return vals.pop() 239 return None 240 241 def setUp(self, bin_prefix): 242 self.arch = self.params.get('arch', 243 default=self._get_unique_tag_val('arch')) 244 245 self.cpu = self.params.get('cpu', 246 default=self._get_unique_tag_val('cpu')) 247 248 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 249 self.qemu_bin = self.params.get('qemu_bin', 250 default=default_qemu_bin) 251 if self.qemu_bin is None: 252 self.cancel("No QEMU binary defined or found in the build tree") 253 254 def fetch_asset(self, name, 255 asset_hash, algorithm=None, 256 locations=None, expire=None, 257 find_only=False, cancel_on_missing=True): 258 return super().fetch_asset(name, 259 asset_hash=asset_hash, 260 algorithm=algorithm, 261 locations=locations, 262 expire=expire, 263 find_only=find_only, 264 cancel_on_missing=cancel_on_missing) 265 266 267class QemuSystemTest(QemuBaseTest): 268 """Facilitates system emulation tests.""" 269 270 def setUp(self): 271 self._vms = {} 272 273 super().setUp('qemu-system-') 274 275 accel_required = self._get_unique_tag_val('accel') 276 if accel_required: 277 self.require_accelerator(accel_required) 278 279 self.machine = self.params.get('machine', 280 default=self._get_unique_tag_val('machine')) 281 282 def require_accelerator(self, accelerator): 283 """ 284 Requires an accelerator to be available for the test to continue 285 286 It takes into account the currently set qemu binary. 287 288 If the check fails, the test is canceled. If the check itself 289 for the given accelerator is not available, the test is also 290 canceled. 291 292 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 293 :type accelerator: str 294 """ 295 checker = {'tcg': tcg_available, 296 'kvm': kvm_available}.get(accelerator) 297 if checker is None: 298 self.cancel("Don't know how to check for the presence " 299 "of accelerator %s" % accelerator) 300 if not checker(qemu_bin=self.qemu_bin): 301 self.cancel("%s accelerator does not seem to be " 302 "available" % accelerator) 303 304 def require_netdev(self, netdevname): 305 netdevhelp = run_cmd([self.qemu_bin, 306 '-M', 'none', '-netdev', 'help'])[0]; 307 if netdevhelp.find('\n' + netdevname + '\n') < 0: 308 self.cancel('no support for user networking') 309 310 def _new_vm(self, name, *args): 311 self._sd = tempfile.TemporaryDirectory(prefix="qemu_") 312 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 313 log_dir=self.logdir) 314 self.log.debug('QEMUMachine "%s" created', name) 315 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 316 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 317 if args: 318 vm.add_args(*args) 319 return vm 320 321 def get_qemu_img(self): 322 self.log.debug('Looking for and selecting a qemu-img binary') 323 324 # If qemu-img has been built, use it, otherwise the system wide one 325 # will be used. 326 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 327 if not os.path.exists(qemu_img): 328 qemu_img = find_command('qemu-img', False) 329 if qemu_img is False: 330 self.cancel('Could not find "qemu-img"') 331 332 return qemu_img 333 334 @property 335 def vm(self): 336 return self.get_vm(name='default') 337 338 def get_vm(self, *args, name=None): 339 if not name: 340 name = str(uuid.uuid4()) 341 if self._vms.get(name) is None: 342 self._vms[name] = self._new_vm(name, *args) 343 if self.cpu is not None: 344 self._vms[name].add_args('-cpu', self.cpu) 345 if self.machine is not None: 346 self._vms[name].set_machine(self.machine) 347 return self._vms[name] 348 349 def set_vm_arg(self, arg, value): 350 """ 351 Set an argument to list of extra arguments to be given to the QEMU 352 binary. If the argument already exists then its value is replaced. 353 354 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 355 :type arg: str 356 :param value: the argument value, such as "host" in "-cpu host" 357 :type value: str 358 """ 359 if not arg or not value: 360 return 361 if arg not in self.vm.args: 362 self.vm.args.extend([arg, value]) 363 else: 364 idx = self.vm.args.index(arg) + 1 365 if idx < len(self.vm.args): 366 self.vm.args[idx] = value 367 else: 368 self.vm.args.append(value) 369 370 def tearDown(self): 371 for vm in self._vms.values(): 372 vm.shutdown() 373 self._sd = None 374 super().tearDown() 375 376 377class LinuxSSHMixIn: 378 """Contains utility methods for interacting with a guest via SSH.""" 379 380 def ssh_connect(self, username, credential, credential_is_key=True): 381 self.ssh_logger = logging.getLogger('ssh') 382 res = self.vm.cmd('human-monitor-command', 383 command_line='info usernet') 384 port = get_info_usernet_hostfwd_port(res) 385 self.assertIsNotNone(port) 386 self.assertGreater(port, 0) 387 self.log.debug('sshd listening on port: %d', port) 388 if credential_is_key: 389 self.ssh_session = ssh.Session('127.0.0.1', port=port, 390 user=username, key=credential) 391 else: 392 self.ssh_session = ssh.Session('127.0.0.1', port=port, 393 user=username, password=credential) 394 for i in range(10): 395 try: 396 self.ssh_session.connect() 397 return 398 except: 399 time.sleep(i) 400 self.fail('ssh connection timeout') 401 402 def ssh_command(self, command): 403 self.ssh_logger.info(command) 404 result = self.ssh_session.cmd(command) 405 stdout_lines = [line.rstrip() for line 406 in result.stdout_text.splitlines()] 407 for line in stdout_lines: 408 self.ssh_logger.info(line) 409 stderr_lines = [line.rstrip() for line 410 in result.stderr_text.splitlines()] 411 for line in stderr_lines: 412 self.ssh_logger.warning(line) 413 414 self.assertEqual(result.exit_status, 0, 415 f'Guest command failed: {command}') 416 return stdout_lines, stderr_lines 417 418 def ssh_command_output_contains(self, cmd, exp): 419 stdout, _ = self.ssh_command(cmd) 420 for line in stdout: 421 if exp in line: 422 break 423 else: 424 self.fail('"%s" output does not contain "%s"' % (cmd, exp)) 425