1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import shutil 14import subprocess 15import sys 16import tempfile 17import time 18import uuid 19 20import avocado 21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage 22from avocado.utils.path import find_command 23 24from qemu.machine import QEMUMachine 25from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 26 tcg_available) 27 28 29#: The QEMU build root directory. It may also be the source directory 30#: if building from the source dir, but it's safer to use BUILD_DIR for 31#: that purpose. Be aware that if this code is moved outside of a source 32#: and build tree, it will not be accurate. 33BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 34 35if os.path.islink(os.path.dirname(os.path.dirname(__file__))): 36 # The link to the avocado tests dir in the source code directory 37 lnk = os.path.dirname(os.path.dirname(__file__)) 38 #: The QEMU root source directory 39 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk))) 40else: 41 SOURCE_DIR = BUILD_DIR 42 43 44def has_cmd(name, args=None): 45 """ 46 This function is for use in a @avocado.skipUnless decorator, e.g.: 47 48 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 49 def test_something_that_needs_sudo(self): 50 ... 51 """ 52 53 if args is None: 54 args = ('which', name) 55 56 try: 57 _, stderr, exitcode = run_cmd(args) 58 except Exception as e: 59 exitcode = -1 60 stderr = str(e) 61 62 if exitcode != 0: 63 cmd_line = ' '.join(args) 64 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 65 return (False, err) 66 else: 67 return (True, '') 68 69def has_cmds(*cmds): 70 """ 71 This function is for use in a @avocado.skipUnless decorator and 72 allows checking for the availability of multiple commands, e.g.: 73 74 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 75 'cmd2', 'cmd3')) 76 def test_something_that_needs_cmd1_and_cmd2(self): 77 ... 78 """ 79 80 for cmd in cmds: 81 if isinstance(cmd, str): 82 cmd = (cmd,) 83 84 ok, errstr = has_cmd(*cmd) 85 if not ok: 86 return (False, errstr) 87 88 return (True, '') 89 90def run_cmd(args): 91 subp = subprocess.Popen(args, 92 stdout=subprocess.PIPE, 93 stderr=subprocess.PIPE, 94 universal_newlines=True) 95 stdout, stderr = subp.communicate() 96 ret = subp.returncode 97 98 return (stdout, stderr, ret) 99 100def is_readable_executable_file(path): 101 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 102 103 104def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 105 """ 106 Picks the path of a QEMU binary, starting either in the current working 107 directory or in the source tree root directory. 108 109 :param arch: the arch to use when looking for a QEMU binary (the target 110 will match the arch given). If None (the default), arch 111 will be the current host system arch (as given by 112 :func:`os.uname`). 113 :type arch: str 114 :returns: the path to the default QEMU binary or None if one could not 115 be found 116 :rtype: str or None 117 """ 118 if arch is None: 119 arch = os.uname()[4] 120 # qemu binary path does not match arch for powerpc, handle it 121 if 'ppc64le' in arch: 122 arch = 'ppc64' 123 qemu_bin_relative_path = os.path.join(".", bin_prefix + arch) 124 if is_readable_executable_file(qemu_bin_relative_path): 125 return qemu_bin_relative_path 126 127 qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR, 128 qemu_bin_relative_path) 129 if is_readable_executable_file(qemu_bin_from_bld_dir_path): 130 return qemu_bin_from_bld_dir_path 131 return None 132 133 134def _console_interaction(test, success_message, failure_message, 135 send_string, keep_sending=False, vm=None): 136 assert not keep_sending or send_string 137 if vm is None: 138 vm = test.vm 139 console = vm.console_socket.makefile(mode='rb', encoding='utf-8') 140 console_logger = logging.getLogger('console') 141 while True: 142 if send_string: 143 vm.console_socket.sendall(send_string.encode()) 144 if not keep_sending: 145 send_string = None # send only once 146 try: 147 msg = console.readline().decode().strip() 148 except UnicodeDecodeError: 149 msg = None 150 if not msg: 151 continue 152 console_logger.debug(msg) 153 if success_message is None or success_message in msg: 154 break 155 if failure_message and failure_message in msg: 156 console.close() 157 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 158 (failure_message, success_message) 159 test.fail(fail) 160 161def interrupt_interactive_console_until_pattern(test, success_message, 162 failure_message=None, 163 interrupt_string='\r'): 164 """ 165 Keep sending a string to interrupt a console prompt, while logging the 166 console output. Typical use case is to break a boot loader prompt, such: 167 168 Press a key within 5 seconds to interrupt boot process. 169 5 170 4 171 3 172 2 173 1 174 Booting default image... 175 176 :param test: an Avocado test containing a VM that will have its console 177 read and probed for a success or failure message 178 :type test: :class:`avocado_qemu.QemuSystemTest` 179 :param success_message: if this message appears, test succeeds 180 :param failure_message: if this message appears, test fails 181 :param interrupt_string: a string to send to the console before trying 182 to read a new line 183 """ 184 _console_interaction(test, success_message, failure_message, 185 interrupt_string, True) 186 187def wait_for_console_pattern(test, success_message, failure_message=None, 188 vm=None): 189 """ 190 Waits for messages to appear on the console, while logging the content 191 192 :param test: an Avocado test containing a VM that will have its console 193 read and probed for a success or failure message 194 :type test: :class:`avocado_qemu.QemuSystemTest` 195 :param success_message: if this message appears, test succeeds 196 :param failure_message: if this message appears, test fails 197 """ 198 _console_interaction(test, success_message, failure_message, None, vm=vm) 199 200def exec_command(test, command): 201 """ 202 Send a command to a console (appending CRLF characters), while logging 203 the content. 204 205 :param test: an Avocado test containing a VM. 206 :type test: :class:`avocado_qemu.QemuSystemTest` 207 :param command: the command to send 208 :type command: str 209 """ 210 _console_interaction(test, None, None, command + '\r') 211 212def exec_command_and_wait_for_pattern(test, command, 213 success_message, failure_message=None): 214 """ 215 Send a command to a console (appending CRLF characters), then wait 216 for success_message to appear on the console, while logging the. 217 content. Mark the test as failed if failure_message is found instead. 218 219 :param test: an Avocado test containing a VM that will have its console 220 read and probed for a success or failure message 221 :type test: :class:`avocado_qemu.QemuSystemTest` 222 :param command: the command to send 223 :param success_message: if this message appears, test succeeds 224 :param failure_message: if this message appears, test fails 225 """ 226 _console_interaction(test, success_message, failure_message, command + '\r') 227 228class QemuBaseTest(avocado.Test): 229 def _get_unique_tag_val(self, tag_name): 230 """ 231 Gets a tag value, if unique for a key 232 """ 233 vals = self.tags.get(tag_name, []) 234 if len(vals) == 1: 235 return vals.pop() 236 return None 237 238 def setUp(self, bin_prefix): 239 self.arch = self.params.get('arch', 240 default=self._get_unique_tag_val('arch')) 241 242 self.cpu = self.params.get('cpu', 243 default=self._get_unique_tag_val('cpu')) 244 245 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 246 self.qemu_bin = self.params.get('qemu_bin', 247 default=default_qemu_bin) 248 if self.qemu_bin is None: 249 self.cancel("No QEMU binary defined or found in the build tree") 250 251 def fetch_asset(self, name, 252 asset_hash=None, algorithm=None, 253 locations=None, expire=None, 254 find_only=False, cancel_on_missing=True): 255 return super().fetch_asset(name, 256 asset_hash=asset_hash, 257 algorithm=algorithm, 258 locations=locations, 259 expire=expire, 260 find_only=find_only, 261 cancel_on_missing=cancel_on_missing) 262 263 264class QemuSystemTest(QemuBaseTest): 265 """Facilitates system emulation tests.""" 266 267 def setUp(self): 268 self._vms = {} 269 270 super().setUp('qemu-system-') 271 272 self.machine = self.params.get('machine', 273 default=self._get_unique_tag_val('machine')) 274 275 def require_accelerator(self, accelerator): 276 """ 277 Requires an accelerator to be available for the test to continue 278 279 It takes into account the currently set qemu binary. 280 281 If the check fails, the test is canceled. If the check itself 282 for the given accelerator is not available, the test is also 283 canceled. 284 285 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 286 :type accelerator: str 287 """ 288 checker = {'tcg': tcg_available, 289 'kvm': kvm_available}.get(accelerator) 290 if checker is None: 291 self.cancel("Don't know how to check for the presence " 292 "of accelerator %s" % accelerator) 293 if not checker(qemu_bin=self.qemu_bin): 294 self.cancel("%s accelerator does not seem to be " 295 "available" % accelerator) 296 297 def _new_vm(self, name, *args): 298 self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_") 299 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 300 sock_dir=self._sd.name, log_dir=self.logdir) 301 self.log.debug('QEMUMachine "%s" created', name) 302 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 303 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 304 if args: 305 vm.add_args(*args) 306 return vm 307 308 @property 309 def vm(self): 310 return self.get_vm(name='default') 311 312 def get_vm(self, *args, name=None): 313 if not name: 314 name = str(uuid.uuid4()) 315 if self._vms.get(name) is None: 316 self._vms[name] = self._new_vm(name, *args) 317 if self.cpu is not None: 318 self._vms[name].add_args('-cpu', self.cpu) 319 if self.machine is not None: 320 self._vms[name].set_machine(self.machine) 321 return self._vms[name] 322 323 def set_vm_arg(self, arg, value): 324 """ 325 Set an argument to list of extra arguments to be given to the QEMU 326 binary. If the argument already exists then its value is replaced. 327 328 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 329 :type arg: str 330 :param value: the argument value, such as "host" in "-cpu host" 331 :type value: str 332 """ 333 if not arg or not value: 334 return 335 if arg not in self.vm.args: 336 self.vm.args.extend([arg, value]) 337 else: 338 idx = self.vm.args.index(arg) + 1 339 if idx < len(self.vm.args): 340 self.vm.args[idx] = value 341 else: 342 self.vm.args.append(value) 343 344 def tearDown(self): 345 for vm in self._vms.values(): 346 vm.shutdown() 347 self._sd = None 348 super().tearDown() 349 350 351class QemuUserTest(QemuBaseTest): 352 """Facilitates user-mode emulation tests.""" 353 354 def setUp(self): 355 self._ldpath = [] 356 super().setUp('qemu-') 357 358 def add_ldpath(self, ldpath): 359 self._ldpath.append(os.path.abspath(ldpath)) 360 361 def run(self, bin_path, args=[]): 362 qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath]) 363 bin_args = " ".join(args) 364 return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args, 365 bin_path, bin_args)) 366 367 368class LinuxSSHMixIn: 369 """Contains utility methods for interacting with a guest via SSH.""" 370 371 def ssh_connect(self, username, credential, credential_is_key=True): 372 self.ssh_logger = logging.getLogger('ssh') 373 res = self.vm.command('human-monitor-command', 374 command_line='info usernet') 375 port = get_info_usernet_hostfwd_port(res) 376 self.assertIsNotNone(port) 377 self.assertGreater(port, 0) 378 self.log.debug('sshd listening on port: %d', port) 379 if credential_is_key: 380 self.ssh_session = ssh.Session('127.0.0.1', port=port, 381 user=username, key=credential) 382 else: 383 self.ssh_session = ssh.Session('127.0.0.1', port=port, 384 user=username, password=credential) 385 for i in range(10): 386 try: 387 self.ssh_session.connect() 388 return 389 except: 390 time.sleep(i) 391 self.fail('ssh connection timeout') 392 393 def ssh_command(self, command): 394 self.ssh_logger.info(command) 395 result = self.ssh_session.cmd(command) 396 stdout_lines = [line.rstrip() for line 397 in result.stdout_text.splitlines()] 398 for line in stdout_lines: 399 self.ssh_logger.info(line) 400 stderr_lines = [line.rstrip() for line 401 in result.stderr_text.splitlines()] 402 for line in stderr_lines: 403 self.ssh_logger.warning(line) 404 405 self.assertEqual(result.exit_status, 0, 406 f'Guest command failed: {command}') 407 return stdout_lines, stderr_lines 408 409class LinuxDistro: 410 """Represents a Linux distribution 411 412 Holds information of known distros. 413 """ 414 #: A collection of known distros and their respective image checksum 415 KNOWN_DISTROS = { 416 'fedora': { 417 '31': { 418 'x86_64': 419 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09' 420 '8a309c2d46215d8fc026954f3c5c27a0'), 421 'pxeboot_url': ('https://archives.fedoraproject.org/' 422 'pub/archive/fedora/linux/releases/31/' 423 'Everything/x86_64/os/images/pxeboot/'), 424 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-' 425 '08a96687f73c ro no_timer_check ' 426 'net.ifnames=0 console=tty1 ' 427 'console=ttyS0,115200n8'), 428 }, 429 'aarch64': 430 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae' 431 'd2af0ad0329383d5639c997fdf16fe49'), 432 'pxeboot_url': 'https://archives.fedoraproject.org/' 433 'pub/archive/fedora/linux/releases/31/' 434 'Everything/aarch64/os/images/pxeboot/', 435 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-' 436 '355e8475b0a7 ro earlyprintk=pl011,0x9000000' 437 ' ignore_loglevel no_timer_check' 438 ' printk.time=1 rd_NO_PLYMOUTH' 439 ' console=ttyAMA0'), 440 }, 441 'ppc64': 442 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4' 443 '3f991c506f2cc390dc4efa2026ad2f58')}, 444 's390x': 445 {'checksum': ('4caaab5a434fd4d1079149a072fdc789' 446 '1e354f834d355069ca982fdcaf5a122d')}, 447 }, 448 '32': { 449 'aarch64': 450 {'checksum': ('b367755c664a2d7a26955bbfff985855' 451 'adfa2ca15e908baf15b4b176d68d3967'), 452 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 453 'releases/32/Server/aarch64/os/images/' 454 'pxeboot/'), 455 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-' 456 '14d95c0e90c5 ro no_timer_check net.ifnames=0' 457 ' console=tty1 console=ttyS0,115200n8'), 458 }, 459 }, 460 '33': { 461 'aarch64': 462 {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1' 463 'a81f386a17f969c1d1c7c87031008a6b'), 464 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 465 'releases/33/Server/aarch64/os/images/' 466 'pxeboot/'), 467 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-' 468 '1126a0208f8a ro no_timer_check net.ifnames=0' 469 ' console=tty1 console=ttyS0,115200n8' 470 ' console=tty0'), 471 }, 472 }, 473 } 474 } 475 476 def __init__(self, name, version, arch): 477 self.name = name 478 self.version = version 479 self.arch = arch 480 try: 481 info = self.KNOWN_DISTROS.get(name).get(version).get(arch) 482 except AttributeError: 483 # Unknown distro 484 info = None 485 self._info = info or {} 486 487 @property 488 def checksum(self): 489 """Gets the cloud-image file checksum""" 490 return self._info.get('checksum', None) 491 492 @checksum.setter 493 def checksum(self, value): 494 self._info['checksum'] = value 495 496 @property 497 def pxeboot_url(self): 498 """Gets the repository url where pxeboot files can be found""" 499 return self._info.get('pxeboot_url', None) 500 501 @property 502 def default_kernel_params(self): 503 """Gets the default kernel parameters""" 504 return self._info.get('kernel_params', None) 505 506 507class LinuxTest(LinuxSSHMixIn, QemuSystemTest): 508 """Facilitates having a cloud-image Linux based available. 509 510 For tests that indent to interact with guests, this is a better choice 511 to start with than the more vanilla `QemuSystemTest` class. 512 """ 513 514 timeout = 900 515 distro = None 516 username = 'root' 517 password = 'password' 518 smp = '2' 519 memory = '1024' 520 521 def _set_distro(self): 522 distro_name = self.params.get( 523 'distro', 524 default=self._get_unique_tag_val('distro')) 525 if not distro_name: 526 distro_name = 'fedora' 527 528 distro_version = self.params.get( 529 'distro_version', 530 default=self._get_unique_tag_val('distro_version')) 531 if not distro_version: 532 distro_version = '31' 533 534 self.distro = LinuxDistro(distro_name, distro_version, self.arch) 535 536 # The distro checksum behaves differently than distro name and 537 # version. First, it does not respect a tag with the same 538 # name, given that it's not expected to be used for filtering 539 # (distro name versions are the natural choice). Second, the 540 # order of precedence is: parameter, attribute and then value 541 # from KNOWN_DISTROS. 542 distro_checksum = self.params.get('distro_checksum', 543 default=None) 544 if distro_checksum: 545 self.distro.checksum = distro_checksum 546 547 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'): 548 super().setUp() 549 self._set_distro() 550 self.vm.add_args('-smp', self.smp) 551 self.vm.add_args('-m', self.memory) 552 # The following network device allows for SSH connections 553 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22', 554 '-device', '%s,netdev=vnet' % network_device_type) 555 self.set_up_boot() 556 if ssh_pubkey is None: 557 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys() 558 self.set_up_cloudinit(ssh_pubkey) 559 560 def set_up_existing_ssh_keys(self): 561 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub') 562 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa') 563 ssh_dir = os.path.join(self.workdir, '.ssh') 564 os.mkdir(ssh_dir, mode=0o700) 565 ssh_private_key = os.path.join(ssh_dir, 566 os.path.basename(source_private_key)) 567 shutil.copyfile(source_private_key, ssh_private_key) 568 os.chmod(ssh_private_key, 0o600) 569 return (ssh_public_key, ssh_private_key) 570 571 def download_boot(self): 572 self.log.debug('Looking for and selecting a qemu-img binary to be ' 573 'used to create the bootable snapshot image') 574 # If qemu-img has been built, use it, otherwise the system wide one 575 # will be used. If none is available, the test will cancel. 576 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 577 if not os.path.exists(qemu_img): 578 qemu_img = find_command('qemu-img', False) 579 if qemu_img is False: 580 self.cancel('Could not find "qemu-img", which is required to ' 581 'create the bootable image') 582 vmimage.QEMU_IMG = qemu_img 583 584 self.log.info('Downloading/preparing boot image') 585 # Fedora 31 only provides ppc64le images 586 image_arch = self.arch 587 if self.distro.name == 'fedora': 588 if image_arch == 'ppc64': 589 image_arch = 'ppc64le' 590 591 try: 592 boot = vmimage.get( 593 self.distro.name, arch=image_arch, version=self.distro.version, 594 checksum=self.distro.checksum, 595 algorithm='sha256', 596 cache_dir=self.cache_dirs[0], 597 snapshot_dir=self.workdir) 598 except: 599 self.cancel('Failed to download/prepare boot image') 600 return boot.path 601 602 def prepare_cloudinit(self, ssh_pubkey=None): 603 self.log.info('Preparing cloudinit image') 604 try: 605 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso') 606 pubkey_content = None 607 if ssh_pubkey: 608 with open(ssh_pubkey) as pubkey: 609 pubkey_content = pubkey.read() 610 cloudinit.iso(cloudinit_iso, self.name, 611 username=self.username, 612 password=self.password, 613 # QEMU's hard coded usermode router address 614 phone_home_host='10.0.2.2', 615 phone_home_port=self.phone_server.server_port, 616 authorized_key=pubkey_content) 617 except Exception: 618 self.cancel('Failed to prepare the cloudinit image') 619 return cloudinit_iso 620 621 def set_up_boot(self): 622 path = self.download_boot() 623 self.vm.add_args('-drive', 'file=%s' % path) 624 625 def set_up_cloudinit(self, ssh_pubkey=None): 626 self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0), 627 self.name) 628 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey) 629 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso) 630 631 def launch_and_wait(self, set_up_ssh_connection=True): 632 self.vm.set_console() 633 self.vm.launch() 634 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(), 635 logger=self.log.getChild('console')) 636 console_drainer.start() 637 self.log.info('VM launched, waiting for boot confirmation from guest') 638 while not self.phone_server.instance_phoned_back: 639 self.phone_server.handle_request() 640 641 if set_up_ssh_connection: 642 self.log.info('Setting up the SSH connection') 643 self.ssh_connect(self.username, self.ssh_key) 644