1# TestEnv class to manage test environment variables. 2# 3# Copyright (c) 2020-2021 Virtuozzo International GmbH 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 2 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19import os 20import sys 21import tempfile 22from pathlib import Path 23import shutil 24import collections 25import random 26import subprocess 27import glob 28from typing import List, Dict, Any, Optional, ContextManager 29 30 31def isxfile(path: str) -> bool: 32 return os.path.isfile(path) and os.access(path, os.X_OK) 33 34 35def get_default_machine(qemu_prog: str) -> str: 36 outp = subprocess.run([qemu_prog, '-machine', 'help'], check=True, 37 universal_newlines=True, 38 stdout=subprocess.PIPE).stdout 39 40 machines = outp.split('\n') 41 try: 42 default_machine = next(m for m in machines if m.endswith(' (default)')) 43 except StopIteration: 44 return '' 45 default_machine = default_machine.split(' ', 1)[0] 46 47 alias_suf = ' (alias of {})'.format(default_machine) 48 alias = next((m for m in machines if m.endswith(alias_suf)), None) 49 if alias is not None: 50 default_machine = alias.split(' ', 1)[0] 51 52 return default_machine 53 54 55class TestEnv(ContextManager['TestEnv']): 56 """ 57 Manage system environment for running tests 58 59 The following variables are supported/provided. They are represented by 60 lower-cased TestEnv attributes. 61 """ 62 63 # We store environment variables as instance attributes, and there are a 64 # lot of them. Silence pylint: 65 # pylint: disable=too-many-instance-attributes 66 67 env_variables = ['PYTHONPATH', 'TEST_DIR', 'SOCK_DIR', 'SAMPLE_IMG_DIR', 68 'OUTPUT_DIR', 'PYTHON', 'QEMU_PROG', 'QEMU_IMG_PROG', 69 'QEMU_IO_PROG', 'QEMU_NBD_PROG', 'QSD_PROG', 70 'SOCKET_SCM_HELPER', 'QEMU_OPTIONS', 'QEMU_IMG_OPTIONS', 71 'QEMU_IO_OPTIONS', 'QEMU_IO_OPTIONS_NO_FMT', 72 'QEMU_NBD_OPTIONS', 'IMGOPTS', 'IMGFMT', 'IMGPROTO', 73 'AIOMODE', 'CACHEMODE', 'VALGRIND_QEMU', 74 'CACHEMODE_IS_DEFAULT', 'IMGFMT_GENERIC', 'IMGOPTSSYNTAX', 75 'IMGKEYSECRET', 'QEMU_DEFAULT_MACHINE', 'MALLOC_PERTURB_'] 76 77 def prepare_subprocess(self, args: List[str]) -> Dict[str, str]: 78 if self.debug: 79 args.append('-d') 80 81 with open(args[0], encoding="utf-8") as f: 82 try: 83 if f.readline().rstrip() == '#!/usr/bin/env python3': 84 args.insert(0, self.python) 85 except UnicodeDecodeError: # binary test? for future. 86 pass 87 88 os_env = os.environ.copy() 89 os_env.update(self.get_env()) 90 return os_env 91 92 def get_env(self) -> Dict[str, str]: 93 env = {} 94 for v in self.env_variables: 95 val = getattr(self, v.lower(), None) 96 if val is not None: 97 env[v] = val 98 99 return env 100 101 def init_directories(self) -> None: 102 """Init directory variables: 103 PYTHONPATH 104 TEST_DIR 105 SOCK_DIR 106 SAMPLE_IMG_DIR 107 OUTPUT_DIR 108 """ 109 self.pythonpath = os.getenv('PYTHONPATH') 110 if self.pythonpath: 111 self.pythonpath = self.source_iotests + os.pathsep + \ 112 self.pythonpath 113 else: 114 self.pythonpath = self.source_iotests 115 116 self.test_dir = os.getenv('TEST_DIR', 117 os.path.join(os.getcwd(), 'scratch')) 118 Path(self.test_dir).mkdir(parents=True, exist_ok=True) 119 120 try: 121 self.sock_dir = os.environ['SOCK_DIR'] 122 self.tmp_sock_dir = False 123 Path(self.sock_dir).mkdir(parents=True, exist_ok=True) 124 except KeyError: 125 self.sock_dir = tempfile.mkdtemp() 126 self.tmp_sock_dir = True 127 128 self.sample_img_dir = os.getenv('SAMPLE_IMG_DIR', 129 os.path.join(self.source_iotests, 130 'sample_images')) 131 132 self.output_dir = os.getcwd() # OUTPUT_DIR 133 134 def init_binaries(self) -> None: 135 """Init binary path variables: 136 PYTHON (for bash tests) 137 QEMU_PROG, QEMU_IMG_PROG, QEMU_IO_PROG, QEMU_NBD_PROG, QSD_PROG 138 SOCKET_SCM_HELPER 139 """ 140 self.python = sys.executable 141 142 def root(*names: str) -> str: 143 return os.path.join(self.build_root, *names) 144 145 arch = os.uname().machine 146 if 'ppc64' in arch: 147 arch = 'ppc64' 148 149 self.qemu_prog = os.getenv('QEMU_PROG', root(f'qemu-system-{arch}')) 150 if not os.path.exists(self.qemu_prog): 151 pattern = root('qemu-system-*') 152 try: 153 progs = sorted(glob.iglob(pattern)) 154 self.qemu_prog = next(p for p in progs if isxfile(p)) 155 except StopIteration: 156 sys.exit("Not found any Qemu executable binary by pattern " 157 f"'{pattern}'") 158 159 self.qemu_img_prog = os.getenv('QEMU_IMG_PROG', root('qemu-img')) 160 self.qemu_io_prog = os.getenv('QEMU_IO_PROG', root('qemu-io')) 161 self.qemu_nbd_prog = os.getenv('QEMU_NBD_PROG', root('qemu-nbd')) 162 self.qsd_prog = os.getenv('QSD_PROG', root('storage-daemon', 163 'qemu-storage-daemon')) 164 165 for b in [self.qemu_img_prog, self.qemu_io_prog, self.qemu_nbd_prog, 166 self.qemu_prog, self.qsd_prog]: 167 if not os.path.exists(b): 168 sys.exit('No such file: ' + b) 169 if not isxfile(b): 170 sys.exit('Not executable: ' + b) 171 172 helper_path = os.path.join(self.build_iotests, 'socket_scm_helper') 173 if isxfile(helper_path): 174 self.socket_scm_helper = helper_path # SOCKET_SCM_HELPER 175 176 def __init__(self, imgfmt: str, imgproto: str, aiomode: str, 177 cachemode: Optional[str] = None, 178 imgopts: Optional[str] = None, 179 misalign: bool = False, 180 debug: bool = False, 181 valgrind: bool = False) -> None: 182 self.imgfmt = imgfmt 183 self.imgproto = imgproto 184 self.aiomode = aiomode 185 self.imgopts = imgopts 186 self.misalign = misalign 187 self.debug = debug 188 189 if valgrind: 190 self.valgrind_qemu = 'y' 191 192 if cachemode is None: 193 self.cachemode_is_default = 'true' 194 self.cachemode = 'writeback' 195 else: 196 self.cachemode_is_default = 'false' 197 self.cachemode = cachemode 198 199 # Initialize generic paths: build_root, build_iotests, source_iotests, 200 # which are needed to initialize some environment variables. They are 201 # used by init_*() functions as well. 202 203 if os.path.islink(sys.argv[0]): 204 # called from the build tree 205 self.source_iotests = os.path.dirname(os.readlink(sys.argv[0])) 206 self.build_iotests = os.path.dirname(os.path.abspath(sys.argv[0])) 207 else: 208 # called from the source tree 209 self.source_iotests = os.getcwd() 210 self.build_iotests = self.source_iotests 211 212 self.build_root = os.path.join(self.build_iotests, '..', '..') 213 214 self.init_directories() 215 self.init_binaries() 216 217 self.malloc_perturb_ = os.getenv('MALLOC_PERTURB_', 218 str(random.randrange(1, 255))) 219 220 # QEMU_OPTIONS 221 self.qemu_options = '-nodefaults -display none -accel qtest' 222 machine_map = ( 223 ('arm', 'virt'), 224 ('aarch64', 'virt'), 225 ('avr', 'mega2560'), 226 ('m68k', 'virt'), 227 ('rx', 'gdbsim-r5f562n8'), 228 ('tricore', 'tricore_testboard') 229 ) 230 for suffix, machine in machine_map: 231 if self.qemu_prog.endswith(f'qemu-system-{suffix}'): 232 self.qemu_options += f' -machine {machine}' 233 234 # QEMU_DEFAULT_MACHINE 235 self.qemu_default_machine = get_default_machine(self.qemu_prog) 236 237 self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS') 238 self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS') 239 240 is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg'] 241 self.imgfmt_generic = 'true' if is_generic else 'false' 242 243 self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}' 244 if self.misalign: 245 self.qemu_io_options += ' --misalign' 246 247 self.qemu_io_options_no_fmt = self.qemu_io_options 248 249 if self.imgfmt == 'luks': 250 self.imgoptssyntax = 'true' 251 self.imgkeysecret = '123456' 252 if not self.imgopts: 253 self.imgopts = 'iter-time=10' 254 elif 'iter-time=' not in self.imgopts: 255 self.imgopts += ',iter-time=10' 256 else: 257 self.imgoptssyntax = 'false' 258 self.qemu_io_options += ' -f ' + self.imgfmt 259 260 if self.imgfmt == 'vmdk': 261 if not self.imgopts: 262 self.imgopts = 'zeroed_grain=on' 263 elif 'zeroed_grain=' not in self.imgopts: 264 self.imgopts += ',zeroed_grain=on' 265 266 def close(self) -> None: 267 if self.tmp_sock_dir: 268 shutil.rmtree(self.sock_dir) 269 270 def __enter__(self) -> 'TestEnv': 271 return self 272 273 def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: 274 self.close() 275 276 def print_env(self) -> None: 277 template = """\ 278QEMU -- "{QEMU_PROG}" {QEMU_OPTIONS} 279QEMU_IMG -- "{QEMU_IMG_PROG}" {QEMU_IMG_OPTIONS} 280QEMU_IO -- "{QEMU_IO_PROG}" {QEMU_IO_OPTIONS} 281QEMU_NBD -- "{QEMU_NBD_PROG}" {QEMU_NBD_OPTIONS} 282IMGFMT -- {IMGFMT}{imgopts} 283IMGPROTO -- {IMGPROTO} 284PLATFORM -- {platform} 285TEST_DIR -- {TEST_DIR} 286SOCK_DIR -- {SOCK_DIR} 287SOCKET_SCM_HELPER -- {SOCKET_SCM_HELPER} 288""" 289 290 args = collections.defaultdict(str, self.get_env()) 291 292 if 'IMGOPTS' in args: 293 args['imgopts'] = f" ({args['IMGOPTS']})" 294 295 u = os.uname() 296 args['platform'] = f'{u.sysname}/{u.machine} {u.nodename} {u.release}' 297 298 print(template.format_map(args)) 299