1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import shutil 14import subprocess 15import sys 16import tempfile 17import time 18import uuid 19 20import avocado 21from avocado.utils import cloudinit, datadrainer, network, process, ssh, vmimage 22from avocado.utils.path import find_command 23 24#: The QEMU build root directory. It may also be the source directory 25#: if building from the source dir, but it's safer to use BUILD_DIR for 26#: that purpose. Be aware that if this code is moved outside of a source 27#: and build tree, it will not be accurate. 28BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 29 30if os.path.islink(os.path.dirname(os.path.dirname(__file__))): 31 # The link to the avocado tests dir in the source code directory 32 lnk = os.path.dirname(os.path.dirname(__file__)) 33 #: The QEMU root source directory 34 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk))) 35else: 36 SOURCE_DIR = BUILD_DIR 37 38sys.path.append(os.path.join(SOURCE_DIR, 'python')) 39 40from qemu.machine import QEMUMachine 41from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 42 tcg_available) 43 44 45def has_cmd(name, args=None): 46 """ 47 This function is for use in a @avocado.skipUnless decorator, e.g.: 48 49 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 50 def test_something_that_needs_sudo(self): 51 ... 52 """ 53 54 if args is None: 55 args = ('which', name) 56 57 try: 58 _, stderr, exitcode = run_cmd(args) 59 except Exception as e: 60 exitcode = -1 61 stderr = str(e) 62 63 if exitcode != 0: 64 cmd_line = ' '.join(args) 65 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 66 return (False, err) 67 else: 68 return (True, '') 69 70def has_cmds(*cmds): 71 """ 72 This function is for use in a @avocado.skipUnless decorator and 73 allows checking for the availability of multiple commands, e.g.: 74 75 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 76 'cmd2', 'cmd3')) 77 def test_something_that_needs_cmd1_and_cmd2(self): 78 ... 79 """ 80 81 for cmd in cmds: 82 if isinstance(cmd, str): 83 cmd = (cmd,) 84 85 ok, errstr = has_cmd(*cmd) 86 if not ok: 87 return (False, errstr) 88 89 return (True, '') 90 91def run_cmd(args): 92 subp = subprocess.Popen(args, 93 stdout=subprocess.PIPE, 94 stderr=subprocess.PIPE, 95 universal_newlines=True) 96 stdout, stderr = subp.communicate() 97 ret = subp.returncode 98 99 return (stdout, stderr, ret) 100 101def is_readable_executable_file(path): 102 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 103 104 105def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 106 """ 107 Picks the path of a QEMU binary, starting either in the current working 108 directory or in the source tree root directory. 109 110 :param arch: the arch to use when looking for a QEMU binary (the target 111 will match the arch given). If None (the default), arch 112 will be the current host system arch (as given by 113 :func:`os.uname`). 114 :type arch: str 115 :returns: the path to the default QEMU binary or None if one could not 116 be found 117 :rtype: str or None 118 """ 119 if arch is None: 120 arch = os.uname()[4] 121 # qemu binary path does not match arch for powerpc, handle it 122 if 'ppc64le' in arch: 123 arch = 'ppc64' 124 qemu_bin_relative_path = os.path.join(".", bin_prefix + arch) 125 if is_readable_executable_file(qemu_bin_relative_path): 126 return qemu_bin_relative_path 127 128 qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR, 129 qemu_bin_relative_path) 130 if is_readable_executable_file(qemu_bin_from_bld_dir_path): 131 return qemu_bin_from_bld_dir_path 132 return None 133 134 135def _console_interaction(test, success_message, failure_message, 136 send_string, keep_sending=False, vm=None): 137 assert not keep_sending or send_string 138 if vm is None: 139 vm = test.vm 140 console = vm.console_socket.makefile(mode='rb', encoding='utf-8') 141 console_logger = logging.getLogger('console') 142 while True: 143 if send_string: 144 vm.console_socket.sendall(send_string.encode()) 145 if not keep_sending: 146 send_string = None # send only once 147 try: 148 msg = console.readline().decode().strip() 149 except UnicodeDecodeError: 150 msg = None 151 if not msg: 152 continue 153 console_logger.debug(msg) 154 if success_message is None or success_message in msg: 155 break 156 if failure_message and failure_message in msg: 157 console.close() 158 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 159 (failure_message, success_message) 160 test.fail(fail) 161 162def interrupt_interactive_console_until_pattern(test, success_message, 163 failure_message=None, 164 interrupt_string='\r'): 165 """ 166 Keep sending a string to interrupt a console prompt, while logging the 167 console output. Typical use case is to break a boot loader prompt, such: 168 169 Press a key within 5 seconds to interrupt boot process. 170 5 171 4 172 3 173 2 174 1 175 Booting default image... 176 177 :param test: an Avocado test containing a VM that will have its console 178 read and probed for a success or failure message 179 :type test: :class:`avocado_qemu.QemuSystemTest` 180 :param success_message: if this message appears, test succeeds 181 :param failure_message: if this message appears, test fails 182 :param interrupt_string: a string to send to the console before trying 183 to read a new line 184 """ 185 _console_interaction(test, success_message, failure_message, 186 interrupt_string, True) 187 188def wait_for_console_pattern(test, success_message, failure_message=None, 189 vm=None): 190 """ 191 Waits for messages to appear on the console, while logging the content 192 193 :param test: an Avocado test containing a VM that will have its console 194 read and probed for a success or failure message 195 :type test: :class:`avocado_qemu.QemuSystemTest` 196 :param success_message: if this message appears, test succeeds 197 :param failure_message: if this message appears, test fails 198 """ 199 _console_interaction(test, success_message, failure_message, None, vm=vm) 200 201def exec_command(test, command): 202 """ 203 Send a command to a console (appending CRLF characters), while logging 204 the content. 205 206 :param test: an Avocado test containing a VM. 207 :type test: :class:`avocado_qemu.QemuSystemTest` 208 :param command: the command to send 209 :type command: str 210 """ 211 _console_interaction(test, None, None, command + '\r') 212 213def exec_command_and_wait_for_pattern(test, command, 214 success_message, failure_message=None): 215 """ 216 Send a command to a console (appending CRLF characters), then wait 217 for success_message to appear on the console, while logging the. 218 content. Mark the test as failed if failure_message is found instead. 219 220 :param test: an Avocado test containing a VM that will have its console 221 read and probed for a success or failure message 222 :type test: :class:`avocado_qemu.QemuSystemTest` 223 :param command: the command to send 224 :param success_message: if this message appears, test succeeds 225 :param failure_message: if this message appears, test fails 226 """ 227 _console_interaction(test, success_message, failure_message, command + '\r') 228 229class QemuBaseTest(avocado.Test): 230 def _get_unique_tag_val(self, tag_name): 231 """ 232 Gets a tag value, if unique for a key 233 """ 234 vals = self.tags.get(tag_name, []) 235 if len(vals) == 1: 236 return vals.pop() 237 return None 238 239 def setUp(self, bin_prefix): 240 self.arch = self.params.get('arch', 241 default=self._get_unique_tag_val('arch')) 242 243 self.cpu = self.params.get('cpu', 244 default=self._get_unique_tag_val('cpu')) 245 246 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 247 self.qemu_bin = self.params.get('qemu_bin', 248 default=default_qemu_bin) 249 if self.qemu_bin is None: 250 self.cancel("No QEMU binary defined or found in the build tree") 251 252 def fetch_asset(self, name, 253 asset_hash=None, algorithm=None, 254 locations=None, expire=None, 255 find_only=False, cancel_on_missing=True): 256 return super().fetch_asset(name, 257 asset_hash=asset_hash, 258 algorithm=algorithm, 259 locations=locations, 260 expire=expire, 261 find_only=find_only, 262 cancel_on_missing=cancel_on_missing) 263 264 265class QemuSystemTest(QemuBaseTest): 266 """Facilitates system emulation tests.""" 267 268 def setUp(self): 269 self._vms = {} 270 271 super().setUp('qemu-system-') 272 273 self.machine = self.params.get('machine', 274 default=self._get_unique_tag_val('machine')) 275 276 def require_accelerator(self, accelerator): 277 """ 278 Requires an accelerator to be available for the test to continue 279 280 It takes into account the currently set qemu binary. 281 282 If the check fails, the test is canceled. If the check itself 283 for the given accelerator is not available, the test is also 284 canceled. 285 286 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 287 :type accelerator: str 288 """ 289 checker = {'tcg': tcg_available, 290 'kvm': kvm_available}.get(accelerator) 291 if checker is None: 292 self.cancel("Don't know how to check for the presence " 293 "of accelerator %s" % accelerator) 294 if not checker(qemu_bin=self.qemu_bin): 295 self.cancel("%s accelerator does not seem to be " 296 "available" % accelerator) 297 298 def _new_vm(self, name, *args): 299 self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_") 300 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 301 sock_dir=self._sd.name, log_dir=self.logdir) 302 self.log.debug('QEMUMachine "%s" created', name) 303 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 304 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 305 if args: 306 vm.add_args(*args) 307 return vm 308 309 @property 310 def vm(self): 311 return self.get_vm(name='default') 312 313 def get_vm(self, *args, name=None): 314 if not name: 315 name = str(uuid.uuid4()) 316 if self._vms.get(name) is None: 317 self._vms[name] = self._new_vm(name, *args) 318 if self.cpu is not None: 319 self._vms[name].add_args('-cpu', self.cpu) 320 if self.machine is not None: 321 self._vms[name].set_machine(self.machine) 322 return self._vms[name] 323 324 def set_vm_arg(self, arg, value): 325 """ 326 Set an argument to list of extra arguments to be given to the QEMU 327 binary. If the argument already exists then its value is replaced. 328 329 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 330 :type arg: str 331 :param value: the argument value, such as "host" in "-cpu host" 332 :type value: str 333 """ 334 if not arg or not value: 335 return 336 if arg not in self.vm.args: 337 self.vm.args.extend([arg, value]) 338 else: 339 idx = self.vm.args.index(arg) + 1 340 if idx < len(self.vm.args): 341 self.vm.args[idx] = value 342 else: 343 self.vm.args.append(value) 344 345 def tearDown(self): 346 for vm in self._vms.values(): 347 vm.shutdown() 348 self._sd = None 349 super().tearDown() 350 351 352class QemuUserTest(QemuBaseTest): 353 """Facilitates user-mode emulation tests.""" 354 355 def setUp(self): 356 self._ldpath = [] 357 super().setUp('qemu-') 358 359 def add_ldpath(self, ldpath): 360 self._ldpath.append(os.path.abspath(ldpath)) 361 362 def run(self, bin_path, args=[]): 363 qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath]) 364 bin_args = " ".join(args) 365 return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args, 366 bin_path, bin_args)) 367 368 369class LinuxSSHMixIn: 370 """Contains utility methods for interacting with a guest via SSH.""" 371 372 def ssh_connect(self, username, credential, credential_is_key=True): 373 self.ssh_logger = logging.getLogger('ssh') 374 res = self.vm.command('human-monitor-command', 375 command_line='info usernet') 376 port = get_info_usernet_hostfwd_port(res) 377 self.assertIsNotNone(port) 378 self.assertGreater(port, 0) 379 self.log.debug('sshd listening on port: %d', port) 380 if credential_is_key: 381 self.ssh_session = ssh.Session('127.0.0.1', port=port, 382 user=username, key=credential) 383 else: 384 self.ssh_session = ssh.Session('127.0.0.1', port=port, 385 user=username, password=credential) 386 for i in range(10): 387 try: 388 self.ssh_session.connect() 389 return 390 except: 391 time.sleep(i) 392 self.fail('ssh connection timeout') 393 394 def ssh_command(self, command): 395 self.ssh_logger.info(command) 396 result = self.ssh_session.cmd(command) 397 stdout_lines = [line.rstrip() for line 398 in result.stdout_text.splitlines()] 399 for line in stdout_lines: 400 self.ssh_logger.info(line) 401 stderr_lines = [line.rstrip() for line 402 in result.stderr_text.splitlines()] 403 for line in stderr_lines: 404 self.ssh_logger.warning(line) 405 406 self.assertEqual(result.exit_status, 0, 407 f'Guest command failed: {command}') 408 return stdout_lines, stderr_lines 409 410class LinuxDistro: 411 """Represents a Linux distribution 412 413 Holds information of known distros. 414 """ 415 #: A collection of known distros and their respective image checksum 416 KNOWN_DISTROS = { 417 'fedora': { 418 '31': { 419 'x86_64': 420 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09' 421 '8a309c2d46215d8fc026954f3c5c27a0'), 422 'pxeboot_url': ('https://archives.fedoraproject.org/' 423 'pub/archive/fedora/linux/releases/31/' 424 'Everything/x86_64/os/images/pxeboot/'), 425 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-' 426 '08a96687f73c ro no_timer_check ' 427 'net.ifnames=0 console=tty1 ' 428 'console=ttyS0,115200n8'), 429 }, 430 'aarch64': 431 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae' 432 'd2af0ad0329383d5639c997fdf16fe49'), 433 'pxeboot_url': 'https://archives.fedoraproject.org/' 434 'pub/archive/fedora/linux/releases/31/' 435 'Everything/aarch64/os/images/pxeboot/', 436 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-' 437 '355e8475b0a7 ro earlyprintk=pl011,0x9000000' 438 ' ignore_loglevel no_timer_check' 439 ' printk.time=1 rd_NO_PLYMOUTH' 440 ' console=ttyAMA0'), 441 }, 442 'ppc64': 443 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4' 444 '3f991c506f2cc390dc4efa2026ad2f58')}, 445 's390x': 446 {'checksum': ('4caaab5a434fd4d1079149a072fdc789' 447 '1e354f834d355069ca982fdcaf5a122d')}, 448 }, 449 '32': { 450 'aarch64': 451 {'checksum': ('b367755c664a2d7a26955bbfff985855' 452 'adfa2ca15e908baf15b4b176d68d3967'), 453 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 454 'releases/32/Server/aarch64/os/images/' 455 'pxeboot/'), 456 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-' 457 '14d95c0e90c5 ro no_timer_check net.ifnames=0' 458 ' console=tty1 console=ttyS0,115200n8'), 459 }, 460 }, 461 '33': { 462 'aarch64': 463 {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1' 464 'a81f386a17f969c1d1c7c87031008a6b'), 465 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 466 'releases/33/Server/aarch64/os/images/' 467 'pxeboot/'), 468 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-' 469 '1126a0208f8a ro no_timer_check net.ifnames=0' 470 ' console=tty1 console=ttyS0,115200n8' 471 ' console=tty0'), 472 }, 473 }, 474 } 475 } 476 477 def __init__(self, name, version, arch): 478 self.name = name 479 self.version = version 480 self.arch = arch 481 try: 482 info = self.KNOWN_DISTROS.get(name).get(version).get(arch) 483 except AttributeError: 484 # Unknown distro 485 info = None 486 self._info = info or {} 487 488 @property 489 def checksum(self): 490 """Gets the cloud-image file checksum""" 491 return self._info.get('checksum', None) 492 493 @checksum.setter 494 def checksum(self, value): 495 self._info['checksum'] = value 496 497 @property 498 def pxeboot_url(self): 499 """Gets the repository url where pxeboot files can be found""" 500 return self._info.get('pxeboot_url', None) 501 502 @property 503 def default_kernel_params(self): 504 """Gets the default kernel parameters""" 505 return self._info.get('kernel_params', None) 506 507 508class LinuxTest(LinuxSSHMixIn, QemuSystemTest): 509 """Facilitates having a cloud-image Linux based available. 510 511 For tests that indent to interact with guests, this is a better choice 512 to start with than the more vanilla `QemuSystemTest` class. 513 """ 514 515 timeout = 900 516 distro = None 517 username = 'root' 518 password = 'password' 519 520 def _set_distro(self): 521 distro_name = self.params.get( 522 'distro', 523 default=self._get_unique_tag_val('distro')) 524 if not distro_name: 525 distro_name = 'fedora' 526 527 distro_version = self.params.get( 528 'distro_version', 529 default=self._get_unique_tag_val('distro_version')) 530 if not distro_version: 531 distro_version = '31' 532 533 self.distro = LinuxDistro(distro_name, distro_version, self.arch) 534 535 # The distro checksum behaves differently than distro name and 536 # version. First, it does not respect a tag with the same 537 # name, given that it's not expected to be used for filtering 538 # (distro name versions are the natural choice). Second, the 539 # order of precedence is: parameter, attribute and then value 540 # from KNOWN_DISTROS. 541 distro_checksum = self.params.get('distro_checksum', 542 default=None) 543 if distro_checksum: 544 self.distro.checksum = distro_checksum 545 546 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'): 547 super().setUp() 548 self._set_distro() 549 self.vm.add_args('-smp', '2') 550 self.vm.add_args('-m', '1024') 551 # The following network device allows for SSH connections 552 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22', 553 '-device', '%s,netdev=vnet' % network_device_type) 554 self.set_up_boot() 555 if ssh_pubkey is None: 556 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys() 557 self.set_up_cloudinit(ssh_pubkey) 558 559 def set_up_existing_ssh_keys(self): 560 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub') 561 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa') 562 ssh_dir = os.path.join(self.workdir, '.ssh') 563 os.mkdir(ssh_dir, mode=0o700) 564 ssh_private_key = os.path.join(ssh_dir, 565 os.path.basename(source_private_key)) 566 shutil.copyfile(source_private_key, ssh_private_key) 567 os.chmod(ssh_private_key, 0o600) 568 return (ssh_public_key, ssh_private_key) 569 570 def download_boot(self): 571 self.log.debug('Looking for and selecting a qemu-img binary to be ' 572 'used to create the bootable snapshot image') 573 # If qemu-img has been built, use it, otherwise the system wide one 574 # will be used. If none is available, the test will cancel. 575 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 576 if not os.path.exists(qemu_img): 577 qemu_img = find_command('qemu-img', False) 578 if qemu_img is False: 579 self.cancel('Could not find "qemu-img", which is required to ' 580 'create the bootable image') 581 vmimage.QEMU_IMG = qemu_img 582 583 self.log.info('Downloading/preparing boot image') 584 # Fedora 31 only provides ppc64le images 585 image_arch = self.arch 586 if self.distro.name == 'fedora': 587 if image_arch == 'ppc64': 588 image_arch = 'ppc64le' 589 590 try: 591 boot = vmimage.get( 592 self.distro.name, arch=image_arch, version=self.distro.version, 593 checksum=self.distro.checksum, 594 algorithm='sha256', 595 cache_dir=self.cache_dirs[0], 596 snapshot_dir=self.workdir) 597 except: 598 self.cancel('Failed to download/prepare boot image') 599 return boot.path 600 601 def prepare_cloudinit(self, ssh_pubkey=None): 602 self.log.info('Preparing cloudinit image') 603 try: 604 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso') 605 self.phone_home_port = network.find_free_port() 606 pubkey_content = None 607 if ssh_pubkey: 608 with open(ssh_pubkey) as pubkey: 609 pubkey_content = pubkey.read() 610 cloudinit.iso(cloudinit_iso, self.name, 611 username=self.username, 612 password=self.password, 613 # QEMU's hard coded usermode router address 614 phone_home_host='10.0.2.2', 615 phone_home_port=self.phone_home_port, 616 authorized_key=pubkey_content) 617 except Exception: 618 self.cancel('Failed to prepare the cloudinit image') 619 return cloudinit_iso 620 621 def set_up_boot(self): 622 path = self.download_boot() 623 self.vm.add_args('-drive', 'file=%s' % path) 624 625 def set_up_cloudinit(self, ssh_pubkey=None): 626 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey) 627 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso) 628 629 def launch_and_wait(self, set_up_ssh_connection=True): 630 self.vm.set_console() 631 self.vm.launch() 632 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(), 633 logger=self.log.getChild('console')) 634 console_drainer.start() 635 self.log.info('VM launched, waiting for boot confirmation from guest') 636 cloudinit.wait_for_phone_home(('0.0.0.0', self.phone_home_port), self.name) 637 if set_up_ssh_connection: 638 self.log.info('Setting up the SSH connection') 639 self.ssh_connect(self.username, self.ssh_key) 640