1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses test results from a kernel dmesg log. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import re 10 11from collections import namedtuple 12from datetime import datetime 13from enum import Enum, auto 14from functools import reduce 15from typing import Iterable, Iterator, List, Optional, Tuple 16 17TestResult = namedtuple('TestResult', ['status','suites','log']) 18 19class TestSuite(object): 20 def __init__(self) -> None: 21 self.status = TestStatus.SUCCESS 22 self.name = '' 23 self.cases = [] # type: List[TestCase] 24 25 def __str__(self) -> str: 26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')' 27 28 def __repr__(self) -> str: 29 return str(self) 30 31class TestCase(object): 32 def __init__(self) -> None: 33 self.status = TestStatus.SUCCESS 34 self.name = '' 35 self.log = [] # type: List[str] 36 37 def __str__(self) -> str: 38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')' 39 40 def __repr__(self) -> str: 41 return str(self) 42 43class TestStatus(Enum): 44 SUCCESS = auto() 45 FAILURE = auto() 46 SKIPPED = auto() 47 TEST_CRASHED = auto() 48 NO_TESTS = auto() 49 FAILURE_TO_PARSE_TESTS = auto() 50 51class LineStream: 52 """Provides a peek()/pop() interface over an iterator of (line#, text).""" 53 _lines: Iterator[Tuple[int, str]] 54 _next: Tuple[int, str] 55 _done: bool 56 57 def __init__(self, lines: Iterator[Tuple[int, str]]): 58 self._lines = lines 59 self._done = False 60 self._next = (0, '') 61 self._get_next() 62 63 def _get_next(self) -> None: 64 try: 65 self._next = next(self._lines) 66 except StopIteration: 67 self._done = True 68 69 def peek(self) -> str: 70 return self._next[1] 71 72 def pop(self) -> str: 73 n = self._next 74 self._get_next() 75 return n[1] 76 77 def __bool__(self) -> bool: 78 return not self._done 79 80 # Only used by kunit_tool_test.py. 81 def __iter__(self) -> Iterator[str]: 82 while bool(self): 83 yield self.pop() 84 85 def line_number(self) -> int: 86 return self._next[0] 87 88kunit_start_re = re.compile(r'TAP version [0-9]+$') 89kunit_end_re = re.compile('(List of all partitions:|' 90 'Kernel panic - not syncing: VFS:|reboot: System halted)') 91 92def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 93 def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]: 94 line_num = 0 95 started = False 96 for line in kernel_output: 97 line_num += 1 98 line = line.rstrip() # line always has a trailing \n 99 if kunit_start_re.search(line): 100 prefix_len = len(line.split('TAP version')[0]) 101 started = True 102 yield line_num, line[prefix_len:] 103 elif kunit_end_re.search(line): 104 break 105 elif started: 106 yield line_num, line[prefix_len:] 107 return LineStream(lines=isolate_kunit_output(kernel_output)) 108 109DIVIDER = '=' * 60 110 111RESET = '\033[0;0m' 112 113def red(text) -> str: 114 return '\033[1;31m' + text + RESET 115 116def yellow(text) -> str: 117 return '\033[1;33m' + text + RESET 118 119def green(text) -> str: 120 return '\033[1;32m' + text + RESET 121 122def print_with_timestamp(message) -> None: 123 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) 124 125def format_suite_divider(message) -> str: 126 return '======== ' + message + ' ========' 127 128def print_suite_divider(message) -> None: 129 print_with_timestamp(DIVIDER) 130 print_with_timestamp(format_suite_divider(message)) 131 132def print_log(log) -> None: 133 for m in log: 134 print_with_timestamp(m) 135 136TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*# (Subtest:|.*: kunit test case crashed!)).*$') 137 138def consume_non_diagnostic(lines: LineStream) -> None: 139 while lines and not TAP_ENTRIES.match(lines.peek()): 140 lines.pop() 141 142def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None: 143 while lines and not TAP_ENTRIES.match(lines.peek()): 144 test_case.log.append(lines.peek()) 145 lines.pop() 146 147OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) 148 149OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$') 150 151OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$') 152 153OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$') 154 155def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool: 156 save_non_diagnostic(lines, test_case) 157 if not lines: 158 test_case.status = TestStatus.TEST_CRASHED 159 return True 160 line = lines.peek() 161 match = OK_NOT_OK_SUBTEST.match(line) 162 while not match and lines: 163 line = lines.pop() 164 match = OK_NOT_OK_SUBTEST.match(line) 165 if match: 166 test_case.log.append(lines.pop()) 167 test_case.name = match.group(2) 168 skip_match = OK_NOT_OK_SKIP.match(line) 169 if skip_match: 170 test_case.status = TestStatus.SKIPPED 171 return True 172 if test_case.status == TestStatus.TEST_CRASHED: 173 return True 174 if match.group(1) == 'ok': 175 test_case.status = TestStatus.SUCCESS 176 else: 177 test_case.status = TestStatus.FAILURE 178 return True 179 else: 180 return False 181 182SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$') 183DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$') 184 185def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool: 186 save_non_diagnostic(lines, test_case) 187 if not lines: 188 return False 189 line = lines.peek() 190 match = SUBTEST_DIAGNOSTIC.match(line) 191 if match: 192 test_case.log.append(lines.pop()) 193 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line) 194 if crash_match: 195 test_case.status = TestStatus.TEST_CRASHED 196 return True 197 else: 198 return False 199 200def parse_test_case(lines: LineStream) -> Optional[TestCase]: 201 test_case = TestCase() 202 save_non_diagnostic(lines, test_case) 203 while parse_diagnostic(lines, test_case): 204 pass 205 if parse_ok_not_ok_test_case(lines, test_case): 206 return test_case 207 else: 208 return None 209 210SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$') 211 212def parse_subtest_header(lines: LineStream) -> Optional[str]: 213 consume_non_diagnostic(lines) 214 if not lines: 215 return None 216 match = SUBTEST_HEADER.match(lines.peek()) 217 if match: 218 lines.pop() 219 return match.group(1) 220 else: 221 return None 222 223SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)') 224 225def parse_subtest_plan(lines: LineStream) -> Optional[int]: 226 consume_non_diagnostic(lines) 227 match = SUBTEST_PLAN.match(lines.peek()) 228 if match: 229 lines.pop() 230 return int(match.group(1)) 231 else: 232 return None 233 234def max_status(left: TestStatus, right: TestStatus) -> TestStatus: 235 if left == right: 236 return left 237 elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: 238 return TestStatus.TEST_CRASHED 239 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: 240 return TestStatus.FAILURE 241 elif left == TestStatus.SKIPPED: 242 return right 243 else: 244 return left 245 246def parse_ok_not_ok_test_suite(lines: LineStream, 247 test_suite: TestSuite, 248 expected_suite_index: int) -> bool: 249 consume_non_diagnostic(lines) 250 if not lines: 251 test_suite.status = TestStatus.TEST_CRASHED 252 return False 253 line = lines.peek() 254 match = OK_NOT_OK_MODULE.match(line) 255 if match: 256 lines.pop() 257 if match.group(1) == 'ok': 258 test_suite.status = TestStatus.SUCCESS 259 else: 260 test_suite.status = TestStatus.FAILURE 261 skip_match = OK_NOT_OK_SKIP.match(line) 262 if skip_match: 263 test_suite.status = TestStatus.SKIPPED 264 suite_index = int(match.group(2)) 265 if suite_index != expected_suite_index: 266 print_with_timestamp( 267 red('[ERROR] ') + 'expected_suite_index ' + 268 str(expected_suite_index) + ', but got ' + 269 str(suite_index)) 270 return True 271 else: 272 return False 273 274def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus: 275 return reduce(max_status, status_list, TestStatus.SKIPPED) 276 277def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: 278 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases) 279 return max_status(max_test_case_status, test_suite.status) 280 281def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]: 282 if not lines: 283 return None 284 consume_non_diagnostic(lines) 285 test_suite = TestSuite() 286 test_suite.status = TestStatus.SUCCESS 287 name = parse_subtest_header(lines) 288 if not name: 289 return None 290 test_suite.name = name 291 expected_test_case_num = parse_subtest_plan(lines) 292 if expected_test_case_num is None: 293 return None 294 while expected_test_case_num > 0: 295 test_case = parse_test_case(lines) 296 if not test_case: 297 break 298 test_suite.cases.append(test_case) 299 expected_test_case_num -= 1 300 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index): 301 test_suite.status = bubble_up_test_case_errors(test_suite) 302 return test_suite 303 elif not lines: 304 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') 305 return test_suite 306 else: 307 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}') 308 return None 309 310TAP_HEADER = re.compile(r'^TAP version 14$') 311 312def parse_tap_header(lines: LineStream) -> bool: 313 consume_non_diagnostic(lines) 314 if TAP_HEADER.match(lines.peek()): 315 lines.pop() 316 return True 317 else: 318 return False 319 320TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)') 321 322def parse_test_plan(lines: LineStream) -> Optional[int]: 323 consume_non_diagnostic(lines) 324 match = TEST_PLAN.match(lines.peek()) 325 if match: 326 lines.pop() 327 return int(match.group(1)) 328 else: 329 return None 330 331def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus: 332 return bubble_up_errors(x.status for x in test_suites) 333 334def parse_test_result(lines: LineStream) -> TestResult: 335 consume_non_diagnostic(lines) 336 if not lines or not parse_tap_header(lines): 337 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) 338 expected_test_suite_num = parse_test_plan(lines) 339 if expected_test_suite_num == 0: 340 return TestResult(TestStatus.NO_TESTS, [], lines) 341 elif expected_test_suite_num is None: 342 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) 343 test_suites = [] 344 for i in range(1, expected_test_suite_num + 1): 345 test_suite = parse_test_suite(lines, i) 346 if test_suite: 347 test_suites.append(test_suite) 348 else: 349 print_with_timestamp( 350 red('[ERROR] ') + ' expected ' + 351 str(expected_test_suite_num) + 352 ' test suites, but got ' + str(i - 2)) 353 break 354 test_suite = parse_test_suite(lines, -1) 355 if test_suite: 356 print_with_timestamp(red('[ERROR] ') + 357 'got unexpected test suite: ' + test_suite.name) 358 if test_suites: 359 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines) 360 else: 361 return TestResult(TestStatus.NO_TESTS, [], lines) 362 363class TestCounts: 364 passed: int 365 failed: int 366 crashed: int 367 skipped: int 368 369 def __init__(self): 370 self.passed = 0 371 self.failed = 0 372 self.crashed = 0 373 self.skipped = 0 374 375 def total(self) -> int: 376 return self.passed + self.failed + self.crashed + self.skipped 377 378def print_and_count_results(test_result: TestResult) -> TestCounts: 379 counts = TestCounts() 380 for test_suite in test_result.suites: 381 if test_suite.status == TestStatus.SUCCESS: 382 print_suite_divider(green('[PASSED] ') + test_suite.name) 383 elif test_suite.status == TestStatus.SKIPPED: 384 print_suite_divider(yellow('[SKIPPED] ') + test_suite.name) 385 elif test_suite.status == TestStatus.TEST_CRASHED: 386 print_suite_divider(red('[CRASHED] ' + test_suite.name)) 387 else: 388 print_suite_divider(red('[FAILED] ') + test_suite.name) 389 for test_case in test_suite.cases: 390 if test_case.status == TestStatus.SUCCESS: 391 counts.passed += 1 392 print_with_timestamp(green('[PASSED] ') + test_case.name) 393 elif test_case.status == TestStatus.SKIPPED: 394 counts.skipped += 1 395 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name) 396 elif test_case.status == TestStatus.TEST_CRASHED: 397 counts.crashed += 1 398 print_with_timestamp(red('[CRASHED] ' + test_case.name)) 399 print_log(map(yellow, test_case.log)) 400 print_with_timestamp('') 401 else: 402 counts.failed += 1 403 print_with_timestamp(red('[FAILED] ') + test_case.name) 404 print_log(map(yellow, test_case.log)) 405 print_with_timestamp('') 406 return counts 407 408def parse_run_tests(kernel_output: Iterable[str]) -> TestResult: 409 counts = TestCounts() 410 lines = extract_tap_lines(kernel_output) 411 test_result = parse_test_result(lines) 412 if test_result.status == TestStatus.NO_TESTS: 413 print(red('[ERROR] ') + yellow('no tests run!')) 414 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS: 415 print(red('[ERROR] ') + yellow('could not parse test results!')) 416 else: 417 counts = print_and_count_results(test_result) 418 print_with_timestamp(DIVIDER) 419 if test_result.status == TestStatus.SUCCESS: 420 fmt = green 421 elif test_result.status == TestStatus.SKIPPED: 422 fmt = yellow 423 else: 424 fmt =red 425 print_with_timestamp( 426 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' % 427 (counts.total(), counts.failed, counts.crashed, counts.skipped))) 428 return test_result 429