xref: /openbmc/linux/tools/testing/kunit/kunit.py (revision 83775e158a3d2dc437132ab357ed6c9214ef0ae9)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# A thin wrapper on top of the KUnit Kernel
5#
6# Copyright (C) 2019, Google LLC.
7# Author: Felix Guo <felixguoxiuping@gmail.com>
8# Author: Brendan Higgins <brendanhiggins@google.com>
9
10import argparse
11import os
12import re
13import shlex
14import sys
15import time
16
17assert sys.version_info >= (3, 7), "Python version is too old"
18
19from dataclasses import dataclass
20from enum import Enum, auto
21from typing import Iterable, List, Optional, Sequence, Tuple
22
23import kunit_json
24import kunit_kernel
25import kunit_parser
26from kunit_printer import stdout
27
28class KunitStatus(Enum):
29	SUCCESS = auto()
30	CONFIG_FAILURE = auto()
31	BUILD_FAILURE = auto()
32	TEST_FAILURE = auto()
33
34@dataclass
35class KunitResult:
36	status: KunitStatus
37	elapsed_time: float
38
39@dataclass
40class KunitConfigRequest:
41	build_dir: str
42	make_options: Optional[List[str]]
43
44@dataclass
45class KunitBuildRequest(KunitConfigRequest):
46	jobs: int
47
48@dataclass
49class KunitParseRequest:
50	raw_output: Optional[str]
51	json: Optional[str]
52
53@dataclass
54class KunitExecRequest(KunitParseRequest):
55	build_dir: str
56	timeout: int
57	filter_glob: str
58	kernel_args: Optional[List[str]]
59	run_isolated: Optional[str]
60
61@dataclass
62class KunitRequest(KunitExecRequest, KunitBuildRequest):
63	pass
64
65
66def get_kernel_root_path() -> str:
67	path = sys.argv[0] if not __file__ else __file__
68	parts = os.path.realpath(path).split('tools/testing/kunit')
69	if len(parts) != 2:
70		sys.exit(1)
71	return parts[0]
72
73def config_tests(linux: kunit_kernel.LinuxSourceTree,
74		 request: KunitConfigRequest) -> KunitResult:
75	stdout.print_with_timestamp('Configuring KUnit Kernel ...')
76
77	config_start = time.time()
78	success = linux.build_reconfig(request.build_dir, request.make_options)
79	config_end = time.time()
80	status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE
81	return KunitResult(status, config_end - config_start)
82
83def build_tests(linux: kunit_kernel.LinuxSourceTree,
84		request: KunitBuildRequest) -> KunitResult:
85	stdout.print_with_timestamp('Building KUnit Kernel ...')
86
87	build_start = time.time()
88	success = linux.build_kernel(request.jobs,
89				     request.build_dir,
90				     request.make_options)
91	build_end = time.time()
92	status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE
93	return KunitResult(status, build_end - build_start)
94
95def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
96			   request: KunitBuildRequest) -> KunitResult:
97	config_result = config_tests(linux, request)
98	if config_result.status != KunitStatus.SUCCESS:
99		return config_result
100
101	return build_tests(linux, request)
102
103def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
104	args = ['kunit.action=list']
105	if request.kernel_args:
106		args.extend(request.kernel_args)
107
108	output = linux.run_kernel(args=args,
109			   timeout=request.timeout,
110			   filter_glob=request.filter_glob,
111			   build_dir=request.build_dir)
112	lines = kunit_parser.extract_tap_lines(output)
113	# Hack! Drop the dummy TAP version header that the executor prints out.
114	lines.pop()
115
116	# Filter out any extraneous non-test output that might have gotten mixed in.
117	return [l for l in lines if re.match(r'^[^\s.]+\.[^\s.]+$', l)]
118
119def _suites_from_test_list(tests: List[str]) -> List[str]:
120	"""Extracts all the suites from an ordered list of tests."""
121	suites = []  # type: List[str]
122	for t in tests:
123		parts = t.split('.', maxsplit=2)
124		if len(parts) != 2:
125			raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
126		suite, _ = parts
127		if not suites or suites[-1] != suite:
128			suites.append(suite)
129	return suites
130
131
132
133def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
134	filter_globs = [request.filter_glob]
135	if request.run_isolated:
136		tests = _list_tests(linux, request)
137		if request.run_isolated == 'test':
138			filter_globs = tests
139		elif request.run_isolated == 'suite':
140			filter_globs = _suites_from_test_list(tests)
141			# Apply the test-part of the user's glob, if present.
142			if '.' in request.filter_glob:
143				test_glob = request.filter_glob.split('.', maxsplit=2)[1]
144				filter_globs = [g + '.'+ test_glob for g in filter_globs]
145
146	metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig')
147
148	test_counts = kunit_parser.TestCounts()
149	exec_time = 0.0
150	for i, filter_glob in enumerate(filter_globs):
151		stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
152
153		test_start = time.time()
154		run_result = linux.run_kernel(
155			args=request.kernel_args,
156			timeout=request.timeout,
157			filter_glob=filter_glob,
158			build_dir=request.build_dir)
159
160		_, test_result = parse_tests(request, metadata, run_result)
161		# run_kernel() doesn't block on the kernel exiting.
162		# That only happens after we get the last line of output from `run_result`.
163		# So exec_time here actually contains parsing + execution time, which is fine.
164		test_end = time.time()
165		exec_time += test_end - test_start
166
167		test_counts.add_subtest_counts(test_result.counts)
168
169	if len(filter_globs) == 1 and test_counts.crashed > 0:
170		bd = request.build_dir
171		print('The kernel seems to have crashed; you can decode the stack traces with:')
172		print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
173				bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
174
175	kunit_status = _map_to_overall_status(test_counts.get_status())
176	return KunitResult(status=kunit_status, elapsed_time=exec_time)
177
178def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
179	if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
180		return KunitStatus.SUCCESS
181	return KunitStatus.TEST_FAILURE
182
183def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]:
184	parse_start = time.time()
185
186	if request.raw_output:
187		# Treat unparsed results as one passing test.
188		fake_test = kunit_parser.Test()
189		fake_test.status = kunit_parser.TestStatus.SUCCESS
190		fake_test.counts.passed = 1
191
192		output: Iterable[str] = input_data
193		if request.raw_output == 'all':
194			pass
195		elif request.raw_output == 'kunit':
196			output = kunit_parser.extract_tap_lines(output)
197		for line in output:
198			print(line.rstrip())
199		parse_time = time.time() - parse_start
200		return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test
201
202
203	# Actually parse the test results.
204	test = kunit_parser.parse_run_tests(input_data)
205	parse_time = time.time() - parse_start
206
207	if request.json:
208		json_str = kunit_json.get_json_result(
209					test=test,
210					metadata=metadata)
211		if request.json == 'stdout':
212			print(json_str)
213		else:
214			with open(request.json, 'w') as f:
215				f.write(json_str)
216			stdout.print_with_timestamp("Test results stored in %s" %
217				os.path.abspath(request.json))
218
219	if test.status != kunit_parser.TestStatus.SUCCESS:
220		return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test
221
222	return KunitResult(KunitStatus.SUCCESS, parse_time), test
223
224def run_tests(linux: kunit_kernel.LinuxSourceTree,
225	      request: KunitRequest) -> KunitResult:
226	run_start = time.time()
227
228	config_result = config_tests(linux, request)
229	if config_result.status != KunitStatus.SUCCESS:
230		return config_result
231
232	build_result = build_tests(linux, request)
233	if build_result.status != KunitStatus.SUCCESS:
234		return build_result
235
236	exec_result = exec_tests(linux, request)
237
238	run_end = time.time()
239
240	stdout.print_with_timestamp((
241		'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
242		'building, %.3fs running\n') % (
243				run_end - run_start,
244				config_result.elapsed_time,
245				build_result.elapsed_time,
246				exec_result.elapsed_time))
247	return exec_result
248
249# Problem:
250# $ kunit.py run --json
251# works as one would expect and prints the parsed test results as JSON.
252# $ kunit.py run --json suite_name
253# would *not* pass suite_name as the filter_glob and print as json.
254# argparse will consider it to be another way of writing
255# $ kunit.py run --json=suite_name
256# i.e. it would run all tests, and dump the json to a `suite_name` file.
257# So we hackily automatically rewrite --json => --json=stdout
258pseudo_bool_flag_defaults = {
259		'--json': 'stdout',
260		'--raw_output': 'kunit',
261}
262def massage_argv(argv: Sequence[str]) -> Sequence[str]:
263	def massage_arg(arg: str) -> str:
264		if arg not in pseudo_bool_flag_defaults:
265			return arg
266		return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
267	return list(map(massage_arg, argv))
268
269def get_default_jobs() -> int:
270	return len(os.sched_getaffinity(0))
271
272def add_common_opts(parser: argparse.ArgumentParser) -> None:
273	parser.add_argument('--build_dir',
274			    help='As in the make command, it specifies the build '
275			    'directory.',
276			    type=str, default='.kunit', metavar='DIR')
277	parser.add_argument('--make_options',
278			    help='X=Y make option, can be repeated.',
279			    action='append', metavar='X=Y')
280	parser.add_argument('--alltests',
281			    help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config',
282			    action='store_true')
283	parser.add_argument('--kunitconfig',
284			     help='Path to Kconfig fragment that enables KUnit tests.'
285			     ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
286			     'will get  automatically appended. If repeated, the files '
287			     'blindly concatenated, which might not work in all cases.',
288			     action='append', metavar='PATHS')
289	parser.add_argument('--kconfig_add',
290			     help='Additional Kconfig options to append to the '
291			     '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
292			    action='append', metavar='CONFIG_X=Y')
293
294	parser.add_argument('--arch',
295			    help=('Specifies the architecture to run tests under. '
296				  'The architecture specified here must match the '
297				  'string passed to the ARCH make param, '
298				  'e.g. i386, x86_64, arm, um, etc. Non-UML '
299				  'architectures run on QEMU.'),
300			    type=str, default='um', metavar='ARCH')
301
302	parser.add_argument('--cross_compile',
303			    help=('Sets make\'s CROSS_COMPILE variable; it should '
304				  'be set to a toolchain path prefix (the prefix '
305				  'of gcc and other tools in your toolchain, for '
306				  'example `sparc64-linux-gnu-` if you have the '
307				  'sparc toolchain installed on your system, or '
308				  '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
309				  'if you have downloaded the microblaze toolchain '
310				  'from the 0-day website to a directory in your '
311				  'home directory called `toolchains`).'),
312			    metavar='PREFIX')
313
314	parser.add_argument('--qemu_config',
315			    help=('Takes a path to a path to a file containing '
316				  'a QemuArchParams object.'),
317			    type=str, metavar='FILE')
318
319	parser.add_argument('--qemu_args',
320			    help='Additional QEMU arguments, e.g. "-smp 8"',
321			    action='append', metavar='')
322
323def add_build_opts(parser: argparse.ArgumentParser) -> None:
324	parser.add_argument('--jobs',
325			    help='As in the make command, "Specifies  the number of '
326			    'jobs (commands) to run simultaneously."',
327			    type=int, default=get_default_jobs(), metavar='N')
328
329def add_exec_opts(parser: argparse.ArgumentParser) -> None:
330	parser.add_argument('--timeout',
331			    help='maximum number of seconds to allow for all tests '
332			    'to run. This does not include time taken to build the '
333			    'tests.',
334			    type=int,
335			    default=300,
336			    metavar='SECONDS')
337	parser.add_argument('filter_glob',
338			    help='Filter which KUnit test suites/tests run at '
339			    'boot-time, e.g. list* or list*.*del_test',
340			    type=str,
341			    nargs='?',
342			    default='',
343			    metavar='filter_glob')
344	parser.add_argument('--kernel_args',
345			    help='Kernel command-line parameters. Maybe be repeated',
346			     action='append', metavar='')
347	parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
348			    'individual suite/test. This is can be useful for debugging '
349			    'a non-hermetic test, one that might pass/fail based on '
350			    'what ran before it.',
351			    type=str,
352			    choices=['suite', 'test'])
353
354def add_parse_opts(parser: argparse.ArgumentParser) -> None:
355	parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. '
356			    'By default, filters to just KUnit output. Use '
357			    '--raw_output=all to show everything',
358			     type=str, nargs='?', const='all', default=None, choices=['all', 'kunit'])
359	parser.add_argument('--json',
360			    nargs='?',
361			    help='Prints parsed test results as JSON to stdout or a file if '
362			    'a filename is specified. Does nothing if --raw_output is set.',
363			    type=str, const='stdout', default=None, metavar='FILE')
364
365
366def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree:
367	"""Returns a LinuxSourceTree based on the user's arguments."""
368	# Allow users to specify multiple arguments in one string, e.g. '-smp 8'
369	qemu_args: List[str] = []
370	if cli_args.qemu_args:
371		for arg in cli_args.qemu_args:
372			qemu_args.extend(shlex.split(arg))
373
374	kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else []
375	if cli_args.alltests:
376		# Prepend so user-specified options take prio if we ever allow
377		# --kunitconfig options to have differing options.
378		kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs
379
380	return kunit_kernel.LinuxSourceTree(cli_args.build_dir,
381			kunitconfig_paths=kunitconfigs,
382			kconfig_add=cli_args.kconfig_add,
383			arch=cli_args.arch,
384			cross_compile=cli_args.cross_compile,
385			qemu_config_path=cli_args.qemu_config,
386			extra_qemu_args=qemu_args)
387
388
389def run_handler(cli_args: argparse.Namespace) -> None:
390	if not os.path.exists(cli_args.build_dir):
391		os.mkdir(cli_args.build_dir)
392
393	linux = tree_from_args(cli_args)
394	request = KunitRequest(build_dir=cli_args.build_dir,
395					make_options=cli_args.make_options,
396					jobs=cli_args.jobs,
397					raw_output=cli_args.raw_output,
398					json=cli_args.json,
399					timeout=cli_args.timeout,
400					filter_glob=cli_args.filter_glob,
401					kernel_args=cli_args.kernel_args,
402					run_isolated=cli_args.run_isolated)
403	result = run_tests(linux, request)
404	if result.status != KunitStatus.SUCCESS:
405		sys.exit(1)
406
407
408def config_handler(cli_args: argparse.Namespace) -> None:
409	if cli_args.build_dir and (
410			not os.path.exists(cli_args.build_dir)):
411		os.mkdir(cli_args.build_dir)
412
413	linux = tree_from_args(cli_args)
414	request = KunitConfigRequest(build_dir=cli_args.build_dir,
415						make_options=cli_args.make_options)
416	result = config_tests(linux, request)
417	stdout.print_with_timestamp((
418		'Elapsed time: %.3fs\n') % (
419			result.elapsed_time))
420	if result.status != KunitStatus.SUCCESS:
421		sys.exit(1)
422
423
424def build_handler(cli_args: argparse.Namespace) -> None:
425	linux = tree_from_args(cli_args)
426	request = KunitBuildRequest(build_dir=cli_args.build_dir,
427					make_options=cli_args.make_options,
428					jobs=cli_args.jobs)
429	result = config_and_build_tests(linux, request)
430	stdout.print_with_timestamp((
431		'Elapsed time: %.3fs\n') % (
432			result.elapsed_time))
433	if result.status != KunitStatus.SUCCESS:
434		sys.exit(1)
435
436
437def exec_handler(cli_args: argparse.Namespace) -> None:
438	linux = tree_from_args(cli_args)
439	exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
440					build_dir=cli_args.build_dir,
441					json=cli_args.json,
442					timeout=cli_args.timeout,
443					filter_glob=cli_args.filter_glob,
444					kernel_args=cli_args.kernel_args,
445					run_isolated=cli_args.run_isolated)
446	result = exec_tests(linux, exec_request)
447	stdout.print_with_timestamp((
448		'Elapsed time: %.3fs\n') % (result.elapsed_time))
449	if result.status != KunitStatus.SUCCESS:
450		sys.exit(1)
451
452
453def parse_handler(cli_args: argparse.Namespace) -> None:
454	if cli_args.file is None:
455		sys.stdin.reconfigure(errors='backslashreplace')  # type: ignore
456		kunit_output = sys.stdin  # type: Iterable[str]
457	else:
458		with open(cli_args.file, 'r', errors='backslashreplace') as f:
459			kunit_output = f.read().splitlines()
460	# We know nothing about how the result was created!
461	metadata = kunit_json.Metadata()
462	request = KunitParseRequest(raw_output=cli_args.raw_output,
463					json=cli_args.json)
464	result, _ = parse_tests(request, metadata, kunit_output)
465	if result.status != KunitStatus.SUCCESS:
466		sys.exit(1)
467
468
469subcommand_handlers_map = {
470	'run': run_handler,
471	'config': config_handler,
472	'build': build_handler,
473	'exec': exec_handler,
474	'parse': parse_handler
475}
476
477
478def main(argv: Sequence[str]) -> None:
479	parser = argparse.ArgumentParser(
480			description='Helps writing and running KUnit tests.')
481	subparser = parser.add_subparsers(dest='subcommand')
482
483	# The 'run' command will config, build, exec, and parse in one go.
484	run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
485	add_common_opts(run_parser)
486	add_build_opts(run_parser)
487	add_exec_opts(run_parser)
488	add_parse_opts(run_parser)
489
490	config_parser = subparser.add_parser('config',
491						help='Ensures that .config contains all of '
492						'the options in .kunitconfig')
493	add_common_opts(config_parser)
494
495	build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
496	add_common_opts(build_parser)
497	add_build_opts(build_parser)
498
499	exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
500	add_common_opts(exec_parser)
501	add_exec_opts(exec_parser)
502	add_parse_opts(exec_parser)
503
504	# The 'parse' option is special, as it doesn't need the kernel source
505	# (therefore there is no need for a build_dir, hence no add_common_opts)
506	# and the '--file' argument is not relevant to 'run', so isn't in
507	# add_parse_opts()
508	parse_parser = subparser.add_parser('parse',
509					    help='Parses KUnit results from a file, '
510					    'and parses formatted results.')
511	add_parse_opts(parse_parser)
512	parse_parser.add_argument('file',
513				  help='Specifies the file to read results from.',
514				  type=str, nargs='?', metavar='input_file')
515
516	cli_args = parser.parse_args(massage_argv(argv))
517
518	if get_kernel_root_path():
519		os.chdir(get_kernel_root_path())
520
521	subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None)
522
523	if subcomand_handler is None:
524		parser.print_help()
525		return
526
527	subcomand_handler(cli_args)
528
529
530if __name__ == '__main__':
531	main(sys.argv[1:])
532