1#!/usr/bin/env python3 2# SPDX-License-Identifier: GPL-2.0 3# 4# A thin wrapper on top of the KUnit Kernel 5# 6# Copyright (C) 2019, Google LLC. 7# Author: Felix Guo <felixguoxiuping@gmail.com> 8# Author: Brendan Higgins <brendanhiggins@google.com> 9 10import argparse 11import os 12import re 13import sys 14import time 15 16assert sys.version_info >= (3, 7), "Python version is too old" 17 18from dataclasses import dataclass 19from enum import Enum, auto 20from typing import Any, Iterable, Sequence, List, Optional 21 22import kunit_json 23import kunit_kernel 24import kunit_parser 25 26class KunitStatus(Enum): 27 SUCCESS = auto() 28 CONFIG_FAILURE = auto() 29 BUILD_FAILURE = auto() 30 TEST_FAILURE = auto() 31 32@dataclass 33class KunitResult: 34 status: KunitStatus 35 result: Any 36 elapsed_time: float 37 38@dataclass 39class KunitConfigRequest: 40 build_dir: str 41 make_options: Optional[List[str]] 42 43@dataclass 44class KunitBuildRequest(KunitConfigRequest): 45 jobs: int 46 alltests: bool 47 48@dataclass 49class KunitParseRequest: 50 raw_output: Optional[str] 51 build_dir: str 52 json: Optional[str] 53 54@dataclass 55class KunitExecRequest(KunitParseRequest): 56 timeout: int 57 alltests: bool 58 filter_glob: str 59 kernel_args: Optional[List[str]] 60 run_isolated: Optional[str] 61 62@dataclass 63class KunitRequest(KunitExecRequest, KunitBuildRequest): 64 pass 65 66 67KernelDirectoryPath = sys.argv[0].split('tools/testing/kunit/')[0] 68 69def get_kernel_root_path() -> str: 70 path = sys.argv[0] if not __file__ else __file__ 71 parts = os.path.realpath(path).split('tools/testing/kunit') 72 if len(parts) != 2: 73 sys.exit(1) 74 return parts[0] 75 76def config_tests(linux: kunit_kernel.LinuxSourceTree, 77 request: KunitConfigRequest) -> KunitResult: 78 kunit_parser.print_with_timestamp('Configuring KUnit Kernel ...') 79 80 config_start = time.time() 81 success = linux.build_reconfig(request.build_dir, request.make_options) 82 config_end = time.time() 83 if not success: 84 return KunitResult(KunitStatus.CONFIG_FAILURE, 85 'could not configure kernel', 86 config_end - config_start) 87 return KunitResult(KunitStatus.SUCCESS, 88 'configured kernel successfully', 89 config_end - config_start) 90 91def build_tests(linux: kunit_kernel.LinuxSourceTree, 92 request: KunitBuildRequest) -> KunitResult: 93 kunit_parser.print_with_timestamp('Building KUnit Kernel ...') 94 95 build_start = time.time() 96 success = linux.build_kernel(request.alltests, 97 request.jobs, 98 request.build_dir, 99 request.make_options) 100 build_end = time.time() 101 if not success: 102 return KunitResult(KunitStatus.BUILD_FAILURE, 103 'could not build kernel', 104 build_end - build_start) 105 if not success: 106 return KunitResult(KunitStatus.BUILD_FAILURE, 107 'could not build kernel', 108 build_end - build_start) 109 return KunitResult(KunitStatus.SUCCESS, 110 'built kernel successfully', 111 build_end - build_start) 112 113def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree, 114 request: KunitBuildRequest) -> KunitResult: 115 config_result = config_tests(linux, request) 116 if config_result.status != KunitStatus.SUCCESS: 117 return config_result 118 119 return build_tests(linux, request) 120 121def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]: 122 args = ['kunit.action=list'] 123 if request.kernel_args: 124 args.extend(request.kernel_args) 125 126 output = linux.run_kernel(args=args, 127 timeout=None if request.alltests else request.timeout, 128 filter_glob=request.filter_glob, 129 build_dir=request.build_dir) 130 lines = kunit_parser.extract_tap_lines(output) 131 # Hack! Drop the dummy TAP version header that the executor prints out. 132 lines.pop() 133 134 # Filter out any extraneous non-test output that might have gotten mixed in. 135 return [l for l in lines if re.match('^[^\s.]+\.[^\s.]+$', l)] 136 137def _suites_from_test_list(tests: List[str]) -> List[str]: 138 """Extracts all the suites from an ordered list of tests.""" 139 suites = [] # type: List[str] 140 for t in tests: 141 parts = t.split('.', maxsplit=2) 142 if len(parts) != 2: 143 raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"') 144 suite, case = parts 145 if not suites or suites[-1] != suite: 146 suites.append(suite) 147 return suites 148 149 150 151def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult: 152 filter_globs = [request.filter_glob] 153 if request.run_isolated: 154 tests = _list_tests(linux, request) 155 if request.run_isolated == 'test': 156 filter_globs = tests 157 if request.run_isolated == 'suite': 158 filter_globs = _suites_from_test_list(tests) 159 # Apply the test-part of the user's glob, if present. 160 if '.' in request.filter_glob: 161 test_glob = request.filter_glob.split('.', maxsplit=2)[1] 162 filter_globs = [g + '.'+ test_glob for g in filter_globs] 163 164 test_counts = kunit_parser.TestCounts() 165 exec_time = 0.0 166 for i, filter_glob in enumerate(filter_globs): 167 kunit_parser.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs))) 168 169 test_start = time.time() 170 run_result = linux.run_kernel( 171 args=request.kernel_args, 172 timeout=None if request.alltests else request.timeout, 173 filter_glob=filter_glob, 174 build_dir=request.build_dir) 175 176 result = parse_tests(request, run_result) 177 # run_kernel() doesn't block on the kernel exiting. 178 # That only happens after we get the last line of output from `run_result`. 179 # So exec_time here actually contains parsing + execution time, which is fine. 180 test_end = time.time() 181 exec_time += test_end - test_start 182 183 test_counts.add_subtest_counts(result.result.counts) 184 185 if len(filter_globs) == 1 and test_counts.crashed > 0: 186 bd = request.build_dir 187 print('The kernel seems to have crashed; you can decode the stack traces with:') 188 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format( 189 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0])) 190 191 kunit_status = _map_to_overall_status(test_counts.get_status()) 192 return KunitResult(status=kunit_status, result=result, elapsed_time=exec_time) 193 194def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus: 195 if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED): 196 return KunitStatus.SUCCESS 197 else: 198 return KunitStatus.TEST_FAILURE 199 200def parse_tests(request: KunitParseRequest, input_data: Iterable[str]) -> KunitResult: 201 parse_start = time.time() 202 203 test_result = kunit_parser.Test() 204 205 if request.raw_output: 206 # Treat unparsed results as one passing test. 207 test_result.status = kunit_parser.TestStatus.SUCCESS 208 test_result.counts.passed = 1 209 210 output: Iterable[str] = input_data 211 if request.raw_output == 'all': 212 pass 213 elif request.raw_output == 'kunit': 214 output = kunit_parser.extract_tap_lines(output) 215 else: 216 print(f'Unknown --raw_output option "{request.raw_output}"', file=sys.stderr) 217 for line in output: 218 print(line.rstrip()) 219 220 else: 221 test_result = kunit_parser.parse_run_tests(input_data) 222 parse_end = time.time() 223 224 if request.json: 225 json_obj = kunit_json.get_json_result( 226 test=test_result, 227 def_config='kunit_defconfig', 228 build_dir=request.build_dir, 229 json_path=request.json) 230 if request.json == 'stdout': 231 print(json_obj) 232 233 if test_result.status != kunit_parser.TestStatus.SUCCESS: 234 return KunitResult(KunitStatus.TEST_FAILURE, test_result, 235 parse_end - parse_start) 236 237 return KunitResult(KunitStatus.SUCCESS, test_result, 238 parse_end - parse_start) 239 240def run_tests(linux: kunit_kernel.LinuxSourceTree, 241 request: KunitRequest) -> KunitResult: 242 run_start = time.time() 243 244 config_result = config_tests(linux, request) 245 if config_result.status != KunitStatus.SUCCESS: 246 return config_result 247 248 build_result = build_tests(linux, request) 249 if build_result.status != KunitStatus.SUCCESS: 250 return build_result 251 252 exec_result = exec_tests(linux, request) 253 254 run_end = time.time() 255 256 kunit_parser.print_with_timestamp(( 257 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' + 258 'building, %.3fs running\n') % ( 259 run_end - run_start, 260 config_result.elapsed_time, 261 build_result.elapsed_time, 262 exec_result.elapsed_time)) 263 return exec_result 264 265# Problem: 266# $ kunit.py run --json 267# works as one would expect and prints the parsed test results as JSON. 268# $ kunit.py run --json suite_name 269# would *not* pass suite_name as the filter_glob and print as json. 270# argparse will consider it to be another way of writing 271# $ kunit.py run --json=suite_name 272# i.e. it would run all tests, and dump the json to a `suite_name` file. 273# So we hackily automatically rewrite --json => --json=stdout 274pseudo_bool_flag_defaults = { 275 '--json': 'stdout', 276 '--raw_output': 'kunit', 277} 278def massage_argv(argv: Sequence[str]) -> Sequence[str]: 279 def massage_arg(arg: str) -> str: 280 if arg not in pseudo_bool_flag_defaults: 281 return arg 282 return f'{arg}={pseudo_bool_flag_defaults[arg]}' 283 return list(map(massage_arg, argv)) 284 285def get_default_jobs() -> int: 286 return len(os.sched_getaffinity(0)) 287 288def add_common_opts(parser) -> None: 289 parser.add_argument('--build_dir', 290 help='As in the make command, it specifies the build ' 291 'directory.', 292 type=str, default='.kunit', metavar='build_dir') 293 parser.add_argument('--make_options', 294 help='X=Y make option, can be repeated.', 295 action='append') 296 parser.add_argument('--alltests', 297 help='Run all KUnit tests through allyesconfig', 298 action='store_true') 299 parser.add_argument('--kunitconfig', 300 help='Path to Kconfig fragment that enables KUnit tests.' 301 ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" ' 302 'will get automatically appended.', 303 metavar='kunitconfig') 304 parser.add_argument('--kconfig_add', 305 help='Additional Kconfig options to append to the ' 306 '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.', 307 action='append') 308 309 parser.add_argument('--arch', 310 help=('Specifies the architecture to run tests under. ' 311 'The architecture specified here must match the ' 312 'string passed to the ARCH make param, ' 313 'e.g. i386, x86_64, arm, um, etc. Non-UML ' 314 'architectures run on QEMU.'), 315 type=str, default='um', metavar='arch') 316 317 parser.add_argument('--cross_compile', 318 help=('Sets make\'s CROSS_COMPILE variable; it should ' 319 'be set to a toolchain path prefix (the prefix ' 320 'of gcc and other tools in your toolchain, for ' 321 'example `sparc64-linux-gnu-` if you have the ' 322 'sparc toolchain installed on your system, or ' 323 '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` ' 324 'if you have downloaded the microblaze toolchain ' 325 'from the 0-day website to a directory in your ' 326 'home directory called `toolchains`).'), 327 metavar='cross_compile') 328 329 parser.add_argument('--qemu_config', 330 help=('Takes a path to a path to a file containing ' 331 'a QemuArchParams object.'), 332 type=str, metavar='qemu_config') 333 334def add_build_opts(parser) -> None: 335 parser.add_argument('--jobs', 336 help='As in the make command, "Specifies the number of ' 337 'jobs (commands) to run simultaneously."', 338 type=int, default=get_default_jobs(), metavar='jobs') 339 340def add_exec_opts(parser) -> None: 341 parser.add_argument('--timeout', 342 help='maximum number of seconds to allow for all tests ' 343 'to run. This does not include time taken to build the ' 344 'tests.', 345 type=int, 346 default=300, 347 metavar='timeout') 348 parser.add_argument('filter_glob', 349 help='Filter which KUnit test suites/tests run at ' 350 'boot-time, e.g. list* or list*.*del_test', 351 type=str, 352 nargs='?', 353 default='', 354 metavar='filter_glob') 355 parser.add_argument('--kernel_args', 356 help='Kernel command-line parameters. Maybe be repeated', 357 action='append') 358 parser.add_argument('--run_isolated', help='If set, boot the kernel for each ' 359 'individual suite/test. This is can be useful for debugging ' 360 'a non-hermetic test, one that might pass/fail based on ' 361 'what ran before it.', 362 type=str, 363 choices=['suite', 'test']), 364 365def add_parse_opts(parser) -> None: 366 parser.add_argument('--raw_output', help='If set don\'t format output from kernel. ' 367 'If set to --raw_output=kunit, filters to just KUnit output.', 368 type=str, nargs='?', const='all', default=None) 369 parser.add_argument('--json', 370 nargs='?', 371 help='Stores test results in a JSON, and either ' 372 'prints to stdout or saves to file if a ' 373 'filename is specified', 374 type=str, const='stdout', default=None) 375 376def main(argv, linux=None): 377 parser = argparse.ArgumentParser( 378 description='Helps writing and running KUnit tests.') 379 subparser = parser.add_subparsers(dest='subcommand') 380 381 # The 'run' command will config, build, exec, and parse in one go. 382 run_parser = subparser.add_parser('run', help='Runs KUnit tests.') 383 add_common_opts(run_parser) 384 add_build_opts(run_parser) 385 add_exec_opts(run_parser) 386 add_parse_opts(run_parser) 387 388 config_parser = subparser.add_parser('config', 389 help='Ensures that .config contains all of ' 390 'the options in .kunitconfig') 391 add_common_opts(config_parser) 392 393 build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests') 394 add_common_opts(build_parser) 395 add_build_opts(build_parser) 396 397 exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests') 398 add_common_opts(exec_parser) 399 add_exec_opts(exec_parser) 400 add_parse_opts(exec_parser) 401 402 # The 'parse' option is special, as it doesn't need the kernel source 403 # (therefore there is no need for a build_dir, hence no add_common_opts) 404 # and the '--file' argument is not relevant to 'run', so isn't in 405 # add_parse_opts() 406 parse_parser = subparser.add_parser('parse', 407 help='Parses KUnit results from a file, ' 408 'and parses formatted results.') 409 add_parse_opts(parse_parser) 410 parse_parser.add_argument('file', 411 help='Specifies the file to read results from.', 412 type=str, nargs='?', metavar='input_file') 413 414 cli_args = parser.parse_args(massage_argv(argv)) 415 416 if get_kernel_root_path(): 417 os.chdir(get_kernel_root_path()) 418 419 if cli_args.subcommand == 'run': 420 if not os.path.exists(cli_args.build_dir): 421 os.mkdir(cli_args.build_dir) 422 423 if not linux: 424 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 425 kunitconfig_path=cli_args.kunitconfig, 426 kconfig_add=cli_args.kconfig_add, 427 arch=cli_args.arch, 428 cross_compile=cli_args.cross_compile, 429 qemu_config_path=cli_args.qemu_config) 430 431 request = KunitRequest(build_dir=cli_args.build_dir, 432 make_options=cli_args.make_options, 433 jobs=cli_args.jobs, 434 alltests=cli_args.alltests, 435 raw_output=cli_args.raw_output, 436 json=cli_args.json, 437 timeout=cli_args.timeout, 438 filter_glob=cli_args.filter_glob, 439 kernel_args=cli_args.kernel_args, 440 run_isolated=cli_args.run_isolated) 441 result = run_tests(linux, request) 442 if result.status != KunitStatus.SUCCESS: 443 sys.exit(1) 444 elif cli_args.subcommand == 'config': 445 if cli_args.build_dir and ( 446 not os.path.exists(cli_args.build_dir)): 447 os.mkdir(cli_args.build_dir) 448 449 if not linux: 450 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 451 kunitconfig_path=cli_args.kunitconfig, 452 kconfig_add=cli_args.kconfig_add, 453 arch=cli_args.arch, 454 cross_compile=cli_args.cross_compile, 455 qemu_config_path=cli_args.qemu_config) 456 457 request = KunitConfigRequest(build_dir=cli_args.build_dir, 458 make_options=cli_args.make_options) 459 result = config_tests(linux, request) 460 kunit_parser.print_with_timestamp(( 461 'Elapsed time: %.3fs\n') % ( 462 result.elapsed_time)) 463 if result.status != KunitStatus.SUCCESS: 464 sys.exit(1) 465 elif cli_args.subcommand == 'build': 466 if not linux: 467 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 468 kunitconfig_path=cli_args.kunitconfig, 469 kconfig_add=cli_args.kconfig_add, 470 arch=cli_args.arch, 471 cross_compile=cli_args.cross_compile, 472 qemu_config_path=cli_args.qemu_config) 473 474 request = KunitBuildRequest(build_dir=cli_args.build_dir, 475 make_options=cli_args.make_options, 476 jobs=cli_args.jobs, 477 alltests=cli_args.alltests) 478 result = config_and_build_tests(linux, request) 479 kunit_parser.print_with_timestamp(( 480 'Elapsed time: %.3fs\n') % ( 481 result.elapsed_time)) 482 if result.status != KunitStatus.SUCCESS: 483 sys.exit(1) 484 elif cli_args.subcommand == 'exec': 485 if not linux: 486 linux = kunit_kernel.LinuxSourceTree(cli_args.build_dir, 487 kunitconfig_path=cli_args.kunitconfig, 488 kconfig_add=cli_args.kconfig_add, 489 arch=cli_args.arch, 490 cross_compile=cli_args.cross_compile, 491 qemu_config_path=cli_args.qemu_config) 492 493 exec_request = KunitExecRequest(raw_output=cli_args.raw_output, 494 build_dir=cli_args.build_dir, 495 json=cli_args.json, 496 timeout=cli_args.timeout, 497 alltests=cli_args.alltests, 498 filter_glob=cli_args.filter_glob, 499 kernel_args=cli_args.kernel_args, 500 run_isolated=cli_args.run_isolated) 501 result = exec_tests(linux, exec_request) 502 kunit_parser.print_with_timestamp(( 503 'Elapsed time: %.3fs\n') % (result.elapsed_time)) 504 if result.status != KunitStatus.SUCCESS: 505 sys.exit(1) 506 elif cli_args.subcommand == 'parse': 507 if cli_args.file == None: 508 sys.stdin.reconfigure(errors='backslashreplace') # pytype: disable=attribute-error 509 kunit_output = sys.stdin 510 else: 511 with open(cli_args.file, 'r', errors='backslashreplace') as f: 512 kunit_output = f.read().splitlines() 513 request = KunitParseRequest(raw_output=cli_args.raw_output, 514 build_dir='', 515 json=cli_args.json) 516 result = parse_tests(request, kunit_output) 517 if result.status != KunitStatus.SUCCESS: 518 sys.exit(1) 519 else: 520 parser.print_help() 521 522if __name__ == '__main__': 523 main(sys.argv[1:]) 524