1# Test class and utilities for functional tests
2#
3# Copyright (c) 2018 Red Hat, Inc.
4#
5# Author:
6#  Cleber Rosa <crosa@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2 or
9# later.  See the COPYING file in the top-level directory.
10
11import logging
12import os
13import shutil
14import subprocess
15import sys
16import tempfile
17import time
18import uuid
19
20import avocado
21from avocado.utils import cloudinit, datadrainer, process, ssh, vmimage
22from avocado.utils.path import find_command
23
24#: The QEMU build root directory.  It may also be the source directory
25#: if building from the source dir, but it's safer to use BUILD_DIR for
26#: that purpose.  Be aware that if this code is moved outside of a source
27#: and build tree, it will not be accurate.
28BUILD_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
29
30if os.path.islink(os.path.dirname(os.path.dirname(__file__))):
31    # The link to the avocado tests dir in the source code directory
32    lnk = os.path.dirname(os.path.dirname(__file__))
33    #: The QEMU root source directory
34    SOURCE_DIR = os.path.dirname(os.path.dirname(os.readlink(lnk)))
35else:
36    SOURCE_DIR = BUILD_DIR
37
38sys.path.append(os.path.join(SOURCE_DIR, 'python'))
39
40from qemu.machine import QEMUMachine
41from qemu.utils import (get_info_usernet_hostfwd_port, kvm_available,
42                        tcg_available)
43
44
45def has_cmd(name, args=None):
46    """
47    This function is for use in a @avocado.skipUnless decorator, e.g.:
48
49        @skipUnless(*has_cmd('sudo -n', ('sudo', '-n', 'true')))
50        def test_something_that_needs_sudo(self):
51            ...
52    """
53
54    if args is None:
55        args = ('which', name)
56
57    try:
58        _, stderr, exitcode = run_cmd(args)
59    except Exception as e:
60        exitcode = -1
61        stderr = str(e)
62
63    if exitcode != 0:
64        cmd_line = ' '.join(args)
65        err = f'{name} required, but "{cmd_line}" failed: {stderr.strip()}'
66        return (False, err)
67    else:
68        return (True, '')
69
70def has_cmds(*cmds):
71    """
72    This function is for use in a @avocado.skipUnless decorator and
73    allows checking for the availability of multiple commands, e.g.:
74
75        @skipUnless(*has_cmds(('cmd1', ('cmd1', '--some-parameter')),
76                              'cmd2', 'cmd3'))
77        def test_something_that_needs_cmd1_and_cmd2(self):
78            ...
79    """
80
81    for cmd in cmds:
82        if isinstance(cmd, str):
83            cmd = (cmd,)
84
85        ok, errstr = has_cmd(*cmd)
86        if not ok:
87            return (False, errstr)
88
89    return (True, '')
90
91def run_cmd(args):
92    subp = subprocess.Popen(args,
93                            stdout=subprocess.PIPE,
94                            stderr=subprocess.PIPE,
95                            universal_newlines=True)
96    stdout, stderr = subp.communicate()
97    ret = subp.returncode
98
99    return (stdout, stderr, ret)
100
101def is_readable_executable_file(path):
102    return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
103
104
105def pick_default_qemu_bin(bin_prefix='qemu-system-', arch=None):
106    """
107    Picks the path of a QEMU binary, starting either in the current working
108    directory or in the source tree root directory.
109
110    :param arch: the arch to use when looking for a QEMU binary (the target
111                 will match the arch given).  If None (the default), arch
112                 will be the current host system arch (as given by
113                 :func:`os.uname`).
114    :type arch: str
115    :returns: the path to the default QEMU binary or None if one could not
116              be found
117    :rtype: str or None
118    """
119    if arch is None:
120        arch = os.uname()[4]
121    # qemu binary path does not match arch for powerpc, handle it
122    if 'ppc64le' in arch:
123        arch = 'ppc64'
124    qemu_bin_relative_path = os.path.join(".", bin_prefix + arch)
125    if is_readable_executable_file(qemu_bin_relative_path):
126        return qemu_bin_relative_path
127
128    qemu_bin_from_bld_dir_path = os.path.join(BUILD_DIR,
129                                              qemu_bin_relative_path)
130    if is_readable_executable_file(qemu_bin_from_bld_dir_path):
131        return qemu_bin_from_bld_dir_path
132    return None
133
134
135def _console_interaction(test, success_message, failure_message,
136                         send_string, keep_sending=False, vm=None):
137    assert not keep_sending or send_string
138    if vm is None:
139        vm = test.vm
140    console = vm.console_socket.makefile(mode='rb', encoding='utf-8')
141    console_logger = logging.getLogger('console')
142    while True:
143        if send_string:
144            vm.console_socket.sendall(send_string.encode())
145            if not keep_sending:
146                send_string = None # send only once
147        try:
148            msg = console.readline().decode().strip()
149        except UnicodeDecodeError:
150            msg = None
151        if not msg:
152            continue
153        console_logger.debug(msg)
154        if success_message is None or success_message in msg:
155            break
156        if failure_message and failure_message in msg:
157            console.close()
158            fail = 'Failure message found in console: "%s". Expected: "%s"' % \
159                    (failure_message, success_message)
160            test.fail(fail)
161
162def interrupt_interactive_console_until_pattern(test, success_message,
163                                                failure_message=None,
164                                                interrupt_string='\r'):
165    """
166    Keep sending a string to interrupt a console prompt, while logging the
167    console output. Typical use case is to break a boot loader prompt, such:
168
169        Press a key within 5 seconds to interrupt boot process.
170        5
171        4
172        3
173        2
174        1
175        Booting default image...
176
177    :param test: an Avocado test containing a VM that will have its console
178                 read and probed for a success or failure message
179    :type test: :class:`avocado_qemu.QemuSystemTest`
180    :param success_message: if this message appears, test succeeds
181    :param failure_message: if this message appears, test fails
182    :param interrupt_string: a string to send to the console before trying
183                             to read a new line
184    """
185    _console_interaction(test, success_message, failure_message,
186                         interrupt_string, True)
187
188def wait_for_console_pattern(test, success_message, failure_message=None,
189                             vm=None):
190    """
191    Waits for messages to appear on the console, while logging the content
192
193    :param test: an Avocado test containing a VM that will have its console
194                 read and probed for a success or failure message
195    :type test: :class:`avocado_qemu.QemuSystemTest`
196    :param success_message: if this message appears, test succeeds
197    :param failure_message: if this message appears, test fails
198    """
199    _console_interaction(test, success_message, failure_message, None, vm=vm)
200
201def exec_command(test, command):
202    """
203    Send a command to a console (appending CRLF characters), while logging
204    the content.
205
206    :param test: an Avocado test containing a VM.
207    :type test: :class:`avocado_qemu.QemuSystemTest`
208    :param command: the command to send
209    :type command: str
210    """
211    _console_interaction(test, None, None, command + '\r')
212
213def exec_command_and_wait_for_pattern(test, command,
214                                      success_message, failure_message=None):
215    """
216    Send a command to a console (appending CRLF characters), then wait
217    for success_message to appear on the console, while logging the.
218    content. Mark the test as failed if failure_message is found instead.
219
220    :param test: an Avocado test containing a VM that will have its console
221                 read and probed for a success or failure message
222    :type test: :class:`avocado_qemu.QemuSystemTest`
223    :param command: the command to send
224    :param success_message: if this message appears, test succeeds
225    :param failure_message: if this message appears, test fails
226    """
227    _console_interaction(test, success_message, failure_message, command + '\r')
228
229class QemuBaseTest(avocado.Test):
230    def _get_unique_tag_val(self, tag_name):
231        """
232        Gets a tag value, if unique for a key
233        """
234        vals = self.tags.get(tag_name, [])
235        if len(vals) == 1:
236            return vals.pop()
237        return None
238
239    def setUp(self, bin_prefix):
240        self.arch = self.params.get('arch',
241                                    default=self._get_unique_tag_val('arch'))
242
243        self.cpu = self.params.get('cpu',
244                                   default=self._get_unique_tag_val('cpu'))
245
246        default_qemu_bin = pick_default_qemu_bin(bin_prefix, arch=self.arch)
247        self.qemu_bin = self.params.get('qemu_bin',
248                                        default=default_qemu_bin)
249        if self.qemu_bin is None:
250            self.cancel("No QEMU binary defined or found in the build tree")
251
252    def fetch_asset(self, name,
253                    asset_hash=None, algorithm=None,
254                    locations=None, expire=None,
255                    find_only=False, cancel_on_missing=True):
256        return super().fetch_asset(name,
257                        asset_hash=asset_hash,
258                        algorithm=algorithm,
259                        locations=locations,
260                        expire=expire,
261                        find_only=find_only,
262                        cancel_on_missing=cancel_on_missing)
263
264
265class QemuSystemTest(QemuBaseTest):
266    """Facilitates system emulation tests."""
267
268    def setUp(self):
269        self._vms = {}
270
271        super().setUp('qemu-system-')
272
273        self.machine = self.params.get('machine',
274                                       default=self._get_unique_tag_val('machine'))
275
276    def require_accelerator(self, accelerator):
277        """
278        Requires an accelerator to be available for the test to continue
279
280        It takes into account the currently set qemu binary.
281
282        If the check fails, the test is canceled.  If the check itself
283        for the given accelerator is not available, the test is also
284        canceled.
285
286        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
287        :type accelerator: str
288        """
289        checker = {'tcg': tcg_available,
290                   'kvm': kvm_available}.get(accelerator)
291        if checker is None:
292            self.cancel("Don't know how to check for the presence "
293                        "of accelerator %s" % accelerator)
294        if not checker(qemu_bin=self.qemu_bin):
295            self.cancel("%s accelerator does not seem to be "
296                        "available" % accelerator)
297
298    def _new_vm(self, name, *args):
299        self._sd = tempfile.TemporaryDirectory(prefix="avo_qemu_sock_")
300        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir,
301                         sock_dir=self._sd.name, log_dir=self.logdir)
302        self.log.debug('QEMUMachine "%s" created', name)
303        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
304        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
305        if args:
306            vm.add_args(*args)
307        return vm
308
309    @property
310    def vm(self):
311        return self.get_vm(name='default')
312
313    def get_vm(self, *args, name=None):
314        if not name:
315            name = str(uuid.uuid4())
316        if self._vms.get(name) is None:
317            self._vms[name] = self._new_vm(name, *args)
318            if self.cpu is not None:
319                self._vms[name].add_args('-cpu', self.cpu)
320            if self.machine is not None:
321                self._vms[name].set_machine(self.machine)
322        return self._vms[name]
323
324    def set_vm_arg(self, arg, value):
325        """
326        Set an argument to list of extra arguments to be given to the QEMU
327        binary. If the argument already exists then its value is replaced.
328
329        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
330        :type arg: str
331        :param value: the argument value, such as "host" in "-cpu host"
332        :type value: str
333        """
334        if not arg or not value:
335            return
336        if arg not in self.vm.args:
337            self.vm.args.extend([arg, value])
338        else:
339            idx = self.vm.args.index(arg) + 1
340            if idx < len(self.vm.args):
341                self.vm.args[idx] = value
342            else:
343                self.vm.args.append(value)
344
345    def tearDown(self):
346        for vm in self._vms.values():
347            vm.shutdown()
348        self._sd = None
349        super().tearDown()
350
351
352class QemuUserTest(QemuBaseTest):
353    """Facilitates user-mode emulation tests."""
354
355    def setUp(self):
356        self._ldpath = []
357        super().setUp('qemu-')
358
359    def add_ldpath(self, ldpath):
360        self._ldpath.append(os.path.abspath(ldpath))
361
362    def run(self, bin_path, args=[]):
363        qemu_args = " ".join(["-L %s" % ldpath for ldpath in self._ldpath])
364        bin_args = " ".join(args)
365        return process.run("%s %s %s %s" % (self.qemu_bin, qemu_args,
366                                            bin_path, bin_args))
367
368
369class LinuxSSHMixIn:
370    """Contains utility methods for interacting with a guest via SSH."""
371
372    def ssh_connect(self, username, credential, credential_is_key=True):
373        self.ssh_logger = logging.getLogger('ssh')
374        res = self.vm.command('human-monitor-command',
375                              command_line='info usernet')
376        port = get_info_usernet_hostfwd_port(res)
377        self.assertIsNotNone(port)
378        self.assertGreater(port, 0)
379        self.log.debug('sshd listening on port: %d', port)
380        if credential_is_key:
381            self.ssh_session = ssh.Session('127.0.0.1', port=port,
382                                           user=username, key=credential)
383        else:
384            self.ssh_session = ssh.Session('127.0.0.1', port=port,
385                                           user=username, password=credential)
386        for i in range(10):
387            try:
388                self.ssh_session.connect()
389                return
390            except:
391                time.sleep(i)
392        self.fail('ssh connection timeout')
393
394    def ssh_command(self, command):
395        self.ssh_logger.info(command)
396        result = self.ssh_session.cmd(command)
397        stdout_lines = [line.rstrip() for line
398                        in result.stdout_text.splitlines()]
399        for line in stdout_lines:
400            self.ssh_logger.info(line)
401        stderr_lines = [line.rstrip() for line
402                        in result.stderr_text.splitlines()]
403        for line in stderr_lines:
404            self.ssh_logger.warning(line)
405
406        self.assertEqual(result.exit_status, 0,
407                         f'Guest command failed: {command}')
408        return stdout_lines, stderr_lines
409
410class LinuxDistro:
411    """Represents a Linux distribution
412
413    Holds information of known distros.
414    """
415    #: A collection of known distros and their respective image checksum
416    KNOWN_DISTROS = {
417        'fedora': {
418            '31': {
419                'x86_64':
420                {'checksum': ('e3c1b309d9203604922d6e255c2c5d09'
421                              '8a309c2d46215d8fc026954f3c5c27a0'),
422                 'pxeboot_url': ('https://archives.fedoraproject.org/'
423                                 'pub/archive/fedora/linux/releases/31/'
424                                 'Everything/x86_64/os/images/pxeboot/'),
425                 'kernel_params': ('root=UUID=b1438b9b-2cab-4065-a99a-'
426                                   '08a96687f73c ro no_timer_check '
427                                   'net.ifnames=0 console=tty1 '
428                                   'console=ttyS0,115200n8'),
429                },
430                'aarch64':
431                {'checksum': ('1e18d9c0cf734940c4b5d5ec592facae'
432                              'd2af0ad0329383d5639c997fdf16fe49'),
433                'pxeboot_url': 'https://archives.fedoraproject.org/'
434                               'pub/archive/fedora/linux/releases/31/'
435                               'Everything/aarch64/os/images/pxeboot/',
436                'kernel_params': ('root=UUID=b6950a44-9f3c-4076-a9c2-'
437                                  '355e8475b0a7 ro earlyprintk=pl011,0x9000000'
438                                  ' ignore_loglevel no_timer_check'
439                                  ' printk.time=1 rd_NO_PLYMOUTH'
440                                  ' console=ttyAMA0'),
441                },
442                'ppc64':
443                {'checksum': ('7c3528b85a3df4b2306e892199a9e1e4'
444                              '3f991c506f2cc390dc4efa2026ad2f58')},
445                's390x':
446                {'checksum': ('4caaab5a434fd4d1079149a072fdc789'
447                              '1e354f834d355069ca982fdcaf5a122d')},
448            },
449            '32': {
450                'aarch64':
451                {'checksum': ('b367755c664a2d7a26955bbfff985855'
452                              'adfa2ca15e908baf15b4b176d68d3967'),
453                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
454                                'releases/32/Server/aarch64/os/images/'
455                                'pxeboot/'),
456                'kernel_params': ('root=UUID=3df75b65-be8d-4db4-8655-'
457                                  '14d95c0e90c5 ro no_timer_check net.ifnames=0'
458                                  ' console=tty1 console=ttyS0,115200n8'),
459                },
460            },
461            '33': {
462                'aarch64':
463                {'checksum': ('e7f75cdfd523fe5ac2ca9eeece68edc1'
464                              'a81f386a17f969c1d1c7c87031008a6b'),
465                'pxeboot_url': ('http://dl.fedoraproject.org/pub/fedora/linux/'
466                                'releases/33/Server/aarch64/os/images/'
467                                'pxeboot/'),
468                'kernel_params': ('root=UUID=d20b3ffa-6397-4a63-a734-'
469                                  '1126a0208f8a ro no_timer_check net.ifnames=0'
470                                  ' console=tty1 console=ttyS0,115200n8'
471                                  ' console=tty0'),
472                 },
473            },
474        }
475    }
476
477    def __init__(self, name, version, arch):
478        self.name = name
479        self.version = version
480        self.arch = arch
481        try:
482            info = self.KNOWN_DISTROS.get(name).get(version).get(arch)
483        except AttributeError:
484            # Unknown distro
485            info = None
486        self._info = info or {}
487
488    @property
489    def checksum(self):
490        """Gets the cloud-image file checksum"""
491        return self._info.get('checksum', None)
492
493    @checksum.setter
494    def checksum(self, value):
495        self._info['checksum'] = value
496
497    @property
498    def pxeboot_url(self):
499        """Gets the repository url where pxeboot files can be found"""
500        return self._info.get('pxeboot_url', None)
501
502    @property
503    def default_kernel_params(self):
504        """Gets the default kernel parameters"""
505        return self._info.get('kernel_params', None)
506
507
508class LinuxTest(LinuxSSHMixIn, QemuSystemTest):
509    """Facilitates having a cloud-image Linux based available.
510
511    For tests that indent to interact with guests, this is a better choice
512    to start with than the more vanilla `QemuSystemTest` class.
513    """
514
515    timeout = 900
516    distro = None
517    username = 'root'
518    password = 'password'
519    smp = '2'
520    memory = '1024'
521
522    def _set_distro(self):
523        distro_name = self.params.get(
524            'distro',
525            default=self._get_unique_tag_val('distro'))
526        if not distro_name:
527            distro_name = 'fedora'
528
529        distro_version = self.params.get(
530            'distro_version',
531            default=self._get_unique_tag_val('distro_version'))
532        if not distro_version:
533            distro_version = '31'
534
535        self.distro = LinuxDistro(distro_name, distro_version, self.arch)
536
537        # The distro checksum behaves differently than distro name and
538        # version. First, it does not respect a tag with the same
539        # name, given that it's not expected to be used for filtering
540        # (distro name versions are the natural choice).  Second, the
541        # order of precedence is: parameter, attribute and then value
542        # from KNOWN_DISTROS.
543        distro_checksum = self.params.get('distro_checksum',
544                                          default=None)
545        if distro_checksum:
546            self.distro.checksum = distro_checksum
547
548    def setUp(self, ssh_pubkey=None, network_device_type='virtio-net'):
549        super().setUp()
550        self._set_distro()
551        self.vm.add_args('-smp', self.smp)
552        self.vm.add_args('-m', self.memory)
553        # The following network device allows for SSH connections
554        self.vm.add_args('-netdev', 'user,id=vnet,hostfwd=:127.0.0.1:0-:22',
555                         '-device', '%s,netdev=vnet' % network_device_type)
556        self.set_up_boot()
557        if ssh_pubkey is None:
558            ssh_pubkey, self.ssh_key = self.set_up_existing_ssh_keys()
559        self.set_up_cloudinit(ssh_pubkey)
560
561    def set_up_existing_ssh_keys(self):
562        ssh_public_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa.pub')
563        source_private_key = os.path.join(SOURCE_DIR, 'tests', 'keys', 'id_rsa')
564        ssh_dir = os.path.join(self.workdir, '.ssh')
565        os.mkdir(ssh_dir, mode=0o700)
566        ssh_private_key = os.path.join(ssh_dir,
567                                       os.path.basename(source_private_key))
568        shutil.copyfile(source_private_key, ssh_private_key)
569        os.chmod(ssh_private_key, 0o600)
570        return (ssh_public_key, ssh_private_key)
571
572    def download_boot(self):
573        self.log.debug('Looking for and selecting a qemu-img binary to be '
574                       'used to create the bootable snapshot image')
575        # If qemu-img has been built, use it, otherwise the system wide one
576        # will be used.  If none is available, the test will cancel.
577        qemu_img = os.path.join(BUILD_DIR, 'qemu-img')
578        if not os.path.exists(qemu_img):
579            qemu_img = find_command('qemu-img', False)
580        if qemu_img is False:
581            self.cancel('Could not find "qemu-img", which is required to '
582                        'create the bootable image')
583        vmimage.QEMU_IMG = qemu_img
584
585        self.log.info('Downloading/preparing boot image')
586        # Fedora 31 only provides ppc64le images
587        image_arch = self.arch
588        if self.distro.name == 'fedora':
589            if image_arch == 'ppc64':
590                image_arch = 'ppc64le'
591
592        try:
593            boot = vmimage.get(
594                self.distro.name, arch=image_arch, version=self.distro.version,
595                checksum=self.distro.checksum,
596                algorithm='sha256',
597                cache_dir=self.cache_dirs[0],
598                snapshot_dir=self.workdir)
599        except:
600            self.cancel('Failed to download/prepare boot image')
601        return boot.path
602
603    def prepare_cloudinit(self, ssh_pubkey=None):
604        self.log.info('Preparing cloudinit image')
605        try:
606            cloudinit_iso = os.path.join(self.workdir, 'cloudinit.iso')
607            pubkey_content = None
608            if ssh_pubkey:
609                with open(ssh_pubkey) as pubkey:
610                    pubkey_content = pubkey.read()
611            cloudinit.iso(cloudinit_iso, self.name,
612                          username=self.username,
613                          password=self.password,
614                          # QEMU's hard coded usermode router address
615                          phone_home_host='10.0.2.2',
616                          phone_home_port=self.phone_server.server_port,
617                          authorized_key=pubkey_content)
618        except Exception:
619            self.cancel('Failed to prepare the cloudinit image')
620        return cloudinit_iso
621
622    def set_up_boot(self):
623        path = self.download_boot()
624        self.vm.add_args('-drive', 'file=%s' % path)
625
626    def set_up_cloudinit(self, ssh_pubkey=None):
627        self.phone_server = cloudinit.PhoneHomeServer(('0.0.0.0', 0),
628                                                      self.name)
629        cloudinit_iso = self.prepare_cloudinit(ssh_pubkey)
630        self.vm.add_args('-drive', 'file=%s,format=raw' % cloudinit_iso)
631
632    def launch_and_wait(self, set_up_ssh_connection=True):
633        self.vm.set_console()
634        self.vm.launch()
635        console_drainer = datadrainer.LineLogger(self.vm.console_socket.fileno(),
636                                                 logger=self.log.getChild('console'))
637        console_drainer.start()
638        self.log.info('VM launched, waiting for boot confirmation from guest')
639        while not self.phone_server.instance_phoned_back:
640            self.phone_server.handle_request()
641
642        if set_up_ssh_connection:
643            self.log.info('Setting up the SSH connection')
644            self.ssh_connect(self.username, self.ssh_key)
645