1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import shutil
14import subprocess
15import sys
16import tempfile
17import time
18import uuid
19
20import avocado
21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage
22from avocado.utils.path import find_command
23
24from qemu.machine import QEMUMachine
25from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
26                        tcg_available)
27
28
29#: The QEMU build root directory.  It may also be the source directory
30#: if building from the source dir, but it's safer to use BUILD_DIR for
31#: that purpose.  Be aware that if this code is moved outside of a source
32#: and build tree, it will not be accurate.
33BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
34
35if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
36    # The link to the avocado tests dir in the source code directory
37    lnk = os.path.dirname(os.path.dirname(__file__))
38    #: The QEMU root source directory
39    SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
40else:
41    SOURCE_DIR = BUILD_DIR
42
43
44def has_cmd(name, args=None):
45    """
46    This function is for use in a @avocado.skipUnless decorator, e.g.:
47
48        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
49        def test_something_that_needs_sudo(self):
50            ...
51    """
52
53    if args is None:
54        args = ('which', name)
55
56    try:
57        _, stderr, exitcode = run_cmd(args)
58    except Exception as e:
59        exitcode = -1
60        stderr = str(e)
61
62    if exitcode != 0:
63        cmd_line = ' '.join(args)
64        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
65        return (False, err)
66    else:
67        return (True, '')
68
69def has_cmds(*cmds):
70    """
71    This function is for use in a @avocado.skipUnless decorator and
72    allows checking for the availability of multiple commands, e.g.:
73
74        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
75                              'cmd2', 'cmd3'))
76        def test_something_that_needs_cmd1_and_cmd2(self):
77            ...
78    """
79
80    for cmd in cmds:
81        if isinstance(cmd, str):
82            cmd = (cmd,)
83
84        ok, errstr = has_cmd(*cmd)
85        if not ok:
86            return (False, errstr)
87
88    return (True, '')
89
90def run_cmd(args):
91    subp = subprocess.Popen(args,
92                            stdout=subprocess.PIPE,
93                            stderr=subprocess.PIPE,
94                            universal_newlines=True)
95    stdout, stderr = subp.communicate()
96    ret = subp.returncode
97
98    return (stdout, stderr, ret)
99
100def is_readable_executable_file(path):
101    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
102
103
104def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
105    """
106    Picks the path of a QEMU binary, starting either in the current working
107    directory or in the source tree root directory.
108
109    :param arch: the arch to use when looking for a QEMU binary (the target
110                 will match the arch given).  If None (the default), arch
111                 will be the current host system arch (as given by
112                 :func:`os.uname`).
113    :type arch: str
114    :returns: the path to the default QEMU binary or None if one could not
115              be found
116    :rtype: str or None
117    """
118    if arch is None:
119        arch = os.uname()[4]
120    # qemu binary path does not match arch for powerpc, handle it
121    if 'ppc64le' in arch:
122        arch = 'ppc64'
123    qemu_bin_name = bin_prefix + arch
124    qemu_bin_paths = [
125        os.path.join(".", qemu_bin_name),
126        os.path.join(BUILD_DIR, qemu_bin_name),
127        os.path.join(BUILD_DIR, "build", qemu_bin_name),
128    ]
129    for path in qemu_bin_paths:
130        if is_readable_executable_file(path):
131            return path
132    return None
133
134
135def _console_interaction(test, success_message, failure_message,
136                         send_string, keep_sending=False, vm=None):
137    assert not keep_sending or send_string
138    if vm is None:
139        vm = test.vm
140    console = vm.console_socket.makefile(mode='rb', encoding='utf-8')
141    console_logger = logging.getLogger('console')
142    while True:
143        if send_string:
144            vm.console_socket.sendall(send_string.encode())
145            if not keep_sending:
146                send_string = None # send only once
147        try:
148            msg = console.readline().decode().strip()
149        except UnicodeDecodeError:
150            msg = None
151        if not msg:
152            continue
153        console_logger.debug(msg)
154        if success_message is None or success_message in msg:
155            break
156        if failure_message and failure_message in msg:
157            console.close()
158            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
159                    (failure_message, success_message)
160            test.fail(fail)
161
162def interrupt_interactive_console_until_pattern(test, success_message,
163                                                failure_message=None,
164                                                interrupt_string='\r'):
165    """
166    Keep sending a string to interrupt a console prompt, while logging the
167    console output. Typical use case is to break a boot loader prompt, such:
168
169        Press a key within 5 seconds to interrupt boot process.
170        5
171        4
172        3
173        2
174        1
175        Booting default image...
176
177    :param test: an Avocado test containing a VM that will have its console
178                 read and probed for a success or failure message
179    :type test: :class:`avocado_qemu.QemuSystemTest`
180    :param success_message: if this message appears, test succeeds
181    :param failure_message: if this message appears, test fails
182    :param interrupt_string: a string to send to the console before trying
183                             to read a new line
184    """
185    _console_interaction(test, success_message, failure_message,
186                         interrupt_string, True)
187
188def wait_for_console_pattern(test, success_message, failure_message=None,
189                             vm=None):
190    """
191    Waits for messages to appear on the console, while logging the content
192
193    :param test: an Avocado test containing a VM that will have its console
194                 read and probed for a success or failure message
195    :type test: :class:`avocado_qemu.QemuSystemTest`
196    :param success_message: if this message appears, test succeeds
197    :param failure_message: if this message appears, test fails
198    """
199    _console_interaction(test, success_message, failure_message, None, vm=vm)
200
201def exec_command(test, command):
202    """
203    Send a command to a console (appending CRLF characters), while logging
204    the content.
205
206    :param test: an Avocado test containing a VM.
207    :type test: :class:`avocado_qemu.QemuSystemTest`
208    :param command: the command to send
209    :type command: str
210    """
211    _console_interaction(test, None, None, command + '\r')
212
213def exec_command_and_wait_for_pattern(test, command,
214                                      success_message, failure_message=None):
215    """
216    Send a command to a console (appending CRLF characters), then wait
217    for success_message to appear on the console, while logging the.
218    content. Mark the test as failed if failure_message is found instead.
219
220    :param test: an Avocado test containing a VM that will have its console
221                 read and probed for a success or failure message
222    :type test: :class:`avocado_qemu.QemuSystemTest`
223    :param command: the command to send
224    :param success_message: if this message appears, test succeeds
225    :param failure_message: if this message appears, test fails
226    """
227    _console_interaction(test, success_message, failure_message, command + '\r')
228
229class QemuBaseTest(avocado.Test):
230
231    # default timeout for all tests, can be overridden
232    timeout = 900
233
234    def _get_unique_tag_val(self, tag_name):
235        """
236        Gets a tag value, if unique for a key
237        """
238        vals = self.tags.get(tag_name, [])
239        if len(vals) == 1:
240            return vals.pop()
241        return None
242
243    def setUp(self, bin_prefix):
244        self.arch = self.params.get('arch',
245                                    default=self._get_unique_tag_val('arch'))
246
247        self.cpu = self.params.get('cpu',
248                                   default=self._get_unique_tag_val('cpu'))
249
250        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
251        self.qemu_bin = self.params.get('qemu_bin',
252                                        default=default_qemu_bin)
253        if self.qemu_bin is None:
254            self.cancel("No QEMU binary defined or found in the build tree")
255
256    def fetch_asset(self, name,
257                    asset_hash=None, algorithm=None,
258                    locations=None, expire=None,
259                    find_only=False, cancel_on_missing=True):
260        return super().fetch_asset(name,
261                        asset_hash=asset_hash,
262                        algorithm=algorithm,
263                        locations=locations,
264                        expire=expire,
265                        find_only=find_only,
266                        cancel_on_missing=cancel_on_missing)
267
268
269class QemuSystemTest(QemuBaseTest):
270    """Facilitates system emulation tests."""
271
272    def setUp(self):
273        self._vms = {}
274
275        super().setUp('qemu-system-')
276
277        self.machine = self.params.get('machine',
278                                       default=self._get_unique_tag_val('machine'))
279
280    def require_accelerator(self, accelerator):
281        """
282        Requires an accelerator to be available for the test to continue
283
284        It takes into account the currently set qemu binary.
285
286        If the check fails, the test is canceled.  If the check itself
287        for the given accelerator is not available, the test is also
288        canceled.
289
290        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
291        :type accelerator: str
292        """
293        checker = {'tcg': tcg_available,
294                   'kvm': kvm_available}.get(accelerator)
295        if checker is None:
296            self.cancel("Don't know how to check for the presence "
297                        "of accelerator %s" % accelerator)
298        if not checker(qemu_bin=self.qemu_bin):
299            self.cancel("%s accelerator does not seem to be "
300                        "available" % accelerator)
301
302    def require_netdev(self, netdevname):
303        netdevhelp = run_cmd([self.qemu_bin,
304                             '-M', 'none', '-netdev', 'help'])[0];
305        if netdevhelp.find('\n' + netdevname + '\n') < 0:
306            self.cancel('no support for user networking')
307
308    def _new_vm(self, name, *args):
309        self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_")
310        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
311                         sock_dir=self._sd.name, log_dir=self.logdir)
312        self.log.debug('QEMUMachine "%s" created', name)
313        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
314        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
315        if args:
316            vm.add_args(*args)
317        return vm
318
319    @property
320    def vm(self):
321        return self.get_vm(name='default')
322
323    def get_vm(self, *args, name=None):
324        if not name:
325            name = str(uuid.uuid4())
326        if self._vms.get(name) is None:
327            self._vms[name] = self._new_vm(name, *args)
328            if self.cpu is not None:
329                self._vms[name].add_args('-cpu', self.cpu)
330            if self.machine is not None:
331                self._vms[name].set_machine(self.machine)
332        return self._vms[name]
333
334    def set_vm_arg(self, arg, value):
335        """
336        Set an argument to list of extra arguments to be given to the QEMU
337        binary. If the argument already exists then its value is replaced.
338
339        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
340        :type arg: str
341        :param value: the argument value, such as "host" in "-cpu host"
342        :type value: str
343        """
344        if not arg or not value:
345            return
346        if arg not in self.vm.args:
347            self.vm.args.extend([arg, value])
348        else:
349            idx = self.vm.args.index(arg) + 1
350            if idx < len(self.vm.args):
351                self.vm.args[idx] = value
352            else:
353                self.vm.args.append(value)
354
355    def tearDown(self):
356        for vm in self._vms.values():
357            vm.shutdown()
358        self._sd = None
359        super().tearDown()
360
361
362class QemuUserTest(QemuBaseTest):
363    """Facilitates user-mode emulation tests."""
364
365    def setUp(self):
366        self._ldpath = []
367        super().setUp('qemu-')
368
369    def add_ldpath(self, ldpath):
370        self._ldpath.append(os.path.abspath(ldpath))
371
372    def run(self, bin_path, args=[]):
373        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
374        bin_args = " ".join(args)
375        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
376                                            bin_path, bin_args))
377
378
379class LinuxSSHMixIn:
380    """Contains utility methods for interacting with a guest via SSH."""
381
382    def ssh_connect(self, username, credential, credential_is_key=True):
383        self.ssh_logger = logging.getLogger('ssh')
384        res = self.vm.command('human-monitor-command',
385                              command_line='info usernet')
386        port = get_info_usernet_hostfwd_port(res)
387        self.assertIsNotNone(port)
388        self.assertGreater(port, 0)
389        self.log.debug('sshd listening on port: %d', port)
390        if credential_is_key:
391            self.ssh_session = ssh.Session('127.0.0.1', port=port,
392                                           user=username, key=credential)
393        else:
394            self.ssh_session = ssh.Session('127.0.0.1', port=port,
395                                           user=username, password=credential)
396        for i in range(10):
397            try:
398                self.ssh_session.connect()
399                return
400            except:
401                time.sleep(i)
402        self.fail('ssh connection timeout')
403
404    def ssh_command(self, command):
405        self.ssh_logger.info(command)
406        result = self.ssh_session.cmd(command)
407        stdout_lines = [line.rstrip() for line
408                        in result.stdout_text.splitlines()]
409        for line in stdout_lines:
410            self.ssh_logger.info(line)
411        stderr_lines = [line.rstrip() for line
412                        in result.stderr_text.splitlines()]
413        for line in stderr_lines:
414            self.ssh_logger.warning(line)
415
416        self.assertEqual(result.exit_status, 0,
417                         f'Guest command failed: {command}')
418        return stdout_lines, stderr_lines
419
420class LinuxDistro:
421    """Represents a Linux distribution
422
423    Holds information of known distros.
424    """
425    #: A collection of known distros and their respective image checksum
426    KNOWN_DISTROS = {
427        'fedora': {
428            '31': {
429                'x86_64':
430                {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
431                              '8a309c2d46215d8fc026954f3c5c27a0'),
432                 'pxeboot_url': ('https://archives.fedoraproject.org/'
433                                 'pub/archive/fedora/linux/releases/31/'
434                                 'Everything/x86_64/os/images/pxeboot/'),
435                 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
436                                   '08a96687f73c ro no_timer_check '
437                                   'net.ifnames=0 console=tty1 '
438                                   'console=ttyS0,115200n8'),
439                },
440                'aarch64':
441                {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
442                              'd2af0ad0329383d5639c997fdf16fe49'),
443                'pxeboot_url': 'https://archives.fedoraproject.org/'
444                               'pub/archive/fedora/linux/releases/31/'
445                               'Everything/aarch64/os/images/pxeboot/',
446                'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
447                                  '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
448                                  ' ignore_loglevel no_timer_check'
449                                  ' printk.time=1 rd_NO_PLYMOUTH'
450                                  ' console=ttyAMA0'),
451                },
452                'ppc64':
453                {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
454                              '3f991c506f2cc390dc4efa2026ad2f58')},
455                's390x':
456                {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
457                              '1e354f834d355069ca982fdcaf5a122d')},
458            },
459            '32': {
460                'aarch64':
461                {'checksum': ('b367755c664a2d7a26955bbfff985855'
462                              'adfa2ca15e908baf15b4b176d68d3967'),
463                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
464                                'releases/32/Server/aarch64/os/images/'
465                                'pxeboot/'),
466                'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
467                                  '14d95c0e90c5 ro no_timer_check net.ifnames=0'
468                                  ' console=tty1 console=ttyS0,115200n8'),
469                },
470            },
471            '33': {
472                'aarch64':
473                {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
474                              'a81f386a17f969c1d1c7c87031008a6b'),
475                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
476                                'releases/33/Server/aarch64/os/images/'
477                                'pxeboot/'),
478                'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
479                                  '1126a0208f8a ro no_timer_check net.ifnames=0'
480                                  ' console=tty1 console=ttyS0,115200n8'
481                                  ' console=tty0'),
482                 },
483            },
484        }
485    }
486
487    def __init__(self, name, version, arch):
488        self.name = name
489        self.version = version
490        self.arch = arch
491        try:
492            info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
493        except AttributeError:
494            # Unknown distro
495            info = None
496        self._info = info or {}
497
498    @property
499    def checksum(self):
500        """Gets the cloud-image file checksum"""
501        return self._info.get('checksum', None)
502
503    @checksum.setter
504    def checksum(self, value):
505        self._info['checksum'] = value
506
507    @property
508    def pxeboot_url(self):
509        """Gets the repository url where pxeboot files can be found"""
510        return self._info.get('pxeboot_url', None)
511
512    @property
513    def default_kernel_params(self):
514        """Gets the default kernel parameters"""
515        return self._info.get('kernel_params', None)
516
517
518class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
519    """Facilitates having a cloud-image Linux based available.
520
521    For tests that intend to interact with guests, this is a better choice
522    to start with than the more vanilla `QemuSystemTest` class.
523    """
524
525    distro = None
526    username = 'root'
527    password = 'password'
528    smp = '2'
529    memory = '1024'
530
531    def _set_distro(self):
532        distro_name = self.params.get(
533            'distro',
534            default=self._get_unique_tag_val('distro'))
535        if not distro_name:
536            distro_name = 'fedora'
537
538        distro_version = self.params.get(
539            'distro_version',
540            default=self._get_unique_tag_val('distro_version'))
541        if not distro_version:
542            distro_version = '31'
543
544        self.distro = LinuxDistro(distro_name, distro_version, self.arch)
545
546        # The distro checksum behaves differently than distro name and
547        # version. First, it does not respect a tag with the same
548        # name, given that it's not expected to be used for filtering
549        # (distro name versions are the natural choice).  Second, the
550        # order of precedence is: parameter, attribute and then value
551        # from KNOWN_DISTROS.
552        distro_checksum = self.params.get('distro_checksum',
553                                          default=None)
554        if distro_checksum:
555            self.distro.checksum = distro_checksum
556
557    def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
558        super().setUp()
559        self.require_netdev('user')
560        self._set_distro()
561        self.vm.add_args('-smp', self.smp)
562        self.vm.add_args('-m', self.memory)
563        # The following network device allows for SSH connections
564        self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
565                         '-device', '%s,netdev=vnet' % network_device_type)
566        self.set_up_boot()
567        if ssh_pubkey is None:
568            ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
569        self.set_up_cloudinit(ssh_pubkey)
570
571    def set_up_existing_ssh_keys(self):
572        ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
573        source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
574        ssh_dir = os.path.join(self.workdir, '.ssh')
575        os.mkdir(ssh_dir, mode=0o700)
576        ssh_private_key = os.path.join(ssh_dir,
577                                       os.path.basename(source_private_key))
578        shutil.copyfile(source_private_key, ssh_private_key)
579        os.chmod(ssh_private_key, 0o600)
580        return (ssh_public_key, ssh_private_key)
581
582    def download_boot(self):
583        self.log.debug('Looking for and selecting a qemu-img binary to be '
584                       'used to create the bootable snapshot image')
585        # If qemu-img has been built, use it, otherwise the system wide one
586        # will be used.  If none is available, the test will cancel.
587        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
588        if not os.path.exists(qemu_img):
589            qemu_img = find_command('qemu-img', False)
590        if qemu_img is False:
591            self.cancel('Could not find "qemu-img", which is required to '
592                        'create the bootable image')
593        vmimage.QEMU_IMG = qemu_img
594
595        self.log.info('Downloading/preparing boot image')
596        # Fedora 31 only provides ppc64le images
597        image_arch = self.arch
598        if self.distro.name == 'fedora':
599            if image_arch == 'ppc64':
600                image_arch = 'ppc64le'
601
602        try:
603            boot = vmimage.get(
604                self.distro.name, arch=image_arch, version=self.distro.version,
605                checksum=self.distro.checksum,
606                algorithm='sha256',
607                cache_dir=self.cache_dirs[0],
608                snapshot_dir=self.workdir)
609        except:
610            self.cancel('Failed to download/prepare boot image')
611        return boot.path
612
613    def prepare_cloudinit(self, ssh_pubkey=None):
614        self.log.info('Preparing cloudinit image')
615        try:
616            cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
617            pubkey_content = None
618            if ssh_pubkey:
619                with open(ssh_pubkey) as pubkey:
620                    pubkey_content = pubkey.read()
621            cloudinit.iso(cloudinit_iso, self.name,
622                          username=self.username,
623                          password=self.password,
624                          # QEMU's hard coded usermode router address
625                          phone_home_host='10.0.2.2',
626                          phone_home_port=self.phone_server.server_port,
627                          authorized_key=pubkey_content)
628        except Exception:
629            self.cancel('Failed to prepare the cloudinit image')
630        return cloudinit_iso
631
632    def set_up_boot(self):
633        path = self.download_boot()
634        self.vm.add_args('-drive', 'file=%s' % path)
635
636    def set_up_cloudinit(self, ssh_pubkey=None):
637        self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
638                                                      self.name)
639        cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
640        self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
641
642    def launch_and_wait(self, set_up_ssh_connection=True):
643        self.vm.set_console()
644        self.vm.launch()
645        console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
646                                                 logger=self.log.getChild('console'))
647        console_drainer.start()
648        self.log.info('VM launched, waiting for boot confirmation from guest')
649        while not self.phone_server.instance_phoned_back:
650            self.phone_server.handle_request()
651
652        if set_up_ssh_connection:
653            self.log.info('Setting up the SSH connection')
654            self.ssh_connect(self.username, self.ssh_key)
655