1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.util
10import logging
11import subprocess
12import os
13import shutil
14import signal
15import threading
16from typing import Iterator, List, Optional, Tuple
17
18import kunit_config
19import kunit_parser
20import qemu_config
21
22KCONFIG_PATH = '.config'
23KUNITCONFIG_PATH = '.kunitconfig'
24DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
25BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
26OUTFILE_PATH = 'test.log'
27ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
28QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
29
30def get_file_path(build_dir, default):
31	if build_dir:
32		default = os.path.join(build_dir, default)
33	return default
34
35class ConfigError(Exception):
36	"""Represents an error trying to configure the Linux kernel."""
37
38
39class BuildError(Exception):
40	"""Represents an error trying to build the Linux kernel."""
41
42
43class LinuxSourceTreeOperations(object):
44	"""An abstraction over command line operations performed on a source tree."""
45
46	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
47		self._linux_arch = linux_arch
48		self._cross_compile = cross_compile
49
50	def make_mrproper(self) -> None:
51		try:
52			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
53		except OSError as e:
54			raise ConfigError('Could not call make command: ' + str(e))
55		except subprocess.CalledProcessError as e:
56			raise ConfigError(e.output.decode())
57
58	def make_arch_qemuconfig(self, kconfig: kunit_config.Kconfig) -> None:
59		pass
60
61	def make_allyesconfig(self, build_dir, make_options) -> None:
62		raise ConfigError('Only the "um" arch is supported for alltests')
63
64	def make_olddefconfig(self, build_dir, make_options) -> None:
65		command = ['make', 'ARCH=' + self._linux_arch, 'olddefconfig']
66		if self._cross_compile:
67			command += ['CROSS_COMPILE=' + self._cross_compile]
68		if make_options:
69			command.extend(make_options)
70		if build_dir:
71			command += ['O=' + build_dir]
72		print('Populating config with:\n$', ' '.join(command))
73		try:
74			subprocess.check_output(command, stderr=subprocess.STDOUT)
75		except OSError as e:
76			raise ConfigError('Could not call make command: ' + str(e))
77		except subprocess.CalledProcessError as e:
78			raise ConfigError(e.output.decode())
79
80	def make(self, jobs, build_dir, make_options) -> None:
81		command = ['make', 'ARCH=' + self._linux_arch, '--jobs=' + str(jobs)]
82		if make_options:
83			command.extend(make_options)
84		if self._cross_compile:
85			command += ['CROSS_COMPILE=' + self._cross_compile]
86		if build_dir:
87			command += ['O=' + build_dir]
88		print('Building with:\n$', ' '.join(command))
89		try:
90			proc = subprocess.Popen(command,
91						stderr=subprocess.PIPE,
92						stdout=subprocess.DEVNULL)
93		except OSError as e:
94			raise BuildError('Could not call execute make: ' + str(e))
95		except subprocess.CalledProcessError as e:
96			raise BuildError(e.output)
97		_, stderr = proc.communicate()
98		if proc.returncode != 0:
99			raise BuildError(stderr.decode())
100		if stderr:  # likely only due to build warnings
101			print(stderr.decode())
102
103	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
104		raise RuntimeError('not implemented!')
105
106
107class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
108
109	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
110		super().__init__(linux_arch=qemu_arch_params.linux_arch,
111				 cross_compile=cross_compile)
112		self._kconfig = qemu_arch_params.kconfig
113		self._qemu_arch = qemu_arch_params.qemu_arch
114		self._kernel_path = qemu_arch_params.kernel_path
115		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
116		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
117
118	def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None:
119		kconfig = kunit_config.Kconfig()
120		kconfig.parse_from_string(self._kconfig)
121		base_kunitconfig.merge_in_entries(kconfig)
122
123	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
124		kernel_path = os.path.join(build_dir, self._kernel_path)
125		qemu_command = ['qemu-system-' + self._qemu_arch,
126				'-nodefaults',
127				'-m', '1024',
128				'-kernel', kernel_path,
129				'-append', '\'' + ' '.join(params + [self._kernel_command_line]) + '\'',
130				'-no-reboot',
131				'-nographic',
132				'-serial stdio'] + self._extra_qemu_params
133		print('Running tests with:\n$', ' '.join(qemu_command))
134		return subprocess.Popen(' '.join(qemu_command),
135					   stdin=subprocess.PIPE,
136					   stdout=subprocess.PIPE,
137					   stderr=subprocess.STDOUT,
138					   text=True, shell=True, errors='backslashreplace')
139
140class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
141	"""An abstraction over command line operations performed on a source tree."""
142
143	def __init__(self, cross_compile=None):
144		super().__init__(linux_arch='um', cross_compile=cross_compile)
145
146	def make_allyesconfig(self, build_dir, make_options) -> None:
147		kunit_parser.print_with_timestamp(
148			'Enabling all CONFIGs for UML...')
149		command = ['make', 'ARCH=um', 'allyesconfig']
150		if make_options:
151			command.extend(make_options)
152		if build_dir:
153			command += ['O=' + build_dir]
154		process = subprocess.Popen(
155			command,
156			stdout=subprocess.DEVNULL,
157			stderr=subprocess.STDOUT)
158		process.wait()
159		kunit_parser.print_with_timestamp(
160			'Disabling broken configs to run KUnit tests...')
161
162		with open(get_kconfig_path(build_dir), 'a') as config:
163			with open(BROKEN_ALLCONFIG_PATH, 'r') as disable:
164				config.write(disable.read())
165		kunit_parser.print_with_timestamp(
166			'Starting Kernel with all configs takes a few minutes...')
167
168	def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
169		"""Runs the Linux UML binary. Must be named 'linux'."""
170		linux_bin = get_file_path(build_dir, 'linux')
171		return subprocess.Popen([linux_bin] + params,
172					   stdin=subprocess.PIPE,
173					   stdout=subprocess.PIPE,
174					   stderr=subprocess.STDOUT,
175					   text=True, errors='backslashreplace')
176
177def get_kconfig_path(build_dir) -> str:
178	return get_file_path(build_dir, KCONFIG_PATH)
179
180def get_kunitconfig_path(build_dir) -> str:
181	return get_file_path(build_dir, KUNITCONFIG_PATH)
182
183def get_outfile_path(build_dir) -> str:
184	return get_file_path(build_dir, OUTFILE_PATH)
185
186def get_source_tree_ops(arch: str, cross_compile: Optional[str]) -> LinuxSourceTreeOperations:
187	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
188	if arch == 'um':
189		return LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
190	elif os.path.isfile(config_path):
191		return get_source_tree_ops_from_qemu_config(config_path, cross_compile)[1]
192
193	options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
194	raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
195
196def get_source_tree_ops_from_qemu_config(config_path: str,
197					 cross_compile: Optional[str]) -> Tuple[
198							 str, LinuxSourceTreeOperations]:
199	# The module name/path has very little to do with where the actual file
200	# exists (I learned this through experimentation and could not find it
201	# anywhere in the Python documentation).
202	#
203	# Bascially, we completely ignore the actual file location of the config
204	# we are loading and just tell Python that the module lives in the
205	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
206	# exists as a file.
207	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
208	spec = importlib.util.spec_from_file_location(module_path, config_path)
209	config = importlib.util.module_from_spec(spec)
210	# See https://github.com/python/typeshed/pull/2626 for context.
211	assert isinstance(spec.loader, importlib.abc.Loader)
212	spec.loader.exec_module(config)
213
214	if not hasattr(config, 'QEMU_ARCH'):
215		raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
216	params: qemu_config.QemuArchParams = config.QEMU_ARCH  # type: ignore
217	return params.linux_arch, LinuxSourceTreeOperationsQemu(
218			params, cross_compile=cross_compile)
219
220class LinuxSourceTree(object):
221	"""Represents a Linux kernel source tree with KUnit tests."""
222
223	def __init__(
224	      self,
225	      build_dir: str,
226	      load_config=True,
227	      kunitconfig_path='',
228	      arch=None,
229	      cross_compile=None,
230	      qemu_config_path=None) -> None:
231		signal.signal(signal.SIGINT, self.signal_handler)
232		if qemu_config_path:
233			self._arch, self._ops = get_source_tree_ops_from_qemu_config(
234					qemu_config_path, cross_compile)
235		else:
236			self._arch = 'um' if arch is None else arch
237			self._ops = get_source_tree_ops(self._arch, cross_compile)
238
239		if not load_config:
240			return
241
242		if kunitconfig_path:
243			if os.path.isdir(kunitconfig_path):
244				kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH)
245			if not os.path.exists(kunitconfig_path):
246				raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
247		else:
248			kunitconfig_path = get_kunitconfig_path(build_dir)
249			if not os.path.exists(kunitconfig_path):
250				shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
251
252		self._kconfig = kunit_config.Kconfig()
253		self._kconfig.read_from_file(kunitconfig_path)
254
255	def clean(self) -> bool:
256		try:
257			self._ops.make_mrproper()
258		except ConfigError as e:
259			logging.error(e)
260			return False
261		return True
262
263	def validate_config(self, build_dir) -> bool:
264		kconfig_path = get_kconfig_path(build_dir)
265		validated_kconfig = kunit_config.Kconfig()
266		validated_kconfig.read_from_file(kconfig_path)
267		if not self._kconfig.is_subset_of(validated_kconfig):
268			invalid = self._kconfig.entries() - validated_kconfig.entries()
269			message = 'Provided Kconfig is not contained in validated .config. Following fields found in kunitconfig, ' \
270					  'but not in .config: %s' % (
271					', '.join([str(e) for e in invalid])
272			)
273			logging.error(message)
274			return False
275		return True
276
277	def build_config(self, build_dir, make_options) -> bool:
278		kconfig_path = get_kconfig_path(build_dir)
279		if build_dir and not os.path.exists(build_dir):
280			os.mkdir(build_dir)
281		try:
282			self._ops.make_arch_qemuconfig(self._kconfig)
283			self._kconfig.write_to_file(kconfig_path)
284			self._ops.make_olddefconfig(build_dir, make_options)
285		except ConfigError as e:
286			logging.error(e)
287			return False
288		return self.validate_config(build_dir)
289
290	def build_reconfig(self, build_dir, make_options) -> bool:
291		"""Creates a new .config if it is not a subset of the .kunitconfig."""
292		kconfig_path = get_kconfig_path(build_dir)
293		if os.path.exists(kconfig_path):
294			existing_kconfig = kunit_config.Kconfig()
295			existing_kconfig.read_from_file(kconfig_path)
296			self._ops.make_arch_qemuconfig(self._kconfig)
297			if not self._kconfig.is_subset_of(existing_kconfig):
298				print('Regenerating .config ...')
299				os.remove(kconfig_path)
300				return self.build_config(build_dir, make_options)
301			else:
302				return True
303		else:
304			print('Generating .config ...')
305			return self.build_config(build_dir, make_options)
306
307	def build_kernel(self, alltests, jobs, build_dir, make_options) -> bool:
308		try:
309			if alltests:
310				self._ops.make_allyesconfig(build_dir, make_options)
311			self._ops.make_olddefconfig(build_dir, make_options)
312			self._ops.make(jobs, build_dir, make_options)
313		except (ConfigError, BuildError) as e:
314			logging.error(e)
315			return False
316		return self.validate_config(build_dir)
317
318	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
319		if not args:
320			args = []
321		args.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
322		if filter_glob:
323			args.append('kunit.filter_glob='+filter_glob)
324
325		process = self._ops.start(args, build_dir)
326		assert process.stdout is not None  # tell mypy it's set
327
328		# Enforce the timeout in a background thread.
329		def _wait_proc():
330			try:
331				process.wait(timeout=timeout)
332			except Exception as e:
333				print(e)
334				process.terminate()
335				process.wait()
336		waiter = threading.Thread(target=_wait_proc)
337		waiter.start()
338
339		output = open(get_outfile_path(build_dir), 'w')
340		try:
341			# Tee the output to the file and to our caller in real time.
342			for line in process.stdout:
343				output.write(line)
344				yield line
345		# This runs even if our caller doesn't consume every line.
346		finally:
347			# Flush any leftover output to the file
348			output.write(process.stdout.read())
349			output.close()
350			process.stdout.close()
351
352			waiter.join()
353			subprocess.call(['stty', 'sane'])
354
355	def signal_handler(self, sig, frame) -> None:
356		logging.error('Build interruption occurred. Cleaning console.')
357		subprocess.call(['stty', 'sane'])
358