1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <felixguoxiuping@gmail.com> 9# Author: Brendan Higgins <brendanhiggins@google.com> 10# Author: Rae Moar <rmoar@google.com> 11 12from __future__ import annotations 13import re 14import sys 15 16import datetime 17from enum import Enum, auto 18from typing import Iterable, Iterator, List, Optional, Tuple 19 20class Test: 21 """ 22 A class to represent a test parsed from KTAP results. All KTAP 23 results within a test log are stored in a main Test object as 24 subtests. 25 26 Attributes: 27 status : TestStatus - status of the test 28 name : str - name of the test 29 expected_count : int - expected number of subtests (0 if single 30 test case and None if unknown expected number of subtests) 31 subtests : List[Test] - list of subtests 32 log : List[str] - log of KTAP lines that correspond to the test 33 counts : TestCounts - counts of the test statuses and errors of 34 subtests or of the test itself if the test is a single 35 test case. 36 """ 37 def __init__(self) -> None: 38 """Creates Test object with default attributes.""" 39 self.status = TestStatus.TEST_CRASHED 40 self.name = '' 41 self.expected_count = 0 # type: Optional[int] 42 self.subtests = [] # type: List[Test] 43 self.log = [] # type: List[str] 44 self.counts = TestCounts() 45 46 def __str__(self) -> str: 47 """Returns string representation of a Test class object.""" 48 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 49 f'{self.subtests}, {self.log}, {self.counts})') 50 51 def __repr__(self) -> str: 52 """Returns string representation of a Test class object.""" 53 return str(self) 54 55 def add_error(self, error_message: str) -> None: 56 """Records an error that occurred while parsing this test.""" 57 self.counts.errors += 1 58 print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}') 59 60class TestStatus(Enum): 61 """An enumeration class to represent the status of a test.""" 62 SUCCESS = auto() 63 FAILURE = auto() 64 SKIPPED = auto() 65 TEST_CRASHED = auto() 66 NO_TESTS = auto() 67 FAILURE_TO_PARSE_TESTS = auto() 68 69class TestCounts: 70 """ 71 Tracks the counts of statuses of all test cases and any errors within 72 a Test. 73 74 Attributes: 75 passed : int - the number of tests that have passed 76 failed : int - the number of tests that have failed 77 crashed : int - the number of tests that have crashed 78 skipped : int - the number of tests that have skipped 79 errors : int - the number of errors in the test and subtests 80 """ 81 def __init__(self): 82 """Creates TestCounts object with counts of all test 83 statuses and test errors set to 0. 84 """ 85 self.passed = 0 86 self.failed = 0 87 self.crashed = 0 88 self.skipped = 0 89 self.errors = 0 90 91 def __str__(self) -> str: 92 """Returns the string representation of a TestCounts object.""" 93 statuses = [('passed', self.passed), ('failed', self.failed), 94 ('crashed', self.crashed), ('skipped', self.skipped), 95 ('errors', self.errors)] 96 return f'Ran {self.total()} tests: ' + \ 97 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 98 99 def total(self) -> int: 100 """Returns the total number of test cases within a test 101 object, where a test case is a test with no subtests. 102 """ 103 return (self.passed + self.failed + self.crashed + 104 self.skipped) 105 106 def add_subtest_counts(self, counts: TestCounts) -> None: 107 """ 108 Adds the counts of another TestCounts object to the current 109 TestCounts object. Used to add the counts of a subtest to the 110 parent test. 111 112 Parameters: 113 counts - a different TestCounts object whose counts 114 will be added to the counts of the TestCounts object 115 """ 116 self.passed += counts.passed 117 self.failed += counts.failed 118 self.crashed += counts.crashed 119 self.skipped += counts.skipped 120 self.errors += counts.errors 121 122 def get_status(self) -> TestStatus: 123 """Returns the aggregated status of a Test using test 124 counts. 125 """ 126 if self.total() == 0: 127 return TestStatus.NO_TESTS 128 if self.crashed: 129 # Crashes should take priority. 130 return TestStatus.TEST_CRASHED 131 if self.failed: 132 return TestStatus.FAILURE 133 if self.passed: 134 # No failures or crashes, looks good! 135 return TestStatus.SUCCESS 136 # We have only skipped tests. 137 return TestStatus.SKIPPED 138 139 def add_status(self, status: TestStatus) -> None: 140 """Increments the count for `status`.""" 141 if status == TestStatus.SUCCESS: 142 self.passed += 1 143 elif status == TestStatus.FAILURE: 144 self.failed += 1 145 elif status == TestStatus.SKIPPED: 146 self.skipped += 1 147 elif status != TestStatus.NO_TESTS: 148 self.crashed += 1 149 150class LineStream: 151 """ 152 A class to represent the lines of kernel output. 153 Provides a lazy peek()/pop() interface over an iterator of 154 (line#, text). 155 """ 156 _lines: Iterator[Tuple[int, str]] 157 _next: Tuple[int, str] 158 _need_next: bool 159 _done: bool 160 161 def __init__(self, lines: Iterator[Tuple[int, str]]): 162 """Creates a new LineStream that wraps the given iterator.""" 163 self._lines = lines 164 self._done = False 165 self._need_next = True 166 self._next = (0, '') 167 168 def _get_next(self) -> None: 169 """Advances the LineSteam to the next line, if necessary.""" 170 if not self._need_next: 171 return 172 try: 173 self._next = next(self._lines) 174 except StopIteration: 175 self._done = True 176 finally: 177 self._need_next = False 178 179 def peek(self) -> str: 180 """Returns the current line, without advancing the LineStream. 181 """ 182 self._get_next() 183 return self._next[1] 184 185 def pop(self) -> str: 186 """Returns the current line and advances the LineStream to 187 the next line. 188 """ 189 s = self.peek() 190 if self._done: 191 raise ValueError(f'LineStream: going past EOF, last line was {s}') 192 self._need_next = True 193 return s 194 195 def __bool__(self) -> bool: 196 """Returns True if stream has more lines.""" 197 self._get_next() 198 return not self._done 199 200 # Only used by kunit_tool_test.py. 201 def __iter__(self) -> Iterator[str]: 202 """Empties all lines stored in LineStream object into 203 Iterator object and returns the Iterator object. 204 """ 205 while bool(self): 206 yield self.pop() 207 208 def line_number(self) -> int: 209 """Returns the line number of the current line.""" 210 self._get_next() 211 return self._next[0] 212 213# Parsing helper methods: 214 215KTAP_START = re.compile(r'KTAP version ([0-9]+)$') 216TAP_START = re.compile(r'TAP version ([0-9]+)$') 217KTAP_END = re.compile('(List of all partitions:|' 218 'Kernel panic - not syncing: VFS:|reboot: System halted)') 219 220def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 221 """Extracts KTAP lines from the kernel output.""" 222 def isolate_ktap_output(kernel_output: Iterable[str]) \ 223 -> Iterator[Tuple[int, str]]: 224 line_num = 0 225 started = False 226 for line in kernel_output: 227 line_num += 1 228 line = line.rstrip() # remove trailing \n 229 if not started and KTAP_START.search(line): 230 # start extracting KTAP lines and set prefix 231 # to number of characters before version line 232 prefix_len = len( 233 line.split('KTAP version')[0]) 234 started = True 235 yield line_num, line[prefix_len:] 236 elif not started and TAP_START.search(line): 237 # start extracting KTAP lines and set prefix 238 # to number of characters before version line 239 prefix_len = len(line.split('TAP version')[0]) 240 started = True 241 yield line_num, line[prefix_len:] 242 elif started and KTAP_END.search(line): 243 # stop extracting KTAP lines 244 break 245 elif started: 246 # remove prefix and any indention and yield 247 # line with line number 248 line = line[prefix_len:].lstrip() 249 yield line_num, line 250 return LineStream(lines=isolate_ktap_output(kernel_output)) 251 252KTAP_VERSIONS = [1] 253TAP_VERSIONS = [13, 14] 254 255def check_version(version_num: int, accepted_versions: List[int], 256 version_type: str, test: Test) -> None: 257 """ 258 Adds error to test object if version number is too high or too 259 low. 260 261 Parameters: 262 version_num - The inputted version number from the parsed KTAP or TAP 263 header line 264 accepted_version - List of accepted KTAP or TAP versions 265 version_type - 'KTAP' or 'TAP' depending on the type of 266 version line. 267 test - Test object for current test being parsed 268 """ 269 if version_num < min(accepted_versions): 270 test.add_error(f'{version_type} version lower than expected!') 271 elif version_num > max(accepted_versions): 272 test.add_error(f'{version_type} version higer than expected!') 273 274def parse_ktap_header(lines: LineStream, test: Test) -> bool: 275 """ 276 Parses KTAP/TAP header line and checks version number. 277 Returns False if fails to parse KTAP/TAP header line. 278 279 Accepted formats: 280 - 'KTAP version [version number]' 281 - 'TAP version [version number]' 282 283 Parameters: 284 lines - LineStream of KTAP output to parse 285 test - Test object for current test being parsed 286 287 Return: 288 True if successfully parsed KTAP/TAP header line 289 """ 290 ktap_match = KTAP_START.match(lines.peek()) 291 tap_match = TAP_START.match(lines.peek()) 292 if ktap_match: 293 version_num = int(ktap_match.group(1)) 294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test) 295 elif tap_match: 296 version_num = int(tap_match.group(1)) 297 check_version(version_num, TAP_VERSIONS, 'TAP', test) 298 else: 299 return False 300 test.log.append(lines.pop()) 301 return True 302 303TEST_HEADER = re.compile(r'^# Subtest: (.*)$') 304 305def parse_test_header(lines: LineStream, test: Test) -> bool: 306 """ 307 Parses test header and stores test name in test object. 308 Returns False if fails to parse test header line. 309 310 Accepted format: 311 - '# Subtest: [test name]' 312 313 Parameters: 314 lines - LineStream of KTAP output to parse 315 test - Test object for current test being parsed 316 317 Return: 318 True if successfully parsed test header line 319 """ 320 match = TEST_HEADER.match(lines.peek()) 321 if not match: 322 return False 323 test.log.append(lines.pop()) 324 test.name = match.group(1) 325 return True 326 327TEST_PLAN = re.compile(r'1\.\.([0-9]+)') 328 329def parse_test_plan(lines: LineStream, test: Test) -> bool: 330 """ 331 Parses test plan line and stores the expected number of subtests in 332 test object. Reports an error if expected count is 0. 333 Returns False and sets expected_count to None if there is no valid test 334 plan. 335 336 Accepted format: 337 - '1..[number of subtests]' 338 339 Parameters: 340 lines - LineStream of KTAP output to parse 341 test - Test object for current test being parsed 342 343 Return: 344 True if successfully parsed test plan line 345 """ 346 match = TEST_PLAN.match(lines.peek()) 347 if not match: 348 test.expected_count = None 349 return False 350 test.log.append(lines.pop()) 351 expected_count = int(match.group(1)) 352 test.expected_count = expected_count 353 return True 354 355TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') 356 357TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') 358 359def peek_test_name_match(lines: LineStream, test: Test) -> bool: 360 """ 361 Matches current line with the format of a test result line and checks 362 if the name matches the name of the current test. 363 Returns False if fails to match format or name. 364 365 Accepted format: 366 - '[ok|not ok] [test number] [-] [test name] [optional skip 367 directive]' 368 369 Parameters: 370 lines - LineStream of KTAP output to parse 371 test - Test object for current test being parsed 372 373 Return: 374 True if matched a test result line and the name matching the 375 expected test name 376 """ 377 line = lines.peek() 378 match = TEST_RESULT.match(line) 379 if not match: 380 return False 381 name = match.group(4) 382 return name == test.name 383 384def parse_test_result(lines: LineStream, test: Test, 385 expected_num: int) -> bool: 386 """ 387 Parses test result line and stores the status and name in the test 388 object. Reports an error if the test number does not match expected 389 test number. 390 Returns False if fails to parse test result line. 391 392 Note that the SKIP directive is the only direction that causes a 393 change in status. 394 395 Accepted format: 396 - '[ok|not ok] [test number] [-] [test name] [optional skip 397 directive]' 398 399 Parameters: 400 lines - LineStream of KTAP output to parse 401 test - Test object for current test being parsed 402 expected_num - expected test number for current test 403 404 Return: 405 True if successfully parsed a test result line. 406 """ 407 line = lines.peek() 408 match = TEST_RESULT.match(line) 409 skip_match = TEST_RESULT_SKIP.match(line) 410 411 # Check if line matches test result line format 412 if not match: 413 return False 414 test.log.append(lines.pop()) 415 416 # Set name of test object 417 if skip_match: 418 test.name = skip_match.group(4) 419 else: 420 test.name = match.group(4) 421 422 # Check test num 423 num = int(match.group(2)) 424 if num != expected_num: 425 test.add_error(f'Expected test number {expected_num} but found {num}') 426 427 # Set status of test object 428 status = match.group(1) 429 if skip_match: 430 test.status = TestStatus.SKIPPED 431 elif status == 'ok': 432 test.status = TestStatus.SUCCESS 433 else: 434 test.status = TestStatus.FAILURE 435 return True 436 437def parse_diagnostic(lines: LineStream) -> List[str]: 438 """ 439 Parse lines that do not match the format of a test result line or 440 test header line and returns them in list. 441 442 Line formats that are not parsed: 443 - '# Subtest: [test name]' 444 - '[ok|not ok] [test number] [-] [test name] [optional skip 445 directive]' 446 447 Parameters: 448 lines - LineStream of KTAP output to parse 449 450 Return: 451 Log of diagnostic lines 452 """ 453 log = [] # type: List[str] 454 while lines and not TEST_RESULT.match(lines.peek()) and not \ 455 TEST_HEADER.match(lines.peek()): 456 log.append(lines.pop()) 457 return log 458 459 460# Printing helper methods: 461 462DIVIDER = '=' * 60 463 464RESET = '\033[0;0m' 465 466def red(text: str) -> str: 467 """Returns inputted string with red color code.""" 468 if not sys.stdout.isatty(): 469 return text 470 return '\033[1;31m' + text + RESET 471 472def yellow(text: str) -> str: 473 """Returns inputted string with yellow color code.""" 474 if not sys.stdout.isatty(): 475 return text 476 return '\033[1;33m' + text + RESET 477 478def green(text: str) -> str: 479 """Returns inputted string with green color code.""" 480 if not sys.stdout.isatty(): 481 return text 482 return '\033[1;32m' + text + RESET 483 484ANSI_LEN = len(red('')) 485 486def print_with_timestamp(message: str) -> None: 487 """Prints message with timestamp at beginning.""" 488 print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message)) 489 490def format_test_divider(message: str, len_message: int) -> str: 491 """ 492 Returns string with message centered in fixed width divider. 493 494 Example: 495 '===================== message example =====================' 496 497 Parameters: 498 message - message to be centered in divider line 499 len_message - length of the message to be printed such that 500 any characters of the color codes are not counted 501 502 Return: 503 String containing message centered in fixed width divider 504 """ 505 default_count = 3 # default number of dashes 506 len_1 = default_count 507 len_2 = default_count 508 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 509 if difference > 0: 510 # calculate number of dashes for each side of the divider 511 len_1 = int(difference / 2) 512 len_2 = difference - len_1 513 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 514 515def print_test_header(test: Test) -> None: 516 """ 517 Prints test header with test name and optionally the expected number 518 of subtests. 519 520 Example: 521 '=================== example (2 subtests) ===================' 522 523 Parameters: 524 test - Test object representing current test being printed 525 """ 526 message = test.name 527 if test.expected_count: 528 if test.expected_count == 1: 529 message += ' (1 subtest)' 530 else: 531 message += f' ({test.expected_count} subtests)' 532 print_with_timestamp(format_test_divider(message, len(message))) 533 534def print_log(log: Iterable[str]) -> None: 535 """Prints all strings in saved log for test in yellow.""" 536 for m in log: 537 print_with_timestamp(yellow(m)) 538 539def format_test_result(test: Test) -> str: 540 """ 541 Returns string with formatted test result with colored status and test 542 name. 543 544 Example: 545 '[PASSED] example' 546 547 Parameters: 548 test - Test object representing current test being printed 549 550 Return: 551 String containing formatted test result 552 """ 553 if test.status == TestStatus.SUCCESS: 554 return green('[PASSED] ') + test.name 555 if test.status == TestStatus.SKIPPED: 556 return yellow('[SKIPPED] ') + test.name 557 if test.status == TestStatus.NO_TESTS: 558 return yellow('[NO TESTS RUN] ') + test.name 559 if test.status == TestStatus.TEST_CRASHED: 560 print_log(test.log) 561 return red('[CRASHED] ') + test.name 562 print_log(test.log) 563 return red('[FAILED] ') + test.name 564 565def print_test_result(test: Test) -> None: 566 """ 567 Prints result line with status of test. 568 569 Example: 570 '[PASSED] example' 571 572 Parameters: 573 test - Test object representing current test being printed 574 """ 575 print_with_timestamp(format_test_result(test)) 576 577def print_test_footer(test: Test) -> None: 578 """ 579 Prints test footer with status of test. 580 581 Example: 582 '===================== [PASSED] example =====================' 583 584 Parameters: 585 test - Test object representing current test being printed 586 """ 587 message = format_test_result(test) 588 print_with_timestamp(format_test_divider(message, 589 len(message) - ANSI_LEN)) 590 591def print_summary_line(test: Test) -> None: 592 """ 593 Prints summary line of test object. Color of line is dependent on 594 status of test. Color is green if test passes, yellow if test is 595 skipped, and red if the test fails or crashes. Summary line contains 596 counts of the statuses of the tests subtests or the test itself if it 597 has no subtests. 598 599 Example: 600 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 601 Errors: 0" 602 603 test - Test object representing current test being printed 604 """ 605 if test.status == TestStatus.SUCCESS: 606 color = green 607 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 608 color = yellow 609 else: 610 color = red 611 print_with_timestamp(color(f'Testing complete. {test.counts}')) 612 613# Other methods: 614 615def bubble_up_test_results(test: Test) -> None: 616 """ 617 If the test has subtests, add the test counts of the subtests to the 618 test and check if any of the tests crashed and if so set the test 619 status to crashed. Otherwise if the test has no subtests add the 620 status of the test to the test counts. 621 622 Parameters: 623 test - Test object for current test being parsed 624 """ 625 subtests = test.subtests 626 counts = test.counts 627 status = test.status 628 for t in subtests: 629 counts.add_subtest_counts(t.counts) 630 if counts.total() == 0: 631 counts.add_status(status) 632 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 633 test.status = TestStatus.TEST_CRASHED 634 635def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: 636 """ 637 Finds next test to parse in LineStream, creates new Test object, 638 parses any subtests of the test, populates Test object with all 639 information (status, name) about the test and the Test objects for 640 any subtests, and then returns the Test object. The method accepts 641 three formats of tests: 642 643 Accepted test formats: 644 645 - Main KTAP/TAP header 646 647 Example: 648 649 KTAP version 1 650 1..4 651 [subtests] 652 653 - Subtest header line 654 655 Example: 656 657 # Subtest: name 658 1..3 659 [subtests] 660 ok 1 name 661 662 - Test result line 663 664 Example: 665 666 ok 1 - test 667 668 Parameters: 669 lines - LineStream of KTAP output to parse 670 expected_num - expected test number for test to be parsed 671 log - list of strings containing any preceding diagnostic lines 672 corresponding to the current test 673 674 Return: 675 Test object populated with characteristics and any subtests 676 """ 677 test = Test() 678 test.log.extend(log) 679 parent_test = False 680 main = parse_ktap_header(lines, test) 681 if main: 682 # If KTAP/TAP header is found, attempt to parse 683 # test plan 684 test.name = "main" 685 parse_test_plan(lines, test) 686 parent_test = True 687 else: 688 # If KTAP/TAP header is not found, test must be subtest 689 # header or test result line so parse attempt to parser 690 # subtest header 691 parent_test = parse_test_header(lines, test) 692 if parent_test: 693 # If subtest header is found, attempt to parse 694 # test plan and print header 695 parse_test_plan(lines, test) 696 print_test_header(test) 697 expected_count = test.expected_count 698 subtests = [] 699 test_num = 1 700 while parent_test and (expected_count is None or test_num <= expected_count): 701 # Loop to parse any subtests. 702 # Break after parsing expected number of tests or 703 # if expected number of tests is unknown break when test 704 # result line with matching name to subtest header is found 705 # or no more lines in stream. 706 sub_log = parse_diagnostic(lines) 707 sub_test = Test() 708 if not lines or (peek_test_name_match(lines, test) and 709 not main): 710 if expected_count and test_num <= expected_count: 711 # If parser reaches end of test before 712 # parsing expected number of subtests, print 713 # crashed subtest and record error 714 test.add_error('missing expected subtest!') 715 sub_test.log.extend(sub_log) 716 test.counts.add_status( 717 TestStatus.TEST_CRASHED) 718 print_test_result(sub_test) 719 else: 720 test.log.extend(sub_log) 721 break 722 else: 723 sub_test = parse_test(lines, test_num, sub_log) 724 subtests.append(sub_test) 725 test_num += 1 726 test.subtests = subtests 727 if not main: 728 # If not main test, look for test result line 729 test.log.extend(parse_diagnostic(lines)) 730 if (parent_test and peek_test_name_match(lines, test)) or \ 731 not parent_test: 732 parse_test_result(lines, test, expected_num) 733 else: 734 test.add_error('missing subtest result line!') 735 736 # Check for there being no tests 737 if parent_test and len(subtests) == 0: 738 # Don't override a bad status if this test had one reported. 739 # Assumption: no subtests means CRASHED is from Test.__init__() 740 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 741 test.status = TestStatus.NO_TESTS 742 test.add_error('0 tests run!') 743 744 # Add statuses to TestCounts attribute in Test object 745 bubble_up_test_results(test) 746 if parent_test and not main: 747 # If test has subtests and is not the main test object, print 748 # footer. 749 print_test_footer(test) 750 elif not main: 751 print_test_result(test) 752 return test 753 754def parse_run_tests(kernel_output: Iterable[str]) -> Test: 755 """ 756 Using kernel output, extract KTAP lines, parse the lines for test 757 results and print condensed test results and summary line. 758 759 Parameters: 760 kernel_output - Iterable object contains lines of kernel output 761 762 Return: 763 Test - the main test object with all subtests. 764 """ 765 print_with_timestamp(DIVIDER) 766 lines = extract_tap_lines(kernel_output) 767 test = Test() 768 if not lines: 769 test.name = '<missing>' 770 test.add_error('could not find any KTAP output!') 771 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 772 else: 773 test = parse_test(lines, 0, []) 774 if test.status != TestStatus.NO_TESTS: 775 test.status = test.counts.get_status() 776 print_with_timestamp(DIVIDER) 777 print_summary_line(test) 778 return test 779