xref: /openbmc/qemu/tests/functional/test_migration.py (revision 8a2f1f921cc84cae3aa54c29e24e8c1defc9ef34)
1#!/usr/bin/env python3
2#
3# Migration test
4#
5# Copyright (c) 2019 Red Hat, Inc.
6#
7# Authors:
8#  Cleber Rosa <crosa@redhat.com>
9#  Caio Carrara <ccarrara@redhat.com>
10#
11# This work is licensed under the terms of the GNU GPL, version 2 or
12# later.  See the COPYING file in the top-level directory.
13
14
15import tempfile
16import os
17import time
18
19from qemu_test import QemuSystemTest, skipIfMissingCommands
20from qemu_test.ports import Ports
21
22class MigrationTest(QemuSystemTest):
23
24    timeout = 10
25
26    @staticmethod
27    def migration_finished(vm):
28        return vm.cmd('query-migrate')['status'] in ('completed', 'failed')
29
30    def assert_migration(self, src_vm, dst_vm):
31
32        end = time.monotonic() + self.timeout
33        while time.monotonic() < end and not self.migration_finished(src_vm):
34           time.sleep(0.1)
35
36        end = time.monotonic() + self.timeout
37        while time.monotonic() < end and not self.migration_finished(dst_vm):
38           time.sleep(0.1)
39
40        self.assertEqual(src_vm.cmd('query-migrate')['status'], 'completed')
41        self.assertEqual(dst_vm.cmd('query-migrate')['status'], 'completed')
42        self.assertEqual(dst_vm.cmd('query-status')['status'], 'running')
43        self.assertEqual(src_vm.cmd('query-status')['status'],'postmigrate')
44
45    def select_machine(self):
46        target_machine = {
47            'aarch64': 'quanta-gsj',
48            'alpha': 'clipper',
49            'arm': 'npcm750-evb',
50            'i386': 'isapc',
51            'ppc': 'sam460ex',
52            'ppc64': 'mac99',
53            'riscv32': 'spike',
54            'riscv64': 'virt',
55            'sparc': 'SS-4',
56            'sparc64': 'sun4u',
57            'x86_64': 'microvm',
58        }
59        self.set_machine(target_machine[self.arch])
60
61    def do_migrate(self, dest_uri, src_uri=None):
62        self.select_machine()
63        dest_vm = self.get_vm('-incoming', dest_uri, name="dest-qemu")
64        dest_vm.add_args('-nodefaults')
65        dest_vm.launch()
66        if src_uri is None:
67            src_uri = dest_uri
68        source_vm = self.get_vm(name="source-qemu")
69        source_vm.add_args('-nodefaults')
70        source_vm.launch()
71        source_vm.qmp('migrate', uri=src_uri)
72        self.assert_migration(source_vm, dest_vm)
73
74    def _get_free_port(self, ports):
75        port = ports.find_free_port()
76        if port is None:
77            self.skipTest('Failed to find a free port')
78        return port
79
80    def test_migration_with_tcp_localhost(self):
81        with Ports() as ports:
82            dest_uri = 'tcp:localhost:%u' % self._get_free_port(ports)
83            self.do_migrate(dest_uri)
84
85    def test_migration_with_unix(self):
86        with tempfile.TemporaryDirectory(prefix='socket_') as socket_path:
87            dest_uri = 'unix:%s/qemu-test.sock' % socket_path
88            self.do_migrate(dest_uri)
89
90    @skipIfMissingCommands('nc')
91    def test_migration_with_exec(self):
92        """The test works for both netcat-traditional and netcat-openbsd packages."""
93        with Ports() as ports:
94            free_port = self._get_free_port(ports)
95            dest_uri = 'exec:nc -l localhost %u' % free_port
96            src_uri = 'exec:nc localhost %u' % free_port
97            self.do_migrate(dest_uri, src_uri)
98
99if __name__ == '__main__':
100    QemuSystemTest.main()
101