1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.util
10import logging
11import subprocess
12import os
13import shutil
14import signal
15from typing import Iterator, Optional, Tuple
16
17from contextlib import ExitStack
18
19from collections import namedtuple
20
21import kunit_config
22import kunit_parser
23import qemu_config
24
25KCONFIG_PATH = '.config'
26KUNITCONFIG_PATH = '.kunitconfig'
27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
29OUTFILE_PATH = 'test.log'
30ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
31QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
32
33def get_file_path(build_dir, default):
34	if build_dir:
35		default = os.path.join(build_dir, default)
36	return default
37
38class ConfigError(Exception):
39	"""Represents an error trying to configure the Linux kernel."""
40
41
42class BuildError(Exception):
43	"""Represents an error trying to build the Linux kernel."""
44
45
46class LinuxSourceTreeOperations(object):
47	"""An abstraction over command line operations performed on a source tree."""
48
49	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
50		self._linux_arch = linux_arch
51		self._cross_compile = cross_compile
52
53	def make_mrproper(self) -> None:
54		try:
55			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
56		except OSError as e:
57			raise ConfigError('Could not call make command: ' + str(e))
58		except subprocess.CalledProcessError as e:
59			raise ConfigError(e.output.decode())
60
61	def make_arch_qemuconfig(self, kconfig: kunit_config.Kconfig) -> None:
62		pass
63
64	def make_allyesconfig(self, build_dir, make_options) -> None:
65		raise ConfigError('Only the "um" arch is supported for alltests')
66
67	def make_olddefconfig(self, build_dir, make_options) -> None:
68		command = ['make', 'ARCH=' + self._linux_arch, 'olddefconfig']
69		if self._cross_compile:
70			command += ['CROSS_COMPILE=' + self._cross_compile]
71		if make_options:
72			command.extend(make_options)
73		if build_dir:
74			command += ['O=' + build_dir]
75		print('Populating config with:\n$', ' '.join(command))
76		try:
77			subprocess.check_output(command, stderr=subprocess.STDOUT)
78		except OSError as e:
79			raise ConfigError('Could not call make command: ' + str(e))
80		except subprocess.CalledProcessError as e:
81			raise ConfigError(e.output.decode())
82
83	def make(self, jobs, build_dir, make_options) -> None:
84		command = ['make', 'ARCH=' + self._linux_arch, '--jobs=' + str(jobs)]
85		if make_options:
86			command.extend(make_options)
87		if self._cross_compile:
88			command += ['CROSS_COMPILE=' + self._cross_compile]
89		if build_dir:
90			command += ['O=' + build_dir]
91		print('Building with:\n$', ' '.join(command))
92		try:
93			proc = subprocess.Popen(command,
94						stderr=subprocess.PIPE,
95						stdout=subprocess.DEVNULL)
96		except OSError as e:
97			raise BuildError('Could not call execute make: ' + str(e))
98		except subprocess.CalledProcessError as e:
99			raise BuildError(e.output)
100		_, stderr = proc.communicate()
101		if proc.returncode != 0:
102			raise BuildError(stderr.decode())
103		if stderr:  # likely only due to build warnings
104			print(stderr.decode())
105
106	def run(self, params, timeout, build_dir, outfile) -> None:
107		pass
108
109
110class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
111
112	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
113		super().__init__(linux_arch=qemu_arch_params.linux_arch,
114				 cross_compile=cross_compile)
115		self._kconfig = qemu_arch_params.kconfig
116		self._qemu_arch = qemu_arch_params.qemu_arch
117		self._kernel_path = qemu_arch_params.kernel_path
118		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
119		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
120
121	def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None:
122		kconfig = kunit_config.Kconfig()
123		kconfig.parse_from_string(self._kconfig)
124		base_kunitconfig.merge_in_entries(kconfig)
125
126	def run(self, params, timeout, build_dir, outfile):
127		kernel_path = os.path.join(build_dir, self._kernel_path)
128		qemu_command = ['qemu-system-' + self._qemu_arch,
129				'-nodefaults',
130				'-m', '1024',
131				'-kernel', kernel_path,
132				'-append', '\'' + ' '.join(params + [self._kernel_command_line]) + '\'',
133				'-no-reboot',
134				'-nographic',
135				'-serial stdio'] + self._extra_qemu_params
136		print('Running tests with:\n$', ' '.join(qemu_command))
137		with open(outfile, 'w') as output:
138			process = subprocess.Popen(' '.join(qemu_command),
139						   stdin=subprocess.PIPE,
140						   stdout=output,
141						   stderr=subprocess.STDOUT,
142						   text=True, shell=True)
143		try:
144			process.wait(timeout=timeout)
145		except Exception as e:
146			print(e)
147			process.terminate()
148		return process
149
150class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
151	"""An abstraction over command line operations performed on a source tree."""
152
153	def __init__(self, cross_compile=None):
154		super().__init__(linux_arch='um', cross_compile=cross_compile)
155
156	def make_allyesconfig(self, build_dir, make_options) -> None:
157		kunit_parser.print_with_timestamp(
158			'Enabling all CONFIGs for UML...')
159		command = ['make', 'ARCH=um', 'allyesconfig']
160		if make_options:
161			command.extend(make_options)
162		if build_dir:
163			command += ['O=' + build_dir]
164		process = subprocess.Popen(
165			command,
166			stdout=subprocess.DEVNULL,
167			stderr=subprocess.STDOUT)
168		process.wait()
169		kunit_parser.print_with_timestamp(
170			'Disabling broken configs to run KUnit tests...')
171		with ExitStack() as es:
172			config = open(get_kconfig_path(build_dir), 'a')
173			disable = open(BROKEN_ALLCONFIG_PATH, 'r').read()
174			config.write(disable)
175		kunit_parser.print_with_timestamp(
176			'Starting Kernel with all configs takes a few minutes...')
177
178	def run(self, params, timeout, build_dir, outfile):
179		"""Runs the Linux UML binary. Must be named 'linux'."""
180		linux_bin = get_file_path(build_dir, 'linux')
181		outfile = get_outfile_path(build_dir)
182		with open(outfile, 'w') as output:
183			process = subprocess.Popen([linux_bin] + params,
184						   stdin=subprocess.PIPE,
185						   stdout=output,
186						   stderr=subprocess.STDOUT,
187						   text=True)
188			process.wait(timeout)
189
190def get_kconfig_path(build_dir) -> str:
191	return get_file_path(build_dir, KCONFIG_PATH)
192
193def get_kunitconfig_path(build_dir) -> str:
194	return get_file_path(build_dir, KUNITCONFIG_PATH)
195
196def get_outfile_path(build_dir) -> str:
197	return get_file_path(build_dir, OUTFILE_PATH)
198
199def get_source_tree_ops(arch: str, cross_compile: Optional[str]) -> LinuxSourceTreeOperations:
200	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
201	if arch == 'um':
202		return LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
203	elif os.path.isfile(config_path):
204		return get_source_tree_ops_from_qemu_config(config_path, cross_compile)[1]
205	else:
206		raise ConfigError(arch + ' is not a valid arch')
207
208def get_source_tree_ops_from_qemu_config(config_path: str,
209					 cross_compile: Optional[str]) -> Tuple[
210							 str, LinuxSourceTreeOperations]:
211	# The module name/path has very little to do with where the actual file
212	# exists (I learned this through experimentation and could not find it
213	# anywhere in the Python documentation).
214	#
215	# Bascially, we completely ignore the actual file location of the config
216	# we are loading and just tell Python that the module lives in the
217	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
218	# exists as a file.
219	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
220	spec = importlib.util.spec_from_file_location(module_path, config_path)
221	config = importlib.util.module_from_spec(spec)
222	# TODO(brendanhiggins@google.com): I looked this up and apparently other
223	# Python projects have noted that pytype complains that "No attribute
224	# 'exec_module' on _importlib_modulespec._Loader". Disabling for now.
225	spec.loader.exec_module(config) # pytype: disable=attribute-error
226	return config.QEMU_ARCH.linux_arch, LinuxSourceTreeOperationsQemu(
227			config.QEMU_ARCH, cross_compile=cross_compile)
228
229class LinuxSourceTree(object):
230	"""Represents a Linux kernel source tree with KUnit tests."""
231
232	def __init__(
233	      self,
234	      build_dir: str,
235	      load_config=True,
236	      kunitconfig_path='',
237	      arch=None,
238	      cross_compile=None,
239	      qemu_config_path=None) -> None:
240		signal.signal(signal.SIGINT, self.signal_handler)
241		if qemu_config_path:
242			self._arch, self._ops = get_source_tree_ops_from_qemu_config(
243					qemu_config_path, cross_compile)
244		else:
245			self._arch = 'um' if arch is None else arch
246			self._ops = get_source_tree_ops(self._arch, cross_compile)
247
248		if not load_config:
249			return
250
251		if kunitconfig_path:
252			if os.path.isdir(kunitconfig_path):
253				kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH)
254			if not os.path.exists(kunitconfig_path):
255				raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
256		else:
257			kunitconfig_path = get_kunitconfig_path(build_dir)
258			if not os.path.exists(kunitconfig_path):
259				shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
260
261		self._kconfig = kunit_config.Kconfig()
262		self._kconfig.read_from_file(kunitconfig_path)
263
264	def clean(self) -> bool:
265		try:
266			self._ops.make_mrproper()
267		except ConfigError as e:
268			logging.error(e)
269			return False
270		return True
271
272	def validate_config(self, build_dir) -> bool:
273		kconfig_path = get_kconfig_path(build_dir)
274		validated_kconfig = kunit_config.Kconfig()
275		validated_kconfig.read_from_file(kconfig_path)
276		if not self._kconfig.is_subset_of(validated_kconfig):
277			invalid = self._kconfig.entries() - validated_kconfig.entries()
278			message = 'Provided Kconfig is not contained in validated .config. Following fields found in kunitconfig, ' \
279					  'but not in .config: %s' % (
280					', '.join([str(e) for e in invalid])
281			)
282			logging.error(message)
283			return False
284		return True
285
286	def build_config(self, build_dir, make_options) -> bool:
287		kconfig_path = get_kconfig_path(build_dir)
288		if build_dir and not os.path.exists(build_dir):
289			os.mkdir(build_dir)
290		try:
291			self._ops.make_arch_qemuconfig(self._kconfig)
292			self._kconfig.write_to_file(kconfig_path)
293			self._ops.make_olddefconfig(build_dir, make_options)
294		except ConfigError as e:
295			logging.error(e)
296			return False
297		return self.validate_config(build_dir)
298
299	def build_reconfig(self, build_dir, make_options) -> bool:
300		"""Creates a new .config if it is not a subset of the .kunitconfig."""
301		kconfig_path = get_kconfig_path(build_dir)
302		if os.path.exists(kconfig_path):
303			existing_kconfig = kunit_config.Kconfig()
304			existing_kconfig.read_from_file(kconfig_path)
305			self._ops.make_arch_qemuconfig(self._kconfig)
306			if not self._kconfig.is_subset_of(existing_kconfig):
307				print('Regenerating .config ...')
308				os.remove(kconfig_path)
309				return self.build_config(build_dir, make_options)
310			else:
311				return True
312		else:
313			print('Generating .config ...')
314			return self.build_config(build_dir, make_options)
315
316	def build_kernel(self, alltests, jobs, build_dir, make_options) -> bool:
317		try:
318			if alltests:
319				self._ops.make_allyesconfig(build_dir, make_options)
320			self._ops.make_olddefconfig(build_dir, make_options)
321			self._ops.make(jobs, build_dir, make_options)
322		except (ConfigError, BuildError) as e:
323			logging.error(e)
324			return False
325		return self.validate_config(build_dir)
326
327	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
328		if not args:
329			args = []
330		args.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
331		if filter_glob:
332			args.append('kunit.filter_glob='+filter_glob)
333		outfile = get_outfile_path(build_dir)
334		self._ops.run(args, timeout, build_dir, outfile)
335		subprocess.call(['stty', 'sane'])
336		with open(outfile, 'r') as file:
337			for line in file:
338				yield line
339
340	def signal_handler(self, sig, frame) -> None:
341		logging.error('Build interruption occurred. Cleaning console.')
342		subprocess.call(['stty', 'sane'])
343