xref: /openbmc/qemu/tests/functional/qemu_test/cmd.py (revision cdad03b7)
1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import os.path
17import subprocess
18
19from .config import BUILD_DIR
20
21
22def has_cmd(name, args=None):
23    """
24    This function is for use in a @skipUnless decorator, e.g.:
25
26        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
27        def test_something_that_needs_sudo(self):
28            ...
29    """
30
31    if args is None:
32        args = ('which', name)
33
34    try:
35        _, stderr, exitcode = run_cmd(args)
36    except Exception as e:
37        exitcode = -1
38        stderr = str(e)
39
40    if exitcode != 0:
41        cmd_line = ' '.join(args)
42        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
43        return (False, err)
44    else:
45        return (True, '')
46
47def has_cmds(*cmds):
48    """
49    This function is for use in a @skipUnless decorator and
50    allows checking for the availability of multiple commands, e.g.:
51
52        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
53                              'cmd2', 'cmd3'))
54        def test_something_that_needs_cmd1_and_cmd2(self):
55            ...
56    """
57
58    for cmd in cmds:
59        if isinstance(cmd, str):
60            cmd = (cmd,)
61
62        ok, errstr = has_cmd(*cmd)
63        if not ok:
64            return (False, errstr)
65
66    return (True, '')
67
68def run_cmd(args):
69    subp = subprocess.Popen(args,
70                            stdout=subprocess.PIPE,
71                            stderr=subprocess.PIPE,
72                            universal_newlines=True)
73    stdout, stderr = subp.communicate()
74    ret = subp.returncode
75
76    return (stdout, stderr, ret)
77
78def is_readable_executable_file(path):
79    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
80
81# @test: functional test to fail if @failure is seen
82# @vm: the VM whose console to process
83# @success: a non-None string to look for
84# @failure: a string to look for that triggers test failure, or None
85#
86# Read up to 1 line of text from @vm, looking for @success
87# and optionally @failure.
88#
89# If @success or @failure are seen, immediately return True,
90# even if end of line is not yet seen. ie remainder of the
91# line is left unread.
92#
93# If end of line is seen, with neither @success or @failure
94# return False
95#
96# If @failure is seen, then mark @test as failed
97def _console_read_line_until_match(test, vm, success, failure):
98    msg = bytes([])
99    done = False
100    while True:
101        c = vm.console_socket.recv(1)
102        if c is None:
103            done = True
104            test.fail(
105                f"EOF in console, expected '{success}'")
106            break
107        msg += c
108
109        if success in msg:
110            done = True
111            break
112        if failure and failure in msg:
113            done = True
114            vm.console_socket.close()
115            test.fail(
116                f"'{failure}' found in console, expected '{success}'")
117
118        if c == b'\n':
119            break
120
121    console_logger = logging.getLogger('console')
122    try:
123        console_logger.debug(msg.decode().strip())
124    except:
125        console_logger.debug(msg)
126
127    return done
128
129def _console_interaction(test, success_message, failure_message,
130                         send_string, keep_sending=False, vm=None):
131    assert not keep_sending or send_string
132    assert success_message or send_string
133
134    if vm is None:
135        vm = test.vm
136
137    test.log.debug(
138        f"Console interaction: success_msg='{success_message}' " +
139        f"failure_msg='{failure_message}' send_string='{send_string}'")
140
141    # We'll process console in bytes, to avoid having to
142    # deal with unicode decode errors from receiving
143    # partial utf8 byte sequences
144    success_message_b = None
145    if success_message is not None:
146        success_message_b = success_message.encode()
147
148    failure_message_b = None
149    if failure_message is not None:
150        failure_message_b = failure_message.encode()
151
152    while True:
153        if send_string:
154            vm.console_socket.sendall(send_string.encode())
155            if not keep_sending:
156                send_string = None # send only once
157
158        # Only consume console output if waiting for something
159        if success_message is None:
160            if send_string is None:
161                break
162            continue
163
164        if _console_read_line_until_match(test, vm,
165                                          success_message_b,
166                                          failure_message_b):
167            break
168
169def interrupt_interactive_console_until_pattern(test, success_message,
170                                                failure_message=None,
171                                                interrupt_string='\r'):
172    """
173    Keep sending a string to interrupt a console prompt, while logging the
174    console output. Typical use case is to break a boot loader prompt, such:
175
176        Press a key within 5 seconds to interrupt boot process.
177        5
178        4
179        3
180        2
181        1
182        Booting default image...
183
184    :param test: a  test containing a VM that will have its console
185                 read and probed for a success or failure message
186    :type test: :class:`qemu_test.QemuSystemTest`
187    :param success_message: if this message appears, test succeeds
188    :param failure_message: if this message appears, test fails
189    :param interrupt_string: a string to send to the console before trying
190                             to read a new line
191    """
192    assert success_message
193    _console_interaction(test, success_message, failure_message,
194                         interrupt_string, True)
195
196def wait_for_console_pattern(test, success_message, failure_message=None,
197                             vm=None):
198    """
199    Waits for messages to appear on the console, while logging the content
200
201    :param test: a test containing a VM that will have its console
202                 read and probed for a success or failure message
203    :type test: :class:`qemu_test.QemuSystemTest`
204    :param success_message: if this message appears, test succeeds
205    :param failure_message: if this message appears, test fails
206    """
207    assert success_message
208    _console_interaction(test, success_message, failure_message, None, vm=vm)
209
210def exec_command(test, command):
211    """
212    Send a command to a console (appending CRLF characters), while logging
213    the content.
214
215    :param test: a test containing a VM.
216    :type test: :class:`qemu_test.QemuSystemTest`
217    :param command: the command to send
218    :type command: str
219    """
220    _console_interaction(test, None, None, command + '\r')
221
222def exec_command_and_wait_for_pattern(test, command,
223                                      success_message, failure_message=None):
224    """
225    Send a command to a console (appending CRLF characters), then wait
226    for success_message to appear on the console, while logging the.
227    content. Mark the test as failed if failure_message is found instead.
228
229    :param test: a test containing a VM that will have its console
230                 read and probed for a success or failure message
231    :type test: :class:`qemu_test.QemuSystemTest`
232    :param command: the command to send
233    :param success_message: if this message appears, test succeeds
234    :param failure_message: if this message appears, test fails
235    """
236    assert success_message
237    _console_interaction(test, success_message, failure_message, command + '\r')
238
239def get_qemu_img(test):
240    test.log.debug('Looking for and selecting a qemu-img binary')
241
242    # If qemu-img has been built, use it, otherwise the system wide one
243    # will be used.
244    qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
245    if os.path.exists(qemu_img):
246        return qemu_img
247    (has_system_qemu_img, errmsg) = has_cmd('qemu-img')
248    if has_system_qemu_img:
249        return 'qemu-img'
250    test.skipTest(errmsg)
251