1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <felixguoxiuping@gmail.com> 9# Author: Brendan Higgins <brendanhiggins@google.com> 10# Author: Rae Moar <rmoar@google.com> 11 12from __future__ import annotations 13import re 14import sys 15 16from enum import Enum, auto 17from typing import Iterable, Iterator, List, Optional, Tuple 18 19from kunit_printer import stdout 20 21class Test: 22 """ 23 A class to represent a test parsed from KTAP results. All KTAP 24 results within a test log are stored in a main Test object as 25 subtests. 26 27 Attributes: 28 status : TestStatus - status of the test 29 name : str - name of the test 30 expected_count : int - expected number of subtests (0 if single 31 test case and None if unknown expected number of subtests) 32 subtests : List[Test] - list of subtests 33 log : List[str] - log of KTAP lines that correspond to the test 34 counts : TestCounts - counts of the test statuses and errors of 35 subtests or of the test itself if the test is a single 36 test case. 37 """ 38 def __init__(self) -> None: 39 """Creates Test object with default attributes.""" 40 self.status = TestStatus.TEST_CRASHED 41 self.name = '' 42 self.expected_count = 0 # type: Optional[int] 43 self.subtests = [] # type: List[Test] 44 self.log = [] # type: List[str] 45 self.counts = TestCounts() 46 47 def __str__(self) -> str: 48 """Returns string representation of a Test class object.""" 49 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 50 f'{self.subtests}, {self.log}, {self.counts})') 51 52 def __repr__(self) -> str: 53 """Returns string representation of a Test class object.""" 54 return str(self) 55 56 def add_error(self, error_message: str) -> None: 57 """Records an error that occurred while parsing this test.""" 58 self.counts.errors += 1 59 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') 60 61class TestStatus(Enum): 62 """An enumeration class to represent the status of a test.""" 63 SUCCESS = auto() 64 FAILURE = auto() 65 SKIPPED = auto() 66 TEST_CRASHED = auto() 67 NO_TESTS = auto() 68 FAILURE_TO_PARSE_TESTS = auto() 69 70class TestCounts: 71 """ 72 Tracks the counts of statuses of all test cases and any errors within 73 a Test. 74 75 Attributes: 76 passed : int - the number of tests that have passed 77 failed : int - the number of tests that have failed 78 crashed : int - the number of tests that have crashed 79 skipped : int - the number of tests that have skipped 80 errors : int - the number of errors in the test and subtests 81 """ 82 def __init__(self): 83 """Creates TestCounts object with counts of all test 84 statuses and test errors set to 0. 85 """ 86 self.passed = 0 87 self.failed = 0 88 self.crashed = 0 89 self.skipped = 0 90 self.errors = 0 91 92 def __str__(self) -> str: 93 """Returns the string representation of a TestCounts object.""" 94 statuses = [('passed', self.passed), ('failed', self.failed), 95 ('crashed', self.crashed), ('skipped', self.skipped), 96 ('errors', self.errors)] 97 return f'Ran {self.total()} tests: ' + \ 98 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 99 100 def total(self) -> int: 101 """Returns the total number of test cases within a test 102 object, where a test case is a test with no subtests. 103 """ 104 return (self.passed + self.failed + self.crashed + 105 self.skipped) 106 107 def add_subtest_counts(self, counts: TestCounts) -> None: 108 """ 109 Adds the counts of another TestCounts object to the current 110 TestCounts object. Used to add the counts of a subtest to the 111 parent test. 112 113 Parameters: 114 counts - a different TestCounts object whose counts 115 will be added to the counts of the TestCounts object 116 """ 117 self.passed += counts.passed 118 self.failed += counts.failed 119 self.crashed += counts.crashed 120 self.skipped += counts.skipped 121 self.errors += counts.errors 122 123 def get_status(self) -> TestStatus: 124 """Returns the aggregated status of a Test using test 125 counts. 126 """ 127 if self.total() == 0: 128 return TestStatus.NO_TESTS 129 if self.crashed: 130 # Crashes should take priority. 131 return TestStatus.TEST_CRASHED 132 if self.failed: 133 return TestStatus.FAILURE 134 if self.passed: 135 # No failures or crashes, looks good! 136 return TestStatus.SUCCESS 137 # We have only skipped tests. 138 return TestStatus.SKIPPED 139 140 def add_status(self, status: TestStatus) -> None: 141 """Increments the count for `status`.""" 142 if status == TestStatus.SUCCESS: 143 self.passed += 1 144 elif status == TestStatus.FAILURE: 145 self.failed += 1 146 elif status == TestStatus.SKIPPED: 147 self.skipped += 1 148 elif status != TestStatus.NO_TESTS: 149 self.crashed += 1 150 151class LineStream: 152 """ 153 A class to represent the lines of kernel output. 154 Provides a lazy peek()/pop() interface over an iterator of 155 (line#, text). 156 """ 157 _lines: Iterator[Tuple[int, str]] 158 _next: Tuple[int, str] 159 _need_next: bool 160 _done: bool 161 162 def __init__(self, lines: Iterator[Tuple[int, str]]): 163 """Creates a new LineStream that wraps the given iterator.""" 164 self._lines = lines 165 self._done = False 166 self._need_next = True 167 self._next = (0, '') 168 169 def _get_next(self) -> None: 170 """Advances the LineSteam to the next line, if necessary.""" 171 if not self._need_next: 172 return 173 try: 174 self._next = next(self._lines) 175 except StopIteration: 176 self._done = True 177 finally: 178 self._need_next = False 179 180 def peek(self) -> str: 181 """Returns the current line, without advancing the LineStream. 182 """ 183 self._get_next() 184 return self._next[1] 185 186 def pop(self) -> str: 187 """Returns the current line and advances the LineStream to 188 the next line. 189 """ 190 s = self.peek() 191 if self._done: 192 raise ValueError(f'LineStream: going past EOF, last line was {s}') 193 self._need_next = True 194 return s 195 196 def __bool__(self) -> bool: 197 """Returns True if stream has more lines.""" 198 self._get_next() 199 return not self._done 200 201 # Only used by kunit_tool_test.py. 202 def __iter__(self) -> Iterator[str]: 203 """Empties all lines stored in LineStream object into 204 Iterator object and returns the Iterator object. 205 """ 206 while bool(self): 207 yield self.pop() 208 209 def line_number(self) -> int: 210 """Returns the line number of the current line.""" 211 self._get_next() 212 return self._next[0] 213 214# Parsing helper methods: 215 216KTAP_START = re.compile(r'KTAP version ([0-9]+)$') 217TAP_START = re.compile(r'TAP version ([0-9]+)$') 218KTAP_END = re.compile('(List of all partitions:|' 219 'Kernel panic - not syncing: VFS:|reboot: System halted)') 220 221def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 222 """Extracts KTAP lines from the kernel output.""" 223 def isolate_ktap_output(kernel_output: Iterable[str]) \ 224 -> Iterator[Tuple[int, str]]: 225 line_num = 0 226 started = False 227 for line in kernel_output: 228 line_num += 1 229 line = line.rstrip() # remove trailing \n 230 if not started and KTAP_START.search(line): 231 # start extracting KTAP lines and set prefix 232 # to number of characters before version line 233 prefix_len = len( 234 line.split('KTAP version')[0]) 235 started = True 236 yield line_num, line[prefix_len:] 237 elif not started and TAP_START.search(line): 238 # start extracting KTAP lines and set prefix 239 # to number of characters before version line 240 prefix_len = len(line.split('TAP version')[0]) 241 started = True 242 yield line_num, line[prefix_len:] 243 elif started and KTAP_END.search(line): 244 # stop extracting KTAP lines 245 break 246 elif started: 247 # remove prefix and any indention and yield 248 # line with line number 249 line = line[prefix_len:].lstrip() 250 yield line_num, line 251 return LineStream(lines=isolate_ktap_output(kernel_output)) 252 253KTAP_VERSIONS = [1] 254TAP_VERSIONS = [13, 14] 255 256def check_version(version_num: int, accepted_versions: List[int], 257 version_type: str, test: Test) -> None: 258 """ 259 Adds error to test object if version number is too high or too 260 low. 261 262 Parameters: 263 version_num - The inputted version number from the parsed KTAP or TAP 264 header line 265 accepted_version - List of accepted KTAP or TAP versions 266 version_type - 'KTAP' or 'TAP' depending on the type of 267 version line. 268 test - Test object for current test being parsed 269 """ 270 if version_num < min(accepted_versions): 271 test.add_error(f'{version_type} version lower than expected!') 272 elif version_num > max(accepted_versions): 273 test.add_error(f'{version_type} version higer than expected!') 274 275def parse_ktap_header(lines: LineStream, test: Test) -> bool: 276 """ 277 Parses KTAP/TAP header line and checks version number. 278 Returns False if fails to parse KTAP/TAP header line. 279 280 Accepted formats: 281 - 'KTAP version [version number]' 282 - 'TAP version [version number]' 283 284 Parameters: 285 lines - LineStream of KTAP output to parse 286 test - Test object for current test being parsed 287 288 Return: 289 True if successfully parsed KTAP/TAP header line 290 """ 291 ktap_match = KTAP_START.match(lines.peek()) 292 tap_match = TAP_START.match(lines.peek()) 293 if ktap_match: 294 version_num = int(ktap_match.group(1)) 295 check_version(version_num, KTAP_VERSIONS, 'KTAP', test) 296 elif tap_match: 297 version_num = int(tap_match.group(1)) 298 check_version(version_num, TAP_VERSIONS, 'TAP', test) 299 else: 300 return False 301 test.log.append(lines.pop()) 302 return True 303 304TEST_HEADER = re.compile(r'^# Subtest: (.*)$') 305 306def parse_test_header(lines: LineStream, test: Test) -> bool: 307 """ 308 Parses test header and stores test name in test object. 309 Returns False if fails to parse test header line. 310 311 Accepted format: 312 - '# Subtest: [test name]' 313 314 Parameters: 315 lines - LineStream of KTAP output to parse 316 test - Test object for current test being parsed 317 318 Return: 319 True if successfully parsed test header line 320 """ 321 match = TEST_HEADER.match(lines.peek()) 322 if not match: 323 return False 324 test.log.append(lines.pop()) 325 test.name = match.group(1) 326 return True 327 328TEST_PLAN = re.compile(r'1\.\.([0-9]+)') 329 330def parse_test_plan(lines: LineStream, test: Test) -> bool: 331 """ 332 Parses test plan line and stores the expected number of subtests in 333 test object. Reports an error if expected count is 0. 334 Returns False and sets expected_count to None if there is no valid test 335 plan. 336 337 Accepted format: 338 - '1..[number of subtests]' 339 340 Parameters: 341 lines - LineStream of KTAP output to parse 342 test - Test object for current test being parsed 343 344 Return: 345 True if successfully parsed test plan line 346 """ 347 match = TEST_PLAN.match(lines.peek()) 348 if not match: 349 test.expected_count = None 350 return False 351 test.log.append(lines.pop()) 352 expected_count = int(match.group(1)) 353 test.expected_count = expected_count 354 return True 355 356TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') 357 358TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') 359 360def peek_test_name_match(lines: LineStream, test: Test) -> bool: 361 """ 362 Matches current line with the format of a test result line and checks 363 if the name matches the name of the current test. 364 Returns False if fails to match format or name. 365 366 Accepted format: 367 - '[ok|not ok] [test number] [-] [test name] [optional skip 368 directive]' 369 370 Parameters: 371 lines - LineStream of KTAP output to parse 372 test - Test object for current test being parsed 373 374 Return: 375 True if matched a test result line and the name matching the 376 expected test name 377 """ 378 line = lines.peek() 379 match = TEST_RESULT.match(line) 380 if not match: 381 return False 382 name = match.group(4) 383 return name == test.name 384 385def parse_test_result(lines: LineStream, test: Test, 386 expected_num: int) -> bool: 387 """ 388 Parses test result line and stores the status and name in the test 389 object. Reports an error if the test number does not match expected 390 test number. 391 Returns False if fails to parse test result line. 392 393 Note that the SKIP directive is the only direction that causes a 394 change in status. 395 396 Accepted format: 397 - '[ok|not ok] [test number] [-] [test name] [optional skip 398 directive]' 399 400 Parameters: 401 lines - LineStream of KTAP output to parse 402 test - Test object for current test being parsed 403 expected_num - expected test number for current test 404 405 Return: 406 True if successfully parsed a test result line. 407 """ 408 line = lines.peek() 409 match = TEST_RESULT.match(line) 410 skip_match = TEST_RESULT_SKIP.match(line) 411 412 # Check if line matches test result line format 413 if not match: 414 return False 415 test.log.append(lines.pop()) 416 417 # Set name of test object 418 if skip_match: 419 test.name = skip_match.group(4) 420 else: 421 test.name = match.group(4) 422 423 # Check test num 424 num = int(match.group(2)) 425 if num != expected_num: 426 test.add_error(f'Expected test number {expected_num} but found {num}') 427 428 # Set status of test object 429 status = match.group(1) 430 if skip_match: 431 test.status = TestStatus.SKIPPED 432 elif status == 'ok': 433 test.status = TestStatus.SUCCESS 434 else: 435 test.status = TestStatus.FAILURE 436 return True 437 438def parse_diagnostic(lines: LineStream) -> List[str]: 439 """ 440 Parse lines that do not match the format of a test result line or 441 test header line and returns them in list. 442 443 Line formats that are not parsed: 444 - '# Subtest: [test name]' 445 - '[ok|not ok] [test number] [-] [test name] [optional skip 446 directive]' 447 448 Parameters: 449 lines - LineStream of KTAP output to parse 450 451 Return: 452 Log of diagnostic lines 453 """ 454 log = [] # type: List[str] 455 while lines and not TEST_RESULT.match(lines.peek()) and not \ 456 TEST_HEADER.match(lines.peek()): 457 log.append(lines.pop()) 458 return log 459 460 461# Printing helper methods: 462 463DIVIDER = '=' * 60 464 465def format_test_divider(message: str, len_message: int) -> str: 466 """ 467 Returns string with message centered in fixed width divider. 468 469 Example: 470 '===================== message example =====================' 471 472 Parameters: 473 message - message to be centered in divider line 474 len_message - length of the message to be printed such that 475 any characters of the color codes are not counted 476 477 Return: 478 String containing message centered in fixed width divider 479 """ 480 default_count = 3 # default number of dashes 481 len_1 = default_count 482 len_2 = default_count 483 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 484 if difference > 0: 485 # calculate number of dashes for each side of the divider 486 len_1 = int(difference / 2) 487 len_2 = difference - len_1 488 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 489 490def print_test_header(test: Test) -> None: 491 """ 492 Prints test header with test name and optionally the expected number 493 of subtests. 494 495 Example: 496 '=================== example (2 subtests) ===================' 497 498 Parameters: 499 test - Test object representing current test being printed 500 """ 501 message = test.name 502 if test.expected_count: 503 if test.expected_count == 1: 504 message += ' (1 subtest)' 505 else: 506 message += f' ({test.expected_count} subtests)' 507 stdout.print_with_timestamp(format_test_divider(message, len(message))) 508 509def print_log(log: Iterable[str]) -> None: 510 """Prints all strings in saved log for test in yellow.""" 511 for m in log: 512 stdout.print_with_timestamp(stdout.yellow(m)) 513 514def format_test_result(test: Test) -> str: 515 """ 516 Returns string with formatted test result with colored status and test 517 name. 518 519 Example: 520 '[PASSED] example' 521 522 Parameters: 523 test - Test object representing current test being printed 524 525 Return: 526 String containing formatted test result 527 """ 528 if test.status == TestStatus.SUCCESS: 529 return stdout.green('[PASSED] ') + test.name 530 if test.status == TestStatus.SKIPPED: 531 return stdout.yellow('[SKIPPED] ') + test.name 532 if test.status == TestStatus.NO_TESTS: 533 return stdout.yellow('[NO TESTS RUN] ') + test.name 534 if test.status == TestStatus.TEST_CRASHED: 535 print_log(test.log) 536 return stdout.red('[CRASHED] ') + test.name 537 print_log(test.log) 538 return stdout.red('[FAILED] ') + test.name 539 540def print_test_result(test: Test) -> None: 541 """ 542 Prints result line with status of test. 543 544 Example: 545 '[PASSED] example' 546 547 Parameters: 548 test - Test object representing current test being printed 549 """ 550 stdout.print_with_timestamp(format_test_result(test)) 551 552def print_test_footer(test: Test) -> None: 553 """ 554 Prints test footer with status of test. 555 556 Example: 557 '===================== [PASSED] example =====================' 558 559 Parameters: 560 test - Test object representing current test being printed 561 """ 562 message = format_test_result(test) 563 stdout.print_with_timestamp(format_test_divider(message, 564 len(message) - stdout.color_len())) 565 566def print_summary_line(test: Test) -> None: 567 """ 568 Prints summary line of test object. Color of line is dependent on 569 status of test. Color is green if test passes, yellow if test is 570 skipped, and red if the test fails or crashes. Summary line contains 571 counts of the statuses of the tests subtests or the test itself if it 572 has no subtests. 573 574 Example: 575 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 576 Errors: 0" 577 578 test - Test object representing current test being printed 579 """ 580 if test.status == TestStatus.SUCCESS: 581 color = stdout.green 582 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 583 color = stdout.yellow 584 else: 585 color = stdout.red 586 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) 587 588# Other methods: 589 590def bubble_up_test_results(test: Test) -> None: 591 """ 592 If the test has subtests, add the test counts of the subtests to the 593 test and check if any of the tests crashed and if so set the test 594 status to crashed. Otherwise if the test has no subtests add the 595 status of the test to the test counts. 596 597 Parameters: 598 test - Test object for current test being parsed 599 """ 600 subtests = test.subtests 601 counts = test.counts 602 status = test.status 603 for t in subtests: 604 counts.add_subtest_counts(t.counts) 605 if counts.total() == 0: 606 counts.add_status(status) 607 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 608 test.status = TestStatus.TEST_CRASHED 609 610def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: 611 """ 612 Finds next test to parse in LineStream, creates new Test object, 613 parses any subtests of the test, populates Test object with all 614 information (status, name) about the test and the Test objects for 615 any subtests, and then returns the Test object. The method accepts 616 three formats of tests: 617 618 Accepted test formats: 619 620 - Main KTAP/TAP header 621 622 Example: 623 624 KTAP version 1 625 1..4 626 [subtests] 627 628 - Subtest header line 629 630 Example: 631 632 # Subtest: name 633 1..3 634 [subtests] 635 ok 1 name 636 637 - Test result line 638 639 Example: 640 641 ok 1 - test 642 643 Parameters: 644 lines - LineStream of KTAP output to parse 645 expected_num - expected test number for test to be parsed 646 log - list of strings containing any preceding diagnostic lines 647 corresponding to the current test 648 649 Return: 650 Test object populated with characteristics and any subtests 651 """ 652 test = Test() 653 test.log.extend(log) 654 parent_test = False 655 main = parse_ktap_header(lines, test) 656 if main: 657 # If KTAP/TAP header is found, attempt to parse 658 # test plan 659 test.name = "main" 660 parse_test_plan(lines, test) 661 parent_test = True 662 else: 663 # If KTAP/TAP header is not found, test must be subtest 664 # header or test result line so parse attempt to parser 665 # subtest header 666 parent_test = parse_test_header(lines, test) 667 if parent_test: 668 # If subtest header is found, attempt to parse 669 # test plan and print header 670 parse_test_plan(lines, test) 671 print_test_header(test) 672 expected_count = test.expected_count 673 subtests = [] 674 test_num = 1 675 while parent_test and (expected_count is None or test_num <= expected_count): 676 # Loop to parse any subtests. 677 # Break after parsing expected number of tests or 678 # if expected number of tests is unknown break when test 679 # result line with matching name to subtest header is found 680 # or no more lines in stream. 681 sub_log = parse_diagnostic(lines) 682 sub_test = Test() 683 if not lines or (peek_test_name_match(lines, test) and 684 not main): 685 if expected_count and test_num <= expected_count: 686 # If parser reaches end of test before 687 # parsing expected number of subtests, print 688 # crashed subtest and record error 689 test.add_error('missing expected subtest!') 690 sub_test.log.extend(sub_log) 691 test.counts.add_status( 692 TestStatus.TEST_CRASHED) 693 print_test_result(sub_test) 694 else: 695 test.log.extend(sub_log) 696 break 697 else: 698 sub_test = parse_test(lines, test_num, sub_log) 699 subtests.append(sub_test) 700 test_num += 1 701 test.subtests = subtests 702 if not main: 703 # If not main test, look for test result line 704 test.log.extend(parse_diagnostic(lines)) 705 if (parent_test and peek_test_name_match(lines, test)) or \ 706 not parent_test: 707 parse_test_result(lines, test, expected_num) 708 else: 709 test.add_error('missing subtest result line!') 710 711 # Check for there being no tests 712 if parent_test and len(subtests) == 0: 713 # Don't override a bad status if this test had one reported. 714 # Assumption: no subtests means CRASHED is from Test.__init__() 715 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 716 test.status = TestStatus.NO_TESTS 717 test.add_error('0 tests run!') 718 719 # Add statuses to TestCounts attribute in Test object 720 bubble_up_test_results(test) 721 if parent_test and not main: 722 # If test has subtests and is not the main test object, print 723 # footer. 724 print_test_footer(test) 725 elif not main: 726 print_test_result(test) 727 return test 728 729def parse_run_tests(kernel_output: Iterable[str]) -> Test: 730 """ 731 Using kernel output, extract KTAP lines, parse the lines for test 732 results and print condensed test results and summary line. 733 734 Parameters: 735 kernel_output - Iterable object contains lines of kernel output 736 737 Return: 738 Test - the main test object with all subtests. 739 """ 740 stdout.print_with_timestamp(DIVIDER) 741 lines = extract_tap_lines(kernel_output) 742 test = Test() 743 if not lines: 744 test.name = '<missing>' 745 test.add_error('could not find any KTAP output!') 746 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 747 else: 748 test = parse_test(lines, 0, []) 749 if test.status != TestStatus.NO_TESTS: 750 test.status = test.counts.get_status() 751 stdout.print_with_timestamp(DIVIDER) 752 print_summary_line(test) 753 return test 754