1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9from __future__ import annotations
10import importlib.util
11import logging
12import subprocess
13import os
14import shutil
15import signal
16from typing import Iterator
17from typing import Optional
18
19from contextlib import ExitStack
20
21from collections import namedtuple
22
23import kunit_config
24import kunit_parser
25import qemu_config
26
27KCONFIG_PATH = '.config'
28KUNITCONFIG_PATH = '.kunitconfig'
29DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
30BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
31OUTFILE_PATH = 'test.log'
32ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
33QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
34
35def get_file_path(build_dir, default):
36	if build_dir:
37		default = os.path.join(build_dir, default)
38	return default
39
40class ConfigError(Exception):
41	"""Represents an error trying to configure the Linux kernel."""
42
43
44class BuildError(Exception):
45	"""Represents an error trying to build the Linux kernel."""
46
47
48class LinuxSourceTreeOperations(object):
49	"""An abstraction over command line operations performed on a source tree."""
50
51	def __init__(self, linux_arch: str, cross_compile: Optional[str]):
52		self._linux_arch = linux_arch
53		self._cross_compile = cross_compile
54
55	def make_mrproper(self) -> None:
56		try:
57			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
58		except OSError as e:
59			raise ConfigError('Could not call make command: ' + str(e))
60		except subprocess.CalledProcessError as e:
61			raise ConfigError(e.output.decode())
62
63	def make_arch_qemuconfig(self, kconfig: kunit_config.Kconfig) -> None:
64		pass
65
66	def make_allyesconfig(self, build_dir, make_options) -> None:
67		raise ConfigError('Only the "um" arch is supported for alltests')
68
69	def make_olddefconfig(self, build_dir, make_options) -> None:
70		command = ['make', 'ARCH=' + self._linux_arch, 'olddefconfig']
71		if self._cross_compile:
72			command += ['CROSS_COMPILE=' + self._cross_compile]
73		if make_options:
74			command.extend(make_options)
75		if build_dir:
76			command += ['O=' + build_dir]
77		print('Populating config with:\n$', ' '.join(command))
78		try:
79			subprocess.check_output(command, stderr=subprocess.STDOUT)
80		except OSError as e:
81			raise ConfigError('Could not call make command: ' + str(e))
82		except subprocess.CalledProcessError as e:
83			raise ConfigError(e.output.decode())
84
85	def make(self, jobs, build_dir, make_options) -> None:
86		command = ['make', 'ARCH=' + self._linux_arch, '--jobs=' + str(jobs)]
87		if make_options:
88			command.extend(make_options)
89		if self._cross_compile:
90			command += ['CROSS_COMPILE=' + self._cross_compile]
91		if build_dir:
92			command += ['O=' + build_dir]
93		print('Building with:\n$', ' '.join(command))
94		try:
95			proc = subprocess.Popen(command,
96						stderr=subprocess.PIPE,
97						stdout=subprocess.DEVNULL)
98		except OSError as e:
99			raise BuildError('Could not call execute make: ' + str(e))
100		except subprocess.CalledProcessError as e:
101			raise BuildError(e.output)
102		_, stderr = proc.communicate()
103		if proc.returncode != 0:
104			raise BuildError(stderr.decode())
105		if stderr:  # likely only due to build warnings
106			print(stderr.decode())
107
108	def run(self, params, timeout, build_dir, outfile) -> None:
109		pass
110
111
112class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
113
114	def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
115		super().__init__(linux_arch=qemu_arch_params.linux_arch,
116				 cross_compile=cross_compile)
117		self._kconfig = qemu_arch_params.kconfig
118		self._qemu_arch = qemu_arch_params.qemu_arch
119		self._kernel_path = qemu_arch_params.kernel_path
120		self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
121		self._extra_qemu_params = qemu_arch_params.extra_qemu_params
122
123	def make_arch_qemuconfig(self, base_kunitconfig: kunit_config.Kconfig) -> None:
124		kconfig = kunit_config.Kconfig()
125		kconfig.parse_from_string(self._kconfig)
126		base_kunitconfig.merge_in_entries(kconfig)
127
128	def run(self, params, timeout, build_dir, outfile):
129		kernel_path = os.path.join(build_dir, self._kernel_path)
130		qemu_command = ['qemu-system-' + self._qemu_arch,
131				'-nodefaults',
132				'-m', '1024',
133				'-kernel', kernel_path,
134				'-append', '\'' + ' '.join(params + [self._kernel_command_line]) + '\'',
135				'-no-reboot',
136				'-nographic',
137				'-serial stdio'] + self._extra_qemu_params
138		print('Running tests with:\n$', ' '.join(qemu_command))
139		with open(outfile, 'w') as output:
140			process = subprocess.Popen(' '.join(qemu_command),
141						   stdin=subprocess.PIPE,
142						   stdout=output,
143						   stderr=subprocess.STDOUT,
144						   text=True, shell=True)
145		try:
146			process.wait(timeout=timeout)
147		except Exception as e:
148			print(e)
149			process.terminate()
150		return process
151
152class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
153	"""An abstraction over command line operations performed on a source tree."""
154
155	def __init__(self, cross_compile=None):
156		super().__init__(linux_arch='um', cross_compile=cross_compile)
157
158	def make_allyesconfig(self, build_dir, make_options) -> None:
159		kunit_parser.print_with_timestamp(
160			'Enabling all CONFIGs for UML...')
161		command = ['make', 'ARCH=um', 'allyesconfig']
162		if make_options:
163			command.extend(make_options)
164		if build_dir:
165			command += ['O=' + build_dir]
166		process = subprocess.Popen(
167			command,
168			stdout=subprocess.DEVNULL,
169			stderr=subprocess.STDOUT)
170		process.wait()
171		kunit_parser.print_with_timestamp(
172			'Disabling broken configs to run KUnit tests...')
173		with ExitStack() as es:
174			config = open(get_kconfig_path(build_dir), 'a')
175			disable = open(BROKEN_ALLCONFIG_PATH, 'r').read()
176			config.write(disable)
177		kunit_parser.print_with_timestamp(
178			'Starting Kernel with all configs takes a few minutes...')
179
180	def run(self, params, timeout, build_dir, outfile):
181		"""Runs the Linux UML binary. Must be named 'linux'."""
182		linux_bin = get_file_path(build_dir, 'linux')
183		outfile = get_outfile_path(build_dir)
184		with open(outfile, 'w') as output:
185			process = subprocess.Popen([linux_bin] + params,
186						   stdin=subprocess.PIPE,
187						   stdout=output,
188						   stderr=subprocess.STDOUT,
189						   text=True)
190			process.wait(timeout)
191
192def get_kconfig_path(build_dir) -> str:
193	return get_file_path(build_dir, KCONFIG_PATH)
194
195def get_kunitconfig_path(build_dir) -> str:
196	return get_file_path(build_dir, KUNITCONFIG_PATH)
197
198def get_outfile_path(build_dir) -> str:
199	return get_file_path(build_dir, OUTFILE_PATH)
200
201def get_source_tree_ops(arch: str, cross_compile: Optional[str]) -> LinuxSourceTreeOperations:
202	config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
203	if arch == 'um':
204		return LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
205	elif os.path.isfile(config_path):
206		return get_source_tree_ops_from_qemu_config(config_path, cross_compile)[1]
207	else:
208		raise ConfigError(arch + ' is not a valid arch')
209
210def get_source_tree_ops_from_qemu_config(config_path: str,
211					 cross_compile: Optional[str]) -> tuple[
212							 str, LinuxSourceTreeOperations]:
213	# The module name/path has very little to do with where the actual file
214	# exists (I learned this through experimentation and could not find it
215	# anywhere in the Python documentation).
216	#
217	# Bascially, we completely ignore the actual file location of the config
218	# we are loading and just tell Python that the module lives in the
219	# QEMU_CONFIGS_DIR for import purposes regardless of where it actually
220	# exists as a file.
221	module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
222	spec = importlib.util.spec_from_file_location(module_path, config_path)
223	config = importlib.util.module_from_spec(spec)
224	# TODO(brendanhiggins@google.com): I looked this up and apparently other
225	# Python projects have noted that pytype complains that "No attribute
226	# 'exec_module' on _importlib_modulespec._Loader". Disabling for now.
227	spec.loader.exec_module(config) # pytype: disable=attribute-error
228	return config.QEMU_ARCH.linux_arch, LinuxSourceTreeOperationsQemu(
229			config.QEMU_ARCH, cross_compile=cross_compile)
230
231class LinuxSourceTree(object):
232	"""Represents a Linux kernel source tree with KUnit tests."""
233
234	def __init__(
235	      self,
236	      build_dir: str,
237	      load_config=True,
238	      kunitconfig_path='',
239	      arch=None,
240	      cross_compile=None,
241	      qemu_config_path=None) -> None:
242		signal.signal(signal.SIGINT, self.signal_handler)
243		if qemu_config_path:
244			self._arch, self._ops = get_source_tree_ops_from_qemu_config(
245					qemu_config_path, cross_compile)
246		else:
247			self._arch = 'um' if arch is None else arch
248			self._ops = get_source_tree_ops(self._arch, cross_compile)
249
250		if not load_config:
251			return
252
253		if kunitconfig_path:
254			if os.path.isdir(kunitconfig_path):
255				kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH)
256			if not os.path.exists(kunitconfig_path):
257				raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
258		else:
259			kunitconfig_path = get_kunitconfig_path(build_dir)
260			if not os.path.exists(kunitconfig_path):
261				shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
262
263		self._kconfig = kunit_config.Kconfig()
264		self._kconfig.read_from_file(kunitconfig_path)
265
266	def clean(self) -> bool:
267		try:
268			self._ops.make_mrproper()
269		except ConfigError as e:
270			logging.error(e)
271			return False
272		return True
273
274	def validate_config(self, build_dir) -> bool:
275		kconfig_path = get_kconfig_path(build_dir)
276		validated_kconfig = kunit_config.Kconfig()
277		validated_kconfig.read_from_file(kconfig_path)
278		if not self._kconfig.is_subset_of(validated_kconfig):
279			invalid = self._kconfig.entries() - validated_kconfig.entries()
280			message = 'Provided Kconfig is not contained in validated .config. Following fields found in kunitconfig, ' \
281					  'but not in .config: %s' % (
282					', '.join([str(e) for e in invalid])
283			)
284			logging.error(message)
285			return False
286		return True
287
288	def build_config(self, build_dir, make_options) -> bool:
289		kconfig_path = get_kconfig_path(build_dir)
290		if build_dir and not os.path.exists(build_dir):
291			os.mkdir(build_dir)
292		try:
293			self._ops.make_arch_qemuconfig(self._kconfig)
294			self._kconfig.write_to_file(kconfig_path)
295			self._ops.make_olddefconfig(build_dir, make_options)
296		except ConfigError as e:
297			logging.error(e)
298			return False
299		return self.validate_config(build_dir)
300
301	def build_reconfig(self, build_dir, make_options) -> bool:
302		"""Creates a new .config if it is not a subset of the .kunitconfig."""
303		kconfig_path = get_kconfig_path(build_dir)
304		if os.path.exists(kconfig_path):
305			existing_kconfig = kunit_config.Kconfig()
306			existing_kconfig.read_from_file(kconfig_path)
307			self._ops.make_arch_qemuconfig(self._kconfig)
308			if not self._kconfig.is_subset_of(existing_kconfig):
309				print('Regenerating .config ...')
310				os.remove(kconfig_path)
311				return self.build_config(build_dir, make_options)
312			else:
313				return True
314		else:
315			print('Generating .config ...')
316			return self.build_config(build_dir, make_options)
317
318	def build_kernel(self, alltests, jobs, build_dir, make_options) -> bool:
319		try:
320			if alltests:
321				self._ops.make_allyesconfig(build_dir, make_options)
322			self._ops.make_olddefconfig(build_dir, make_options)
323			self._ops.make(jobs, build_dir, make_options)
324		except (ConfigError, BuildError) as e:
325			logging.error(e)
326			return False
327		return self.validate_config(build_dir)
328
329	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
330		if not args:
331			args = []
332		args.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
333		if filter_glob:
334			args.append('kunit.filter_glob='+filter_glob)
335		outfile = get_outfile_path(build_dir)
336		self._ops.run(args, timeout, build_dir, outfile)
337		subprocess.call(['stty', 'sane'])
338		with open(outfile, 'r') as file:
339			for line in file:
340				yield line
341
342	def signal_handler(self, sig, frame) -> None:
343		logging.error('Build interruption occurred. Cleaning console.')
344		subprocess.call(['stty', 'sane'])
345