1# Test class and utilities for functional tests 2# 3# Copyright (c) 2018 Red Hat, Inc. 4# 5# Author: 6# Cleber Rosa <crosa@redhat.com> 7# 8# This work is licensed under the terms of the GNU GPL, version 2 or 9# later. See the COPYING file in the top-level directory. 10 11import logging 12import os 13import shutil 14import subprocess 15import sys 16import tempfile 17import time 18import uuid 19 20import avocado 21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage 22from avocado.utils.path import find_command 23 24from qemu.machine import QEMUMachine 25from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available, 26 tcg_available) 27 28 29#: The QEMU build root directory. It may also be the source directory 30#: if building from the source dir, but it's safer to use BUILD_DIR for 31#: that purpose. Be aware that if this code is moved outside of a source 32#: and build tree, it will not be accurate. 33BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) 34 35if os.path.islink(os.path.dirname(os.path.dirname(__file__))): 36 # The link to the avocado tests dir in the source code directory 37 lnk = os.path.dirname(os.path.dirname(__file__)) 38 #: The QEMU root source directory 39 SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk))) 40else: 41 SOURCE_DIR = BUILD_DIR 42 43 44def has_cmd(name, args=None): 45 """ 46 This function is for use in a @avocado.skipUnless decorator, e.g.: 47 48 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 49 def test_something_that_needs_sudo(self): 50 ... 51 """ 52 53 if args is None: 54 args = ('which', name) 55 56 try: 57 _, stderr, exitcode = run_cmd(args) 58 except Exception as e: 59 exitcode = -1 60 stderr = str(e) 61 62 if exitcode != 0: 63 cmd_line = ' '.join(args) 64 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 65 return (False, err) 66 else: 67 return (True, '') 68 69def has_cmds(*cmds): 70 """ 71 This function is for use in a @avocado.skipUnless decorator and 72 allows checking for the availability of multiple commands, e.g.: 73 74 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 75 'cmd2', 'cmd3')) 76 def test_something_that_needs_cmd1_and_cmd2(self): 77 ... 78 """ 79 80 for cmd in cmds: 81 if isinstance(cmd, str): 82 cmd = (cmd,) 83 84 ok, errstr = has_cmd(*cmd) 85 if not ok: 86 return (False, errstr) 87 88 return (True, '') 89 90def run_cmd(args): 91 subp = subprocess.Popen(args, 92 stdout=subprocess.PIPE, 93 stderr=subprocess.PIPE, 94 universal_newlines=True) 95 stdout, stderr = subp.communicate() 96 ret = subp.returncode 97 98 return (stdout, stderr, ret) 99 100def is_readable_executable_file(path): 101 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 102 103 104def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None): 105 """ 106 Picks the path of a QEMU binary, starting either in the current working 107 directory or in the source tree root directory. 108 109 :param arch: the arch to use when looking for a QEMU binary (the target 110 will match the arch given). If None (the default), arch 111 will be the current host system arch (as given by 112 :func:`os.uname`). 113 :type arch: str 114 :returns: the path to the default QEMU binary or None if one could not 115 be found 116 :rtype: str or None 117 """ 118 if arch is None: 119 arch = os.uname()[4] 120 # qemu binary path does not match arch for powerpc, handle it 121 if 'ppc64le' in arch: 122 arch = 'ppc64' 123 qemu_bin_name = bin_prefix + arch 124 qemu_bin_paths = [ 125 os.path.join(".", qemu_bin_name), 126 os.path.join(BUILD_DIR, qemu_bin_name), 127 os.path.join(BUILD_DIR, "build", qemu_bin_name), 128 ] 129 for path in qemu_bin_paths: 130 if is_readable_executable_file(path): 131 return path 132 return None 133 134 135def _console_interaction(test, success_message, failure_message, 136 send_string, keep_sending=False, vm=None): 137 assert not keep_sending or send_string 138 if vm is None: 139 vm = test.vm 140 console = vm.console_socket.makefile(mode='rb', encoding='utf-8') 141 console_logger = logging.getLogger('console') 142 while True: 143 if send_string: 144 vm.console_socket.sendall(send_string.encode()) 145 if not keep_sending: 146 send_string = None # send only once 147 try: 148 msg = console.readline().decode().strip() 149 except UnicodeDecodeError: 150 msg = None 151 if not msg: 152 continue 153 console_logger.debug(msg) 154 if success_message is None or success_message in msg: 155 break 156 if failure_message and failure_message in msg: 157 console.close() 158 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 159 (failure_message, success_message) 160 test.fail(fail) 161 162def interrupt_interactive_console_until_pattern(test, success_message, 163 failure_message=None, 164 interrupt_string='\r'): 165 """ 166 Keep sending a string to interrupt a console prompt, while logging the 167 console output. Typical use case is to break a boot loader prompt, such: 168 169 Press a key within 5 seconds to interrupt boot process. 170 5 171 4 172 3 173 2 174 1 175 Booting default image... 176 177 :param test: an Avocado test containing a VM that will have its console 178 read and probed for a success or failure message 179 :type test: :class:`avocado_qemu.QemuSystemTest` 180 :param success_message: if this message appears, test succeeds 181 :param failure_message: if this message appears, test fails 182 :param interrupt_string: a string to send to the console before trying 183 to read a new line 184 """ 185 _console_interaction(test, success_message, failure_message, 186 interrupt_string, True) 187 188def wait_for_console_pattern(test, success_message, failure_message=None, 189 vm=None): 190 """ 191 Waits for messages to appear on the console, while logging the content 192 193 :param test: an Avocado test containing a VM that will have its console 194 read and probed for a success or failure message 195 :type test: :class:`avocado_qemu.QemuSystemTest` 196 :param success_message: if this message appears, test succeeds 197 :param failure_message: if this message appears, test fails 198 """ 199 _console_interaction(test, success_message, failure_message, None, vm=vm) 200 201def exec_command(test, command): 202 """ 203 Send a command to a console (appending CRLF characters), while logging 204 the content. 205 206 :param test: an Avocado test containing a VM. 207 :type test: :class:`avocado_qemu.QemuSystemTest` 208 :param command: the command to send 209 :type command: str 210 """ 211 _console_interaction(test, None, None, command + '\r') 212 213def exec_command_and_wait_for_pattern(test, command, 214 success_message, failure_message=None): 215 """ 216 Send a command to a console (appending CRLF characters), then wait 217 for success_message to appear on the console, while logging the. 218 content. Mark the test as failed if failure_message is found instead. 219 220 :param test: an Avocado test containing a VM that will have its console 221 read and probed for a success or failure message 222 :type test: :class:`avocado_qemu.QemuSystemTest` 223 :param command: the command to send 224 :param success_message: if this message appears, test succeeds 225 :param failure_message: if this message appears, test fails 226 """ 227 _console_interaction(test, success_message, failure_message, command + '\r') 228 229class QemuBaseTest(avocado.Test): 230 231 # default timeout for all tests, can be overridden 232 timeout = 120 233 234 def _get_unique_tag_val(self, tag_name): 235 """ 236 Gets a tag value, if unique for a key 237 """ 238 vals = self.tags.get(tag_name, []) 239 if len(vals) == 1: 240 return vals.pop() 241 return None 242 243 def setUp(self, bin_prefix): 244 self.arch = self.params.get('arch', 245 default=self._get_unique_tag_val('arch')) 246 247 self.cpu = self.params.get('cpu', 248 default=self._get_unique_tag_val('cpu')) 249 250 default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch) 251 self.qemu_bin = self.params.get('qemu_bin', 252 default=default_qemu_bin) 253 if self.qemu_bin is None: 254 self.cancel("No QEMU binary defined or found in the build tree") 255 256 def fetch_asset(self, name, 257 asset_hash=None, algorithm=None, 258 locations=None, expire=None, 259 find_only=False, cancel_on_missing=True): 260 return super().fetch_asset(name, 261 asset_hash=asset_hash, 262 algorithm=algorithm, 263 locations=locations, 264 expire=expire, 265 find_only=find_only, 266 cancel_on_missing=cancel_on_missing) 267 268 269class QemuSystemTest(QemuBaseTest): 270 """Facilitates system emulation tests.""" 271 272 def setUp(self): 273 self._vms = {} 274 275 super().setUp('qemu-system-') 276 277 accel_required = self._get_unique_tag_val('accel') 278 if accel_required: 279 self.require_accelerator(accel_required) 280 281 self.machine = self.params.get('machine', 282 default=self._get_unique_tag_val('machine')) 283 284 def require_accelerator(self, accelerator): 285 """ 286 Requires an accelerator to be available for the test to continue 287 288 It takes into account the currently set qemu binary. 289 290 If the check fails, the test is canceled. If the check itself 291 for the given accelerator is not available, the test is also 292 canceled. 293 294 :param accelerator: name of the accelerator, such as "kvm" or "tcg" 295 :type accelerator: str 296 """ 297 checker = {'tcg': tcg_available, 298 'kvm': kvm_available}.get(accelerator) 299 if checker is None: 300 self.cancel("Don't know how to check for the presence " 301 "of accelerator %s" % accelerator) 302 if not checker(qemu_bin=self.qemu_bin): 303 self.cancel("%s accelerator does not seem to be " 304 "available" % accelerator) 305 306 def require_netdev(self, netdevname): 307 netdevhelp = run_cmd([self.qemu_bin, 308 '-M', 'none', '-netdev', 'help'])[0]; 309 if netdevhelp.find('\n' + netdevname + '\n') < 0: 310 self.cancel('no support for user networking') 311 312 def require_multiprocess(self): 313 """ 314 Test for the presence of the x-pci-proxy-dev which is required 315 to support multiprocess. 316 """ 317 devhelp = run_cmd([self.qemu_bin, 318 '-M', 'none', '-device', 'help'])[0]; 319 if devhelp.find('x-pci-proxy-dev') < 0: 320 self.cancel('no support for multiprocess device emulation') 321 322 def _new_vm(self, name, *args): 323 self._sd = tempfile.TemporaryDirectory(prefix="qemu_") 324 vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir, 325 sock_dir=self._sd.name, log_dir=self.logdir) 326 self.log.debug('QEMUMachine "%s" created', name) 327 self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir) 328 self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir) 329 if args: 330 vm.add_args(*args) 331 return vm 332 333 def get_qemu_img(self): 334 self.log.debug('Looking for and selecting a qemu-img binary') 335 336 # If qemu-img has been built, use it, otherwise the system wide one 337 # will be used. 338 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 339 if not os.path.exists(qemu_img): 340 qemu_img = find_command('qemu-img', False) 341 if qemu_img is False: 342 self.cancel('Could not find "qemu-img"') 343 344 return qemu_img 345 346 @property 347 def vm(self): 348 return self.get_vm(name='default') 349 350 def get_vm(self, *args, name=None): 351 if not name: 352 name = str(uuid.uuid4()) 353 if self._vms.get(name) is None: 354 self._vms[name] = self._new_vm(name, *args) 355 if self.cpu is not None: 356 self._vms[name].add_args('-cpu', self.cpu) 357 if self.machine is not None: 358 self._vms[name].set_machine(self.machine) 359 return self._vms[name] 360 361 def set_vm_arg(self, arg, value): 362 """ 363 Set an argument to list of extra arguments to be given to the QEMU 364 binary. If the argument already exists then its value is replaced. 365 366 :param arg: the QEMU argument, such as "-cpu" in "-cpu host" 367 :type arg: str 368 :param value: the argument value, such as "host" in "-cpu host" 369 :type value: str 370 """ 371 if not arg or not value: 372 return 373 if arg not in self.vm.args: 374 self.vm.args.extend([arg, value]) 375 else: 376 idx = self.vm.args.index(arg) + 1 377 if idx < len(self.vm.args): 378 self.vm.args[idx] = value 379 else: 380 self.vm.args.append(value) 381 382 def tearDown(self): 383 for vm in self._vms.values(): 384 vm.shutdown() 385 self._sd = None 386 super().tearDown() 387 388 389class QemuUserTest(QemuBaseTest): 390 """Facilitates user-mode emulation tests.""" 391 392 def setUp(self): 393 self._ldpath = [] 394 super().setUp('qemu-') 395 396 def add_ldpath(self, ldpath): 397 self._ldpath.append(os.path.abspath(ldpath)) 398 399 def run(self, bin_path, args=[]): 400 qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath]) 401 bin_args = " ".join(args) 402 return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args, 403 bin_path, bin_args)) 404 405 406class LinuxSSHMixIn: 407 """Contains utility methods for interacting with a guest via SSH.""" 408 409 def ssh_connect(self, username, credential, credential_is_key=True): 410 self.ssh_logger = logging.getLogger('ssh') 411 res = self.vm.command('human-monitor-command', 412 command_line='info usernet') 413 port = get_info_usernet_hostfwd_port(res) 414 self.assertIsNotNone(port) 415 self.assertGreater(port, 0) 416 self.log.debug('sshd listening on port: %d', port) 417 if credential_is_key: 418 self.ssh_session = ssh.Session('127.0.0.1', port=port, 419 user=username, key=credential) 420 else: 421 self.ssh_session = ssh.Session('127.0.0.1', port=port, 422 user=username, password=credential) 423 for i in range(10): 424 try: 425 self.ssh_session.connect() 426 return 427 except: 428 time.sleep(i) 429 self.fail('ssh connection timeout') 430 431 def ssh_command(self, command): 432 self.ssh_logger.info(command) 433 result = self.ssh_session.cmd(command) 434 stdout_lines = [line.rstrip() for line 435 in result.stdout_text.splitlines()] 436 for line in stdout_lines: 437 self.ssh_logger.info(line) 438 stderr_lines = [line.rstrip() for line 439 in result.stderr_text.splitlines()] 440 for line in stderr_lines: 441 self.ssh_logger.warning(line) 442 443 self.assertEqual(result.exit_status, 0, 444 f'Guest command failed: {command}') 445 return stdout_lines, stderr_lines 446 447 def ssh_command_output_contains(self, cmd, exp): 448 stdout, _ = self.ssh_command(cmd) 449 for line in stdout: 450 if exp in line: 451 break 452 else: 453 self.fail('"%s" output does not contain "%s"' % (cmd, exp)) 454 455class LinuxDistro: 456 """Represents a Linux distribution 457 458 Holds information of known distros. 459 """ 460 #: A collection of known distros and their respective image checksum 461 KNOWN_DISTROS = { 462 'fedora': { 463 '31': { 464 'x86_64': 465 {'checksum': ('e3c1b309d9203604922d6e255c2c5d09' 466 '8a309c2d46215d8fc026954f3c5c27a0'), 467 'pxeboot_url': ('https://archives.fedoraproject.org/' 468 'pub/archive/fedora/linux/releases/31/' 469 'Everything/x86_64/os/images/pxeboot/'), 470 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-' 471 '08a96687f73c ro no_timer_check ' 472 'net.ifnames=0 console=tty1 ' 473 'console=ttyS0,115200n8'), 474 }, 475 'aarch64': 476 {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae' 477 'd2af0ad0329383d5639c997fdf16fe49'), 478 'pxeboot_url': 'https://archives.fedoraproject.org/' 479 'pub/archive/fedora/linux/releases/31/' 480 'Everything/aarch64/os/images/pxeboot/', 481 'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-' 482 '355e8475b0a7 ro earlyprintk=pl011,0x9000000' 483 ' ignore_loglevel no_timer_check' 484 ' printk.time=1 rd_NO_PLYMOUTH' 485 ' console=ttyAMA0'), 486 }, 487 'ppc64': 488 {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4' 489 '3f991c506f2cc390dc4efa2026ad2f58')}, 490 's390x': 491 {'checksum': ('4caaab5a434fd4d1079149a072fdc789' 492 '1e354f834d355069ca982fdcaf5a122d')}, 493 }, 494 '32': { 495 'aarch64': 496 {'checksum': ('b367755c664a2d7a26955bbfff985855' 497 'adfa2ca15e908baf15b4b176d68d3967'), 498 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 499 'releases/32/Server/aarch64/os/images/' 500 'pxeboot/'), 501 'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-' 502 '14d95c0e90c5 ro no_timer_check net.ifnames=0' 503 ' console=tty1 console=ttyS0,115200n8'), 504 }, 505 }, 506 '33': { 507 'aarch64': 508 {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1' 509 'a81f386a17f969c1d1c7c87031008a6b'), 510 'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/' 511 'releases/33/Server/aarch64/os/images/' 512 'pxeboot/'), 513 'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-' 514 '1126a0208f8a ro no_timer_check net.ifnames=0' 515 ' console=tty1 console=ttyS0,115200n8' 516 ' console=tty0'), 517 }, 518 }, 519 } 520 } 521 522 def __init__(self, name, version, arch): 523 self.name = name 524 self.version = version 525 self.arch = arch 526 try: 527 info = self.KNOWN_DISTROS.get(name).get(version).get(arch) 528 except AttributeError: 529 # Unknown distro 530 info = None 531 self._info = info or {} 532 533 @property 534 def checksum(self): 535 """Gets the cloud-image file checksum""" 536 return self._info.get('checksum', None) 537 538 @checksum.setter 539 def checksum(self, value): 540 self._info['checksum'] = value 541 542 @property 543 def pxeboot_url(self): 544 """Gets the repository url where pxeboot files can be found""" 545 return self._info.get('pxeboot_url', None) 546 547 @property 548 def default_kernel_params(self): 549 """Gets the default kernel parameters""" 550 return self._info.get('kernel_params', None) 551 552 553class LinuxTest(LinuxSSHMixIn, QemuSystemTest): 554 """Facilitates having a cloud-image Linux based available. 555 556 For tests that intend to interact with guests, this is a better choice 557 to start with than the more vanilla `QemuSystemTest` class. 558 """ 559 560 distro = None 561 username = 'root' 562 password = 'password' 563 smp = '2' 564 memory = '1024' 565 566 def _set_distro(self): 567 distro_name = self.params.get( 568 'distro', 569 default=self._get_unique_tag_val('distro')) 570 if not distro_name: 571 distro_name = 'fedora' 572 573 distro_version = self.params.get( 574 'distro_version', 575 default=self._get_unique_tag_val('distro_version')) 576 if not distro_version: 577 distro_version = '31' 578 579 self.distro = LinuxDistro(distro_name, distro_version, self.arch) 580 581 # The distro checksum behaves differently than distro name and 582 # version. First, it does not respect a tag with the same 583 # name, given that it's not expected to be used for filtering 584 # (distro name versions are the natural choice). Second, the 585 # order of precedence is: parameter, attribute and then value 586 # from KNOWN_DISTROS. 587 distro_checksum = self.params.get('distro_checksum', 588 default=None) 589 if distro_checksum: 590 self.distro.checksum = distro_checksum 591 592 def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'): 593 super().setUp() 594 self.require_netdev('user') 595 self._set_distro() 596 self.vm.add_args('-smp', self.smp) 597 self.vm.add_args('-m', self.memory) 598 # The following network device allows for SSH connections 599 self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22', 600 '-device', '%s,netdev=vnet' % network_device_type) 601 self.set_up_boot() 602 if ssh_pubkey is None: 603 ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys() 604 self.set_up_cloudinit(ssh_pubkey) 605 606 def set_up_existing_ssh_keys(self): 607 ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub') 608 source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa') 609 ssh_dir = os.path.join(self.workdir, '.ssh') 610 os.mkdir(ssh_dir, mode=0o700) 611 ssh_private_key = os.path.join(ssh_dir, 612 os.path.basename(source_private_key)) 613 shutil.copyfile(source_private_key, ssh_private_key) 614 os.chmod(ssh_private_key, 0o600) 615 return (ssh_public_key, ssh_private_key) 616 617 def download_boot(self): 618 # Set the qemu-img binary. 619 # If none is available, the test will cancel. 620 vmimage.QEMU_IMG = super().get_qemu_img() 621 622 self.log.info('Downloading/preparing boot image') 623 # Fedora 31 only provides ppc64le images 624 image_arch = self.arch 625 if self.distro.name == 'fedora': 626 if image_arch == 'ppc64': 627 image_arch = 'ppc64le' 628 629 try: 630 boot = vmimage.get( 631 self.distro.name, arch=image_arch, version=self.distro.version, 632 checksum=self.distro.checksum, 633 algorithm='sha256', 634 cache_dir=self.cache_dirs[0], 635 snapshot_dir=self.workdir) 636 except: 637 self.cancel('Failed to download/prepare boot image') 638 return boot.path 639 640 def prepare_cloudinit(self, ssh_pubkey=None): 641 self.log.info('Preparing cloudinit image') 642 try: 643 cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso') 644 pubkey_content = None 645 if ssh_pubkey: 646 with open(ssh_pubkey) as pubkey: 647 pubkey_content = pubkey.read() 648 cloudinit.iso(cloudinit_iso, self.name, 649 username=self.username, 650 password=self.password, 651 # QEMU's hard coded usermode router address 652 phone_home_host='10.0.2.2', 653 phone_home_port=self.phone_server.server_port, 654 authorized_key=pubkey_content) 655 except Exception: 656 self.cancel('Failed to prepare the cloudinit image') 657 return cloudinit_iso 658 659 def set_up_boot(self): 660 path = self.download_boot() 661 self.vm.add_args('-drive', 'file=%s' % path) 662 663 def set_up_cloudinit(self, ssh_pubkey=None): 664 self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0), 665 self.name) 666 cloudinit_iso = self.prepare_cloudinit(ssh_pubkey) 667 self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso) 668 669 def launch_and_wait(self, set_up_ssh_connection=True): 670 self.vm.set_console() 671 self.vm.launch() 672 console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(), 673 logger=self.log.getChild('console')) 674 console_drainer.start() 675 self.log.info('VM launched, waiting for boot confirmation from guest') 676 while not self.phone_server.instance_phoned_back: 677 self.phone_server.handle_request() 678 679 if set_up_ssh_connection: 680 self.log.info('Setting up the SSH connection') 681 self.ssh_connect(self.username, self.ssh_key) 682