1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import threading
18from typing import Iterator, List, Optional, Tuple
19from types import FrameType
20
21import kunit_config
22import qemu_config
23
24KCONFIG_PATH = '.config'
25KUNITCONFIG_PATH = '.kunitconfig'
26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30OUTFILE_PATH = 'test.log'
31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34class ConfigError(Exception):
35	"""Represents an error trying to configure the Linux kernel."""
36
37
38class BuildError(Exception):
39	"""Represents an error trying to build the Linux kernel."""
40
41
42class LinuxSourceTreeOperations:
43	"""An abstraction over command line operations performed on a source tree."""
44
45	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46		self._linux_arch = linux_arch
47		self._cross_compile = cross_compile
48
49	def make_mrproper(self) -> None:
50		try:
51			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52		except OSError as e:
53			raise ConfigError('Could not call make command: ' + str(e))
54		except subprocess.CalledProcessError as e:
55			raise ConfigError(e.output.decode())
56
57	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58		return base_kunitconfig
59
60	def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62		if self._cross_compile:
63			command += ['CROSS_COMPILE=' + self._cross_compile]
64		if make_options:
65			command.extend(make_options)
66		print('Populating config with:\n$', ' '.join(command))
67		try:
68			subprocess.check_output(command, stderr=subprocess.STDOUT)
69		except OSError as e:
70			raise ConfigError('Could not call make command: ' + str(e))
71		except subprocess.CalledProcessError as e:
72			raise ConfigError(e.output.decode())
73
74	def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75		command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76		if make_options:
77			command.extend(make_options)
78		if self._cross_compile:
79			command += ['CROSS_COMPILE=' + self._cross_compile]
80		print('Building with:\n$', ' '.join(command))
81		try:
82			proc = subprocess.Popen(command,
83						stderr=subprocess.PIPE,
84						stdout=subprocess.DEVNULL)
85		except OSError as e:
86			raise BuildError('Could not call execute make: ' + str(e))
87		except subprocess.CalledProcessError as e:
88			raise BuildError(e.output)
89		_, stderr = proc.communicate()
90		if proc.returncode != 0:
91			raise BuildError(stderr.decode())
92		if stderr:  # likely only due to build warnings
93			print(stderr.decode())
94
95	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96		raise RuntimeError('not implemented!')
97
98
99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102		super().__init__(linux_arch=qemu_arch_params.linux_arch,
103				 cross_compile=cross_compile)
104		self._kconfig = qemu_arch_params.kconfig
105		self._qemu_arch = qemu_arch_params.qemu_arch
106		self._kernel_path = qemu_arch_params.kernel_path
107		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109		self._serial = qemu_arch_params.serial
110
111	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112		kconfig = kunit_config.parse_from_string(self._kconfig)
113		kconfig.merge_in_entries(base_kunitconfig)
114		return kconfig
115
116	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117		kernel_path = os.path.join(build_dir, self._kernel_path)
118		qemu_command = ['qemu-system-' + self._qemu_arch,
119				'-nodefaults',
120				'-m', '1024',
121				'-kernel', kernel_path,
122				'-append', ' '.join(params + [self._kernel_command_line]),
123				'-no-reboot',
124				'-nographic',
125				'-serial', self._serial] + self._extra_qemu_params
126		# Note: shlex.join() does what we want, but requires python 3.8+.
127		print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128		return subprocess.Popen(qemu_command,
129					stdin=subprocess.PIPE,
130					stdout=subprocess.PIPE,
131					stderr=subprocess.STDOUT,
132					text=True, errors='backslashreplace')
133
134class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135	"""An abstraction over command line operations performed on a source tree."""
136
137	def __init__(self, cross_compile: Optional[str]=None):
138		super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140	def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141		kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142		kconfig.merge_in_entries(base_kunitconfig)
143		return kconfig
144
145	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146		"""Runs the Linux UML binary. Must be named 'linux'."""
147		linux_bin = os.path.join(build_dir, 'linux')
148		params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149		return subprocess.Popen([linux_bin] + params,
150					   stdin=subprocess.PIPE,
151					   stdout=subprocess.PIPE,
152					   stderr=subprocess.STDOUT,
153					   text=True, errors='backslashreplace')
154
155def get_kconfig_path(build_dir: str) -> str:
156	return os.path.join(build_dir, KCONFIG_PATH)
157
158def get_kunitconfig_path(build_dir: str) -> str:
159	return os.path.join(build_dir, KUNITCONFIG_PATH)
160
161def get_old_kunitconfig_path(build_dir: str) -> str:
162	return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
163
164def get_parsed_kunitconfig(build_dir: str,
165			   kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
166	if not kunitconfig_paths:
167		path = get_kunitconfig_path(build_dir)
168		if not os.path.exists(path):
169			shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
170		return kunit_config.parse_file(path)
171
172	merged = kunit_config.Kconfig()
173
174	for path in kunitconfig_paths:
175		if os.path.isdir(path):
176			path = os.path.join(path, KUNITCONFIG_PATH)
177		if not os.path.exists(path):
178			raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
179
180		partial = kunit_config.parse_file(path)
181		diff = merged.conflicting_options(partial)
182		if diff:
183			diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
184			raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
185		merged.merge_in_entries(partial)
186	return merged
187
188def get_outfile_path(build_dir: str) -> str:
189	return os.path.join(build_dir, OUTFILE_PATH)
190
191def _default_qemu_config_path(arch: str) -> str:
192	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
193	if os.path.isfile(config_path):
194		return config_path
195
196	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
197	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
198
199def _get_qemu_ops(config_path: str,
200		  extra_qemu_args: Optional[List[str]],
201		  cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
202	# The module name/path has very little to do with where the actual file
203	# exists (I learned this through experimentation and could not find it
204	# anywhere in the Python documentation).
205	#
206	# Bascially, we completely ignore the actual file location of the config
207	# we are loading and just tell Python that the module lives in the
208	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
209	# exists as a file.
210	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
211	spec = importlib.util.spec_from_file_location(module_path, config_path)
212	assert spec is not None
213	config = importlib.util.module_from_spec(spec)
214	# See https://github.com/python/typeshed/pull/2626 for context.
215	assert isinstance(spec.loader, importlib.abc.Loader)
216	spec.loader.exec_module(config)
217
218	if not hasattr(config, 'QEMU_ARCH'):
219		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
220	params: qemu_config.QemuArchParams = config.QEMU_ARCH
221	if extra_qemu_args:
222		params.extra_qemu_params.extend(extra_qemu_args)
223	return params.linux_arch, LinuxSourceTreeOperationsQemu(
224			params, cross_compile=cross_compile)
225
226class LinuxSourceTree:
227	"""Represents a Linux kernel source tree with KUnit tests."""
228
229	def __init__(
230	      self,
231	      build_dir: str,
232	      kunitconfig_paths: Optional[List[str]]=None,
233	      kconfig_add: Optional[List[str]]=None,
234	      arch: Optional[str]=None,
235	      cross_compile: Optional[str]=None,
236	      qemu_config_path: Optional[str]=None,
237	      extra_qemu_args: Optional[List[str]]=None) -> None:
238		signal.signal(signal.SIGINT, self.signal_handler)
239		if qemu_config_path:
240			self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
241		else:
242			self._arch = 'um' if arch is None else arch
243			if self._arch == 'um':
244				self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
245			else:
246				qemu_config_path = _default_qemu_config_path(self._arch)
247				_, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
248
249		self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
250		if kconfig_add:
251			kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
252			self._kconfig.merge_in_entries(kconfig)
253
254	def arch(self) -> str:
255		return self._arch
256
257	def clean(self) -> bool:
258		try:
259			self._ops.make_mrproper()
260		except ConfigError as e:
261			logging.error(e)
262			return False
263		return True
264
265	def validate_config(self, build_dir: str) -> bool:
266		kconfig_path = get_kconfig_path(build_dir)
267		validated_kconfig = kunit_config.parse_file(kconfig_path)
268		if self._kconfig.is_subset_of(validated_kconfig):
269			return True
270		missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
271		message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
272			  'This is probably due to unsatisfied dependencies.\n' \
273			  'Missing: ' + ', '.join(str(e) for e in missing)
274		if self._arch == 'um':
275			message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
276				   'on a different architecture with something like "--arch=x86_64".'
277		logging.error(message)
278		return False
279
280	def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
281		kconfig_path = get_kconfig_path(build_dir)
282		if build_dir and not os.path.exists(build_dir):
283			os.mkdir(build_dir)
284		try:
285			self._kconfig = self._ops.make_arch_config(self._kconfig)
286			self._kconfig.write_to_file(kconfig_path)
287			self._ops.make_olddefconfig(build_dir, make_options)
288		except ConfigError as e:
289			logging.error(e)
290			return False
291		if not self.validate_config(build_dir):
292			return False
293
294		old_path = get_old_kunitconfig_path(build_dir)
295		if os.path.exists(old_path):
296			os.remove(old_path)  # write_to_file appends to the file
297		self._kconfig.write_to_file(old_path)
298		return True
299
300	def _kunitconfig_changed(self, build_dir: str) -> bool:
301		old_path = get_old_kunitconfig_path(build_dir)
302		if not os.path.exists(old_path):
303			return True
304
305		old_kconfig = kunit_config.parse_file(old_path)
306		return old_kconfig != self._kconfig
307
308	def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
309		"""Creates a new .config if it is not a subset of the .kunitconfig."""
310		kconfig_path = get_kconfig_path(build_dir)
311		if not os.path.exists(kconfig_path):
312			print('Generating .config ...')
313			return self.build_config(build_dir, make_options)
314
315		existing_kconfig = kunit_config.parse_file(kconfig_path)
316		self._kconfig = self._ops.make_arch_config(self._kconfig)
317
318		if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
319			return True
320		print('Regenerating .config ...')
321		os.remove(kconfig_path)
322		return self.build_config(build_dir, make_options)
323
324	def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
325		try:
326			self._ops.make_olddefconfig(build_dir, make_options)
327			self._ops.make(jobs, build_dir, make_options)
328		except (ConfigError, BuildError) as e:
329			logging.error(e)
330			return False
331		return self.validate_config(build_dir)
332
333	def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
334		if not args:
335			args = []
336		if filter_glob:
337			args.append('kunit.filter_glob=' + filter_glob)
338		if filter:
339			args.append('kunit.filter="' + filter + '"')
340		if filter_action:
341			args.append('kunit.filter_action=' + filter_action)
342		args.append('kunit.enable=1')
343
344		process = self._ops.start(args, build_dir)
345		assert process.stdout is not None  # tell mypy it's set
346
347		# Enforce the timeout in a background thread.
348		def _wait_proc() -> None:
349			try:
350				process.wait(timeout=timeout)
351			except Exception as e:
352				print(e)
353				process.terminate()
354				process.wait()
355		waiter = threading.Thread(target=_wait_proc)
356		waiter.start()
357
358		output = open(get_outfile_path(build_dir), 'w')
359		try:
360			# Tee the output to the file and to our caller in real time.
361			for line in process.stdout:
362				output.write(line)
363				yield line
364		# This runs even if our caller doesn't consume every line.
365		finally:
366			# Flush any leftover output to the file
367			output.write(process.stdout.read())
368			output.close()
369			process.stdout.close()
370
371			waiter.join()
372			subprocess.call(['stty', 'sane'])
373
374	def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
375		logging.error('Build interruption occurred. Cleaning console.')
376		subprocess.call(['stty', 'sane'])
377