1# Test class and utilities for functional tests 2# 3# Copyright 2018, 2024 Red Hat, Inc. 4# 5# Original Author (Avocado-based tests): 6# Cleber Rosa <crosa@redhat.com> 7# 8# Adaption for standalone version: 9# Thomas Huth <thuth@redhat.com> 10# 11# This work is licensed under the terms of the GNU GPL, version 2 or 12# later. See the COPYING file in the top-level directory. 13 14import logging 15import os 16import os.path 17import subprocess 18 19from .config import BUILD_DIR 20 21 22def has_cmd(name, args=None): 23 """ 24 This function is for use in a @skipUnless decorator, e.g.: 25 26 @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 27 def test_something_that_needs_sudo(self): 28 ... 29 """ 30 31 if args is None: 32 args = ('which', name) 33 34 try: 35 _, stderr, exitcode = run_cmd(args) 36 except Exception as e: 37 exitcode = -1 38 stderr = str(e) 39 40 if exitcode != 0: 41 cmd_line = ' '.join(args) 42 err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 43 return (False, err) 44 else: 45 return (True, '') 46 47def has_cmds(*cmds): 48 """ 49 This function is for use in a @skipUnless decorator and 50 allows checking for the availability of multiple commands, e.g.: 51 52 @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 53 'cmd2', 'cmd3')) 54 def test_something_that_needs_cmd1_and_cmd2(self): 55 ... 56 """ 57 58 for cmd in cmds: 59 if isinstance(cmd, str): 60 cmd = (cmd,) 61 62 ok, errstr = has_cmd(*cmd) 63 if not ok: 64 return (False, errstr) 65 66 return (True, '') 67 68def run_cmd(args): 69 subp = subprocess.Popen(args, 70 stdout=subprocess.PIPE, 71 stderr=subprocess.PIPE, 72 universal_newlines=True) 73 stdout, stderr = subp.communicate() 74 ret = subp.returncode 75 76 return (stdout, stderr, ret) 77 78def is_readable_executable_file(path): 79 return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 80 81# @test: functional test to fail if @failure is seen 82# @vm: the VM whose console to process 83# @success: a non-None string to look for 84# @failure: a string to look for that triggers test failure, or None 85# 86# Read up to 1 line of text from @vm, looking for @success 87# and optionally @failure. 88# 89# If @success or @failure are seen, immediately return True, 90# even if end of line is not yet seen. ie remainder of the 91# line is left unread. 92# 93# If end of line is seen, with neither @success or @failure 94# return False 95# 96# If @failure is seen, then mark @test as failed 97def _console_read_line_until_match(test, vm, success, failure): 98 msg = bytes([]) 99 done = False 100 while True: 101 c = vm.console_socket.recv(1) 102 if c is None: 103 done = True 104 test.fail( 105 f"EOF in console, expected '{success}'") 106 break 107 msg += c 108 109 if success in msg: 110 done = True 111 break 112 if failure and failure in msg: 113 done = True 114 vm.console_socket.close() 115 test.fail( 116 f"'{failure}' found in console, expected '{success}'") 117 118 if c == b'\n': 119 break 120 121 console_logger = logging.getLogger('console') 122 try: 123 console_logger.debug(msg.decode().strip()) 124 except: 125 console_logger.debug(msg) 126 127 return done 128 129def _console_interaction(test, success_message, failure_message, 130 send_string, keep_sending=False, vm=None): 131 assert not keep_sending or send_string 132 assert success_message or send_string 133 134 if vm is None: 135 vm = test.vm 136 137 test.log.debug( 138 f"Console interaction: success_msg='{success_message}' " + 139 f"failure_msg='{failure_message}' send_string='{send_string}'") 140 141 # We'll process console in bytes, to avoid having to 142 # deal with unicode decode errors from receiving 143 # partial utf8 byte sequences 144 success_message_b = None 145 if success_message is not None: 146 success_message_b = success_message.encode() 147 148 failure_message_b = None 149 if failure_message is not None: 150 failure_message_b = failure_message.encode() 151 152 while True: 153 if send_string: 154 vm.console_socket.sendall(send_string.encode()) 155 if not keep_sending: 156 send_string = None # send only once 157 158 # Only consume console output if waiting for something 159 if success_message is None: 160 if send_string is None: 161 break 162 continue 163 164 if _console_read_line_until_match(test, vm, 165 success_message_b, 166 failure_message_b): 167 break 168 169def interrupt_interactive_console_until_pattern(test, success_message, 170 failure_message=None, 171 interrupt_string='\r'): 172 """ 173 Keep sending a string to interrupt a console prompt, while logging the 174 console output. Typical use case is to break a boot loader prompt, such: 175 176 Press a key within 5 seconds to interrupt boot process. 177 5 178 4 179 3 180 2 181 1 182 Booting default image... 183 184 :param test: a test containing a VM that will have its console 185 read and probed for a success or failure message 186 :type test: :class:`qemu_test.QemuSystemTest` 187 :param success_message: if this message appears, test succeeds 188 :param failure_message: if this message appears, test fails 189 :param interrupt_string: a string to send to the console before trying 190 to read a new line 191 """ 192 assert success_message 193 _console_interaction(test, success_message, failure_message, 194 interrupt_string, True) 195 196def wait_for_console_pattern(test, success_message, failure_message=None, 197 vm=None): 198 """ 199 Waits for messages to appear on the console, while logging the content 200 201 :param test: a test containing a VM that will have its console 202 read and probed for a success or failure message 203 :type test: :class:`qemu_test.QemuSystemTest` 204 :param success_message: if this message appears, test succeeds 205 :param failure_message: if this message appears, test fails 206 """ 207 assert success_message 208 _console_interaction(test, success_message, failure_message, None, vm=vm) 209 210def exec_command(test, command): 211 """ 212 Send a command to a console (appending CRLF characters), while logging 213 the content. 214 215 :param test: a test containing a VM. 216 :type test: :class:`qemu_test.QemuSystemTest` 217 :param command: the command to send 218 :type command: str 219 """ 220 _console_interaction(test, None, None, command + '\r') 221 222def exec_command_and_wait_for_pattern(test, command, 223 success_message, failure_message=None): 224 """ 225 Send a command to a console (appending CRLF characters), then wait 226 for success_message to appear on the console, while logging the. 227 content. Mark the test as failed if failure_message is found instead. 228 229 :param test: a test containing a VM that will have its console 230 read and probed for a success or failure message 231 :type test: :class:`qemu_test.QemuSystemTest` 232 :param command: the command to send 233 :param success_message: if this message appears, test succeeds 234 :param failure_message: if this message appears, test fails 235 """ 236 assert success_message 237 _console_interaction(test, success_message, failure_message, command + '\r') 238 239def get_qemu_img(test): 240 test.log.debug('Looking for and selecting a qemu-img binary') 241 242 # If qemu-img has been built, use it, otherwise the system wide one 243 # will be used. 244 qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 245 if os.path.exists(qemu_img): 246 return qemu_img 247 (has_system_qemu_img, errmsg) = has_cmd('qemu-img') 248 if has_system_qemu_img: 249 return 'qemu-img' 250 test.skipTest(errmsg) 251