1fa32a634SThomas Huth# Test class and utilities for functional tests 2fa32a634SThomas Huth# 3fa32a634SThomas Huth# Copyright 2018, 2024 Red Hat, Inc. 4fa32a634SThomas Huth# 5fa32a634SThomas Huth# Original Author (Avocado-based tests): 6fa32a634SThomas Huth# Cleber Rosa <crosa@redhat.com> 7fa32a634SThomas Huth# 8fa32a634SThomas Huth# Adaption for standalone version: 9fa32a634SThomas Huth# Thomas Huth <thuth@redhat.com> 10fa32a634SThomas Huth# 11fa32a634SThomas Huth# This work is licensed under the terms of the GNU GPL, version 2 or 12fa32a634SThomas Huth# later. See the COPYING file in the top-level directory. 13fa32a634SThomas Huth 14fa32a634SThomas Huthimport logging 15fa32a634SThomas Huthimport os 16fa32a634SThomas Huthimport os.path 17fa32a634SThomas Huthimport subprocess 18fa32a634SThomas Huth 191255f5e4SPhilippe Mathieu-Daudéfrom .config import BUILD_DIR 201255f5e4SPhilippe Mathieu-Daudé 21fa32a634SThomas Huth 22fa32a634SThomas Huthdef has_cmd(name, args=None): 23fa32a634SThomas Huth """ 24fa32a634SThomas Huth This function is for use in a @skipUnless decorator, e.g.: 25fa32a634SThomas Huth 26fa32a634SThomas Huth @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true'))) 27fa32a634SThomas Huth def test_something_that_needs_sudo(self): 28fa32a634SThomas Huth ... 29fa32a634SThomas Huth """ 30fa32a634SThomas Huth 31fa32a634SThomas Huth if args is None: 32fa32a634SThomas Huth args = ('which', name) 33fa32a634SThomas Huth 34fa32a634SThomas Huth try: 35fa32a634SThomas Huth _, stderr, exitcode = run_cmd(args) 36fa32a634SThomas Huth except Exception as e: 37fa32a634SThomas Huth exitcode = -1 38fa32a634SThomas Huth stderr = str(e) 39fa32a634SThomas Huth 40fa32a634SThomas Huth if exitcode != 0: 41fa32a634SThomas Huth cmd_line = ' '.join(args) 42fa32a634SThomas Huth err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}' 43fa32a634SThomas Huth return (False, err) 44fa32a634SThomas Huth else: 45fa32a634SThomas Huth return (True, '') 46fa32a634SThomas Huth 47fa32a634SThomas Huthdef has_cmds(*cmds): 48fa32a634SThomas Huth """ 49fa32a634SThomas Huth This function is for use in a @skipUnless decorator and 50fa32a634SThomas Huth allows checking for the availability of multiple commands, e.g.: 51fa32a634SThomas Huth 52fa32a634SThomas Huth @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')), 53fa32a634SThomas Huth 'cmd2', 'cmd3')) 54fa32a634SThomas Huth def test_something_that_needs_cmd1_and_cmd2(self): 55fa32a634SThomas Huth ... 56fa32a634SThomas Huth """ 57fa32a634SThomas Huth 58fa32a634SThomas Huth for cmd in cmds: 59fa32a634SThomas Huth if isinstance(cmd, str): 60fa32a634SThomas Huth cmd = (cmd,) 61fa32a634SThomas Huth 62fa32a634SThomas Huth ok, errstr = has_cmd(*cmd) 63fa32a634SThomas Huth if not ok: 64fa32a634SThomas Huth return (False, errstr) 65fa32a634SThomas Huth 66fa32a634SThomas Huth return (True, '') 67fa32a634SThomas Huth 68fa32a634SThomas Huthdef run_cmd(args): 69fa32a634SThomas Huth subp = subprocess.Popen(args, 70fa32a634SThomas Huth stdout=subprocess.PIPE, 71fa32a634SThomas Huth stderr=subprocess.PIPE, 72fa32a634SThomas Huth universal_newlines=True) 73fa32a634SThomas Huth stdout, stderr = subp.communicate() 74fa32a634SThomas Huth ret = subp.returncode 75fa32a634SThomas Huth 76fa32a634SThomas Huth return (stdout, stderr, ret) 77fa32a634SThomas Huth 78fa32a634SThomas Huthdef is_readable_executable_file(path): 79fa32a634SThomas Huth return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK) 80fa32a634SThomas Huth 81*cdad03b7SDaniel P. Berrangé# @test: functional test to fail if @failure is seen 82*cdad03b7SDaniel P. Berrangé# @vm: the VM whose console to process 83*cdad03b7SDaniel P. Berrangé# @success: a non-None string to look for 84*cdad03b7SDaniel P. Berrangé# @failure: a string to look for that triggers test failure, or None 85*cdad03b7SDaniel P. Berrangé# 86*cdad03b7SDaniel P. Berrangé# Read up to 1 line of text from @vm, looking for @success 87*cdad03b7SDaniel P. Berrangé# and optionally @failure. 88*cdad03b7SDaniel P. Berrangé# 89*cdad03b7SDaniel P. Berrangé# If @success or @failure are seen, immediately return True, 90*cdad03b7SDaniel P. Berrangé# even if end of line is not yet seen. ie remainder of the 91*cdad03b7SDaniel P. Berrangé# line is left unread. 92*cdad03b7SDaniel P. Berrangé# 93*cdad03b7SDaniel P. Berrangé# If end of line is seen, with neither @success or @failure 94*cdad03b7SDaniel P. Berrangé# return False 95*cdad03b7SDaniel P. Berrangé# 96*cdad03b7SDaniel P. Berrangé# If @failure is seen, then mark @test as failed 97*cdad03b7SDaniel P. Berrangédef _console_read_line_until_match(test, vm, success, failure): 98*cdad03b7SDaniel P. Berrangé msg = bytes([]) 99*cdad03b7SDaniel P. Berrangé done = False 100*cdad03b7SDaniel P. Berrangé while True: 101*cdad03b7SDaniel P. Berrangé c = vm.console_socket.recv(1) 102*cdad03b7SDaniel P. Berrangé if c is None: 103*cdad03b7SDaniel P. Berrangé done = True 104*cdad03b7SDaniel P. Berrangé test.fail( 105*cdad03b7SDaniel P. Berrangé f"EOF in console, expected '{success}'") 106*cdad03b7SDaniel P. Berrangé break 107*cdad03b7SDaniel P. Berrangé msg += c 108*cdad03b7SDaniel P. Berrangé 109*cdad03b7SDaniel P. Berrangé if success in msg: 110*cdad03b7SDaniel P. Berrangé done = True 111*cdad03b7SDaniel P. Berrangé break 112*cdad03b7SDaniel P. Berrangé if failure and failure in msg: 113*cdad03b7SDaniel P. Berrangé done = True 114*cdad03b7SDaniel P. Berrangé vm.console_socket.close() 115*cdad03b7SDaniel P. Berrangé test.fail( 116*cdad03b7SDaniel P. Berrangé f"'{failure}' found in console, expected '{success}'") 117*cdad03b7SDaniel P. Berrangé 118*cdad03b7SDaniel P. Berrangé if c == b'\n': 119*cdad03b7SDaniel P. Berrangé break 120*cdad03b7SDaniel P. Berrangé 121*cdad03b7SDaniel P. Berrangé console_logger = logging.getLogger('console') 122*cdad03b7SDaniel P. Berrangé try: 123*cdad03b7SDaniel P. Berrangé console_logger.debug(msg.decode().strip()) 124*cdad03b7SDaniel P. Berrangé except: 125*cdad03b7SDaniel P. Berrangé console_logger.debug(msg) 126*cdad03b7SDaniel P. Berrangé 127*cdad03b7SDaniel P. Berrangé return done 128*cdad03b7SDaniel P. Berrangé 129fa32a634SThomas Huthdef _console_interaction(test, success_message, failure_message, 130fa32a634SThomas Huth send_string, keep_sending=False, vm=None): 131fa32a634SThomas Huth assert not keep_sending or send_string 132f03a8189SDaniel P. Berrangé assert success_message or send_string 133f03a8189SDaniel P. Berrangé 134fa32a634SThomas Huth if vm is None: 135fa32a634SThomas Huth vm = test.vm 136*cdad03b7SDaniel P. Berrangé 1376f0942b7SDaniel P. Berrangé test.log.debug( 1386f0942b7SDaniel P. Berrangé f"Console interaction: success_msg='{success_message}' " + 1396f0942b7SDaniel P. Berrangé f"failure_msg='{failure_message}' send_string='{send_string}'") 140*cdad03b7SDaniel P. Berrangé 141*cdad03b7SDaniel P. Berrangé # We'll process console in bytes, to avoid having to 142*cdad03b7SDaniel P. Berrangé # deal with unicode decode errors from receiving 143*cdad03b7SDaniel P. Berrangé # partial utf8 byte sequences 144*cdad03b7SDaniel P. Berrangé success_message_b = None 145*cdad03b7SDaniel P. Berrangé if success_message is not None: 146*cdad03b7SDaniel P. Berrangé success_message_b = success_message.encode() 147*cdad03b7SDaniel P. Berrangé 148*cdad03b7SDaniel P. Berrangé failure_message_b = None 149*cdad03b7SDaniel P. Berrangé if failure_message is not None: 150*cdad03b7SDaniel P. Berrangé failure_message_b = failure_message.encode() 151*cdad03b7SDaniel P. Berrangé 152fa32a634SThomas Huth while True: 153fa32a634SThomas Huth if send_string: 154fa32a634SThomas Huth vm.console_socket.sendall(send_string.encode()) 155fa32a634SThomas Huth if not keep_sending: 156fa32a634SThomas Huth send_string = None # send only once 157fa32a634SThomas Huth 158fa32a634SThomas Huth # Only consume console output if waiting for something 159f03a8189SDaniel P. Berrangé if success_message is None: 160fa32a634SThomas Huth if send_string is None: 161fa32a634SThomas Huth break 162fa32a634SThomas Huth continue 163fa32a634SThomas Huth 164*cdad03b7SDaniel P. Berrangé if _console_read_line_until_match(test, vm, 165*cdad03b7SDaniel P. Berrangé success_message_b, 166*cdad03b7SDaniel P. Berrangé failure_message_b): 167fa32a634SThomas Huth break 168fa32a634SThomas Huth 169fa32a634SThomas Huthdef interrupt_interactive_console_until_pattern(test, success_message, 170fa32a634SThomas Huth failure_message=None, 171fa32a634SThomas Huth interrupt_string='\r'): 172fa32a634SThomas Huth """ 173fa32a634SThomas Huth Keep sending a string to interrupt a console prompt, while logging the 174fa32a634SThomas Huth console output. Typical use case is to break a boot loader prompt, such: 175fa32a634SThomas Huth 176fa32a634SThomas Huth Press a key within 5 seconds to interrupt boot process. 177fa32a634SThomas Huth 5 178fa32a634SThomas Huth 4 179fa32a634SThomas Huth 3 180fa32a634SThomas Huth 2 181fa32a634SThomas Huth 1 182fa32a634SThomas Huth Booting default image... 183fa32a634SThomas Huth 184fa32a634SThomas Huth :param test: a test containing a VM that will have its console 185fa32a634SThomas Huth read and probed for a success or failure message 186fa32a634SThomas Huth :type test: :class:`qemu_test.QemuSystemTest` 187fa32a634SThomas Huth :param success_message: if this message appears, test succeeds 188fa32a634SThomas Huth :param failure_message: if this message appears, test fails 189fa32a634SThomas Huth :param interrupt_string: a string to send to the console before trying 190fa32a634SThomas Huth to read a new line 191fa32a634SThomas Huth """ 192f03a8189SDaniel P. Berrangé assert success_message 193fa32a634SThomas Huth _console_interaction(test, success_message, failure_message, 194fa32a634SThomas Huth interrupt_string, True) 195fa32a634SThomas Huth 196fa32a634SThomas Huthdef wait_for_console_pattern(test, success_message, failure_message=None, 197fa32a634SThomas Huth vm=None): 198fa32a634SThomas Huth """ 199fa32a634SThomas Huth Waits for messages to appear on the console, while logging the content 200fa32a634SThomas Huth 201fa32a634SThomas Huth :param test: a test containing a VM that will have its console 202fa32a634SThomas Huth read and probed for a success or failure message 203fa32a634SThomas Huth :type test: :class:`qemu_test.QemuSystemTest` 204fa32a634SThomas Huth :param success_message: if this message appears, test succeeds 205fa32a634SThomas Huth :param failure_message: if this message appears, test fails 206fa32a634SThomas Huth """ 207f03a8189SDaniel P. Berrangé assert success_message 208fa32a634SThomas Huth _console_interaction(test, success_message, failure_message, None, vm=vm) 209fa32a634SThomas Huth 210fa32a634SThomas Huthdef exec_command(test, command): 211fa32a634SThomas Huth """ 212fa32a634SThomas Huth Send a command to a console (appending CRLF characters), while logging 213fa32a634SThomas Huth the content. 214fa32a634SThomas Huth 215fa32a634SThomas Huth :param test: a test containing a VM. 216fa32a634SThomas Huth :type test: :class:`qemu_test.QemuSystemTest` 217fa32a634SThomas Huth :param command: the command to send 218fa32a634SThomas Huth :type command: str 219fa32a634SThomas Huth """ 220fa32a634SThomas Huth _console_interaction(test, None, None, command + '\r') 221fa32a634SThomas Huth 222fa32a634SThomas Huthdef exec_command_and_wait_for_pattern(test, command, 223fa32a634SThomas Huth success_message, failure_message=None): 224fa32a634SThomas Huth """ 225fa32a634SThomas Huth Send a command to a console (appending CRLF characters), then wait 226fa32a634SThomas Huth for success_message to appear on the console, while logging the. 227fa32a634SThomas Huth content. Mark the test as failed if failure_message is found instead. 228fa32a634SThomas Huth 229fa32a634SThomas Huth :param test: a test containing a VM that will have its console 230fa32a634SThomas Huth read and probed for a success or failure message 231fa32a634SThomas Huth :type test: :class:`qemu_test.QemuSystemTest` 232fa32a634SThomas Huth :param command: the command to send 233fa32a634SThomas Huth :param success_message: if this message appears, test succeeds 234fa32a634SThomas Huth :param failure_message: if this message appears, test fails 235fa32a634SThomas Huth """ 236f03a8189SDaniel P. Berrangé assert success_message 237fa32a634SThomas Huth _console_interaction(test, success_message, failure_message, command + '\r') 2381255f5e4SPhilippe Mathieu-Daudé 2391255f5e4SPhilippe Mathieu-Daudédef get_qemu_img(test): 2401255f5e4SPhilippe Mathieu-Daudé test.log.debug('Looking for and selecting a qemu-img binary') 2411255f5e4SPhilippe Mathieu-Daudé 2421255f5e4SPhilippe Mathieu-Daudé # If qemu-img has been built, use it, otherwise the system wide one 2431255f5e4SPhilippe Mathieu-Daudé # will be used. 2441255f5e4SPhilippe Mathieu-Daudé qemu_img = os.path.join(BUILD_DIR, 'qemu-img') 2451255f5e4SPhilippe Mathieu-Daudé if os.path.exists(qemu_img): 2461255f5e4SPhilippe Mathieu-Daudé return qemu_img 24759d10024SThomas Huth (has_system_qemu_img, errmsg) = has_cmd('qemu-img') 24859d10024SThomas Huth if has_system_qemu_img: 2491255f5e4SPhilippe Mathieu-Daudé return 'qemu-img' 25059d10024SThomas Huth test.skipTest(errmsg) 251