1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import shutil
14import subprocess
15import sys
16import tempfile
17import time
18import uuid
19
20import avocado
21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage
22from avocado.utils.path import find_command
23
24from qemu.machine import QEMUMachine
25from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
26                        tcg_available)
27
28
29#: The QEMU build root directory.  It may also be the source directory
30#: if building from the source dir, but it's safer to use BUILD_DIR for
31#: that purpose.  Be aware that if this code is moved outside of a source
32#: and build tree, it will not be accurate.
33BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
34
35if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
36    # The link to the avocado tests dir in the source code directory
37    lnk = os.path.dirname(os.path.dirname(__file__))
38    #: The QEMU root source directory
39    SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
40else:
41    SOURCE_DIR = BUILD_DIR
42
43
44def has_cmd(name, args=None):
45    """
46    This function is for use in a @avocado.skipUnless decorator, e.g.:
47
48        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
49        def test_something_that_needs_sudo(self):
50            ...
51    """
52
53    if args is None:
54        args = ('which', name)
55
56    try:
57        _, stderr, exitcode = run_cmd(args)
58    except Exception as e:
59        exitcode = -1
60        stderr = str(e)
61
62    if exitcode != 0:
63        cmd_line = ' '.join(args)
64        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
65        return (False, err)
66    else:
67        return (True, '')
68
69def has_cmds(*cmds):
70    """
71    This function is for use in a @avocado.skipUnless decorator and
72    allows checking for the availability of multiple commands, e.g.:
73
74        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
75                              'cmd2', 'cmd3'))
76        def test_something_that_needs_cmd1_and_cmd2(self):
77            ...
78    """
79
80    for cmd in cmds:
81        if isinstance(cmd, str):
82            cmd = (cmd,)
83
84        ok, errstr = has_cmd(*cmd)
85        if not ok:
86            return (False, errstr)
87
88    return (True, '')
89
90def run_cmd(args):
91    subp = subprocess.Popen(args,
92                            stdout=subprocess.PIPE,
93                            stderr=subprocess.PIPE,
94                            universal_newlines=True)
95    stdout, stderr = subp.communicate()
96    ret = subp.returncode
97
98    return (stdout, stderr, ret)
99
100def is_readable_executable_file(path):
101    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
102
103
104def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
105    """
106    Picks the path of a QEMU binary, starting either in the current working
107    directory or in the source tree root directory.
108
109    :param arch: the arch to use when looking for a QEMU binary (the target
110                 will match the arch given).  If None (the default), arch
111                 will be the current host system arch (as given by
112                 :func:`os.uname`).
113    :type arch: str
114    :returns: the path to the default QEMU binary or None if one could not
115              be found
116    :rtype: str or None
117    """
118    if arch is None:
119        arch = os.uname()[4]
120    # qemu binary path does not match arch for powerpc, handle it
121    if 'ppc64le' in arch:
122        arch = 'ppc64'
123    qemu_bin_name = bin_prefix + arch
124    qemu_bin_paths = [
125        os.path.join(".", qemu_bin_name),
126        os.path.join(BUILD_DIR, qemu_bin_name),
127        os.path.join(BUILD_DIR, "build", qemu_bin_name),
128    ]
129    for path in qemu_bin_paths:
130        if is_readable_executable_file(path):
131            return path
132    return None
133
134
135def _console_interaction(test, success_message, failure_message,
136                         send_string, keep_sending=False, vm=None):
137    assert not keep_sending or send_string
138    if vm is None:
139        vm = test.vm
140    console = vm.console_socket.makefile(mode='rb', encoding='utf-8')
141    console_logger = logging.getLogger('console')
142    while True:
143        if send_string:
144            vm.console_socket.sendall(send_string.encode())
145            if not keep_sending:
146                send_string = None # send only once
147        try:
148            msg = console.readline().decode().strip()
149        except UnicodeDecodeError:
150            msg = None
151        if not msg:
152            continue
153        console_logger.debug(msg)
154        if success_message is None or success_message in msg:
155            break
156        if failure_message and failure_message in msg:
157            console.close()
158            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
159                    (failure_message, success_message)
160            test.fail(fail)
161
162def interrupt_interactive_console_until_pattern(test, success_message,
163                                                failure_message=None,
164                                                interrupt_string='\r'):
165    """
166    Keep sending a string to interrupt a console prompt, while logging the
167    console output. Typical use case is to break a boot loader prompt, such:
168
169        Press a key within 5 seconds to interrupt boot process.
170        5
171        4
172        3
173        2
174        1
175        Booting default image...
176
177    :param test: an Avocado test containing a VM that will have its console
178                 read and probed for a success or failure message
179    :type test: :class:`avocado_qemu.QemuSystemTest`
180    :param success_message: if this message appears, test succeeds
181    :param failure_message: if this message appears, test fails
182    :param interrupt_string: a string to send to the console before trying
183                             to read a new line
184    """
185    _console_interaction(test, success_message, failure_message,
186                         interrupt_string, True)
187
188def wait_for_console_pattern(test, success_message, failure_message=None,
189                             vm=None):
190    """
191    Waits for messages to appear on the console, while logging the content
192
193    :param test: an Avocado test containing a VM that will have its console
194                 read and probed for a success or failure message
195    :type test: :class:`avocado_qemu.QemuSystemTest`
196    :param success_message: if this message appears, test succeeds
197    :param failure_message: if this message appears, test fails
198    """
199    _console_interaction(test, success_message, failure_message, None, vm=vm)
200
201def exec_command(test, command):
202    """
203    Send a command to a console (appending CRLF characters), while logging
204    the content.
205
206    :param test: an Avocado test containing a VM.
207    :type test: :class:`avocado_qemu.QemuSystemTest`
208    :param command: the command to send
209    :type command: str
210    """
211    _console_interaction(test, None, None, command + '\r')
212
213def exec_command_and_wait_for_pattern(test, command,
214                                      success_message, failure_message=None):
215    """
216    Send a command to a console (appending CRLF characters), then wait
217    for success_message to appear on the console, while logging the.
218    content. Mark the test as failed if failure_message is found instead.
219
220    :param test: an Avocado test containing a VM that will have its console
221                 read and probed for a success or failure message
222    :type test: :class:`avocado_qemu.QemuSystemTest`
223    :param command: the command to send
224    :param success_message: if this message appears, test succeeds
225    :param failure_message: if this message appears, test fails
226    """
227    _console_interaction(test, success_message, failure_message, command + '\r')
228
229class QemuBaseTest(avocado.Test):
230
231    # default timeout for all tests, can be overridden
232    timeout = 120
233
234    def _get_unique_tag_val(self, tag_name):
235        """
236        Gets a tag value, if unique for a key
237        """
238        vals = self.tags.get(tag_name, [])
239        if len(vals) == 1:
240            return vals.pop()
241        return None
242
243    def setUp(self, bin_prefix):
244        self.arch = self.params.get('arch',
245                                    default=self._get_unique_tag_val('arch'))
246
247        self.cpu = self.params.get('cpu',
248                                   default=self._get_unique_tag_val('cpu'))
249
250        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
251        self.qemu_bin = self.params.get('qemu_bin',
252                                        default=default_qemu_bin)
253        if self.qemu_bin is None:
254            self.cancel("No QEMU binary defined or found in the build tree")
255
256    def fetch_asset(self, name,
257                    asset_hash=None, algorithm=None,
258                    locations=None, expire=None,
259                    find_only=False, cancel_on_missing=True):
260        return super().fetch_asset(name,
261                        asset_hash=asset_hash,
262                        algorithm=algorithm,
263                        locations=locations,
264                        expire=expire,
265                        find_only=find_only,
266                        cancel_on_missing=cancel_on_missing)
267
268
269class QemuSystemTest(QemuBaseTest):
270    """Facilitates system emulation tests."""
271
272    def setUp(self):
273        self._vms = {}
274
275        super().setUp('qemu-system-')
276
277        accel_required = self._get_unique_tag_val('accel')
278        if accel_required:
279            self.require_accelerator(accel_required)
280
281        self.machine = self.params.get('machine',
282                                       default=self._get_unique_tag_val('machine'))
283
284    def require_accelerator(self, accelerator):
285        """
286        Requires an accelerator to be available for the test to continue
287
288        It takes into account the currently set qemu binary.
289
290        If the check fails, the test is canceled.  If the check itself
291        for the given accelerator is not available, the test is also
292        canceled.
293
294        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
295        :type accelerator: str
296        """
297        checker = {'tcg': tcg_available,
298                   'kvm': kvm_available}.get(accelerator)
299        if checker is None:
300            self.cancel("Don't know how to check for the presence "
301                        "of accelerator %s" % accelerator)
302        if not checker(qemu_bin=self.qemu_bin):
303            self.cancel("%s accelerator does not seem to be "
304                        "available" % accelerator)
305
306    def require_netdev(self, netdevname):
307        netdevhelp = run_cmd([self.qemu_bin,
308                             '-M', 'none', '-netdev', 'help'])[0];
309        if netdevhelp.find('\n' + netdevname + '\n') < 0:
310            self.cancel('no support for user networking')
311
312    def _new_vm(self, name, *args):
313        self._sd = tempfile.TemporaryDirectory(prefix="qemu_")
314        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
315                         sock_dir=self._sd.name, log_dir=self.logdir)
316        self.log.debug('QEMUMachine "%s" created', name)
317        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
318        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
319        if args:
320            vm.add_args(*args)
321        return vm
322
323    @property
324    def vm(self):
325        return self.get_vm(name='default')
326
327    def get_vm(self, *args, name=None):
328        if not name:
329            name = str(uuid.uuid4())
330        if self._vms.get(name) is None:
331            self._vms[name] = self._new_vm(name, *args)
332            if self.cpu is not None:
333                self._vms[name].add_args('-cpu', self.cpu)
334            if self.machine is not None:
335                self._vms[name].set_machine(self.machine)
336        return self._vms[name]
337
338    def set_vm_arg(self, arg, value):
339        """
340        Set an argument to list of extra arguments to be given to the QEMU
341        binary. If the argument already exists then its value is replaced.
342
343        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
344        :type arg: str
345        :param value: the argument value, such as "host" in "-cpu host"
346        :type value: str
347        """
348        if not arg or not value:
349            return
350        if arg not in self.vm.args:
351            self.vm.args.extend([arg, value])
352        else:
353            idx = self.vm.args.index(arg) + 1
354            if idx < len(self.vm.args):
355                self.vm.args[idx] = value
356            else:
357                self.vm.args.append(value)
358
359    def tearDown(self):
360        for vm in self._vms.values():
361            vm.shutdown()
362        self._sd = None
363        super().tearDown()
364
365
366class QemuUserTest(QemuBaseTest):
367    """Facilitates user-mode emulation tests."""
368
369    def setUp(self):
370        self._ldpath = []
371        super().setUp('qemu-')
372
373    def add_ldpath(self, ldpath):
374        self._ldpath.append(os.path.abspath(ldpath))
375
376    def run(self, bin_path, args=[]):
377        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
378        bin_args = " ".join(args)
379        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
380                                            bin_path, bin_args))
381
382
383class LinuxSSHMixIn:
384    """Contains utility methods for interacting with a guest via SSH."""
385
386    def ssh_connect(self, username, credential, credential_is_key=True):
387        self.ssh_logger = logging.getLogger('ssh')
388        res = self.vm.command('human-monitor-command',
389                              command_line='info usernet')
390        port = get_info_usernet_hostfwd_port(res)
391        self.assertIsNotNone(port)
392        self.assertGreater(port, 0)
393        self.log.debug('sshd listening on port: %d', port)
394        if credential_is_key:
395            self.ssh_session = ssh.Session('127.0.0.1', port=port,
396                                           user=username, key=credential)
397        else:
398            self.ssh_session = ssh.Session('127.0.0.1', port=port,
399                                           user=username, password=credential)
400        for i in range(10):
401            try:
402                self.ssh_session.connect()
403                return
404            except:
405                time.sleep(i)
406        self.fail('ssh connection timeout')
407
408    def ssh_command(self, command):
409        self.ssh_logger.info(command)
410        result = self.ssh_session.cmd(command)
411        stdout_lines = [line.rstrip() for line
412                        in result.stdout_text.splitlines()]
413        for line in stdout_lines:
414            self.ssh_logger.info(line)
415        stderr_lines = [line.rstrip() for line
416                        in result.stderr_text.splitlines()]
417        for line in stderr_lines:
418            self.ssh_logger.warning(line)
419
420        self.assertEqual(result.exit_status, 0,
421                         f'Guest command failed: {command}')
422        return stdout_lines, stderr_lines
423
424class LinuxDistro:
425    """Represents a Linux distribution
426
427    Holds information of known distros.
428    """
429    #: A collection of known distros and their respective image checksum
430    KNOWN_DISTROS = {
431        'fedora': {
432            '31': {
433                'x86_64':
434                {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
435                              '8a309c2d46215d8fc026954f3c5c27a0'),
436                 'pxeboot_url': ('https://archives.fedoraproject.org/'
437                                 'pub/archive/fedora/linux/releases/31/'
438                                 'Everything/x86_64/os/images/pxeboot/'),
439                 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
440                                   '08a96687f73c ro no_timer_check '
441                                   'net.ifnames=0 console=tty1 '
442                                   'console=ttyS0,115200n8'),
443                },
444                'aarch64':
445                {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
446                              'd2af0ad0329383d5639c997fdf16fe49'),
447                'pxeboot_url': 'https://archives.fedoraproject.org/'
448                               'pub/archive/fedora/linux/releases/31/'
449                               'Everything/aarch64/os/images/pxeboot/',
450                'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
451                                  '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
452                                  ' ignore_loglevel no_timer_check'
453                                  ' printk.time=1 rd_NO_PLYMOUTH'
454                                  ' console=ttyAMA0'),
455                },
456                'ppc64':
457                {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
458                              '3f991c506f2cc390dc4efa2026ad2f58')},
459                's390x':
460                {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
461                              '1e354f834d355069ca982fdcaf5a122d')},
462            },
463            '32': {
464                'aarch64':
465                {'checksum': ('b367755c664a2d7a26955bbfff985855'
466                              'adfa2ca15e908baf15b4b176d68d3967'),
467                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
468                                'releases/32/Server/aarch64/os/images/'
469                                'pxeboot/'),
470                'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
471                                  '14d95c0e90c5 ro no_timer_check net.ifnames=0'
472                                  ' console=tty1 console=ttyS0,115200n8'),
473                },
474            },
475            '33': {
476                'aarch64':
477                {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
478                              'a81f386a17f969c1d1c7c87031008a6b'),
479                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
480                                'releases/33/Server/aarch64/os/images/'
481                                'pxeboot/'),
482                'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
483                                  '1126a0208f8a ro no_timer_check net.ifnames=0'
484                                  ' console=tty1 console=ttyS0,115200n8'
485                                  ' console=tty0'),
486                 },
487            },
488        }
489    }
490
491    def __init__(self, name, version, arch):
492        self.name = name
493        self.version = version
494        self.arch = arch
495        try:
496            info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
497        except AttributeError:
498            # Unknown distro
499            info = None
500        self._info = info or {}
501
502    @property
503    def checksum(self):
504        """Gets the cloud-image file checksum"""
505        return self._info.get('checksum', None)
506
507    @checksum.setter
508    def checksum(self, value):
509        self._info['checksum'] = value
510
511    @property
512    def pxeboot_url(self):
513        """Gets the repository url where pxeboot files can be found"""
514        return self._info.get('pxeboot_url', None)
515
516    @property
517    def default_kernel_params(self):
518        """Gets the default kernel parameters"""
519        return self._info.get('kernel_params', None)
520
521
522class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
523    """Facilitates having a cloud-image Linux based available.
524
525    For tests that intend to interact with guests, this is a better choice
526    to start with than the more vanilla `QemuSystemTest` class.
527    """
528
529    distro = None
530    username = 'root'
531    password = 'password'
532    smp = '2'
533    memory = '1024'
534
535    def _set_distro(self):
536        distro_name = self.params.get(
537            'distro',
538            default=self._get_unique_tag_val('distro'))
539        if not distro_name:
540            distro_name = 'fedora'
541
542        distro_version = self.params.get(
543            'distro_version',
544            default=self._get_unique_tag_val('distro_version'))
545        if not distro_version:
546            distro_version = '31'
547
548        self.distro = LinuxDistro(distro_name, distro_version, self.arch)
549
550        # The distro checksum behaves differently than distro name and
551        # version. First, it does not respect a tag with the same
552        # name, given that it's not expected to be used for filtering
553        # (distro name versions are the natural choice).  Second, the
554        # order of precedence is: parameter, attribute and then value
555        # from KNOWN_DISTROS.
556        distro_checksum = self.params.get('distro_checksum',
557                                          default=None)
558        if distro_checksum:
559            self.distro.checksum = distro_checksum
560
561    def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
562        super().setUp()
563        self.require_netdev('user')
564        self._set_distro()
565        self.vm.add_args('-smp', self.smp)
566        self.vm.add_args('-m', self.memory)
567        # The following network device allows for SSH connections
568        self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
569                         '-device', '%s,netdev=vnet' % network_device_type)
570        self.set_up_boot()
571        if ssh_pubkey is None:
572            ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
573        self.set_up_cloudinit(ssh_pubkey)
574
575    def set_up_existing_ssh_keys(self):
576        ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
577        source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
578        ssh_dir = os.path.join(self.workdir, '.ssh')
579        os.mkdir(ssh_dir, mode=0o700)
580        ssh_private_key = os.path.join(ssh_dir,
581                                       os.path.basename(source_private_key))
582        shutil.copyfile(source_private_key, ssh_private_key)
583        os.chmod(ssh_private_key, 0o600)
584        return (ssh_public_key, ssh_private_key)
585
586    def download_boot(self):
587        self.log.debug('Looking for and selecting a qemu-img binary to be '
588                       'used to create the bootable snapshot image')
589        # If qemu-img has been built, use it, otherwise the system wide one
590        # will be used.  If none is available, the test will cancel.
591        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
592        if not os.path.exists(qemu_img):
593            qemu_img = find_command('qemu-img', False)
594        if qemu_img is False:
595            self.cancel('Could not find "qemu-img", which is required to '
596                        'create the bootable image')
597        vmimage.QEMU_IMG = qemu_img
598
599        self.log.info('Downloading/preparing boot image')
600        # Fedora 31 only provides ppc64le images
601        image_arch = self.arch
602        if self.distro.name == 'fedora':
603            if image_arch == 'ppc64':
604                image_arch = 'ppc64le'
605
606        try:
607            boot = vmimage.get(
608                self.distro.name, arch=image_arch, version=self.distro.version,
609                checksum=self.distro.checksum,
610                algorithm='sha256',
611                cache_dir=self.cache_dirs[0],
612                snapshot_dir=self.workdir)
613        except:
614            self.cancel('Failed to download/prepare boot image')
615        return boot.path
616
617    def prepare_cloudinit(self, ssh_pubkey=None):
618        self.log.info('Preparing cloudinit image')
619        try:
620            cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
621            pubkey_content = None
622            if ssh_pubkey:
623                with open(ssh_pubkey) as pubkey:
624                    pubkey_content = pubkey.read()
625            cloudinit.iso(cloudinit_iso, self.name,
626                          username=self.username,
627                          password=self.password,
628                          # QEMU's hard coded usermode router address
629                          phone_home_host='10.0.2.2',
630                          phone_home_port=self.phone_server.server_port,
631                          authorized_key=pubkey_content)
632        except Exception:
633            self.cancel('Failed to prepare the cloudinit image')
634        return cloudinit_iso
635
636    def set_up_boot(self):
637        path = self.download_boot()
638        self.vm.add_args('-drive', 'file=%s' % path)
639
640    def set_up_cloudinit(self, ssh_pubkey=None):
641        self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
642                                                      self.name)
643        cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
644        self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
645
646    def launch_and_wait(self, set_up_ssh_connection=True):
647        self.vm.set_console()
648        self.vm.launch()
649        console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
650                                                 logger=self.log.getChild('console'))
651        console_drainer.start()
652        self.log.info('VM launched, waiting for boot confirmation from guest')
653        while not self.phone_server.instance_phoned_back:
654            self.phone_server.handle_request()
655
656        if set_up_ssh_connection:
657            self.log.info('Setting up the SSH connection')
658            self.ssh_connect(self.username, self.ssh_key)
659