1# Test class and utilities for functional tests
2#
3# Copyright 2018, 2024 Red Hat, Inc.
4#
5# Original Author (Avocado-based tests):
6#  Cleber Rosa <crosa@redhat.com>
7#
8# Adaption for standalone version:
9#  Thomas Huth <thuth@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14import logging
15import os
16import pycotap
17import sys
18import unittest
19import uuid
20
21from qemu.machine import QEMUMachine
22from qemu.utils import kvm_available, tcg_available
23
24from .asset import Asset
25from .cmd import run_cmd
26from .config import BUILD_DIR
27
28
29class QemuBaseTest(unittest.TestCase):
30
31    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
32    arch = None
33
34    workdir = None
35    log = None
36    logdir = None
37
38    def setUp(self, bin_prefix):
39        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
40        self.arch = self.qemu_bin.split('-')[-1]
41
42        self.workdir = os.path.join(BUILD_DIR, 'tests/functional', self.arch,
43                                    self.id())
44        os.makedirs(self.workdir, exist_ok=True)
45
46        self.logdir = self.workdir
47        self.log = logging.getLogger('qemu-test')
48        self.log.setLevel(logging.DEBUG)
49        self._log_fh = logging.FileHandler(os.path.join(self.logdir,
50                                                        'base.log'), mode='w')
51        self._log_fh.setLevel(logging.DEBUG)
52        fileFormatter = logging.Formatter(
53            '%(asctime)s - %(levelname)s: %(message)s')
54        self._log_fh.setFormatter(fileFormatter)
55        self.log.addHandler(self._log_fh)
56
57    def tearDown(self):
58        self.log.removeHandler(self._log_fh)
59
60    def main():
61        path = os.path.basename(sys.argv[0])[:-3]
62
63        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
64        if cache is not None:
65            Asset.precache_suites(path, cache)
66            return
67
68        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
69                                   test_output_log = pycotap.LogMode.LogToError)
70        unittest.main(module = None, testRunner = tr, argv=["__dummy__", path])
71
72
73class QemuSystemTest(QemuBaseTest):
74    """Facilitates system emulation tests."""
75
76    cpu = None
77    machine = None
78    _machinehelp = None
79
80    def setUp(self):
81        self._vms = {}
82
83        super().setUp('qemu-system-')
84
85        console_log = logging.getLogger('console')
86        console_log.setLevel(logging.DEBUG)
87        self._console_log_fh = logging.FileHandler(os.path.join(self.workdir,
88                                                   'console.log'), mode='w')
89        self._console_log_fh.setLevel(logging.DEBUG)
90        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
91        self._console_log_fh.setFormatter(fileFormatter)
92        console_log.addHandler(self._console_log_fh)
93
94    def set_machine(self, machinename):
95        # TODO: We should use QMP to get the list of available machines
96        if not self._machinehelp:
97            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
98        if self._machinehelp.find(machinename) < 0:
99            self.skipTest('no support for machine ' + machinename)
100        self.machine = machinename
101
102    def require_accelerator(self, accelerator):
103        """
104        Requires an accelerator to be available for the test to continue
105
106        It takes into account the currently set qemu binary.
107
108        If the check fails, the test is canceled.  If the check itself
109        for the given accelerator is not available, the test is also
110        canceled.
111
112        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
113        :type accelerator: str
114        """
115        checker = {'tcg': tcg_available,
116                   'kvm': kvm_available}.get(accelerator)
117        if checker is None:
118            self.skipTest("Don't know how to check for the presence "
119                          "of accelerator %s" % accelerator)
120        if not checker(qemu_bin=self.qemu_bin):
121            self.skipTest("%s accelerator does not seem to be "
122                          "available" % accelerator)
123
124    def require_netdev(self, netdevname):
125        netdevhelp = run_cmd([self.qemu_bin,
126                             '-M', 'none', '-netdev', 'help'])[0];
127        if netdevhelp.find('\n' + netdevname + '\n') < 0:
128            self.skipTest('no support for " + netdevname + " networking')
129
130    def require_device(self, devicename):
131        devhelp = run_cmd([self.qemu_bin,
132                           '-M', 'none', '-device', 'help'])[0];
133        if devhelp.find(devicename) < 0:
134            self.skipTest('no support for device ' + devicename)
135
136    def _new_vm(self, name, *args):
137        vm = QEMUMachine(self.qemu_bin, base_temp_dir=self.workdir)
138        self.log.debug('QEMUMachine "%s" created', name)
139        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
140        self.log.debug('QEMUMachine "%s" log_dir: %s', name, vm.log_dir)
141        if args:
142            vm.add_args(*args)
143        return vm
144
145    @property
146    def vm(self):
147        return self.get_vm(name='default')
148
149    def get_vm(self, *args, name=None):
150        if not name:
151            name = str(uuid.uuid4())
152        if self._vms.get(name) is None:
153            self._vms[name] = self._new_vm(name, *args)
154            if self.cpu is not None:
155                self._vms[name].add_args('-cpu', self.cpu)
156            if self.machine is not None:
157                self._vms[name].set_machine(self.machine)
158        return self._vms[name]
159
160    def set_vm_arg(self, arg, value):
161        """
162        Set an argument to list of extra arguments to be given to the QEMU
163        binary. If the argument already exists then its value is replaced.
164
165        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
166        :type arg: str
167        :param value: the argument value, such as "host" in "-cpu host"
168        :type value: str
169        """
170        if not arg or not value:
171            return
172        if arg not in self.vm.args:
173            self.vm.args.extend([arg, value])
174        else:
175            idx = self.vm.args.index(arg) + 1
176            if idx < len(self.vm.args):
177                self.vm.args[idx] = value
178            else:
179                self.vm.args.append(value)
180
181    def tearDown(self):
182        for vm in self._vms.values():
183            vm.shutdown()
184        logging.getLogger('console').removeHandler(self._console_log_fh)
185        super().tearDown()
186