1fa32a634SThomas Huth# Test class and utilities for functional tests
2fa32a634SThomas Huth#
3fa32a634SThomas Huth# Copyright 2018, 2024 Red Hat, Inc.
4fa32a634SThomas Huth#
5fa32a634SThomas Huth# Original Author (Avocado-based tests):
6fa32a634SThomas Huth#  Cleber Rosa <crosa@redhat.com>
7fa32a634SThomas Huth#
8fa32a634SThomas Huth# Adaption for standalone version:
9fa32a634SThomas Huth#  Thomas Huth <thuth@redhat.com>
10fa32a634SThomas Huth#
11fa32a634SThomas Huth# This work is licensed under the terms of the GNU GPL, version 2 or
12fa32a634SThomas Huth# later.  See the COPYING file in the top-level directory.
13fa32a634SThomas Huth
14fa32a634SThomas Huthimport logging
15fa32a634SThomas Huthimport os
16fa32a634SThomas Huthimport pycotap
17dbaaef7dSDaniel P. Berrangéimport shutil
18dbaaef7dSDaniel P. Berrangéimport subprocess
19fa32a634SThomas Huthimport sys
20fa32a634SThomas Huthimport unittest
21fa32a634SThomas Huthimport uuid
22fa32a634SThomas Huth
23fa32a634SThomas Huthfrom qemu.machine import QEMUMachine
24fa32a634SThomas Huthfrom qemu.utils import kvm_available, tcg_available
25fa32a634SThomas Huth
26f57213f8SDaniel P. Berrangéfrom .asset import Asset
27fa32a634SThomas Huthfrom .cmd import run_cmd
28fa32a634SThomas Huthfrom .config import BUILD_DIR
29fa32a634SThomas Huth
30fa32a634SThomas Huth
31fa32a634SThomas Huthclass QemuBaseTest(unittest.TestCase):
32fa32a634SThomas Huth
33fa32a634SThomas Huth    qemu_bin = os.getenv('QEMU_TEST_QEMU_BINARY')
34fa32a634SThomas Huth    arch = None
35fa32a634SThomas Huth
36fa32a634SThomas Huth    workdir = None
3784e4a27fSThomas Huth    log = None
3884e4a27fSThomas Huth    logdir = None
39fa32a634SThomas Huth
40fa32a634SThomas Huth    def setUp(self, bin_prefix):
41fa32a634SThomas Huth        self.assertIsNotNone(self.qemu_bin, 'QEMU_TEST_QEMU_BINARY must be set')
42fa32a634SThomas Huth        self.arch = self.qemu_bin.split('-')[-1]
43fa32a634SThomas Huth
44dbaaef7dSDaniel P. Berrangé        self.outputdir = os.path.join(BUILD_DIR, 'tests', 'functional',
45dbaaef7dSDaniel P. Berrangé                                      self.arch, self.id())
46dbaaef7dSDaniel P. Berrangé        self.workdir = os.path.join(self.outputdir, 'scratch')
47fa32a634SThomas Huth        os.makedirs(self.workdir, exist_ok=True)
48fa32a634SThomas Huth
49dbaaef7dSDaniel P. Berrangé        self.logdir = self.outputdir
50bb986e40SThomas Huth        self.log_filename = os.path.join(self.logdir, 'base.log')
5184e4a27fSThomas Huth        self.log = logging.getLogger('qemu-test')
5284e4a27fSThomas Huth        self.log.setLevel(logging.DEBUG)
53bb986e40SThomas Huth        self._log_fh = logging.FileHandler(self.log_filename, mode='w')
5484e4a27fSThomas Huth        self._log_fh.setLevel(logging.DEBUG)
5584e4a27fSThomas Huth        fileFormatter = logging.Formatter(
5684e4a27fSThomas Huth            '%(asctime)s - %(levelname)s: %(message)s')
5784e4a27fSThomas Huth        self._log_fh.setFormatter(fileFormatter)
5884e4a27fSThomas Huth        self.log.addHandler(self._log_fh)
5984e4a27fSThomas Huth
609bcfead1SDaniel P. Berrangé        # Capture QEMUMachine logging
619bcfead1SDaniel P. Berrangé        self.machinelog = logging.getLogger('qemu.machine')
629bcfead1SDaniel P. Berrangé        self.machinelog.setLevel(logging.DEBUG)
639bcfead1SDaniel P. Berrangé        self.machinelog.addHandler(self._log_fh)
649bcfead1SDaniel P. Berrangé
6584e4a27fSThomas Huth    def tearDown(self):
66dbaaef7dSDaniel P. Berrangé        if "QEMU_TEST_KEEP_SCRATCH" not in os.environ:
67dbaaef7dSDaniel P. Berrangé            shutil.rmtree(self.workdir)
689bcfead1SDaniel P. Berrangé        self.machinelog.removeHandler(self._log_fh)
6984e4a27fSThomas Huth        self.log.removeHandler(self._log_fh)
7084e4a27fSThomas Huth
71fa32a634SThomas Huth    def main():
72fa32a634SThomas Huth        path = os.path.basename(sys.argv[0])[:-3]
73f57213f8SDaniel P. Berrangé
74f57213f8SDaniel P. Berrangé        cache = os.environ.get("QEMU_TEST_PRECACHE", None)
75f57213f8SDaniel P. Berrangé        if cache is not None:
76f57213f8SDaniel P. Berrangé            Asset.precache_suites(path, cache)
77f57213f8SDaniel P. Berrangé            return
78f57213f8SDaniel P. Berrangé
79fa32a634SThomas Huth        tr = pycotap.TAPTestRunner(message_log = pycotap.LogMode.LogToError,
80fa32a634SThomas Huth                                   test_output_log = pycotap.LogMode.LogToError)
81bb986e40SThomas Huth        res = unittest.main(module = None, testRunner = tr, exit = False,
82bb986e40SThomas Huth                            argv=["__dummy__", path])
83bb986e40SThomas Huth        for (test, message) in res.result.errors + res.result.failures:
84*f5578e42SDaniel P. Berrangé
85*f5578e42SDaniel P. Berrangé            if hasattr(test, "log_filename"):
86bb986e40SThomas Huth                print('More information on ' + test.id() + ' could be found here:'
87bb986e40SThomas Huth                      '\n %s' % test.log_filename, file=sys.stderr)
88bb986e40SThomas Huth                if hasattr(test, 'console_log_name'):
89bb986e40SThomas Huth                    print(' %s' % test.console_log_name, file=sys.stderr)
90bb986e40SThomas Huth        sys.exit(not res.result.wasSuccessful())
91fa32a634SThomas Huth
92fa32a634SThomas Huth
9399465d3fSPhilippe Mathieu-Daudéclass QemuUserTest(QemuBaseTest):
9499465d3fSPhilippe Mathieu-Daudé
9599465d3fSPhilippe Mathieu-Daudé    def setUp(self):
9699465d3fSPhilippe Mathieu-Daudé        super().setUp('qemu-')
9799465d3fSPhilippe Mathieu-Daudé        self._ldpath = []
9899465d3fSPhilippe Mathieu-Daudé
9999465d3fSPhilippe Mathieu-Daudé    def add_ldpath(self, ldpath):
10099465d3fSPhilippe Mathieu-Daudé        self._ldpath.append(os.path.abspath(ldpath))
10199465d3fSPhilippe Mathieu-Daudé
10299465d3fSPhilippe Mathieu-Daudé    def run_cmd(self, bin_path, args=[]):
10399465d3fSPhilippe Mathieu-Daudé        return subprocess.run([self.qemu_bin]
10499465d3fSPhilippe Mathieu-Daudé                              + ["-L %s" % ldpath for ldpath in self._ldpath]
10599465d3fSPhilippe Mathieu-Daudé                              + [bin_path]
10699465d3fSPhilippe Mathieu-Daudé                              + args,
10799465d3fSPhilippe Mathieu-Daudé                              text=True, capture_output=True)
10899465d3fSPhilippe Mathieu-Daudé
109fa32a634SThomas Huthclass QemuSystemTest(QemuBaseTest):
110fa32a634SThomas Huth    """Facilitates system emulation tests."""
111fa32a634SThomas Huth
112fa32a634SThomas Huth    cpu = None
113fa32a634SThomas Huth    machine = None
114fa32a634SThomas Huth    _machinehelp = None
115fa32a634SThomas Huth
116fa32a634SThomas Huth    def setUp(self):
117fa32a634SThomas Huth        self._vms = {}
118fa32a634SThomas Huth
119fa32a634SThomas Huth        super().setUp('qemu-system-')
120fa32a634SThomas Huth
12184e4a27fSThomas Huth        console_log = logging.getLogger('console')
12284e4a27fSThomas Huth        console_log.setLevel(logging.DEBUG)
123dbaaef7dSDaniel P. Berrangé        self.console_log_name = os.path.join(self.logdir, 'console.log')
124bb986e40SThomas Huth        self._console_log_fh = logging.FileHandler(self.console_log_name,
125bb986e40SThomas Huth                                                   mode='w')
12684e4a27fSThomas Huth        self._console_log_fh.setLevel(logging.DEBUG)
12784e4a27fSThomas Huth        fileFormatter = logging.Formatter('%(asctime)s: %(message)s')
12884e4a27fSThomas Huth        self._console_log_fh.setFormatter(fileFormatter)
12984e4a27fSThomas Huth        console_log.addHandler(self._console_log_fh)
13084e4a27fSThomas Huth
131fa32a634SThomas Huth    def set_machine(self, machinename):
132fa32a634SThomas Huth        # TODO: We should use QMP to get the list of available machines
133fa32a634SThomas Huth        if not self._machinehelp:
134fa32a634SThomas Huth            self._machinehelp = run_cmd([self.qemu_bin, '-M', 'help'])[0];
135fa32a634SThomas Huth        if self._machinehelp.find(machinename) < 0:
136fa32a634SThomas Huth            self.skipTest('no support for machine ' + machinename)
137fa32a634SThomas Huth        self.machine = machinename
138fa32a634SThomas Huth
139fa32a634SThomas Huth    def require_accelerator(self, accelerator):
140fa32a634SThomas Huth        """
141fa32a634SThomas Huth        Requires an accelerator to be available for the test to continue
142fa32a634SThomas Huth
143fa32a634SThomas Huth        It takes into account the currently set qemu binary.
144fa32a634SThomas Huth
145fa32a634SThomas Huth        If the check fails, the test is canceled.  If the check itself
146fa32a634SThomas Huth        for the given accelerator is not available, the test is also
147fa32a634SThomas Huth        canceled.
148fa32a634SThomas Huth
149fa32a634SThomas Huth        :param accelerator: name of the accelerator, such as "kvm" or "tcg"
150fa32a634SThomas Huth        :type accelerator: str
151fa32a634SThomas Huth        """
152fa32a634SThomas Huth        checker = {'tcg': tcg_available,
153fa32a634SThomas Huth                   'kvm': kvm_available}.get(accelerator)
154fa32a634SThomas Huth        if checker is None:
155fa32a634SThomas Huth            self.skipTest("Don't know how to check for the presence "
156fa32a634SThomas Huth                          "of accelerator %s" % accelerator)
157fa32a634SThomas Huth        if not checker(qemu_bin=self.qemu_bin):
158fa32a634SThomas Huth            self.skipTest("%s accelerator does not seem to be "
159fa32a634SThomas Huth                          "available" % accelerator)
160fa32a634SThomas Huth
161fa32a634SThomas Huth    def require_netdev(self, netdevname):
162fa32a634SThomas Huth        netdevhelp = run_cmd([self.qemu_bin,
163fa32a634SThomas Huth                             '-M', 'none', '-netdev', 'help'])[0];
164fa32a634SThomas Huth        if netdevhelp.find('\n' + netdevname + '\n') < 0:
165fa32a634SThomas Huth            self.skipTest('no support for " + netdevname + " networking')
166fa32a634SThomas Huth
167fa32a634SThomas Huth    def require_device(self, devicename):
168fa32a634SThomas Huth        devhelp = run_cmd([self.qemu_bin,
169fa32a634SThomas Huth                           '-M', 'none', '-device', 'help'])[0];
170fa32a634SThomas Huth        if devhelp.find(devicename) < 0:
171fa32a634SThomas Huth            self.skipTest('no support for device ' + devicename)
172fa32a634SThomas Huth
173fa32a634SThomas Huth    def _new_vm(self, name, *args):
174e6d69e0fSDaniel P. Berrangé        vm = QEMUMachine(self.qemu_bin,
1759f85aff9SDaniel P. Berrangé                         name=name,
176e6d69e0fSDaniel P. Berrangé                         base_temp_dir=self.workdir,
177e6d69e0fSDaniel P. Berrangé                         log_dir=self.logdir)
178fa32a634SThomas Huth        self.log.debug('QEMUMachine "%s" created', name)
179fa32a634SThomas Huth        self.log.debug('QEMUMachine "%s" temp_dir: %s', name, vm.temp_dir)
1801a8755a5SDaniel P. Berrangé
1811a8755a5SDaniel P. Berrangé        sockpath = os.environ.get("QEMU_TEST_QMP_BACKDOOR", None)
1821a8755a5SDaniel P. Berrangé        if sockpath is not None:
1831a8755a5SDaniel P. Berrangé            vm.add_args("-chardev",
1841a8755a5SDaniel P. Berrangé                        f"socket,id=backdoor,path={sockpath},server=on,wait=off",
1851a8755a5SDaniel P. Berrangé                        "-mon", "chardev=backdoor,mode=control")
1861a8755a5SDaniel P. Berrangé
187fa32a634SThomas Huth        if args:
188fa32a634SThomas Huth            vm.add_args(*args)
189fa32a634SThomas Huth        return vm
190fa32a634SThomas Huth
191fa32a634SThomas Huth    @property
192fa32a634SThomas Huth    def vm(self):
193fa32a634SThomas Huth        return self.get_vm(name='default')
194fa32a634SThomas Huth
195fa32a634SThomas Huth    def get_vm(self, *args, name=None):
196fa32a634SThomas Huth        if not name:
197fa32a634SThomas Huth            name = str(uuid.uuid4())
198fa32a634SThomas Huth        if self._vms.get(name) is None:
199fa32a634SThomas Huth            self._vms[name] = self._new_vm(name, *args)
200fa32a634SThomas Huth            if self.cpu is not None:
201fa32a634SThomas Huth                self._vms[name].add_args('-cpu', self.cpu)
202fa32a634SThomas Huth            if self.machine is not None:
203fa32a634SThomas Huth                self._vms[name].set_machine(self.machine)
204fa32a634SThomas Huth        return self._vms[name]
205fa32a634SThomas Huth
206fa32a634SThomas Huth    def set_vm_arg(self, arg, value):
207fa32a634SThomas Huth        """
208fa32a634SThomas Huth        Set an argument to list of extra arguments to be given to the QEMU
209fa32a634SThomas Huth        binary. If the argument already exists then its value is replaced.
210fa32a634SThomas Huth
211fa32a634SThomas Huth        :param arg: the QEMU argument, such as "-cpu" in "-cpu host"
212fa32a634SThomas Huth        :type arg: str
213fa32a634SThomas Huth        :param value: the argument value, such as "host" in "-cpu host"
214fa32a634SThomas Huth        :type value: str
215fa32a634SThomas Huth        """
216fa32a634SThomas Huth        if not arg or not value:
217fa32a634SThomas Huth            return
218fa32a634SThomas Huth        if arg not in self.vm.args:
219fa32a634SThomas Huth            self.vm.args.extend([arg, value])
220fa32a634SThomas Huth        else:
221fa32a634SThomas Huth            idx = self.vm.args.index(arg) + 1
222fa32a634SThomas Huth            if idx < len(self.vm.args):
223fa32a634SThomas Huth                self.vm.args[idx] = value
224fa32a634SThomas Huth            else:
225fa32a634SThomas Huth                self.vm.args.append(value)
226fa32a634SThomas Huth
227fa32a634SThomas Huth    def tearDown(self):
228fa32a634SThomas Huth        for vm in self._vms.values():
229fa32a634SThomas Huth            vm.shutdown()
23084e4a27fSThomas Huth        logging.getLogger('console').removeHandler(self._console_log_fh)
231fa32a634SThomas Huth        super().tearDown()
232