1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import logging
10import subprocess
11import os
12import shutil
13import signal
14from typing import Iterator
15
16from contextlib import ExitStack
17
18import kunit_config
19import kunit_parser
20
21KCONFIG_PATH = '.config'
22KUNITCONFIG_PATH = '.kunitconfig'
23DEFAULT_KUNITCONFIG_PATH = 'arch/um/configs/kunit_defconfig'
24BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
25OUTFILE_PATH = 'test.log'
26
27def get_file_path(build_dir, default):
28	if build_dir:
29		default = os.path.join(build_dir, default)
30	return default
31
32class ConfigError(Exception):
33	"""Represents an error trying to configure the Linux kernel."""
34
35
36class BuildError(Exception):
37	"""Represents an error trying to build the Linux kernel."""
38
39
40class LinuxSourceTreeOperations(object):
41	"""An abstraction over command line operations performed on a source tree."""
42
43	def make_mrproper(self) -> None:
44		try:
45			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
46		except OSError as e:
47			raise ConfigError('Could not call make command: ' + str(e))
48		except subprocess.CalledProcessError as e:
49			raise ConfigError(e.output.decode())
50
51	def make_olddefconfig(self, build_dir, make_options) -> None:
52		command = ['make', 'ARCH=um', 'olddefconfig']
53		if make_options:
54			command.extend(make_options)
55		if build_dir:
56			command += ['O=' + build_dir]
57		try:
58			subprocess.check_output(command, stderr=subprocess.STDOUT)
59		except OSError as e:
60			raise ConfigError('Could not call make command: ' + str(e))
61		except subprocess.CalledProcessError as e:
62			raise ConfigError(e.output.decode())
63
64	def make_allyesconfig(self, build_dir, make_options) -> None:
65		kunit_parser.print_with_timestamp(
66			'Enabling all CONFIGs for UML...')
67		command = ['make', 'ARCH=um', 'allyesconfig']
68		if make_options:
69			command.extend(make_options)
70		if build_dir:
71			command += ['O=' + build_dir]
72		process = subprocess.Popen(
73			command,
74			stdout=subprocess.DEVNULL,
75			stderr=subprocess.STDOUT)
76		process.wait()
77		kunit_parser.print_with_timestamp(
78			'Disabling broken configs to run KUnit tests...')
79		with ExitStack() as es:
80			config = open(get_kconfig_path(build_dir), 'a')
81			disable = open(BROKEN_ALLCONFIG_PATH, 'r').read()
82			config.write(disable)
83		kunit_parser.print_with_timestamp(
84			'Starting Kernel with all configs takes a few minutes...')
85
86	def make(self, jobs, build_dir, make_options) -> None:
87		command = ['make', 'ARCH=um', '--jobs=' + str(jobs)]
88		if make_options:
89			command.extend(make_options)
90		if build_dir:
91			command += ['O=' + build_dir]
92		try:
93			proc = subprocess.Popen(command,
94						stderr=subprocess.PIPE,
95						stdout=subprocess.DEVNULL)
96		except OSError as e:
97			raise BuildError('Could not call make command: ' + str(e))
98		_, stderr = proc.communicate()
99		if proc.returncode != 0:
100			raise BuildError(stderr.decode())
101		if stderr:  # likely only due to build warnings
102			print(stderr.decode())
103
104	def linux_bin(self, params, timeout, build_dir) -> None:
105		"""Runs the Linux UML binary. Must be named 'linux'."""
106		linux_bin = get_file_path(build_dir, 'linux')
107		outfile = get_outfile_path(build_dir)
108		with open(outfile, 'w') as output:
109			process = subprocess.Popen([linux_bin] + params,
110						   stdout=output,
111						   stderr=subprocess.STDOUT)
112			process.wait(timeout)
113
114def get_kconfig_path(build_dir) -> str:
115	return get_file_path(build_dir, KCONFIG_PATH)
116
117def get_kunitconfig_path(build_dir) -> str:
118	return get_file_path(build_dir, KUNITCONFIG_PATH)
119
120def get_outfile_path(build_dir) -> str:
121	return get_file_path(build_dir, OUTFILE_PATH)
122
123class LinuxSourceTree(object):
124	"""Represents a Linux kernel source tree with KUnit tests."""
125
126	def __init__(self, build_dir: str, load_config=True, kunitconfig_path='') -> None:
127		signal.signal(signal.SIGINT, self.signal_handler)
128
129		self._ops = LinuxSourceTreeOperations()
130
131		if not load_config:
132			return
133
134		if kunitconfig_path:
135			if not os.path.exists(kunitconfig_path):
136				raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
137		else:
138			kunitconfig_path = get_kunitconfig_path(build_dir)
139			if not os.path.exists(kunitconfig_path):
140				shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
141
142		self._kconfig = kunit_config.Kconfig()
143		self._kconfig.read_from_file(kunitconfig_path)
144
145	def clean(self) -> bool:
146		try:
147			self._ops.make_mrproper()
148		except ConfigError as e:
149			logging.error(e)
150			return False
151		return True
152
153	def validate_config(self, build_dir) -> bool:
154		kconfig_path = get_kconfig_path(build_dir)
155		validated_kconfig = kunit_config.Kconfig()
156		validated_kconfig.read_from_file(kconfig_path)
157		if not self._kconfig.is_subset_of(validated_kconfig):
158			invalid = self._kconfig.entries() - validated_kconfig.entries()
159			message = 'Provided Kconfig is not contained in validated .config. Following fields found in kunitconfig, ' \
160					  'but not in .config: %s' % (
161					', '.join([str(e) for e in invalid])
162			)
163			logging.error(message)
164			return False
165		return True
166
167	def build_config(self, build_dir, make_options) -> bool:
168		kconfig_path = get_kconfig_path(build_dir)
169		if build_dir and not os.path.exists(build_dir):
170			os.mkdir(build_dir)
171		self._kconfig.write_to_file(kconfig_path)
172		try:
173			self._ops.make_olddefconfig(build_dir, make_options)
174		except ConfigError as e:
175			logging.error(e)
176			return False
177		return self.validate_config(build_dir)
178
179	def build_reconfig(self, build_dir, make_options) -> bool:
180		"""Creates a new .config if it is not a subset of the .kunitconfig."""
181		kconfig_path = get_kconfig_path(build_dir)
182		if os.path.exists(kconfig_path):
183			existing_kconfig = kunit_config.Kconfig()
184			existing_kconfig.read_from_file(kconfig_path)
185			if not self._kconfig.is_subset_of(existing_kconfig):
186				print('Regenerating .config ...')
187				os.remove(kconfig_path)
188				return self.build_config(build_dir, make_options)
189			else:
190				return True
191		else:
192			print('Generating .config ...')
193			return self.build_config(build_dir, make_options)
194
195	def build_um_kernel(self, alltests, jobs, build_dir, make_options) -> bool:
196		try:
197			if alltests:
198				self._ops.make_allyesconfig(build_dir, make_options)
199			self._ops.make_olddefconfig(build_dir, make_options)
200			self._ops.make(jobs, build_dir, make_options)
201		except (ConfigError, BuildError) as e:
202			logging.error(e)
203			return False
204		return self.validate_config(build_dir)
205
206	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
207		if not args:
208			args = []
209		args.extend(['mem=1G', 'console=tty'])
210		if filter_glob:
211			args.append('kunit.filter_glob='+filter_glob)
212		self._ops.linux_bin(args, timeout, build_dir)
213		outfile = get_outfile_path(build_dir)
214		subprocess.call(['stty', 'sane'])
215		with open(outfile, 'r') as file:
216			for line in file:
217				yield line
218
219	def signal_handler(self, sig, frame) -> None:
220		logging.error('Build interruption occurred. Cleaning console.')
221		subprocess.call(['stty', 'sane'])
222