1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import subprocess
14import sys
15import tempfile
16import time
17import uuid
18
19import avocado
20from avocado.utils import ssh
21from avocado.utils.path import find_command
22
23from qemu.machine import QEMUMachine
24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
25                        tcg_available)
26
27
28#: The QEMU build root directory.  It may also be the source directory
29#: if building from the source dir, but it's safer to use BUILD_DIR for
30#: that purpose.  Be aware that if this code is moved outside of a source
31#: and build tree, it will not be accurate.
32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
33
34
35def has_cmd(name, args=None):
36    """
37    This function is for use in a @avocado.skipUnless decorator, e.g.:
38
39        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
40        def test_something_that_needs_sudo(self):
41            ...
42    """
43
44    if args is None:
45        args = ('which', name)
46
47    try:
48        _, stderr, exitcode = run_cmd(args)
49    except Exception as e:
50        exitcode = -1
51        stderr = str(e)
52
53    if exitcode != 0:
54        cmd_line = ' '.join(args)
55        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
56        return (False, err)
57    else:
58        return (True, '')
59
60def has_cmds(*cmds):
61    """
62    This function is for use in a @avocado.skipUnless decorator and
63    allows checking for the availability of multiple commands, e.g.:
64
65        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
66                              'cmd2', 'cmd3'))
67        def test_something_that_needs_cmd1_and_cmd2(self):
68            ...
69    """
70
71    for cmd in cmds:
72        if isinstance(cmd, str):
73            cmd = (cmd,)
74
75        ok, errstr = has_cmd(*cmd)
76        if not ok:
77            return (False, errstr)
78
79    return (True, '')
80
81def run_cmd(args):
82    subp = subprocess.Popen(args,
83                            stdout=subprocess.PIPE,
84                            stderr=subprocess.PIPE,
85                            universal_newlines=True)
86    stdout, stderr = subp.communicate()
87    ret = subp.returncode
88
89    return (stdout, stderr, ret)
90
91def is_readable_executable_file(path):
92    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
93
94
95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
96    """
97    Picks the path of a QEMU binary, starting either in the current working
98    directory or in the source tree root directory.
99
100    :param arch: the arch to use when looking for a QEMU binary (the target
101                 will match the arch given).  If None (the default), arch
102                 will be the current host system arch (as given by
103                 :func:`os.uname`).
104    :type arch: str
105    :returns: the path to the default QEMU binary or None if one could not
106              be found
107    :rtype: str or None
108    """
109    if arch is None:
110        arch = os.uname()[4]
111    # qemu binary path does not match arch for powerpc, handle it
112    if 'ppc64le' in arch:
113        arch = 'ppc64'
114    qemu_bin_name = bin_prefix + arch
115    qemu_bin_paths = [
116        os.path.join(".", qemu_bin_name),
117        os.path.join(BUILD_DIR, qemu_bin_name),
118        os.path.join(BUILD_DIR, "build", qemu_bin_name),
119    ]
120    for path in qemu_bin_paths:
121        if is_readable_executable_file(path):
122            return path
123    return None
124
125
126def _console_interaction(test, success_message, failure_message,
127                         send_string, keep_sending=False, vm=None):
128    assert not keep_sending or send_string
129    if vm is None:
130        vm = test.vm
131    console = vm.console_file
132    console_logger = logging.getLogger('console')
133    while True:
134        if send_string:
135            vm.console_socket.sendall(send_string.encode())
136            if not keep_sending:
137                send_string = None # send only once
138
139        # Only consume console output if waiting for something
140        if success_message is None and failure_message is None:
141            if send_string is None:
142                break
143            continue
144
145        try:
146            msg = console.readline().decode().strip()
147        except UnicodeDecodeError:
148            msg = None
149        if not msg:
150            continue
151        console_logger.debug(msg)
152        if success_message is None or success_message in msg:
153            break
154        if failure_message and failure_message in msg:
155            console.close()
156            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
157                    (failure_message, success_message)
158            test.fail(fail)
159
160def interrupt_interactive_console_until_pattern(test, success_message,
161                                                failure_message=None,
162                                                interrupt_string='\r'):
163    """
164    Keep sending a string to interrupt a console prompt, while logging the
165    console output. Typical use case is to break a boot loader prompt, such:
166
167        Press a key within 5 seconds to interrupt boot process.
168        5
169        4
170        3
171        2
172        1
173        Booting default image...
174
175    :param test: an Avocado test containing a VM that will have its console
176                 read and probed for a success or failure message
177    :type test: :class:`avocado_qemu.QemuSystemTest`
178    :param success_message: if this message appears, test succeeds
179    :param failure_message: if this message appears, test fails
180    :param interrupt_string: a string to send to the console before trying
181                             to read a new line
182    """
183    _console_interaction(test, success_message, failure_message,
184                         interrupt_string, True)
185
186def wait_for_console_pattern(test, success_message, failure_message=None,
187                             vm=None):
188    """
189    Waits for messages to appear on the console, while logging the content
190
191    :param test: an Avocado test containing a VM that will have its console
192                 read and probed for a success or failure message
193    :type test: :class:`avocado_qemu.QemuSystemTest`
194    :param success_message: if this message appears, test succeeds
195    :param failure_message: if this message appears, test fails
196    """
197    _console_interaction(test, success_message, failure_message, None, vm=vm)
198
199def exec_command(test, command):
200    """
201    Send a command to a console (appending CRLF characters), while logging
202    the content.
203
204    :param test: an Avocado test containing a VM.
205    :type test: :class:`avocado_qemu.QemuSystemTest`
206    :param command: the command to send
207    :type command: str
208    """
209    _console_interaction(test, None, None, command + '\r')
210
211def exec_command_and_wait_for_pattern(test, command,
212                                      success_message, failure_message=None):
213    """
214    Send a command to a console (appending CRLF characters), then wait
215    for success_message to appear on the console, while logging the.
216    content. Mark the test as failed if failure_message is found instead.
217
218    :param test: an Avocado test containing a VM that will have its console
219                 read and probed for a success or failure message
220    :type test: :class:`avocado_qemu.QemuSystemTest`
221    :param command: the command to send
222    :param success_message: if this message appears, test succeeds
223    :param failure_message: if this message appears, test fails
224    """
225    _console_interaction(test, success_message, failure_message, command + '\r')
226
227class QemuBaseTest(avocado.Test):
228
229    # default timeout for all tests, can be overridden
230    timeout = 120
231
232    def _get_unique_tag_val(self, tag_name):
233        """
234        Gets a tag value, if unique for a key
235        """
236        vals = self.tags.get(tag_name, [])
237        if len(vals) == 1:
238            return vals.pop()
239        return None
240
241    def setUp(self, bin_prefix):
242        self.arch = self.params.get('arch',
243                                    default=self._get_unique_tag_val('arch'))
244
245        self.cpu = self.params.get('cpu',
246                                   default=self._get_unique_tag_val('cpu'))
247
248        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
249        self.qemu_bin = self.params.get('qemu_bin',
250                                        default=default_qemu_bin)
251        if self.qemu_bin is None:
252            self.cancel("No QEMU binary defined or found in the build tree")
253
254    def fetch_asset(self, name,
255                    asset_hash, algorithm=None,
256                    locations=None, expire=None,
257                    find_only=False, cancel_on_missing=True):
258        return super().fetch_asset(name,
259                        asset_hash=asset_hash,
260                        algorithm=algorithm,
261                        locations=locations,
262                        expire=expire,
263                        find_only=find_only,
264                        cancel_on_missing=cancel_on_missing)
265
266
267class QemuSystemTest(QemuBaseTest):
268    """Facilitates system emulation tests."""
269
270    def setUp(self):
271        self._vms = {}
272
273        super().setUp('qemu-system-')
274
275        accel_required = self._get_unique_tag_val('accel')
276        if accel_required:
277            self.require_accelerator(accel_required)
278
279        self.machine = self.params.get('machine',
280                                       default=self._get_unique_tag_val('machine'))
281
282    def require_accelerator(self, accelerator):
283        """
284        Requires an accelerator to be available for the test to continue
285
286        It takes into account the currently set qemu binary.
287
288        If the check fails, the test is canceled.  If the check itself
289        for the given accelerator is not available, the test is also
290        canceled.
291
292        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
293        :type accelerator: str
294        """
295        checker = {'tcg': tcg_available,
296                   'kvm': kvm_available}.get(accelerator)
297        if checker is None:
298            self.cancel("Don't know how to check for the presence "
299                        "of accelerator %s" % accelerator)
300        if not checker(qemu_bin=self.qemu_bin):
301            self.cancel("%s accelerator does not seem to be "
302                        "available" % accelerator)
303
304    def require_netdev(self, netdevname):
305        netdevhelp = run_cmd([self.qemu_bin,
306                             '-M', 'none', '-netdev', 'help'])[0];
307        if netdevhelp.find('\n' + netdevname + '\n') < 0:
308            self.cancel('no support for user networking')
309
310    def require_multiprocess(self):
311        """
312        Test for the presence of the x-pci-proxy-dev which is required
313        to support multiprocess.
314        """
315        devhelp = run_cmd([self.qemu_bin,
316                           '-M', 'none', '-device', 'help'])[0];
317        if devhelp.find('x-pci-proxy-dev') < 0:
318            self.cancel('no support for multiprocess device emulation')
319
320    def _new_vm(self, name, *args):
321        self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
322        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
323                         log_dir=self.logdir)
324        self.log.debug('QEMUMachine "%s" created', name)
325        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
326        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
327        if args:
328            vm.add_args(*args)
329        return vm
330
331    def get_qemu_img(self):
332        self.log.debug('Looking for and selecting a qemu-img binary')
333
334        # If qemu-img has been built, use it, otherwise the system wide one
335        # will be used.
336        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
337        if not os.path.exists(qemu_img):
338            qemu_img = find_command('qemu-img', False)
339        if qemu_img is False:
340            self.cancel('Could not find "qemu-img"')
341
342        return qemu_img
343
344    @property
345    def vm(self):
346        return self.get_vm(name='default')
347
348    def get_vm(self, *args, name=None):
349        if not name:
350            name = str(uuid.uuid4())
351        if self._vms.get(name) is None:
352            self._vms[name] = self._new_vm(name, *args)
353            if self.cpu is not None:
354                self._vms[name].add_args('-cpu', self.cpu)
355            if self.machine is not None:
356                self._vms[name].set_machine(self.machine)
357        return self._vms[name]
358
359    def set_vm_arg(self, arg, value):
360        """
361        Set an argument to list of extra arguments to be given to the QEMU
362        binary. If the argument already exists then its value is replaced.
363
364        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
365        :type arg: str
366        :param value: the argument value, such as "host" in "-cpu host"
367        :type value: str
368        """
369        if not arg or not value:
370            return
371        if arg not in self.vm.args:
372            self.vm.args.extend([arg, value])
373        else:
374            idx = self.vm.args.index(arg) + 1
375            if idx < len(self.vm.args):
376                self.vm.args[idx] = value
377            else:
378                self.vm.args.append(value)
379
380    def tearDown(self):
381        for vm in self._vms.values():
382            vm.shutdown()
383        self._sd = None
384        super().tearDown()
385
386
387class QemuUserTest(QemuBaseTest):
388    """Facilitates user-mode emulation tests."""
389
390    def setUp(self):
391        self._ldpath = []
392        super().setUp('qemu-')
393
394    def add_ldpath(self, ldpath):
395        self._ldpath.append(os.path.abspath(ldpath))
396
397    def run(self, bin_path, args=[]):
398        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
399        bin_args = " ".join(args)
400        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
401                                            bin_path, bin_args))
402
403
404class LinuxSSHMixIn:
405    """Contains utility methods for interacting with a guest via SSH."""
406
407    def ssh_connect(self, username, credential, credential_is_key=True):
408        self.ssh_logger = logging.getLogger('ssh')
409        res = self.vm.cmd('human-monitor-command',
410                          command_line='info usernet')
411        port = get_info_usernet_hostfwd_port(res)
412        self.assertIsNotNone(port)
413        self.assertGreater(port, 0)
414        self.log.debug('sshd listening on port: %d', port)
415        if credential_is_key:
416            self.ssh_session = ssh.Session('127.0.0.1', port=port,
417                                           user=username, key=credential)
418        else:
419            self.ssh_session = ssh.Session('127.0.0.1', port=port,
420                                           user=username, password=credential)
421        for i in range(10):
422            try:
423                self.ssh_session.connect()
424                return
425            except:
426                time.sleep(i)
427        self.fail('ssh connection timeout')
428
429    def ssh_command(self, command):
430        self.ssh_logger.info(command)
431        result = self.ssh_session.cmd(command)
432        stdout_lines = [line.rstrip() for line
433                        in result.stdout_text.splitlines()]
434        for line in stdout_lines:
435            self.ssh_logger.info(line)
436        stderr_lines = [line.rstrip() for line
437                        in result.stderr_text.splitlines()]
438        for line in stderr_lines:
439            self.ssh_logger.warning(line)
440
441        self.assertEqual(result.exit_status, 0,
442                         f'Guest command failed: {command}')
443        return stdout_lines, stderr_lines
444
445    def ssh_command_output_contains(self, cmd, exp):
446        stdout, _ = self.ssh_command(cmd)
447        for line in stdout:
448            if exp in line:
449                break
450        else:
451            self.fail('"%s" output does not contain "%s"' % (cmd, exp))
452