1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import shutil
14import subprocess
15import sys
16import tempfile
17import time
18import uuid
19
20import avocado
21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage
22from avocado.utils.path import find_command
23
24from qemu.machine import QEMUMachine
25from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
26                        tcg_available)
27
28
29#: The QEMU build root directory.  It may also be the source directory
30#: if building from the source dir, but it's safer to use BUILD_DIR for
31#: that purpose.  Be aware that if this code is moved outside of a source
32#: and build tree, it will not be accurate.
33BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
34
35if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
36    # The link to the avocado tests dir in the source code directory
37    lnk = os.path.dirname(os.path.dirname(__file__))
38    #: The QEMU root source directory
39    SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
40else:
41    SOURCE_DIR = BUILD_DIR
42
43
44def has_cmd(name, args=None):
45    """
46    This function is for use in a @avocado.skipUnless decorator, e.g.:
47
48        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
49        def test_something_that_needs_sudo(self):
50            ...
51    """
52
53    if args is None:
54        args = ('which', name)
55
56    try:
57        _, stderr, exitcode = run_cmd(args)
58    except Exception as e:
59        exitcode = -1
60        stderr = str(e)
61
62    if exitcode != 0:
63        cmd_line = ' '.join(args)
64        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
65        return (False, err)
66    else:
67        return (True, '')
68
69def has_cmds(*cmds):
70    """
71    This function is for use in a @avocado.skipUnless decorator and
72    allows checking for the availability of multiple commands, e.g.:
73
74        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
75                              'cmd2', 'cmd3'))
76        def test_something_that_needs_cmd1_and_cmd2(self):
77            ...
78    """
79
80    for cmd in cmds:
81        if isinstance(cmd, str):
82            cmd = (cmd,)
83
84        ok, errstr = has_cmd(*cmd)
85        if not ok:
86            return (False, errstr)
87
88    return (True, '')
89
90def run_cmd(args):
91    subp = subprocess.Popen(args,
92                            stdout=subprocess.PIPE,
93                            stderr=subprocess.PIPE,
94                            universal_newlines=True)
95    stdout, stderr = subp.communicate()
96    ret = subp.returncode
97
98    return (stdout, stderr, ret)
99
100def is_readable_executable_file(path):
101    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
102
103
104def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
105    """
106    Picks the path of a QEMU binary, starting either in the current working
107    directory or in the source tree root directory.
108
109    :param arch: the arch to use when looking for a QEMU binary (the target
110                 will match the arch given).  If None (the default), arch
111                 will be the current host system arch (as given by
112                 :func:`os.uname`).
113    :type arch: str
114    :returns: the path to the default QEMU binary or None if one could not
115              be found
116    :rtype: str or None
117    """
118    if arch is None:
119        arch = os.uname()[4]
120    # qemu binary path does not match arch for powerpc, handle it
121    if 'ppc64le' in arch:
122        arch = 'ppc64'
123    qemu_bin_name = bin_prefix + arch
124    qemu_bin_paths = [
125        os.path.join(".", qemu_bin_name),
126        os.path.join(BUILD_DIR, qemu_bin_name),
127        os.path.join(BUILD_DIR, "build", qemu_bin_name),
128    ]
129    for path in qemu_bin_paths:
130        if is_readable_executable_file(path):
131            return path
132    return None
133
134
135def _console_interaction(test, success_message, failure_message,
136                         send_string, keep_sending=False, vm=None):
137    assert not keep_sending or send_string
138    if vm is None:
139        vm = test.vm
140    console = vm.console_file
141    console_logger = logging.getLogger('console')
142    while True:
143        if send_string:
144            vm.console_socket.sendall(send_string.encode())
145            if not keep_sending:
146                send_string = None # send only once
147        try:
148            msg = console.readline().decode().strip()
149        except UnicodeDecodeError:
150            msg = None
151        if not msg:
152            continue
153        console_logger.debug(msg)
154        if success_message is None or success_message in msg:
155            break
156        if failure_message and failure_message in msg:
157            console.close()
158            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
159                    (failure_message, success_message)
160            test.fail(fail)
161
162def interrupt_interactive_console_until_pattern(test, success_message,
163                                                failure_message=None,
164                                                interrupt_string='\r'):
165    """
166    Keep sending a string to interrupt a console prompt, while logging the
167    console output. Typical use case is to break a boot loader prompt, such:
168
169        Press a key within 5 seconds to interrupt boot process.
170        5
171        4
172        3
173        2
174        1
175        Booting default image...
176
177    :param test: an Avocado test containing a VM that will have its console
178                 read and probed for a success or failure message
179    :type test: :class:`avocado_qemu.QemuSystemTest`
180    :param success_message: if this message appears, test succeeds
181    :param failure_message: if this message appears, test fails
182    :param interrupt_string: a string to send to the console before trying
183                             to read a new line
184    """
185    _console_interaction(test, success_message, failure_message,
186                         interrupt_string, True)
187
188def wait_for_console_pattern(test, success_message, failure_message=None,
189                             vm=None):
190    """
191    Waits for messages to appear on the console, while logging the content
192
193    :param test: an Avocado test containing a VM that will have its console
194                 read and probed for a success or failure message
195    :type test: :class:`avocado_qemu.QemuSystemTest`
196    :param success_message: if this message appears, test succeeds
197    :param failure_message: if this message appears, test fails
198    """
199    _console_interaction(test, success_message, failure_message, None, vm=vm)
200
201def exec_command(test, command):
202    """
203    Send a command to a console (appending CRLF characters), while logging
204    the content.
205
206    :param test: an Avocado test containing a VM.
207    :type test: :class:`avocado_qemu.QemuSystemTest`
208    :param command: the command to send
209    :type command: str
210    """
211    _console_interaction(test, None, None, command + '\r')
212
213def exec_command_and_wait_for_pattern(test, command,
214                                      success_message, failure_message=None):
215    """
216    Send a command to a console (appending CRLF characters), then wait
217    for success_message to appear on the console, while logging the.
218    content. Mark the test as failed if failure_message is found instead.
219
220    :param test: an Avocado test containing a VM that will have its console
221                 read and probed for a success or failure message
222    :type test: :class:`avocado_qemu.QemuSystemTest`
223    :param command: the command to send
224    :param success_message: if this message appears, test succeeds
225    :param failure_message: if this message appears, test fails
226    """
227    _console_interaction(test, success_message, failure_message, command + '\r')
228
229class QemuBaseTest(avocado.Test):
230
231    # default timeout for all tests, can be overridden
232    timeout = 120
233
234    def _get_unique_tag_val(self, tag_name):
235        """
236        Gets a tag value, if unique for a key
237        """
238        vals = self.tags.get(tag_name, [])
239        if len(vals) == 1:
240            return vals.pop()
241        return None
242
243    def setUp(self, bin_prefix):
244        self.arch = self.params.get('arch',
245                                    default=self._get_unique_tag_val('arch'))
246
247        self.cpu = self.params.get('cpu',
248                                   default=self._get_unique_tag_val('cpu'))
249
250        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
251        self.qemu_bin = self.params.get('qemu_bin',
252                                        default=default_qemu_bin)
253        if self.qemu_bin is None:
254            self.cancel("No QEMU binary defined or found in the build tree")
255
256    def fetch_asset(self, name,
257                    asset_hash, algorithm=None,
258                    locations=None, expire=None,
259                    find_only=False, cancel_on_missing=True):
260        return super().fetch_asset(name,
261                        asset_hash=asset_hash,
262                        algorithm=algorithm,
263                        locations=locations,
264                        expire=expire,
265                        find_only=find_only,
266                        cancel_on_missing=cancel_on_missing)
267
268
269class QemuSystemTest(QemuBaseTest):
270    """Facilitates system emulation tests."""
271
272    def setUp(self):
273        self._vms = {}
274
275        super().setUp('qemu-system-')
276
277        accel_required = self._get_unique_tag_val('accel')
278        if accel_required:
279            self.require_accelerator(accel_required)
280
281        self.machine = self.params.get('machine',
282                                       default=self._get_unique_tag_val('machine'))
283
284    def require_accelerator(self, accelerator):
285        """
286        Requires an accelerator to be available for the test to continue
287
288        It takes into account the currently set qemu binary.
289
290        If the check fails, the test is canceled.  If the check itself
291        for the given accelerator is not available, the test is also
292        canceled.
293
294        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
295        :type accelerator: str
296        """
297        checker = {'tcg': tcg_available,
298                   'kvm': kvm_available}.get(accelerator)
299        if checker is None:
300            self.cancel("Don't know how to check for the presence "
301                        "of accelerator %s" % accelerator)
302        if not checker(qemu_bin=self.qemu_bin):
303            self.cancel("%s accelerator does not seem to be "
304                        "available" % accelerator)
305
306    def require_netdev(self, netdevname):
307        netdevhelp = run_cmd([self.qemu_bin,
308                             '-M', 'none', '-netdev', 'help'])[0];
309        if netdevhelp.find('\n' + netdevname + '\n') < 0:
310            self.cancel('no support for user networking')
311
312    def require_multiprocess(self):
313        """
314        Test for the presence of the x-pci-proxy-dev which is required
315        to support multiprocess.
316        """
317        devhelp = run_cmd([self.qemu_bin,
318                           '-M', 'none', '-device', 'help'])[0];
319        if devhelp.find('x-pci-proxy-dev') < 0:
320            self.cancel('no support for multiprocess device emulation')
321
322    def _new_vm(self, name, *args):
323        self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
324        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
325                         log_dir=self.logdir)
326        self.log.debug('QEMUMachine "%s" created', name)
327        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
328        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
329        if args:
330            vm.add_args(*args)
331        return vm
332
333    def get_qemu_img(self):
334        self.log.debug('Looking for and selecting a qemu-img binary')
335
336        # If qemu-img has been built, use it, otherwise the system wide one
337        # will be used.
338        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
339        if not os.path.exists(qemu_img):
340            qemu_img = find_command('qemu-img', False)
341        if qemu_img is False:
342            self.cancel('Could not find "qemu-img"')
343
344        return qemu_img
345
346    @property
347    def vm(self):
348        return self.get_vm(name='default')
349
350    def get_vm(self, *args, name=None):
351        if not name:
352            name = str(uuid.uuid4())
353        if self._vms.get(name) is None:
354            self._vms[name] = self._new_vm(name, *args)
355            if self.cpu is not None:
356                self._vms[name].add_args('-cpu', self.cpu)
357            if self.machine is not None:
358                self._vms[name].set_machine(self.machine)
359        return self._vms[name]
360
361    def set_vm_arg(self, arg, value):
362        """
363        Set an argument to list of extra arguments to be given to the QEMU
364        binary. If the argument already exists then its value is replaced.
365
366        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
367        :type arg: str
368        :param value: the argument value, such as "host" in "-cpu host"
369        :type value: str
370        """
371        if not arg or not value:
372            return
373        if arg not in self.vm.args:
374            self.vm.args.extend([arg, value])
375        else:
376            idx = self.vm.args.index(arg) + 1
377            if idx < len(self.vm.args):
378                self.vm.args[idx] = value
379            else:
380                self.vm.args.append(value)
381
382    def tearDown(self):
383        for vm in self._vms.values():
384            vm.shutdown()
385        self._sd = None
386        super().tearDown()
387
388
389class QemuUserTest(QemuBaseTest):
390    """Facilitates user-mode emulation tests."""
391
392    def setUp(self):
393        self._ldpath = []
394        super().setUp('qemu-')
395
396    def add_ldpath(self, ldpath):
397        self._ldpath.append(os.path.abspath(ldpath))
398
399    def run(self, bin_path, args=[]):
400        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
401        bin_args = " ".join(args)
402        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
403                                            bin_path, bin_args))
404
405
406class LinuxSSHMixIn:
407    """Contains utility methods for interacting with a guest via SSH."""
408
409    def ssh_connect(self, username, credential, credential_is_key=True):
410        self.ssh_logger = logging.getLogger('ssh')
411        res = self.vm.cmd('human-monitor-command',
412                          command_line='info usernet')
413        port = get_info_usernet_hostfwd_port(res)
414        self.assertIsNotNone(port)
415        self.assertGreater(port, 0)
416        self.log.debug('sshd listening on port: %d', port)
417        if credential_is_key:
418            self.ssh_session = ssh.Session('127.0.0.1', port=port,
419                                           user=username, key=credential)
420        else:
421            self.ssh_session = ssh.Session('127.0.0.1', port=port,
422                                           user=username, password=credential)
423        for i in range(10):
424            try:
425                self.ssh_session.connect()
426                return
427            except:
428                time.sleep(i)
429        self.fail('ssh connection timeout')
430
431    def ssh_command(self, command):
432        self.ssh_logger.info(command)
433        result = self.ssh_session.cmd(command)
434        stdout_lines = [line.rstrip() for line
435                        in result.stdout_text.splitlines()]
436        for line in stdout_lines:
437            self.ssh_logger.info(line)
438        stderr_lines = [line.rstrip() for line
439                        in result.stderr_text.splitlines()]
440        for line in stderr_lines:
441            self.ssh_logger.warning(line)
442
443        self.assertEqual(result.exit_status, 0,
444                         f'Guest command failed: {command}')
445        return stdout_lines, stderr_lines
446
447    def ssh_command_output_contains(self, cmd, exp):
448        stdout, _ = self.ssh_command(cmd)
449        for line in stdout:
450            if exp in line:
451                break
452        else:
453            self.fail('"%s" output does not contain "%s"' % (cmd, exp))
454
455class LinuxDistro:
456    """Represents a Linux distribution
457
458    Holds information of known distros.
459    """
460    #: A collection of known distros and their respective image checksum
461    KNOWN_DISTROS = {
462        'fedora': {
463            '31': {
464                'x86_64':
465                {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
466                              '8a309c2d46215d8fc026954f3c5c27a0'),
467                 'pxeboot_url': ('https://archives.fedoraproject.org/'
468                                 'pub/archive/fedora/linux/releases/31/'
469                                 'Everything/x86_64/os/images/pxeboot/'),
470                 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
471                                   '08a96687f73c ro no_timer_check '
472                                   'net.ifnames=0 console=tty1 '
473                                   'console=ttyS0,115200n8'),
474                },
475                'aarch64':
476                {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
477                              'd2af0ad0329383d5639c997fdf16fe49'),
478                'pxeboot_url': 'https://archives.fedoraproject.org/'
479                               'pub/archive/fedora/linux/releases/31/'
480                               'Everything/aarch64/os/images/pxeboot/',
481                'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
482                                  '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
483                                  ' ignore_loglevel no_timer_check'
484                                  ' printk.time=1 rd_NO_PLYMOUTH'
485                                  ' console=ttyAMA0'),
486                },
487                'ppc64':
488                {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
489                              '3f991c506f2cc390dc4efa2026ad2f58')},
490                's390x':
491                {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
492                              '1e354f834d355069ca982fdcaf5a122d')},
493            },
494            '32': {
495                'aarch64':
496                {'checksum': ('b367755c664a2d7a26955bbfff985855'
497                              'adfa2ca15e908baf15b4b176d68d3967'),
498                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
499                                'releases/32/Server/aarch64/os/images/'
500                                'pxeboot/'),
501                'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
502                                  '14d95c0e90c5 ro no_timer_check net.ifnames=0'
503                                  ' console=tty1 console=ttyS0,115200n8'),
504                },
505            },
506            '33': {
507                'aarch64':
508                {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
509                              'a81f386a17f969c1d1c7c87031008a6b'),
510                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
511                                'releases/33/Server/aarch64/os/images/'
512                                'pxeboot/'),
513                'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
514                                  '1126a0208f8a ro no_timer_check net.ifnames=0'
515                                  ' console=tty1 console=ttyS0,115200n8'
516                                  ' console=tty0'),
517                 },
518            },
519        }
520    }
521
522    def __init__(self, name, version, arch):
523        self.name = name
524        self.version = version
525        self.arch = arch
526        try:
527            info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
528        except AttributeError:
529            # Unknown distro
530            info = None
531        self._info = info or {}
532
533    @property
534    def checksum(self):
535        """Gets the cloud-image file checksum"""
536        return self._info.get('checksum', None)
537
538    @checksum.setter
539    def checksum(self, value):
540        self._info['checksum'] = value
541
542    @property
543    def pxeboot_url(self):
544        """Gets the repository url where pxeboot files can be found"""
545        return self._info.get('pxeboot_url', None)
546
547    @property
548    def default_kernel_params(self):
549        """Gets the default kernel parameters"""
550        return self._info.get('kernel_params', None)
551
552
553class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
554    """Facilitates having a cloud-image Linux based available.
555
556    For tests that intend to interact with guests, this is a better choice
557    to start with than the more vanilla `QemuSystemTest` class.
558    """
559
560    distro = None
561    username = 'root'
562    password = 'password'
563    smp = '2'
564    memory = '1024'
565
566    def _set_distro(self):
567        distro_name = self.params.get(
568            'distro',
569            default=self._get_unique_tag_val('distro'))
570        if not distro_name:
571            distro_name = 'fedora'
572
573        distro_version = self.params.get(
574            'distro_version',
575            default=self._get_unique_tag_val('distro_version'))
576        if not distro_version:
577            distro_version = '31'
578
579        self.distro = LinuxDistro(distro_name, distro_version, self.arch)
580
581        # The distro checksum behaves differently than distro name and
582        # version. First, it does not respect a tag with the same
583        # name, given that it's not expected to be used for filtering
584        # (distro name versions are the natural choice).  Second, the
585        # order of precedence is: parameter, attribute and then value
586        # from KNOWN_DISTROS.
587        distro_checksum = self.params.get('distro_checksum',
588                                          default=None)
589        if distro_checksum:
590            self.distro.checksum = distro_checksum
591
592    def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
593        super().setUp()
594        self.require_netdev('user')
595        self._set_distro()
596        self.vm.add_args('-smp', self.smp)
597        self.vm.add_args('-m', self.memory)
598        # The following network device allows for SSH connections
599        self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
600                         '-device', '%s,netdev=vnet' % network_device_type)
601        self.set_up_boot()
602        if ssh_pubkey is None:
603            ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
604        self.set_up_cloudinit(ssh_pubkey)
605
606    def set_up_existing_ssh_keys(self):
607        ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
608        source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
609        ssh_dir = os.path.join(self.workdir, '.ssh')
610        os.mkdir(ssh_dir, mode=0o700)
611        ssh_private_key = os.path.join(ssh_dir,
612                                       os.path.basename(source_private_key))
613        shutil.copyfile(source_private_key, ssh_private_key)
614        os.chmod(ssh_private_key, 0o600)
615        return (ssh_public_key, ssh_private_key)
616
617    def download_boot(self):
618        # Set the qemu-img binary.
619        # If none is available, the test will cancel.
620        vmimage.QEMU_IMG = super().get_qemu_img()
621
622        self.log.info('Downloading/preparing boot image')
623        # Fedora 31 only provides ppc64le images
624        image_arch = self.arch
625        if self.distro.name == 'fedora':
626            if image_arch == 'ppc64':
627                image_arch = 'ppc64le'
628
629        try:
630            boot = vmimage.get(
631                self.distro.name, arch=image_arch, version=self.distro.version,
632                checksum=self.distro.checksum,
633                algorithm='sha256',
634                cache_dir=self.cache_dirs[0],
635                snapshot_dir=self.workdir)
636        except:
637            self.cancel('Failed to download/prepare boot image')
638        return boot.path
639
640    def prepare_cloudinit(self, ssh_pubkey=None):
641        self.log.info('Preparing cloudinit image')
642        try:
643            cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
644            pubkey_content = None
645            if ssh_pubkey:
646                with open(ssh_pubkey) as pubkey:
647                    pubkey_content = pubkey.read()
648            cloudinit.iso(cloudinit_iso, self.name,
649                          username=self.username,
650                          password=self.password,
651                          # QEMU's hard coded usermode router address
652                          phone_home_host='10.0.2.2',
653                          phone_home_port=self.phone_server.server_port,
654                          authorized_key=pubkey_content)
655        except Exception:
656            self.cancel('Failed to prepare the cloudinit image')
657        return cloudinit_iso
658
659    def set_up_boot(self):
660        path = self.download_boot()
661        self.vm.add_args('-drive', 'file=%s' % path)
662
663    def set_up_cloudinit(self, ssh_pubkey=None):
664        self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
665                                                      self.name)
666        cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
667        self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
668
669    def launch_and_wait(self, set_up_ssh_connection=True):
670        self.vm.set_console()
671        self.vm.launch()
672        console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
673                                                 logger=self.log.getChild('console'))
674        console_drainer.start()
675        self.log.info('VM launched, waiting for boot confirmation from guest')
676        while not self.phone_server.instance_phoned_back:
677            self.phone_server.handle_request()
678
679        if set_up_ssh_connection:
680            self.log.info('Setting up the SSH connection')
681            self.ssh_connect(self.username, self.ssh_key)
682