1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# A thin wrapper on top of the KUnit Kernel 5# 6# Copyright (C) 2019, Google LLC. 7# Author: Felix Guo <felixguoxiuping@gmail.com> 8# Author: Brendan Higgins <brendanhiggins@google.com> 9 10import argparse 11import os 12import re 13import shlex 14import sys 15import time 16 17assert sys.version_info >= (3, 7), "Python version is too old" 18 19from dataclasses import dataclass 20from enum import Enum, auto 21from typing import Iterable, List, Optional, Sequence, Tuple 22 23import kunit_json 24import kunit_kernel 25import kunit_parser 26from kunit_printer import stdout 27 28class KunitStatus(Enum): 29 SUCCESS = auto() 30 CONFIG_FAILURE = auto() 31 BUILD_FAILURE = auto() 32 TEST_FAILURE = auto() 33 34@dataclass 35class KunitResult: 36 status: KunitStatus 37 elapsed_time: float 38 39@dataclass 40class KunitConfigRequest: 41 build_dir: str 42 make_options: Optional[List[str]] 43 44@dataclass 45class KunitBuildRequest(KunitConfigRequest): 46 jobs: int 47 48@dataclass 49class KunitParseRequest: 50 raw_output: Optional[str] 51 json: Optional[str] 52 53@dataclass 54class KunitExecRequest(KunitParseRequest): 55 build_dir: str 56 timeout: int 57 filter_glob: str 58 filter: str 59 filter_action: Optional[str] 60 kernel_args: Optional[List[str]] 61 run_isolated: Optional[str] 62 list_tests: bool 63 list_tests_attr: bool 64 65@dataclass 66class KunitRequest(KunitExecRequest, KunitBuildRequest): 67 pass 68 69 70def get_kernel_root_path() -> str: 71 path = sys.argv[0] if not __file__ else __file__ 72 parts = os.path.realpath(path).split('tools/testing/kunit') 73 if len(parts) != 2: 74 sys.exit(1) 75 return parts[0] 76 77def config_tests(linux: kunit_kernel.LinuxSourceTree, 78 request: KunitConfigRequest) -> KunitResult: 79 stdout.print_with_timestamp('Configuring KUnit Kernel ...') 80 81 config_start = time.time() 82 success = linux.build_reconfig(request.build_dir, request.make_options) 83 config_end = time.time() 84 status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE 85 return KunitResult(status, config_end - config_start) 86 87def build_tests(linux: kunit_kernel.LinuxSourceTree, 88 request: KunitBuildRequest) -> KunitResult: 89 stdout.print_with_timestamp('Building KUnit Kernel ...') 90 91 build_start = time.time() 92 success = linux.build_kernel(request.jobs, 93 request.build_dir, 94 request.make_options) 95 build_end = time.time() 96 status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE 97 return KunitResult(status, build_end - build_start) 98 99def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, 100 request: KunitBuildRequest) -> KunitResult: 101 config_result = config_tests(linux, request) 102 if config_result.status != KunitStatus.SUCCESS: 103 return config_result 104 105 return build_tests(linux, request) 106 107def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: 108 args = ['kunit.action=list'] 109 110 if request.kernel_args: 111 args.extend(request.kernel_args) 112 113 output = linux.run_kernel(args=args, 114 timeout=request.timeout, 115 filter_glob=request.filter_glob, 116 filter=request.filter, 117 filter_action=request.filter_action, 118 build_dir=request.build_dir) 119 lines = kunit_parser.extract_tap_lines(output) 120 # Hack! Drop the dummy TAP version header that the executor prints out. 121 lines.pop() 122 123 # Filter out any extraneous non-test output that might have gotten mixed in. 124 return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)] 125 126def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]: 127 args = ['kunit.action=list_attr'] 128 129 if request.kernel_args: 130 args.extend(request.kernel_args) 131 132 output = linux.run_kernel(args=args, 133 timeout=request.timeout, 134 filter_glob=request.filter_glob, 135 filter=request.filter, 136 filter_action=request.filter_action, 137 build_dir=request.build_dir) 138 lines = kunit_parser.extract_tap_lines(output) 139 # Hack! Drop the dummy TAP version header that the executor prints out. 140 lines.pop() 141 142 # Filter out any extraneous non-test output that might have gotten mixed in. 143 return lines 144 145def _suites_from_test_list(tests: List[str]) -> List[str]: 146 """Extracts all the suites from an ordered list of tests.""" 147 suites = [] # type: List[str] 148 for t in tests: 149 parts = t.split('.', maxsplit=2) 150 if len(parts) != 2: 151 raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"') 152 suite, _ = parts 153 if not suites or suites[-1] != suite: 154 suites.append(suite) 155 return suites 156 157def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: 158 filter_globs = [request.filter_glob] 159 if request.list_tests: 160 output = _list_tests(linux, request) 161 for line in output: 162 print(line.rstrip()) 163 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) 164 if request.list_tests_attr: 165 attr_output = _list_tests_attr(linux, request) 166 for line in attr_output: 167 print(line.rstrip()) 168 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0) 169 if request.run_isolated: 170 tests = _list_tests(linux, request) 171 if request.run_isolated == 'test': 172 filter_globs = tests 173 elif request.run_isolated == 'suite': 174 filter_globs = _suites_from_test_list(tests) 175 # Apply the test-part of the user's glob, if present. 176 if '.' in request.filter_glob: 177 test_glob = request.filter_glob.split('.', maxsplit=2)[1] 178 filter_globs = [g + '.'+ test_glob for g in filter_globs] 179 180 metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig') 181 182 test_counts = kunit_parser.TestCounts() 183 exec_time = 0.0 184 for i, filter_glob in enumerate(filter_globs): 185 stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) 186 187 test_start = time.time() 188 run_result = linux.run_kernel( 189 args=request.kernel_args, 190 timeout=request.timeout, 191 filter_glob=filter_glob, 192 filter=request.filter, 193 filter_action=request.filter_action, 194 build_dir=request.build_dir) 195 196 _, test_result = parse_tests(request, metadata, run_result) 197 # run_kernel() doesn't block on the kernel exiting. 198 # That only happens after we get the last line of output from `run_result`. 199 # So exec_time here actually contains parsing + execution time, which is fine. 200 test_end = time.time() 201 exec_time += test_end - test_start 202 203 test_counts.add_subtest_counts(test_result.counts) 204 205 if len(filter_globs) == 1 and test_counts.crashed > 0: 206 bd = request.build_dir 207 print('The kernel seems to have crashed; you can decode the stack traces with:') 208 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( 209 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) 210 211 kunit_status = _map_to_overall_status(test_counts.get_status()) 212 return KunitResult(status=kunit_status, elapsed_time=exec_time) 213 214def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: 215 if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): 216 return KunitStatus.SUCCESS 217 return KunitStatus.TEST_FAILURE 218 219def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]: 220 parse_start = time.time() 221 222 if request.raw_output: 223 # Treat unparsed results as one passing test. 224 fake_test = kunit_parser.Test() 225 fake_test.status = kunit_parser.TestStatus.SUCCESS 226 fake_test.counts.passed = 1 227 228 output: Iterable[str] = input_data 229 if request.raw_output == 'all': 230 pass 231 elif request.raw_output == 'kunit': 232 output = kunit_parser.extract_tap_lines(output) 233 for line in output: 234 print(line.rstrip()) 235 parse_time = time.time() - parse_start 236 return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test 237 238 239 # Actually parse the test results. 240 test = kunit_parser.parse_run_tests(input_data) 241 parse_time = time.time() - parse_start 242 243 if request.json: 244 json_str = kunit_json.get_json_result( 245 test=test, 246 metadata=metadata) 247 if request.json == 'stdout': 248 print(json_str) 249 else: 250 with open(request.json, 'w') as f: 251 f.write(json_str) 252 stdout.print_with_timestamp("Test results stored in %s" % 253 os.path.abspath(request.json)) 254 255 if test.status != kunit_parser.TestStatus.SUCCESS: 256 return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test 257 258 return KunitResult(KunitStatus.SUCCESS, parse_time), test 259 260def run_tests(linux: kunit_kernel.LinuxSourceTree, 261 request: KunitRequest) -> KunitResult: 262 run_start = time.time() 263 264 config_result = config_tests(linux, request) 265 if config_result.status != KunitStatus.SUCCESS: 266 return config_result 267 268 build_result = build_tests(linux, request) 269 if build_result.status != KunitStatus.SUCCESS: 270 return build_result 271 272 exec_result = exec_tests(linux, request) 273 274 run_end = time.time() 275 276 stdout.print_with_timestamp(( 277 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 278 'building, %.3fs running\n') % ( 279 run_end - run_start, 280 config_result.elapsed_time, 281 build_result.elapsed_time, 282 exec_result.elapsed_time)) 283 return exec_result 284 285# Problem: 286# $ kunit.py run --json 287# works as one would expect and prints the parsed test results as JSON. 288# $ kunit.py run --json suite_name 289# would *not* pass suite_name as the filter_glob and print as json. 290# argparse will consider it to be another way of writing 291# $ kunit.py run --json=suite_name 292# i.e. it would run all tests, and dump the json to a `suite_name` file. 293# So we hackily automatically rewrite --json => --json=stdout 294pseudo_bool_flag_defaults = { 295 '--json': 'stdout', 296 '--raw_output': 'kunit', 297} 298def massage_argv(argv: Sequence[str]) -> Sequence[str]: 299 def massage_arg(arg: str) -> str: 300 if arg not in pseudo_bool_flag_defaults: 301 return arg 302 return f'{arg}={pseudo_bool_flag_defaults[arg]}' 303 return list(map(massage_arg, argv)) 304 305def get_default_jobs() -> int: 306 return len(os.sched_getaffinity(0)) 307 308def add_common_opts(parser: argparse.ArgumentParser) -> None: 309 parser.add_argument('--build_dir', 310 help='As in the make command, it specifies the build ' 311 'directory.', 312 type=str, default='.kunit', metavar='DIR') 313 parser.add_argument('--make_options', 314 help='X=Y make option, can be repeated.', 315 action='append', metavar='X=Y') 316 parser.add_argument('--alltests', 317 help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config', 318 action='store_true') 319 parser.add_argument('--kunitconfig', 320 help='Path to Kconfig fragment that enables KUnit tests.' 321 ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 322 'will get automatically appended. If repeated, the files ' 323 'blindly concatenated, which might not work in all cases.', 324 action='append', metavar='PATHS') 325 parser.add_argument('--kconfig_add', 326 help='Additional Kconfig options to append to the ' 327 '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', 328 action='append', metavar='CONFIG_X=Y') 329 330 parser.add_argument('--arch', 331 help=('Specifies the architecture to run tests under. ' 332 'The architecture specified here must match the ' 333 'string passed to the ARCH make param, ' 334 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 335 'architectures run on QEMU.'), 336 type=str, default='um', metavar='ARCH') 337 338 parser.add_argument('--cross_compile', 339 help=('Sets make\'s CROSS_COMPILE variable; it should ' 340 'be set to a toolchain path prefix (the prefix ' 341 'of gcc and other tools in your toolchain, for ' 342 'example `sparc64-linux-gnu-` if you have the ' 343 'sparc toolchain installed on your system, or ' 344 '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 345 'if you have downloaded the microblaze toolchain ' 346 'from the 0-day website to a directory in your ' 347 'home directory called `toolchains`).'), 348 metavar='PREFIX') 349 350 parser.add_argument('--qemu_config', 351 help=('Takes a path to a path to a file containing ' 352 'a QemuArchParams object.'), 353 type=str, metavar='FILE') 354 355 parser.add_argument('--qemu_args', 356 help='Additional QEMU arguments, e.g. "-smp 8"', 357 action='append', metavar='') 358 359def add_build_opts(parser: argparse.ArgumentParser) -> None: 360 parser.add_argument('--jobs', 361 help='As in the make command, "Specifies the number of ' 362 'jobs (commands) to run simultaneously."', 363 type=int, default=get_default_jobs(), metavar='N') 364 365def add_exec_opts(parser: argparse.ArgumentParser) -> None: 366 parser.add_argument('--timeout', 367 help='maximum number of seconds to allow for all tests ' 368 'to run. This does not include time taken to build the ' 369 'tests.', 370 type=int, 371 default=300, 372 metavar='SECONDS') 373 parser.add_argument('filter_glob', 374 help='Filter which KUnit test suites/tests run at ' 375 'boot-time, e.g. list* or list*.*del_test', 376 type=str, 377 nargs='?', 378 default='', 379 metavar='filter_glob') 380 parser.add_argument('--filter', 381 help='Filter KUnit tests with attributes, ' 382 'e.g. module=example or speed>slow', 383 type=str, 384 default='') 385 parser.add_argument('--filter_action', 386 help='If set to skip, filtered tests will be skipped, ' 387 'e.g. --filter_action=skip. Otherwise they will not run.', 388 type=str, 389 choices=['skip']) 390 parser.add_argument('--kernel_args', 391 help='Kernel command-line parameters. Maybe be repeated', 392 action='append', metavar='') 393 parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 394 'individual suite/test. This is can be useful for debugging ' 395 'a non-hermetic test, one that might pass/fail based on ' 396 'what ran before it.', 397 type=str, 398 choices=['suite', 'test']) 399 parser.add_argument('--list_tests', help='If set, list all tests that will be ' 400 'run.', 401 action='store_true') 402 parser.add_argument('--list_tests_attr', help='If set, list all tests and test ' 403 'attributes.', 404 action='store_true') 405 406def add_parse_opts(parser: argparse.ArgumentParser) -> None: 407 parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. ' 408 'By default, filters to just KUnit output. Use ' 409 '--raw_output=all to show everything', 410 type=str, nargs='?', const='all', default=None, choices=['all', 'kunit']) 411 parser.add_argument('--json', 412 nargs='?', 413 help='Prints parsed test results as JSON to stdout or a file if ' 414 'a filename is specified. Does nothing if --raw_output is set.', 415 type=str, const='stdout', default=None, metavar='FILE') 416 417 418def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree: 419 """Returns a LinuxSourceTree based on the user's arguments.""" 420 # Allow users to specify multiple arguments in one string, e.g. '-smp 8' 421 qemu_args: List[str] = [] 422 if cli_args.qemu_args: 423 for arg in cli_args.qemu_args: 424 qemu_args.extend(shlex.split(arg)) 425 426 kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else [] 427 if cli_args.alltests: 428 # Prepend so user-specified options take prio if we ever allow 429 # --kunitconfig options to have differing options. 430 kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs 431 432 return kunit_kernel.LinuxSourceTree(cli_args.build_dir, 433 kunitconfig_paths=kunitconfigs, 434 kconfig_add=cli_args.kconfig_add, 435 arch=cli_args.arch, 436 cross_compile=cli_args.cross_compile, 437 qemu_config_path=cli_args.qemu_config, 438 extra_qemu_args=qemu_args) 439 440 441def run_handler(cli_args: argparse.Namespace) -> None: 442 if not os.path.exists(cli_args.build_dir): 443 os.mkdir(cli_args.build_dir) 444 445 linux = tree_from_args(cli_args) 446 request = KunitRequest(build_dir=cli_args.build_dir, 447 make_options=cli_args.make_options, 448 jobs=cli_args.jobs, 449 raw_output=cli_args.raw_output, 450 json=cli_args.json, 451 timeout=cli_args.timeout, 452 filter_glob=cli_args.filter_glob, 453 filter=cli_args.filter, 454 filter_action=cli_args.filter_action, 455 kernel_args=cli_args.kernel_args, 456 run_isolated=cli_args.run_isolated, 457 list_tests=cli_args.list_tests, 458 list_tests_attr=cli_args.list_tests_attr) 459 result = run_tests(linux, request) 460 if result.status != KunitStatus.SUCCESS: 461 sys.exit(1) 462 463 464def config_handler(cli_args: argparse.Namespace) -> None: 465 if cli_args.build_dir and ( 466 not os.path.exists(cli_args.build_dir)): 467 os.mkdir(cli_args.build_dir) 468 469 linux = tree_from_args(cli_args) 470 request = KunitConfigRequest(build_dir=cli_args.build_dir, 471 make_options=cli_args.make_options) 472 result = config_tests(linux, request) 473 stdout.print_with_timestamp(( 474 'Elapsed time: %.3fs\n') % ( 475 result.elapsed_time)) 476 if result.status != KunitStatus.SUCCESS: 477 sys.exit(1) 478 479 480def build_handler(cli_args: argparse.Namespace) -> None: 481 linux = tree_from_args(cli_args) 482 request = KunitBuildRequest(build_dir=cli_args.build_dir, 483 make_options=cli_args.make_options, 484 jobs=cli_args.jobs) 485 result = config_and_build_tests(linux, request) 486 stdout.print_with_timestamp(( 487 'Elapsed time: %.3fs\n') % ( 488 result.elapsed_time)) 489 if result.status != KunitStatus.SUCCESS: 490 sys.exit(1) 491 492 493def exec_handler(cli_args: argparse.Namespace) -> None: 494 linux = tree_from_args(cli_args) 495 exec_request = KunitExecRequest(raw_output=cli_args.raw_output, 496 build_dir=cli_args.build_dir, 497 json=cli_args.json, 498 timeout=cli_args.timeout, 499 filter_glob=cli_args.filter_glob, 500 filter=cli_args.filter, 501 filter_action=cli_args.filter_action, 502 kernel_args=cli_args.kernel_args, 503 run_isolated=cli_args.run_isolated, 504 list_tests=cli_args.list_tests, 505 list_tests_attr=cli_args.list_tests_attr) 506 result = exec_tests(linux, exec_request) 507 stdout.print_with_timestamp(( 508 'Elapsed time: %.3fs\n') % (result.elapsed_time)) 509 if result.status != KunitStatus.SUCCESS: 510 sys.exit(1) 511 512 513def parse_handler(cli_args: argparse.Namespace) -> None: 514 if cli_args.file is None: 515 sys.stdin.reconfigure(errors='backslashreplace') # type: ignore 516 kunit_output = sys.stdin # type: Iterable[str] 517 else: 518 with open(cli_args.file, 'r', errors='backslashreplace') as f: 519 kunit_output = f.read().splitlines() 520 # We know nothing about how the result was created! 521 metadata = kunit_json.Metadata() 522 request = KunitParseRequest(raw_output=cli_args.raw_output, 523 json=cli_args.json) 524 result, _ = parse_tests(request, metadata, kunit_output) 525 if result.status != KunitStatus.SUCCESS: 526 sys.exit(1) 527 528 529subcommand_handlers_map = { 530 'run': run_handler, 531 'config': config_handler, 532 'build': build_handler, 533 'exec': exec_handler, 534 'parse': parse_handler 535} 536 537 538def main(argv: Sequence[str]) -> None: 539 parser = argparse.ArgumentParser( 540 description='Helps writing and running KUnit tests.') 541 subparser = parser.add_subparsers(dest='subcommand') 542 543 # The 'run' command will config, build, exec, and parse in one go. 544 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 545 add_common_opts(run_parser) 546 add_build_opts(run_parser) 547 add_exec_opts(run_parser) 548 add_parse_opts(run_parser) 549 550 config_parser = subparser.add_parser('config', 551 help='Ensures that .config contains all of ' 552 'the options in .kunitconfig') 553 add_common_opts(config_parser) 554 555 build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') 556 add_common_opts(build_parser) 557 add_build_opts(build_parser) 558 559 exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') 560 add_common_opts(exec_parser) 561 add_exec_opts(exec_parser) 562 add_parse_opts(exec_parser) 563 564 # The 'parse' option is special, as it doesn't need the kernel source 565 # (therefore there is no need for a build_dir, hence no add_common_opts) 566 # and the '--file' argument is not relevant to 'run', so isn't in 567 # add_parse_opts() 568 parse_parser = subparser.add_parser('parse', 569 help='Parses KUnit results from a file, ' 570 'and parses formatted results.') 571 add_parse_opts(parse_parser) 572 parse_parser.add_argument('file', 573 help='Specifies the file to read results from.', 574 type=str, nargs='?', metavar='input_file') 575 576 cli_args = parser.parse_args(massage_argv(argv)) 577 578 if get_kernel_root_path(): 579 os.chdir(get_kernel_root_path()) 580 581 subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None) 582 583 if subcomand_handler is None: 584 parser.print_help() 585 return 586 587 subcomand_handler(cli_args) 588 589 590if __name__ == '__main__': 591 main(sys.argv[1:]) 592