1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import logging
10import subprocess
11import os
12import shutil
13import signal
14from typing import Iterator
15
16from contextlib import ExitStack
17
18import kunit_config
19import kunit_parser
20
21KCONFIG_PATH = '.config'
22KUNITCONFIG_PATH = '.kunitconfig'
23DEFAULT_KUNITCONFIG_PATH = 'arch/um/configs/kunit_defconfig'
24BROKEN_ALLCONFIG_PATH = 'tools/testing/kunit/configs/broken_on_uml.config'
25OUTFILE_PATH = 'test.log'
26
27def get_file_path(build_dir, default):
28	if build_dir:
29		default = os.path.join(build_dir, default)
30	return default
31
32class ConfigError(Exception):
33	"""Represents an error trying to configure the Linux kernel."""
34
35
36class BuildError(Exception):
37	"""Represents an error trying to build the Linux kernel."""
38
39
40class LinuxSourceTreeOperations(object):
41	"""An abstraction over command line operations performed on a source tree."""
42
43	def make_mrproper(self) -> None:
44		try:
45			subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
46		except OSError as e:
47			raise ConfigError('Could not call make command: ' + str(e))
48		except subprocess.CalledProcessError as e:
49			raise ConfigError(e.output.decode())
50
51	def make_olddefconfig(self, build_dir, make_options) -> None:
52		command = ['make', 'ARCH=um', 'olddefconfig']
53		if make_options:
54			command.extend(make_options)
55		if build_dir:
56			command += ['O=' + build_dir]
57		try:
58			subprocess.check_output(command, stderr=subprocess.STDOUT)
59		except OSError as e:
60			raise ConfigError('Could not call make command: ' + str(e))
61		except subprocess.CalledProcessError as e:
62			raise ConfigError(e.output.decode())
63
64	def make_allyesconfig(self, build_dir, make_options) -> None:
65		kunit_parser.print_with_timestamp(
66			'Enabling all CONFIGs for UML...')
67		command = ['make', 'ARCH=um', 'allyesconfig']
68		if make_options:
69			command.extend(make_options)
70		if build_dir:
71			command += ['O=' + build_dir]
72		process = subprocess.Popen(
73			command,
74			stdout=subprocess.DEVNULL,
75			stderr=subprocess.STDOUT)
76		process.wait()
77		kunit_parser.print_with_timestamp(
78			'Disabling broken configs to run KUnit tests...')
79		with ExitStack() as es:
80			config = open(get_kconfig_path(build_dir), 'a')
81			disable = open(BROKEN_ALLCONFIG_PATH, 'r').read()
82			config.write(disable)
83		kunit_parser.print_with_timestamp(
84			'Starting Kernel with all configs takes a few minutes...')
85
86	def make(self, jobs, build_dir, make_options) -> None:
87		command = ['make', 'ARCH=um', '--jobs=' + str(jobs)]
88		if make_options:
89			command.extend(make_options)
90		if build_dir:
91			command += ['O=' + build_dir]
92		try:
93			proc = subprocess.Popen(command,
94						stderr=subprocess.PIPE,
95						stdout=subprocess.DEVNULL)
96		except OSError as e:
97			raise BuildError('Could not call make command: ' + str(e))
98		_, stderr = proc.communicate()
99		if proc.returncode != 0:
100			raise BuildError(stderr.decode())
101		if stderr:  # likely only due to build warnings
102			print(stderr.decode())
103
104	def linux_bin(self, params, timeout, build_dir) -> None:
105		"""Runs the Linux UML binary. Must be named 'linux'."""
106		linux_bin = get_file_path(build_dir, 'linux')
107		outfile = get_outfile_path(build_dir)
108		with open(outfile, 'w') as output:
109			process = subprocess.Popen([linux_bin] + params,
110						   stdout=output,
111						   stderr=subprocess.STDOUT)
112			process.wait(timeout)
113
114def get_kconfig_path(build_dir) -> str:
115	return get_file_path(build_dir, KCONFIG_PATH)
116
117def get_kunitconfig_path(build_dir) -> str:
118	return get_file_path(build_dir, KUNITCONFIG_PATH)
119
120def get_outfile_path(build_dir) -> str:
121	return get_file_path(build_dir, OUTFILE_PATH)
122
123class LinuxSourceTree(object):
124	"""Represents a Linux kernel source tree with KUnit tests."""
125
126	def __init__(self, build_dir: str, load_config=True, kunitconfig_path='') -> None:
127		signal.signal(signal.SIGINT, self.signal_handler)
128
129		self._ops = LinuxSourceTreeOperations()
130
131		if not load_config:
132			return
133
134		if kunitconfig_path:
135			if os.path.isdir(kunitconfig_path):
136				kunitconfig_path = os.path.join(kunitconfig_path, KUNITCONFIG_PATH)
137			if not os.path.exists(kunitconfig_path):
138				raise ConfigError(f'Specified kunitconfig ({kunitconfig_path}) does not exist')
139		else:
140			kunitconfig_path = get_kunitconfig_path(build_dir)
141			if not os.path.exists(kunitconfig_path):
142				shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, kunitconfig_path)
143
144		self._kconfig = kunit_config.Kconfig()
145		self._kconfig.read_from_file(kunitconfig_path)
146
147	def clean(self) -> bool:
148		try:
149			self._ops.make_mrproper()
150		except ConfigError as e:
151			logging.error(e)
152			return False
153		return True
154
155	def validate_config(self, build_dir) -> bool:
156		kconfig_path = get_kconfig_path(build_dir)
157		validated_kconfig = kunit_config.Kconfig()
158		validated_kconfig.read_from_file(kconfig_path)
159		if not self._kconfig.is_subset_of(validated_kconfig):
160			invalid = self._kconfig.entries() - validated_kconfig.entries()
161			message = 'Provided Kconfig is not contained in validated .config. Following fields found in kunitconfig, ' \
162					  'but not in .config: %s' % (
163					', '.join([str(e) for e in invalid])
164			)
165			logging.error(message)
166			return False
167		return True
168
169	def build_config(self, build_dir, make_options) -> bool:
170		kconfig_path = get_kconfig_path(build_dir)
171		if build_dir and not os.path.exists(build_dir):
172			os.mkdir(build_dir)
173		self._kconfig.write_to_file(kconfig_path)
174		try:
175			self._ops.make_olddefconfig(build_dir, make_options)
176		except ConfigError as e:
177			logging.error(e)
178			return False
179		return self.validate_config(build_dir)
180
181	def build_reconfig(self, build_dir, make_options) -> bool:
182		"""Creates a new .config if it is not a subset of the .kunitconfig."""
183		kconfig_path = get_kconfig_path(build_dir)
184		if os.path.exists(kconfig_path):
185			existing_kconfig = kunit_config.Kconfig()
186			existing_kconfig.read_from_file(kconfig_path)
187			if not self._kconfig.is_subset_of(existing_kconfig):
188				print('Regenerating .config ...')
189				os.remove(kconfig_path)
190				return self.build_config(build_dir, make_options)
191			else:
192				return True
193		else:
194			print('Generating .config ...')
195			return self.build_config(build_dir, make_options)
196
197	def build_um_kernel(self, alltests, jobs, build_dir, make_options) -> bool:
198		try:
199			if alltests:
200				self._ops.make_allyesconfig(build_dir, make_options)
201			self._ops.make_olddefconfig(build_dir, make_options)
202			self._ops.make(jobs, build_dir, make_options)
203		except (ConfigError, BuildError) as e:
204			logging.error(e)
205			return False
206		return self.validate_config(build_dir)
207
208	def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]:
209		if not args:
210			args = []
211		args.extend(['mem=1G', 'console=tty'])
212		if filter_glob:
213			args.append('kunit.filter_glob='+filter_glob)
214		self._ops.linux_bin(args, timeout, build_dir)
215		outfile = get_outfile_path(build_dir)
216		subprocess.call(['stty', 'sane'])
217		with open(outfile, 'r') as file:
218			for line in file:
219				yield line
220
221	def signal_handler(self, sig, frame) -> None:
222		logging.error('Build interruption occurred. Cleaning console.')
223		subprocess.call(['stty', 'sane'])
224