1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import os.path 17import subprocess 18 19from .config import BUILD_DIR 20 21 22def has_cmd(name, args=None): 23 """ 24 This function is for use in a @skipUnless decorator, e.g.: 25 26 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 27 def test_something_that_needs_sudo(self): 28 ... 29 """ 30 31 if args is None: 32 args = ('which', name) 33 34 try: 35 _, stderr, exitcode = run_cmd(args) 36 except Exception as e: 37 exitcode = -1 38 stderr = str(e) 39 40 if exitcode != 0: 41 cmd_line = ' '.join(args) 42 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 43 return (False, err) 44 else: 45 return (True, '') 46 47def has_cmds(*cmds): 48 """ 49 This function is for use in a @skipUnless decorator and 50 allows checking for the availability of multiple commands, e.g.: 51 52 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 53 'cmd2', 'cmd3')) 54 def test_something_that_needs_cmd1_and_cmd2(self): 55 ... 56 """ 57 58 for cmd in cmds: 59 if isinstance(cmd, str): 60 cmd = (cmd,) 61 62 ok, errstr = has_cmd(*cmd) 63 if not ok: 64 return (False, errstr) 65 66 return (True, '') 67 68def run_cmd(args): 69 subp = subprocess.Popen(args, 70 stdout=subprocess.PIPE, 71 stderr=subprocess.PIPE, 72 universal_newlines=True) 73 stdout, stderr = subp.communicate() 74 ret = subp.returncode 75 76 return (stdout, stderr, ret) 77 78def is_readable_executable_file(path): 79 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 80 81def _console_interaction(test, success_message, failure_message, 82 send_string, keep_sending=False, vm=None): 83 assert not keep_sending or send_string 84 if vm is None: 85 vm = test.vm 86 console = vm.console_file 87 console_logger = logging.getLogger('console') 88 while True: 89 if send_string: 90 vm.console_socket.sendall(send_string.encode()) 91 if not keep_sending: 92 send_string = None # send only once 93 94 # Only consume console output if waiting for something 95 if success_message is None and failure_message is None: 96 if send_string is None: 97 break 98 continue 99 100 try: 101 msg = console.readline().decode().strip() 102 except UnicodeDecodeError: 103 msg = None 104 if not msg: 105 continue 106 console_logger.debug(msg) 107 if success_message is None or success_message in msg: 108 break 109 if failure_message and failure_message in msg: 110 console.close() 111 fail = 'Failure message found in console: "%s". Expected: "%s"' % \ 112 (failure_message, success_message) 113 test.fail(fail) 114 115def interrupt_interactive_console_until_pattern(test, success_message, 116 failure_message=None, 117 interrupt_string='\r'): 118 """ 119 Keep sending a string to interrupt a console prompt, while logging the 120 console output. Typical use case is to break a boot loader prompt, such: 121 122 Press a key within 5 seconds to interrupt boot process. 123 5 124 4 125 3 126 2 127 1 128 Booting default image... 129 130 :param test: a test containing a VM that will have its console 131 read and probed for a success or failure message 132 :type test: :class:`qemu_test.QemuSystemTest` 133 :param success_message: if this message appears, test succeeds 134 :param failure_message: if this message appears, test fails 135 :param interrupt_string: a string to send to the console before trying 136 to read a new line 137 """ 138 _console_interaction(test, success_message, failure_message, 139 interrupt_string, True) 140 141def wait_for_console_pattern(test, success_message, failure_message=None, 142 vm=None): 143 """ 144 Waits for messages to appear on the console, while logging the content 145 146 :param test: a test containing a VM that will have its console 147 read and probed for a success or failure message 148 :type test: :class:`qemu_test.QemuSystemTest` 149 :param success_message: if this message appears, test succeeds 150 :param failure_message: if this message appears, test fails 151 """ 152 _console_interaction(test, success_message, failure_message, None, vm=vm) 153 154def exec_command(test, command): 155 """ 156 Send a command to a console (appending CRLF characters), while logging 157 the content. 158 159 :param test: a test containing a VM. 160 :type test: :class:`qemu_test.QemuSystemTest` 161 :param command: the command to send 162 :type command: str 163 """ 164 _console_interaction(test, None, None, command + '\r') 165 166def exec_command_and_wait_for_pattern(test, command, 167 success_message, failure_message=None): 168 """ 169 Send a command to a console (appending CRLF characters), then wait 170 for success_message to appear on the console, while logging the. 171 content. Mark the test as failed if failure_message is found instead. 172 173 :param test: a test containing a VM that will have its console 174 read and probed for a success or failure message 175 :type test: :class:`qemu_test.QemuSystemTest` 176 :param command: the command to send 177 :param success_message: if this message appears, test succeeds 178 :param failure_message: if this message appears, test fails 179 """ 180 _console_interaction(test, success_message, failure_message, command + '\r') 181 182def get_qemu_img(test): 183 test.log.debug('Looking for and selecting a qemu-img binary') 184 185 # If qemu-img has been built, use it, otherwise the system wide one 186 # will be used. 187 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 188 if os.path.exists(qemu_img): 189 return qemu_img 190 if has_cmd('qemu-img'): 191 return 'qemu-img' 192 test.skipTest('Could not find "qemu-img", which is required to ' 193 'create temporary images') 194