1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import subprocess
14import sys
15import tempfile
16import time
17import uuid
18
19import avocado
20from avocado.utils import ssh
21from avocado.utils.path import find_command
22
23from qemu.machine import QEMUMachine
24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
25                        tcg_available)
26
27
28#: The QEMU build root directory.  It may also be the source directory
29#: if building from the source dir, but it's safer to use BUILD_DIR for
30#: that purpose.  Be aware that if this code is moved outside of a source
31#: and build tree, it will not be accurate.
32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
33
34
35def has_cmd(name, args=None):
36    """
37    This function is for use in a @avocado.skipUnless decorator, e.g.:
38
39        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
40        def test_something_that_needs_sudo(self):
41            ...
42    """
43
44    if args is None:
45        args = ('which', name)
46
47    try:
48        _, stderr, exitcode = run_cmd(args)
49    except Exception as e:
50        exitcode = -1
51        stderr = str(e)
52
53    if exitcode != 0:
54        cmd_line = ' '.join(args)
55        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
56        return (False, err)
57    else:
58        return (True, '')
59
60def has_cmds(*cmds):
61    """
62    This function is for use in a @avocado.skipUnless decorator and
63    allows checking for the availability of multiple commands, e.g.:
64
65        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
66                              'cmd2', 'cmd3'))
67        def test_something_that_needs_cmd1_and_cmd2(self):
68            ...
69    """
70
71    for cmd in cmds:
72        if isinstance(cmd, str):
73            cmd = (cmd,)
74
75        ok, errstr = has_cmd(*cmd)
76        if not ok:
77            return (False, errstr)
78
79    return (True, '')
80
81def run_cmd(args):
82    subp = subprocess.Popen(args,
83                            stdout=subprocess.PIPE,
84                            stderr=subprocess.PIPE,
85                            universal_newlines=True)
86    stdout, stderr = subp.communicate()
87    ret = subp.returncode
88
89    return (stdout, stderr, ret)
90
91def is_readable_executable_file(path):
92    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
93
94
95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
96    """
97    Picks the path of a QEMU binary, starting either in the current working
98    directory or in the source tree root directory.
99
100    :param arch: the arch to use when looking for a QEMU binary (the target
101                 will match the arch given).  If None (the default), arch
102                 will be the current host system arch (as given by
103                 :func:`os.uname`).
104    :type arch: str
105    :returns: the path to the default QEMU binary or None if one could not
106              be found
107    :rtype: str or None
108    """
109    if arch is None:
110        arch = os.uname()[4]
111    # qemu binary path does not match arch for powerpc, handle it
112    if 'ppc64le' in arch:
113        arch = 'ppc64'
114    qemu_bin_name = bin_prefix + arch
115    qemu_bin_paths = [
116        os.path.join(".", qemu_bin_name),
117        os.path.join(BUILD_DIR, qemu_bin_name),
118        os.path.join(BUILD_DIR, "build", qemu_bin_name),
119    ]
120    for path in qemu_bin_paths:
121        if is_readable_executable_file(path):
122            return path
123    return None
124
125
126def _console_interaction(test, success_message, failure_message,
127                         send_string, keep_sending=False, vm=None):
128    assert not keep_sending or send_string
129    if vm is None:
130        vm = test.vm
131    console = vm.console_file
132    console_logger = logging.getLogger('console')
133    while True:
134        if send_string:
135            vm.console_socket.sendall(send_string.encode())
136            if not keep_sending:
137                send_string = None # send only once
138
139        # Only consume console output if waiting for something
140        if success_message is None and failure_message is None:
141            if send_string is None:
142                break
143            continue
144
145        try:
146            msg = console.readline().decode().strip()
147        except UnicodeDecodeError:
148            msg = None
149        if not msg:
150            continue
151        console_logger.debug(msg)
152        if success_message is None or success_message in msg:
153            break
154        if failure_message and failure_message in msg:
155            console.close()
156            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
157                    (failure_message, success_message)
158            test.fail(fail)
159
160def interrupt_interactive_console_until_pattern(test, success_message,
161                                                failure_message=None,
162                                                interrupt_string='\r'):
163    """
164    Keep sending a string to interrupt a console prompt, while logging the
165    console output. Typical use case is to break a boot loader prompt, such:
166
167        Press a key within 5 seconds to interrupt boot process.
168        5
169        4
170        3
171        2
172        1
173        Booting default image...
174
175    :param test: an Avocado test containing a VM that will have its console
176                 read and probed for a success or failure message
177    :type test: :class:`avocado_qemu.QemuSystemTest`
178    :param success_message: if this message appears, test succeeds
179    :param failure_message: if this message appears, test fails
180    :param interrupt_string: a string to send to the console before trying
181                             to read a new line
182    """
183    _console_interaction(test, success_message, failure_message,
184                         interrupt_string, True)
185
186def wait_for_console_pattern(test, success_message, failure_message=None,
187                             vm=None):
188    """
189    Waits for messages to appear on the console, while logging the content
190
191    :param test: an Avocado test containing a VM that will have its console
192                 read and probed for a success or failure message
193    :type test: :class:`avocado_qemu.QemuSystemTest`
194    :param success_message: if this message appears, test succeeds
195    :param failure_message: if this message appears, test fails
196    """
197    _console_interaction(test, success_message, failure_message, None, vm=vm)
198
199def exec_command(test, command):
200    """
201    Send a command to a console (appending CRLF characters), while logging
202    the content.
203
204    :param test: an Avocado test containing a VM.
205    :type test: :class:`avocado_qemu.QemuSystemTest`
206    :param command: the command to send
207    :type command: str
208    """
209    _console_interaction(test, None, None, command + '\r')
210
211def exec_command_and_wait_for_pattern(test, command,
212                                      success_message, failure_message=None):
213    """
214    Send a command to a console (appending CRLF characters), then wait
215    for success_message to appear on the console, while logging the.
216    content. Mark the test as failed if failure_message is found instead.
217
218    :param test: an Avocado test containing a VM that will have its console
219                 read and probed for a success or failure message
220    :type test: :class:`avocado_qemu.QemuSystemTest`
221    :param command: the command to send
222    :param success_message: if this message appears, test succeeds
223    :param failure_message: if this message appears, test fails
224    """
225    _console_interaction(test, success_message, failure_message, command + '\r')
226
227class QemuBaseTest(avocado.Test):
228
229    # default timeout for all tests, can be overridden
230    timeout = 120
231
232    def _get_unique_tag_val(self, tag_name):
233        """
234        Gets a tag value, if unique for a key
235        """
236        vals = self.tags.get(tag_name, [])
237        if len(vals) == 1:
238            return vals.pop()
239        return None
240
241    def setUp(self, bin_prefix):
242        self.arch = self.params.get('arch',
243                                    default=self._get_unique_tag_val('arch'))
244
245        self.cpu = self.params.get('cpu',
246                                   default=self._get_unique_tag_val('cpu'))
247
248        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
249        self.qemu_bin = self.params.get('qemu_bin',
250                                        default=default_qemu_bin)
251        if self.qemu_bin is None:
252            self.cancel("No QEMU binary defined or found in the build tree")
253
254    def fetch_asset(self, name,
255                    asset_hash, algorithm=None,
256                    locations=None, expire=None,
257                    find_only=False, cancel_on_missing=True):
258        return super().fetch_asset(name,
259                        asset_hash=asset_hash,
260                        algorithm=algorithm,
261                        locations=locations,
262                        expire=expire,
263                        find_only=find_only,
264                        cancel_on_missing=cancel_on_missing)
265
266
267class QemuSystemTest(QemuBaseTest):
268    """Facilitates system emulation tests."""
269
270    def setUp(self):
271        self._vms = {}
272
273        super().setUp('qemu-system-')
274
275        accel_required = self._get_unique_tag_val('accel')
276        if accel_required:
277            self.require_accelerator(accel_required)
278
279        self.machine = self.params.get('machine',
280                                       default=self._get_unique_tag_val('machine'))
281
282    def require_accelerator(self, accelerator):
283        """
284        Requires an accelerator to be available for the test to continue
285
286        It takes into account the currently set qemu binary.
287
288        If the check fails, the test is canceled.  If the check itself
289        for the given accelerator is not available, the test is also
290        canceled.
291
292        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
293        :type accelerator: str
294        """
295        checker = {'tcg': tcg_available,
296                   'kvm': kvm_available}.get(accelerator)
297        if checker is None:
298            self.cancel("Don't know how to check for the presence "
299                        "of accelerator %s" % accelerator)
300        if not checker(qemu_bin=self.qemu_bin):
301            self.cancel("%s accelerator does not seem to be "
302                        "available" % accelerator)
303
304    def require_netdev(self, netdevname):
305        netdevhelp = run_cmd([self.qemu_bin,
306                             '-M', 'none', '-netdev', 'help'])[0];
307        if netdevhelp.find('\n' + netdevname + '\n') < 0:
308            self.cancel('no support for user networking')
309
310    def _new_vm(self, name, *args):
311        self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
312        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
313                         log_dir=self.logdir)
314        self.log.debug('QEMUMachine "%s" created', name)
315        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
316        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
317        if args:
318            vm.add_args(*args)
319        return vm
320
321    def get_qemu_img(self):
322        self.log.debug('Looking for and selecting a qemu-img binary')
323
324        # If qemu-img has been built, use it, otherwise the system wide one
325        # will be used.
326        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
327        if not os.path.exists(qemu_img):
328            qemu_img = find_command('qemu-img', False)
329        if qemu_img is False:
330            self.cancel('Could not find "qemu-img"')
331
332        return qemu_img
333
334    @property
335    def vm(self):
336        return self.get_vm(name='default')
337
338    def get_vm(self, *args, name=None):
339        if not name:
340            name = str(uuid.uuid4())
341        if self._vms.get(name) is None:
342            self._vms[name] = self._new_vm(name, *args)
343            if self.cpu is not None:
344                self._vms[name].add_args('-cpu', self.cpu)
345            if self.machine is not None:
346                self._vms[name].set_machine(self.machine)
347        return self._vms[name]
348
349    def set_vm_arg(self, arg, value):
350        """
351        Set an argument to list of extra arguments to be given to the QEMU
352        binary. If the argument already exists then its value is replaced.
353
354        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
355        :type arg: str
356        :param value: the argument value, such as "host" in "-cpu host"
357        :type value: str
358        """
359        if not arg or not value:
360            return
361        if arg not in self.vm.args:
362            self.vm.args.extend([arg, value])
363        else:
364            idx = self.vm.args.index(arg) + 1
365            if idx < len(self.vm.args):
366                self.vm.args[idx] = value
367            else:
368                self.vm.args.append(value)
369
370    def tearDown(self):
371        for vm in self._vms.values():
372            vm.shutdown()
373        self._sd = None
374        super().tearDown()
375
376
377class LinuxSSHMixIn:
378    """Contains utility methods for interacting with a guest via SSH."""
379
380    def ssh_connect(self, username, credential, credential_is_key=True):
381        self.ssh_logger = logging.getLogger('ssh')
382        res = self.vm.cmd('human-monitor-command',
383                          command_line='info usernet')
384        port = get_info_usernet_hostfwd_port(res)
385        self.assertIsNotNone(port)
386        self.assertGreater(port, 0)
387        self.log.debug('sshd listening on port: %d', port)
388        if credential_is_key:
389            self.ssh_session = ssh.Session('127.0.0.1', port=port,
390                                           user=username, key=credential)
391        else:
392            self.ssh_session = ssh.Session('127.0.0.1', port=port,
393                                           user=username, password=credential)
394        for i in range(10):
395            try:
396                self.ssh_session.connect()
397                return
398            except:
399                time.sleep(i)
400        self.fail('ssh connection timeout')
401
402    def ssh_command(self, command):
403        self.ssh_logger.info(command)
404        result = self.ssh_session.cmd(command)
405        stdout_lines = [line.rstrip() for line
406                        in result.stdout_text.splitlines()]
407        for line in stdout_lines:
408            self.ssh_logger.info(line)
409        stderr_lines = [line.rstrip() for line
410                        in result.stderr_text.splitlines()]
411        for line in stderr_lines:
412            self.ssh_logger.warning(line)
413
414        self.assertEqual(result.exit_status, 0,
415                         f'Guest command failed: {command}')
416        return stdout_lines, stderr_lines
417
418    def ssh_command_output_contains(self, cmd, exp):
419        stdout, _ = self.ssh_command(cmd)
420        for line in stdout:
421            if exp in line:
422                break
423        else:
424            self.fail('"%s" output does not contain "%s"' % (cmd, exp))
425