1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import subprocess
14import sys
15import tempfile
16import time
17import uuid
18
19import avocado
20from avocado.utils import ssh
21from avocado.utils.path import find_command
22
23from qemu.machine import QEMUMachine
24from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
25                        tcg_available)
26
27
28#: The QEMU build root directory.  It may also be the source directory
29#: if building from the source dir, but it's safer to use BUILD_DIR for
30#: that purpose.  Be aware that if this code is moved outside of a source
31#: and build tree, it will not be accurate.
32BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
33
34
35def has_cmd(name, args=None):
36    """
37    This function is for use in a @avocado.skipUnless decorator, e.g.:
38
39        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
40        def test_something_that_needs_sudo(self):
41            ...
42    """
43
44    if args is None:
45        args = ('which', name)
46
47    try:
48        _, stderr, exitcode = run_cmd(args)
49    except Exception as e:
50        exitcode = -1
51        stderr = str(e)
52
53    if exitcode != 0:
54        cmd_line = ' '.join(args)
55        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
56        return (False, err)
57    else:
58        return (True, '')
59
60def has_cmds(*cmds):
61    """
62    This function is for use in a @avocado.skipUnless decorator and
63    allows checking for the availability of multiple commands, e.g.:
64
65        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
66                              'cmd2', 'cmd3'))
67        def test_something_that_needs_cmd1_and_cmd2(self):
68            ...
69    """
70
71    for cmd in cmds:
72        if isinstance(cmd, str):
73            cmd = (cmd,)
74
75        ok, errstr = has_cmd(*cmd)
76        if not ok:
77            return (False, errstr)
78
79    return (True, '')
80
81def run_cmd(args):
82    subp = subprocess.Popen(args,
83                            stdout=subprocess.PIPE,
84                            stderr=subprocess.PIPE,
85                            universal_newlines=True)
86    stdout, stderr = subp.communicate()
87    ret = subp.returncode
88
89    return (stdout, stderr, ret)
90
91def is_readable_executable_file(path):
92    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
93
94
95def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
96    """
97    Picks the path of a QEMU binary, starting either in the current working
98    directory or in the source tree root directory.
99
100    :param arch: the arch to use when looking for a QEMU binary (the target
101                 will match the arch given).  If None (the default), arch
102                 will be the current host system arch (as given by
103                 :func:`os.uname`).
104    :type arch: str
105    :returns: the path to the default QEMU binary or None if one could not
106              be found
107    :rtype: str or None
108    """
109    if arch is None:
110        arch = os.uname()[4]
111    # qemu binary path does not match arch for powerpc, handle it
112    if 'ppc64le' in arch:
113        arch = 'ppc64'
114    qemu_bin_name = bin_prefix + arch
115    qemu_bin_paths = [
116        os.path.join(".", qemu_bin_name),
117        os.path.join(BUILD_DIR, qemu_bin_name),
118        os.path.join(BUILD_DIR, "build", qemu_bin_name),
119    ]
120    for path in qemu_bin_paths:
121        if is_readable_executable_file(path):
122            return path
123    return None
124
125
126def _console_interaction(test, success_message, failure_message,
127                         send_string, keep_sending=False, vm=None):
128    assert not keep_sending or send_string
129    if vm is None:
130        vm = test.vm
131    console = vm.console_file
132    console_logger = logging.getLogger('console')
133    while True:
134        if send_string:
135            vm.console_socket.sendall(send_string.encode())
136            if not keep_sending:
137                send_string = None # send only once
138        try:
139            msg = console.readline().decode().strip()
140        except UnicodeDecodeError:
141            msg = None
142        if not msg:
143            continue
144        console_logger.debug(msg)
145        if success_message is None or success_message in msg:
146            break
147        if failure_message and failure_message in msg:
148            console.close()
149            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
150                    (failure_message, success_message)
151            test.fail(fail)
152
153def interrupt_interactive_console_until_pattern(test, success_message,
154                                                failure_message=None,
155                                                interrupt_string='\r'):
156    """
157    Keep sending a string to interrupt a console prompt, while logging the
158    console output. Typical use case is to break a boot loader prompt, such:
159
160        Press a key within 5 seconds to interrupt boot process.
161        5
162        4
163        3
164        2
165        1
166        Booting default image...
167
168    :param test: an Avocado test containing a VM that will have its console
169                 read and probed for a success or failure message
170    :type test: :class:`avocado_qemu.QemuSystemTest`
171    :param success_message: if this message appears, test succeeds
172    :param failure_message: if this message appears, test fails
173    :param interrupt_string: a string to send to the console before trying
174                             to read a new line
175    """
176    _console_interaction(test, success_message, failure_message,
177                         interrupt_string, True)
178
179def wait_for_console_pattern(test, success_message, failure_message=None,
180                             vm=None):
181    """
182    Waits for messages to appear on the console, while logging the content
183
184    :param test: an Avocado test containing a VM that will have its console
185                 read and probed for a success or failure message
186    :type test: :class:`avocado_qemu.QemuSystemTest`
187    :param success_message: if this message appears, test succeeds
188    :param failure_message: if this message appears, test fails
189    """
190    _console_interaction(test, success_message, failure_message, None, vm=vm)
191
192def exec_command(test, command):
193    """
194    Send a command to a console (appending CRLF characters), while logging
195    the content.
196
197    :param test: an Avocado test containing a VM.
198    :type test: :class:`avocado_qemu.QemuSystemTest`
199    :param command: the command to send
200    :type command: str
201    """
202    _console_interaction(test, None, None, command + '\r')
203
204def exec_command_and_wait_for_pattern(test, command,
205                                      success_message, failure_message=None):
206    """
207    Send a command to a console (appending CRLF characters), then wait
208    for success_message to appear on the console, while logging the.
209    content. Mark the test as failed if failure_message is found instead.
210
211    :param test: an Avocado test containing a VM that will have its console
212                 read and probed for a success or failure message
213    :type test: :class:`avocado_qemu.QemuSystemTest`
214    :param command: the command to send
215    :param success_message: if this message appears, test succeeds
216    :param failure_message: if this message appears, test fails
217    """
218    _console_interaction(test, success_message, failure_message, command + '\r')
219
220class QemuBaseTest(avocado.Test):
221
222    # default timeout for all tests, can be overridden
223    timeout = 120
224
225    def _get_unique_tag_val(self, tag_name):
226        """
227        Gets a tag value, if unique for a key
228        """
229        vals = self.tags.get(tag_name, [])
230        if len(vals) == 1:
231            return vals.pop()
232        return None
233
234    def setUp(self, bin_prefix):
235        self.arch = self.params.get('arch',
236                                    default=self._get_unique_tag_val('arch'))
237
238        self.cpu = self.params.get('cpu',
239                                   default=self._get_unique_tag_val('cpu'))
240
241        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
242        self.qemu_bin = self.params.get('qemu_bin',
243                                        default=default_qemu_bin)
244        if self.qemu_bin is None:
245            self.cancel("No QEMU binary defined or found in the build tree")
246
247    def fetch_asset(self, name,
248                    asset_hash, algorithm=None,
249                    locations=None, expire=None,
250                    find_only=False, cancel_on_missing=True):
251        return super().fetch_asset(name,
252                        asset_hash=asset_hash,
253                        algorithm=algorithm,
254                        locations=locations,
255                        expire=expire,
256                        find_only=find_only,
257                        cancel_on_missing=cancel_on_missing)
258
259
260class QemuSystemTest(QemuBaseTest):
261    """Facilitates system emulation tests."""
262
263    def setUp(self):
264        self._vms = {}
265
266        super().setUp('qemu-system-')
267
268        accel_required = self._get_unique_tag_val('accel')
269        if accel_required:
270            self.require_accelerator(accel_required)
271
272        self.machine = self.params.get('machine',
273                                       default=self._get_unique_tag_val('machine'))
274
275    def require_accelerator(self, accelerator):
276        """
277        Requires an accelerator to be available for the test to continue
278
279        It takes into account the currently set qemu binary.
280
281        If the check fails, the test is canceled.  If the check itself
282        for the given accelerator is not available, the test is also
283        canceled.
284
285        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
286        :type accelerator: str
287        """
288        checker = {'tcg': tcg_available,
289                   'kvm': kvm_available}.get(accelerator)
290        if checker is None:
291            self.cancel("Don't know how to check for the presence "
292                        "of accelerator %s" % accelerator)
293        if not checker(qemu_bin=self.qemu_bin):
294            self.cancel("%s accelerator does not seem to be "
295                        "available" % accelerator)
296
297    def require_netdev(self, netdevname):
298        netdevhelp = run_cmd([self.qemu_bin,
299                             '-M', 'none', '-netdev', 'help'])[0];
300        if netdevhelp.find('\n' + netdevname + '\n') < 0:
301            self.cancel('no support for user networking')
302
303    def require_multiprocess(self):
304        """
305        Test for the presence of the x-pci-proxy-dev which is required
306        to support multiprocess.
307        """
308        devhelp = run_cmd([self.qemu_bin,
309                           '-M', 'none', '-device', 'help'])[0];
310        if devhelp.find('x-pci-proxy-dev') < 0:
311            self.cancel('no support for multiprocess device emulation')
312
313    def _new_vm(self, name, *args):
314        self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
315        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
316                         log_dir=self.logdir)
317        self.log.debug('QEMUMachine "%s" created', name)
318        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
319        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
320        if args:
321            vm.add_args(*args)
322        return vm
323
324    def get_qemu_img(self):
325        self.log.debug('Looking for and selecting a qemu-img binary')
326
327        # If qemu-img has been built, use it, otherwise the system wide one
328        # will be used.
329        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
330        if not os.path.exists(qemu_img):
331            qemu_img = find_command('qemu-img', False)
332        if qemu_img is False:
333            self.cancel('Could not find "qemu-img"')
334
335        return qemu_img
336
337    @property
338    def vm(self):
339        return self.get_vm(name='default')
340
341    def get_vm(self, *args, name=None):
342        if not name:
343            name = str(uuid.uuid4())
344        if self._vms.get(name) is None:
345            self._vms[name] = self._new_vm(name, *args)
346            if self.cpu is not None:
347                self._vms[name].add_args('-cpu', self.cpu)
348            if self.machine is not None:
349                self._vms[name].set_machine(self.machine)
350        return self._vms[name]
351
352    def set_vm_arg(self, arg, value):
353        """
354        Set an argument to list of extra arguments to be given to the QEMU
355        binary. If the argument already exists then its value is replaced.
356
357        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
358        :type arg: str
359        :param value: the argument value, such as "host" in "-cpu host"
360        :type value: str
361        """
362        if not arg or not value:
363            return
364        if arg not in self.vm.args:
365            self.vm.args.extend([arg, value])
366        else:
367            idx = self.vm.args.index(arg) + 1
368            if idx < len(self.vm.args):
369                self.vm.args[idx] = value
370            else:
371                self.vm.args.append(value)
372
373    def tearDown(self):
374        for vm in self._vms.values():
375            vm.shutdown()
376        self._sd = None
377        super().tearDown()
378
379
380class QemuUserTest(QemuBaseTest):
381    """Facilitates user-mode emulation tests."""
382
383    def setUp(self):
384        self._ldpath = []
385        super().setUp('qemu-')
386
387    def add_ldpath(self, ldpath):
388        self._ldpath.append(os.path.abspath(ldpath))
389
390    def run(self, bin_path, args=[]):
391        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
392        bin_args = " ".join(args)
393        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
394                                            bin_path, bin_args))
395
396
397class LinuxSSHMixIn:
398    """Contains utility methods for interacting with a guest via SSH."""
399
400    def ssh_connect(self, username, credential, credential_is_key=True):
401        self.ssh_logger = logging.getLogger('ssh')
402        res = self.vm.cmd('human-monitor-command',
403                          command_line='info usernet')
404        port = get_info_usernet_hostfwd_port(res)
405        self.assertIsNotNone(port)
406        self.assertGreater(port, 0)
407        self.log.debug('sshd listening on port: %d', port)
408        if credential_is_key:
409            self.ssh_session = ssh.Session('127.0.0.1', port=port,
410                                           user=username, key=credential)
411        else:
412            self.ssh_session = ssh.Session('127.0.0.1', port=port,
413                                           user=username, password=credential)
414        for i in range(10):
415            try:
416                self.ssh_session.connect()
417                return
418            except:
419                time.sleep(i)
420        self.fail('ssh connection timeout')
421
422    def ssh_command(self, command):
423        self.ssh_logger.info(command)
424        result = self.ssh_session.cmd(command)
425        stdout_lines = [line.rstrip() for line
426                        in result.stdout_text.splitlines()]
427        for line in stdout_lines:
428            self.ssh_logger.info(line)
429        stderr_lines = [line.rstrip() for line
430                        in result.stderr_text.splitlines()]
431        for line in stderr_lines:
432            self.ssh_logger.warning(line)
433
434        self.assertEqual(result.exit_status, 0,
435                         f'Guest command failed: {command}')
436        return stdout_lines, stderr_lines
437
438    def ssh_command_output_contains(self, cmd, exp):
439        stdout, _ = self.ssh_command(cmd)
440        for line in stdout:
441            if exp in line:
442                break
443        else:
444            self.fail('"%s" output does not contain "%s"' % (cmd, exp))
445