1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import threading 18from typing import Iterator, List, Optional, Tuple 19from types import FrameType 20 21import kunit_config 22import qemu_config 23 24KCONFIG_PATH = '.config' 25KUNITCONFIG_PATH = '.kunitconfig' 26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 30OUTFILE_PATH = 'test.log' 31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 33 34class ConfigError(Exception): 35 """Represents an error trying to configure the Linux kernel.""" 36 37 38class BuildError(Exception): 39 """Represents an error trying to build the Linux kernel.""" 40 41 42class LinuxSourceTreeOperations: 43 """An abstraction over command line operations performed on a source tree.""" 44 45 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 46 self._linux_arch = linux_arch 47 self._cross_compile = cross_compile 48 49 def make_mrproper(self) -> None: 50 try: 51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 52 except OSError as e: 53 raise ConfigError('Could not call make command: ' + str(e)) 54 except subprocess.CalledProcessError as e: 55 raise ConfigError(e.output.decode()) 56 57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 58 return base_kunitconfig 59 60 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None: 61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 62 if self._cross_compile: 63 command += ['CROSS_COMPILE=' + self._cross_compile] 64 if make_options: 65 command.extend(make_options) 66 print('Populating config with:\n$', ' '.join(command)) 67 try: 68 subprocess.check_output(command, stderr=subprocess.STDOUT) 69 except OSError as e: 70 raise ConfigError('Could not call make command: ' + str(e)) 71 except subprocess.CalledProcessError as e: 72 raise ConfigError(e.output.decode()) 73 74 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None: 75 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] 76 if make_options: 77 command.extend(make_options) 78 if self._cross_compile: 79 command += ['CROSS_COMPILE=' + self._cross_compile] 80 print('Building with:\n$', ' '.join(command)) 81 try: 82 proc = subprocess.Popen(command, 83 stderr=subprocess.PIPE, 84 stdout=subprocess.DEVNULL) 85 except OSError as e: 86 raise BuildError('Could not call execute make: ' + str(e)) 87 except subprocess.CalledProcessError as e: 88 raise BuildError(e.output) 89 _, stderr = proc.communicate() 90 if proc.returncode != 0: 91 raise BuildError(stderr.decode()) 92 if stderr: # likely only due to build warnings 93 print(stderr.decode()) 94 95 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 96 raise RuntimeError('not implemented!') 97 98 99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 100 101 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 102 super().__init__(linux_arch=qemu_arch_params.linux_arch, 103 cross_compile=cross_compile) 104 self._kconfig = qemu_arch_params.kconfig 105 self._qemu_arch = qemu_arch_params.qemu_arch 106 self._kernel_path = qemu_arch_params.kernel_path 107 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' 108 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 109 self._serial = qemu_arch_params.serial 110 111 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 112 kconfig = kunit_config.parse_from_string(self._kconfig) 113 kconfig.merge_in_entries(base_kunitconfig) 114 return kconfig 115 116 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 117 kernel_path = os.path.join(build_dir, self._kernel_path) 118 qemu_command = ['qemu-system-' + self._qemu_arch, 119 '-nodefaults', 120 '-m', '1024', 121 '-kernel', kernel_path, 122 '-append', ' '.join(params + [self._kernel_command_line]), 123 '-no-reboot', 124 '-nographic', 125 '-serial', self._serial] + self._extra_qemu_params 126 # Note: shlex.join() does what we want, but requires python 3.8+. 127 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 128 return subprocess.Popen(qemu_command, 129 stdin=subprocess.PIPE, 130 stdout=subprocess.PIPE, 131 stderr=subprocess.STDOUT, 132 text=True, errors='backslashreplace') 133 134class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 135 """An abstraction over command line operations performed on a source tree.""" 136 137 def __init__(self, cross_compile: Optional[str]=None): 138 super().__init__(linux_arch='um', cross_compile=cross_compile) 139 140 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 141 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 142 kconfig.merge_in_entries(base_kunitconfig) 143 return kconfig 144 145 def start(self, params: List[str], build_dir: str) -> subprocess.Popen[str]: 146 """Runs the Linux UML binary. Must be named 'linux'.""" 147 linux_bin = os.path.join(build_dir, 'linux') 148 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 149 return subprocess.Popen([linux_bin] + params, 150 stdin=subprocess.PIPE, 151 stdout=subprocess.PIPE, 152 stderr=subprocess.STDOUT, 153 text=True, errors='backslashreplace') 154 155def get_kconfig_path(build_dir: str) -> str: 156 return os.path.join(build_dir, KCONFIG_PATH) 157 158def get_kunitconfig_path(build_dir: str) -> str: 159 return os.path.join(build_dir, KUNITCONFIG_PATH) 160 161def get_old_kunitconfig_path(build_dir: str) -> str: 162 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 163 164def get_parsed_kunitconfig(build_dir: str, 165 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 166 if not kunitconfig_paths: 167 path = get_kunitconfig_path(build_dir) 168 if not os.path.exists(path): 169 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 170 return kunit_config.parse_file(path) 171 172 merged = kunit_config.Kconfig() 173 174 for path in kunitconfig_paths: 175 if os.path.isdir(path): 176 path = os.path.join(path, KUNITCONFIG_PATH) 177 if not os.path.exists(path): 178 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 179 180 partial = kunit_config.parse_file(path) 181 diff = merged.conflicting_options(partial) 182 if diff: 183 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 184 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 185 merged.merge_in_entries(partial) 186 return merged 187 188def get_outfile_path(build_dir: str) -> str: 189 return os.path.join(build_dir, OUTFILE_PATH) 190 191def _default_qemu_config_path(arch: str) -> str: 192 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 193 if os.path.isfile(config_path): 194 return config_path 195 196 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 197 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 198 199def _get_qemu_ops(config_path: str, 200 extra_qemu_args: Optional[List[str]], 201 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 202 # The module name/path has very little to do with where the actual file 203 # exists (I learned this through experimentation and could not find it 204 # anywhere in the Python documentation). 205 # 206 # Bascially, we completely ignore the actual file location of the config 207 # we are loading and just tell Python that the module lives in the 208 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 209 # exists as a file. 210 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 211 spec = importlib.util.spec_from_file_location(module_path, config_path) 212 assert spec is not None 213 config = importlib.util.module_from_spec(spec) 214 # See https://github.com/python/typeshed/pull/2626 for context. 215 assert isinstance(spec.loader, importlib.abc.Loader) 216 spec.loader.exec_module(config) 217 218 if not hasattr(config, 'QEMU_ARCH'): 219 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 220 params: qemu_config.QemuArchParams = config.QEMU_ARCH 221 if extra_qemu_args: 222 params.extra_qemu_params.extend(extra_qemu_args) 223 return params.linux_arch, LinuxSourceTreeOperationsQemu( 224 params, cross_compile=cross_compile) 225 226class LinuxSourceTree: 227 """Represents a Linux kernel source tree with KUnit tests.""" 228 229 def __init__( 230 self, 231 build_dir: str, 232 kunitconfig_paths: Optional[List[str]]=None, 233 kconfig_add: Optional[List[str]]=None, 234 arch: Optional[str]=None, 235 cross_compile: Optional[str]=None, 236 qemu_config_path: Optional[str]=None, 237 extra_qemu_args: Optional[List[str]]=None) -> None: 238 signal.signal(signal.SIGINT, self.signal_handler) 239 if qemu_config_path: 240 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 241 else: 242 self._arch = 'um' if arch is None else arch 243 if self._arch == 'um': 244 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 245 else: 246 qemu_config_path = _default_qemu_config_path(self._arch) 247 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 248 249 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 250 if kconfig_add: 251 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 252 self._kconfig.merge_in_entries(kconfig) 253 254 def arch(self) -> str: 255 return self._arch 256 257 def clean(self) -> bool: 258 try: 259 self._ops.make_mrproper() 260 except ConfigError as e: 261 logging.error(e) 262 return False 263 return True 264 265 def validate_config(self, build_dir: str) -> bool: 266 kconfig_path = get_kconfig_path(build_dir) 267 validated_kconfig = kunit_config.parse_file(kconfig_path) 268 if self._kconfig.is_subset_of(validated_kconfig): 269 return True 270 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 271 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 272 'This is probably due to unsatisfied dependencies.\n' \ 273 'Missing: ' + ', '.join(str(e) for e in missing) 274 if self._arch == 'um': 275 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 276 'on a different architecture with something like "--arch=x86_64".' 277 logging.error(message) 278 return False 279 280 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 281 kconfig_path = get_kconfig_path(build_dir) 282 if build_dir and not os.path.exists(build_dir): 283 os.mkdir(build_dir) 284 try: 285 self._kconfig = self._ops.make_arch_config(self._kconfig) 286 self._kconfig.write_to_file(kconfig_path) 287 self._ops.make_olddefconfig(build_dir, make_options) 288 except ConfigError as e: 289 logging.error(e) 290 return False 291 if not self.validate_config(build_dir): 292 return False 293 294 old_path = get_old_kunitconfig_path(build_dir) 295 if os.path.exists(old_path): 296 os.remove(old_path) # write_to_file appends to the file 297 self._kconfig.write_to_file(old_path) 298 return True 299 300 def _kunitconfig_changed(self, build_dir: str) -> bool: 301 old_path = get_old_kunitconfig_path(build_dir) 302 if not os.path.exists(old_path): 303 return True 304 305 old_kconfig = kunit_config.parse_file(old_path) 306 return old_kconfig != self._kconfig 307 308 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool: 309 """Creates a new .config if it is not a subset of the .kunitconfig.""" 310 kconfig_path = get_kconfig_path(build_dir) 311 if not os.path.exists(kconfig_path): 312 print('Generating .config ...') 313 return self.build_config(build_dir, make_options) 314 315 existing_kconfig = kunit_config.parse_file(kconfig_path) 316 self._kconfig = self._ops.make_arch_config(self._kconfig) 317 318 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 319 return True 320 print('Regenerating .config ...') 321 os.remove(kconfig_path) 322 return self.build_config(build_dir, make_options) 323 324 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool: 325 try: 326 self._ops.make_olddefconfig(build_dir, make_options) 327 self._ops.make(jobs, build_dir, make_options) 328 except (ConfigError, BuildError) as e: 329 logging.error(e) 330 return False 331 return self.validate_config(build_dir) 332 333 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', timeout: Optional[int]=None) -> Iterator[str]: 334 if not args: 335 args = [] 336 if filter_glob: 337 args.append('kunit.filter_glob='+filter_glob) 338 args.append('kunit.enable=1') 339 340 process = self._ops.start(args, build_dir) 341 assert process.stdout is not None # tell mypy it's set 342 343 # Enforce the timeout in a background thread. 344 def _wait_proc() -> None: 345 try: 346 process.wait(timeout=timeout) 347 except Exception as e: 348 print(e) 349 process.terminate() 350 process.wait() 351 waiter = threading.Thread(target=_wait_proc) 352 waiter.start() 353 354 output = open(get_outfile_path(build_dir), 'w') 355 try: 356 # Tee the output to the file and to our caller in real time. 357 for line in process.stdout: 358 output.write(line) 359 yield line 360 # This runs even if our caller doesn't consume every line. 361 finally: 362 # Flush any leftover output to the file 363 output.write(process.stdout.read()) 364 output.close() 365 process.stdout.close() 366 367 waiter.join() 368 subprocess.call(['stty', 'sane']) 369 370 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None: 371 logging.error('Build interruption occurred. Cleaning console.') 372 subprocess.call(['stty', 'sane']) 373