1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# A thin wrapper on top of the KUnit Kernel 5# 6# Copyright (C) 2019, Google LLC. 7# Author: Felix Guo <felixguoxiuping@gmail.com> 8# Author: Brendan Higgins <brendanhiggins@google.com> 9 10import argparse 11import os 12import re 13import shlex 14import sys 15import time 16 17assert sys.version_info >= (3, 7), "Python version is too old" 18 19from dataclasses import dataclass 20from enum import Enum, auto 21from typing import Iterable, List, Optional, Sequence, Tuple 22 23import kunit_json 24import kunit_kernel 25import kunit_parser 26from kunit_printer import stdout 27 28class KunitStatus(Enum): 29 SUCCESS = auto() 30 CONFIG_FAILURE = auto() 31 BUILD_FAILURE = auto() 32 TEST_FAILURE = auto() 33 34@dataclass 35class KunitResult: 36 status: KunitStatus 37 elapsed_time: float 38 39@dataclass 40class KunitConfigRequest: 41 build_dir: str 42 make_options: Optional[List[str]] 43 44@dataclass 45class KunitBuildRequest(KunitConfigRequest): 46 jobs: int 47 alltests: bool 48 49@dataclass 50class KunitParseRequest: 51 raw_output: Optional[str] 52 json: Optional[str] 53 54@dataclass 55class KunitExecRequest(KunitParseRequest): 56 build_dir: str 57 timeout: int 58 alltests: bool 59 filter_glob: str 60 kernel_args: Optional[List[str]] 61 run_isolated: Optional[str] 62 63@dataclass 64class KunitRequest(KunitExecRequest, KunitBuildRequest): 65 pass 66 67 68def get_kernel_root_path() -> str: 69 path = sys.argv[0] if not __file__ else __file__ 70 parts = os.path.realpath(path).split('tools/testing/kunit') 71 if len(parts) != 2: 72 sys.exit(1) 73 return parts[0] 74 75def config_tests(linux: kunit_kernel.LinuxSourceTree, 76 request: KunitConfigRequest) -> KunitResult: 77 stdout.print_with_timestamp('Configuring KUnit Kernel ...') 78 79 config_start = time.time() 80 success = linux.build_reconfig(request.build_dir, request.make_options) 81 config_end = time.time() 82 if not success: 83 return KunitResult(KunitStatus.CONFIG_FAILURE, 84 config_end - config_start) 85 return KunitResult(KunitStatus.SUCCESS, 86 config_end - config_start) 87 88def build_tests(linux: kunit_kernel.LinuxSourceTree, 89 request: KunitBuildRequest) -> KunitResult: 90 stdout.print_with_timestamp('Building KUnit Kernel ...') 91 92 build_start = time.time() 93 success = linux.build_kernel(request.alltests, 94 request.jobs, 95 request.build_dir, 96 request.make_options) 97 build_end = time.time() 98 if not success: 99 return KunitResult(KunitStatus.BUILD_FAILURE, 100 build_end - build_start) 101 if not success: 102 return KunitResult(KunitStatus.BUILD_FAILURE, 103 build_end - build_start) 104 return KunitResult(KunitStatus.SUCCESS, 105 build_end - build_start) 106 107def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, 108 request: KunitBuildRequest) -> KunitResult: 109 config_result = config_tests(linux, request) 110 if config_result.status != KunitStatus.SUCCESS: 111 return config_result 112 113 return build_tests(linux, request) 114 115def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: 116 args = ['kunit.action=list'] 117 if request.kernel_args: 118 args.extend(request.kernel_args) 119 120 output = linux.run_kernel(args=args, 121 timeout=None if request.alltests else request.timeout, 122 filter_glob=request.filter_glob, 123 build_dir=request.build_dir) 124 lines = kunit_parser.extract_tap_lines(output) 125 # Hack! Drop the dummy TAP version header that the executor prints out. 126 lines.pop() 127 128 # Filter out any extraneous non-test output that might have gotten mixed in. 129 return [l for l in lines if re.match(r'^[^\s.]+\.[^\s.]+$', l)] 130 131def _suites_from_test_list(tests: List[str]) -> List[str]: 132 """Extracts all the suites from an ordered list of tests.""" 133 suites = [] # type: List[str] 134 for t in tests: 135 parts = t.split('.', maxsplit=2) 136 if len(parts) != 2: 137 raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"') 138 suite, case = parts 139 if not suites or suites[-1] != suite: 140 suites.append(suite) 141 return suites 142 143 144 145def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: 146 filter_globs = [request.filter_glob] 147 if request.run_isolated: 148 tests = _list_tests(linux, request) 149 if request.run_isolated == 'test': 150 filter_globs = tests 151 if request.run_isolated == 'suite': 152 filter_globs = _suites_from_test_list(tests) 153 # Apply the test-part of the user's glob, if present. 154 if '.' in request.filter_glob: 155 test_glob = request.filter_glob.split('.', maxsplit=2)[1] 156 filter_globs = [g + '.'+ test_glob for g in filter_globs] 157 158 metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig') 159 160 test_counts = kunit_parser.TestCounts() 161 exec_time = 0.0 162 for i, filter_glob in enumerate(filter_globs): 163 stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) 164 165 test_start = time.time() 166 run_result = linux.run_kernel( 167 args=request.kernel_args, 168 timeout=None if request.alltests else request.timeout, 169 filter_glob=filter_glob, 170 build_dir=request.build_dir) 171 172 _, test_result = parse_tests(request, metadata, run_result) 173 # run_kernel() doesn't block on the kernel exiting. 174 # That only happens after we get the last line of output from `run_result`. 175 # So exec_time here actually contains parsing + execution time, which is fine. 176 test_end = time.time() 177 exec_time += test_end - test_start 178 179 test_counts.add_subtest_counts(test_result.counts) 180 181 if len(filter_globs) == 1 and test_counts.crashed > 0: 182 bd = request.build_dir 183 print('The kernel seems to have crashed; you can decode the stack traces with:') 184 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( 185 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) 186 187 kunit_status = _map_to_overall_status(test_counts.get_status()) 188 return KunitResult(status=kunit_status, elapsed_time=exec_time) 189 190def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: 191 if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): 192 return KunitStatus.SUCCESS 193 return KunitStatus.TEST_FAILURE 194 195def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]: 196 parse_start = time.time() 197 198 test_result = kunit_parser.Test() 199 200 if request.raw_output: 201 # Treat unparsed results as one passing test. 202 test_result.status = kunit_parser.TestStatus.SUCCESS 203 test_result.counts.passed = 1 204 205 output: Iterable[str] = input_data 206 if request.raw_output == 'all': 207 pass 208 elif request.raw_output == 'kunit': 209 output = kunit_parser.extract_tap_lines(output) 210 for line in output: 211 print(line.rstrip()) 212 213 else: 214 test_result = kunit_parser.parse_run_tests(input_data) 215 parse_end = time.time() 216 217 if request.json: 218 json_str = kunit_json.get_json_result( 219 test=test_result, 220 metadata=metadata) 221 if request.json == 'stdout': 222 print(json_str) 223 else: 224 with open(request.json, 'w') as f: 225 f.write(json_str) 226 stdout.print_with_timestamp("Test results stored in %s" % 227 os.path.abspath(request.json)) 228 229 if test_result.status != kunit_parser.TestStatus.SUCCESS: 230 return KunitResult(KunitStatus.TEST_FAILURE, parse_end - parse_start), test_result 231 232 return KunitResult(KunitStatus.SUCCESS, parse_end - parse_start), test_result 233 234def run_tests(linux: kunit_kernel.LinuxSourceTree, 235 request: KunitRequest) -> KunitResult: 236 run_start = time.time() 237 238 config_result = config_tests(linux, request) 239 if config_result.status != KunitStatus.SUCCESS: 240 return config_result 241 242 build_result = build_tests(linux, request) 243 if build_result.status != KunitStatus.SUCCESS: 244 return build_result 245 246 exec_result = exec_tests(linux, request) 247 248 run_end = time.time() 249 250 stdout.print_with_timestamp(( 251 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 252 'building, %.3fs running\n') % ( 253 run_end - run_start, 254 config_result.elapsed_time, 255 build_result.elapsed_time, 256 exec_result.elapsed_time)) 257 return exec_result 258 259# Problem: 260# $ kunit.py run --json 261# works as one would expect and prints the parsed test results as JSON. 262# $ kunit.py run --json suite_name 263# would *not* pass suite_name as the filter_glob and print as json. 264# argparse will consider it to be another way of writing 265# $ kunit.py run --json=suite_name 266# i.e. it would run all tests, and dump the json to a `suite_name` file. 267# So we hackily automatically rewrite --json => --json=stdout 268pseudo_bool_flag_defaults = { 269 '--json': 'stdout', 270 '--raw_output': 'kunit', 271} 272def massage_argv(argv: Sequence[str]) -> Sequence[str]: 273 def massage_arg(arg: str) -> str: 274 if arg not in pseudo_bool_flag_defaults: 275 return arg 276 return f'{arg}={pseudo_bool_flag_defaults[arg]}' 277 return list(map(massage_arg, argv)) 278 279def get_default_jobs() -> int: 280 return len(os.sched_getaffinity(0)) 281 282def add_common_opts(parser) -> None: 283 parser.add_argument('--build_dir', 284 help='As in the make command, it specifies the build ' 285 'directory.', 286 type=str, default='.kunit', metavar='DIR') 287 parser.add_argument('--make_options', 288 help='X=Y make option, can be repeated.', 289 action='append', metavar='X=Y') 290 parser.add_argument('--alltests', 291 help='Run all KUnit tests through allyesconfig', 292 action='store_true') 293 parser.add_argument('--kunitconfig', 294 help='Path to Kconfig fragment that enables KUnit tests.' 295 ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 296 'will get automatically appended. If repeated, the files ' 297 'blindly concatenated, which might not work in all cases.', 298 action='append', metavar='PATHS') 299 parser.add_argument('--kconfig_add', 300 help='Additional Kconfig options to append to the ' 301 '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', 302 action='append', metavar='CONFIG_X=Y') 303 304 parser.add_argument('--arch', 305 help=('Specifies the architecture to run tests under. ' 306 'The architecture specified here must match the ' 307 'string passed to the ARCH make param, ' 308 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 309 'architectures run on QEMU.'), 310 type=str, default='um', metavar='ARCH') 311 312 parser.add_argument('--cross_compile', 313 help=('Sets make\'s CROSS_COMPILE variable; it should ' 314 'be set to a toolchain path prefix (the prefix ' 315 'of gcc and other tools in your toolchain, for ' 316 'example `sparc64-linux-gnu-` if you have the ' 317 'sparc toolchain installed on your system, or ' 318 '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 319 'if you have downloaded the microblaze toolchain ' 320 'from the 0-day website to a directory in your ' 321 'home directory called `toolchains`).'), 322 metavar='PREFIX') 323 324 parser.add_argument('--qemu_config', 325 help=('Takes a path to a path to a file containing ' 326 'a QemuArchParams object.'), 327 type=str, metavar='FILE') 328 329 parser.add_argument('--qemu_args', 330 help='Additional QEMU arguments, e.g. "-smp 8"', 331 action='append', metavar='') 332 333def add_build_opts(parser) -> None: 334 parser.add_argument('--jobs', 335 help='As in the make command, "Specifies the number of ' 336 'jobs (commands) to run simultaneously."', 337 type=int, default=get_default_jobs(), metavar='N') 338 339def add_exec_opts(parser) -> None: 340 parser.add_argument('--timeout', 341 help='maximum number of seconds to allow for all tests ' 342 'to run. This does not include time taken to build the ' 343 'tests.', 344 type=int, 345 default=300, 346 metavar='SECONDS') 347 parser.add_argument('filter_glob', 348 help='Filter which KUnit test suites/tests run at ' 349 'boot-time, e.g. list* or list*.*del_test', 350 type=str, 351 nargs='?', 352 default='', 353 metavar='filter_glob') 354 parser.add_argument('--kernel_args', 355 help='Kernel command-line parameters. Maybe be repeated', 356 action='append', metavar='') 357 parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 358 'individual suite/test. This is can be useful for debugging ' 359 'a non-hermetic test, one that might pass/fail based on ' 360 'what ran before it.', 361 type=str, 362 choices=['suite', 'test']) 363 364def add_parse_opts(parser) -> None: 365 parser.add_argument('--raw_output', help='If set don\'t format output from kernel. ' 366 'If set to --raw_output=kunit, filters to just KUnit output.', 367 type=str, nargs='?', const='all', default=None, choices=['all', 'kunit']) 368 parser.add_argument('--json', 369 nargs='?', 370 help='Stores test results in a JSON, and either ' 371 'prints to stdout or saves to file if a ' 372 'filename is specified', 373 type=str, const='stdout', default=None, metavar='FILE') 374 375 376def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree: 377 """Returns a LinuxSourceTree based on the user's arguments.""" 378 # Allow users to specify multiple arguments in one string, e.g. '-smp 8' 379 qemu_args: List[str] = [] 380 if cli_args.qemu_args: 381 for arg in cli_args.qemu_args: 382 qemu_args.extend(shlex.split(arg)) 383 384 return kunit_kernel.LinuxSourceTree(cli_args.build_dir, 385 kunitconfig_paths=cli_args.kunitconfig, 386 kconfig_add=cli_args.kconfig_add, 387 arch=cli_args.arch, 388 cross_compile=cli_args.cross_compile, 389 qemu_config_path=cli_args.qemu_config, 390 extra_qemu_args=qemu_args) 391 392 393def main(argv): 394 parser = argparse.ArgumentParser( 395 description='Helps writing and running KUnit tests.') 396 subparser = parser.add_subparsers(dest='subcommand') 397 398 # The 'run' command will config, build, exec, and parse in one go. 399 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 400 add_common_opts(run_parser) 401 add_build_opts(run_parser) 402 add_exec_opts(run_parser) 403 add_parse_opts(run_parser) 404 405 config_parser = subparser.add_parser('config', 406 help='Ensures that .config contains all of ' 407 'the options in .kunitconfig') 408 add_common_opts(config_parser) 409 410 build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') 411 add_common_opts(build_parser) 412 add_build_opts(build_parser) 413 414 exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') 415 add_common_opts(exec_parser) 416 add_exec_opts(exec_parser) 417 add_parse_opts(exec_parser) 418 419 # The 'parse' option is special, as it doesn't need the kernel source 420 # (therefore there is no need for a build_dir, hence no add_common_opts) 421 # and the '--file' argument is not relevant to 'run', so isn't in 422 # add_parse_opts() 423 parse_parser = subparser.add_parser('parse', 424 help='Parses KUnit results from a file, ' 425 'and parses formatted results.') 426 add_parse_opts(parse_parser) 427 parse_parser.add_argument('file', 428 help='Specifies the file to read results from.', 429 type=str, nargs='?', metavar='input_file') 430 431 cli_args = parser.parse_args(massage_argv(argv)) 432 433 if get_kernel_root_path(): 434 os.chdir(get_kernel_root_path()) 435 436 if cli_args.subcommand == 'run': 437 if not os.path.exists(cli_args.build_dir): 438 os.mkdir(cli_args.build_dir) 439 440 linux = tree_from_args(cli_args) 441 request = KunitRequest(build_dir=cli_args.build_dir, 442 make_options=cli_args.make_options, 443 jobs=cli_args.jobs, 444 alltests=cli_args.alltests, 445 raw_output=cli_args.raw_output, 446 json=cli_args.json, 447 timeout=cli_args.timeout, 448 filter_glob=cli_args.filter_glob, 449 kernel_args=cli_args.kernel_args, 450 run_isolated=cli_args.run_isolated) 451 result = run_tests(linux, request) 452 if result.status != KunitStatus.SUCCESS: 453 sys.exit(1) 454 elif cli_args.subcommand == 'config': 455 if cli_args.build_dir and ( 456 not os.path.exists(cli_args.build_dir)): 457 os.mkdir(cli_args.build_dir) 458 459 linux = tree_from_args(cli_args) 460 request = KunitConfigRequest(build_dir=cli_args.build_dir, 461 make_options=cli_args.make_options) 462 result = config_tests(linux, request) 463 stdout.print_with_timestamp(( 464 'Elapsed time: %.3fs\n') % ( 465 result.elapsed_time)) 466 if result.status != KunitStatus.SUCCESS: 467 sys.exit(1) 468 elif cli_args.subcommand == 'build': 469 linux = tree_from_args(cli_args) 470 request = KunitBuildRequest(build_dir=cli_args.build_dir, 471 make_options=cli_args.make_options, 472 jobs=cli_args.jobs, 473 alltests=cli_args.alltests) 474 result = config_and_build_tests(linux, request) 475 stdout.print_with_timestamp(( 476 'Elapsed time: %.3fs\n') % ( 477 result.elapsed_time)) 478 if result.status != KunitStatus.SUCCESS: 479 sys.exit(1) 480 elif cli_args.subcommand == 'exec': 481 linux = tree_from_args(cli_args) 482 exec_request = KunitExecRequest(raw_output=cli_args.raw_output, 483 build_dir=cli_args.build_dir, 484 json=cli_args.json, 485 timeout=cli_args.timeout, 486 alltests=cli_args.alltests, 487 filter_glob=cli_args.filter_glob, 488 kernel_args=cli_args.kernel_args, 489 run_isolated=cli_args.run_isolated) 490 result = exec_tests(linux, exec_request) 491 stdout.print_with_timestamp(( 492 'Elapsed time: %.3fs\n') % (result.elapsed_time)) 493 if result.status != KunitStatus.SUCCESS: 494 sys.exit(1) 495 elif cli_args.subcommand == 'parse': 496 if cli_args.file is None: 497 sys.stdin.reconfigure(errors='backslashreplace') # pytype: disable=attribute-error 498 kunit_output = sys.stdin 499 else: 500 with open(cli_args.file, 'r', errors='backslashreplace') as f: 501 kunit_output = f.read().splitlines() 502 # We know nothing about how the result was created! 503 metadata = kunit_json.Metadata() 504 request = KunitParseRequest(raw_output=cli_args.raw_output, 505 json=cli_args.json) 506 result, _ = parse_tests(request, metadata, kunit_output) 507 if result.status != KunitStatus.SUCCESS: 508 sys.exit(1) 509 else: 510 parser.print_help() 511 512if __name__ == '__main__': 513 main(sys.argv[1:]) 514