xref: /openbmc/qemu/tests/qemu-iotests/testenv.py (revision c2387413)
1# TestEnv class to manage test environment variables.
2#
3# Copyright (c) 2020-2021 Virtuozzo International GmbH
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19import os
20import sys
21import tempfile
22from pathlib import Path
23import shutil
24import collections
25import random
26import subprocess
27import glob
28from typing import Dict, Any, Optional, ContextManager
29
30
31def isxfile(path: str) -> bool:
32    return os.path.isfile(path) and os.access(path, os.X_OK)
33
34
35def get_default_machine(qemu_prog: str) -> str:
36    outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True,
37                          universal_newlines=True,
38                          stdout=subprocess.PIPE).stdout
39
40    machines = outp.split('\n')
41    try:
42        default_machine = next(m for m in machines if m.endswith(' (default)'))
43    except StopIteration:
44        return ''
45    default_machine = default_machine.split(' ', 1)[0]
46
47    alias_suf = ' (alias of {})'.format(default_machine)
48    alias = next((m for m in machines if m.endswith(alias_suf)), None)
49    if alias is not None:
50        default_machine = alias.split(' ', 1)[0]
51
52    return default_machine
53
54
55class TestEnv(ContextManager['TestEnv']):
56    """
57    Manage system environment for running tests
58
59    The following variables are supported/provided. They are represented by
60    lower-cased TestEnv attributes.
61    """
62
63    # We store environment variables as instance attributes, and there are a
64    # lot of them. Silence pylint:
65    # pylint: disable=too-many-instance-attributes
66
67    env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR',
68                     'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG',
69                     'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG',
70                     'SOCKET_SCM_HELPER', 'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS',
71                     'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT',
72                     'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO',
73                     'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU',
74                     'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX',
75                     'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_']
76
77    def get_env(self) -> Dict[str, str]:
78        env = {}
79        for v in self.env_variables:
80            val = getattr(self, v.lower(), None)
81            if val is not None:
82                env[v] = val
83
84        return env
85
86    def init_directories(self) -> None:
87        """Init directory variables:
88             PYTHONPATH
89             TEST_DIR
90             SOCK_DIR
91             SAMPLE_IMG_DIR
92             OUTPUT_DIR
93        """
94        self.pythonpath = os.getenv('PYTHONPATH')
95        if self.pythonpath:
96            self.pythonpath = self.source_iotests + os.pathsep + \
97                self.pythonpath
98        else:
99            self.pythonpath = self.source_iotests
100
101        self.test_dir = os.getenv('TEST_DIR',
102                                  os.path.join(os.getcwd(), 'scratch'))
103        Path(self.test_dir).mkdir(parents=True, exist_ok=True)
104
105        try:
106            self.sock_dir = os.environ['SOCK_DIR']
107            self.tmp_sock_dir = False
108            Path(self.test_dir).mkdir(parents=True, exist_ok=True)
109        except KeyError:
110            self.sock_dir = tempfile.mkdtemp()
111            self.tmp_sock_dir = True
112
113        self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR',
114                                        os.path.join(self.source_iotests,
115                                                     'sample_images'))
116
117        self.output_dir = os.getcwd()  # OUTPUT_DIR
118
119    def init_binaries(self) -> None:
120        """Init binary path variables:
121             PYTHON (for bash tests)
122             QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG
123             SOCKET_SCM_HELPER
124        """
125        self.python = sys.executable
126
127        def root(*names: str) -> str:
128            return os.path.join(self.build_root, *names)
129
130        arch = os.uname().machine
131        if 'ppc64' in arch:
132            arch = 'ppc64'
133
134        self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}'))
135        if not os.path.exists(self.qemu_prog):
136            pattern = root('qemu-system-*')
137            try:
138                progs = sorted(glob.iglob(pattern))
139                self.qemu_prog = next(p for p in progs if isxfile(p))
140            except StopIteration:
141                sys.exit("Not found any Qemu executable binary by pattern "
142                         f"'{pattern}'")
143
144        self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img'))
145        self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io'))
146        self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd'))
147        self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon',
148                                                   'qemu-storage-daemon'))
149
150        for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog,
151                  self.qemu_prog, self.qsd_prog]:
152            if not os.path.exists(b):
153                sys.exit('No such file: ' + b)
154            if not isxfile(b):
155                sys.exit('Not executable: ' + b)
156
157        helper_path = os.path.join(self.build_iotests, 'socket_scm_helper')
158        if isxfile(helper_path):
159            self.socket_scm_helper = helper_path  # SOCKET_SCM_HELPER
160
161    def __init__(self, imgfmt: str, imgproto: str, aiomode: str,
162                 cachemode: Optional[str] = None,
163                 imgopts: Optional[str] = None,
164                 misalign: bool = False,
165                 debug: bool = False,
166                 valgrind: bool = False) -> None:
167        self.imgfmt = imgfmt
168        self.imgproto = imgproto
169        self.aiomode = aiomode
170        self.imgopts = imgopts
171        self.misalign = misalign
172        self.debug = debug
173
174        if valgrind:
175            self.valgrind_qemu = 'y'
176
177        if cachemode is None:
178            self.cachemode_is_default = 'true'
179            self.cachemode = 'writeback'
180        else:
181            self.cachemode_is_default = 'false'
182            self.cachemode = cachemode
183
184        # Initialize generic paths: build_root, build_iotests, source_iotests,
185        # which are needed to initialize some environment variables. They are
186        # used by init_*() functions as well.
187
188        if os.path.islink(sys.argv[0]):
189            # called from the build tree
190            self.source_iotests = os.path.dirname(os.readlink(sys.argv[0]))
191            self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0]))
192        else:
193            # called from the source tree
194            self.source_iotests = os.getcwd()
195            self.build_iotests = self.source_iotests
196
197        self.build_root = os.path.join(self.build_iotests, '..', '..')
198
199        self.init_directories()
200        self.init_binaries()
201
202        self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_',
203                                         str(random.randrange(1, 255)))
204
205        # QEMU_OPTIONS
206        self.qemu_options = '-nodefaults -display none -accel qtest'
207        machine_map = (
208            ('arm', 'virt'),
209            ('aarch64', 'virt'),
210            ('avr', 'mega2560'),
211            ('rx', 'gdbsim-r5f562n8'),
212            ('tricore', 'tricore_testboard')
213        )
214        for suffix, machine in machine_map:
215            if self.qemu_prog.endswith(f'qemu-system-{suffix}'):
216                self.qemu_options += f' -machine {machine}'
217
218        # QEMU_DEFAULT_MACHINE
219        self.qemu_default_machine = get_default_machine(self.qemu_prog)
220
221        self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
222        self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
223
224        is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
225        self.imgfmt_generic = 'true' if is_generic else 'false'
226
227        self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
228        if self.misalign:
229            self.qemu_io_options += ' --misalign'
230
231        self.qemu_io_options_no_fmt = self.qemu_io_options
232
233        if self.imgfmt == 'luks':
234            self.imgoptssyntax = 'true'
235            self.imgkeysecret = '123456'
236            if not self.imgopts:
237                self.imgopts = 'iter-time=10'
238            elif 'iter-time=' not in self.imgopts:
239                self.imgopts += ',iter-time=10'
240        else:
241            self.imgoptssyntax = 'false'
242            self.qemu_io_options += ' -f ' + self.imgfmt
243
244        if self.imgfmt == 'vmdk':
245            if not self.imgopts:
246                self.imgopts = 'zeroed_grain=on'
247            elif 'zeroed_grain=' not in self.imgopts:
248                self.imgopts += ',zeroed_grain=on'
249
250    def close(self) -> None:
251        if self.tmp_sock_dir:
252            shutil.rmtree(self.sock_dir)
253
254    def __enter__(self) -> 'TestEnv':
255        return self
256
257    def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
258        self.close()
259
260    def print_env(self) -> None:
261        template = """\
262QEMU          -- "{QEMU_PROG}" {QEMU_OPTIONS}
263QEMU_IMG      -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS}
264QEMU_IO       -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS}
265QEMU_NBD      -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS}
266IMGFMT        -- {IMGFMT}{imgopts}
267IMGPROTO      -- {IMGPROTO}
268PLATFORM      -- {platform}
269TEST_DIR      -- {TEST_DIR}
270SOCK_DIR      -- {SOCK_DIR}
271SOCKET_SCM_HELPER -- {SOCKET_SCM_HELPER}"""
272
273        args = collections.defaultdict(str, self.get_env())
274
275        if 'IMGOPTS' in args:
276            args['imgopts'] = f" ({args['IMGOPTS']})"
277
278        u = os.uname()
279        args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}'
280
281        print(template.format_map(args))
282