1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses test results from a kernel dmesg log. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import re 10 11from collections import namedtuple 12from datetime import datetime 13from enum import Enum, auto 14from functools import reduce 15from typing import Iterable, Iterator, List, Optional, Tuple 16 17TestResult = namedtuple('TestResult', ['status','suites','log']) 18 19class TestSuite(object): 20 def __init__(self) -> None: 21 self.status = TestStatus.SUCCESS 22 self.name = '' 23 self.cases = [] # type: List[TestCase] 24 25 def __str__(self) -> str: 26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')' 27 28 def __repr__(self) -> str: 29 return str(self) 30 31class TestCase(object): 32 def __init__(self) -> None: 33 self.status = TestStatus.SUCCESS 34 self.name = '' 35 self.log = [] # type: List[str] 36 37 def __str__(self) -> str: 38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')' 39 40 def __repr__(self) -> str: 41 return str(self) 42 43class TestStatus(Enum): 44 SUCCESS = auto() 45 FAILURE = auto() 46 SKIPPED = auto() 47 TEST_CRASHED = auto() 48 NO_TESTS = auto() 49 FAILURE_TO_PARSE_TESTS = auto() 50 51class LineStream: 52 """Provides a peek()/pop() interface over an iterator of (line#, text).""" 53 _lines: Iterator[Tuple[int, str]] 54 _next: Tuple[int, str] 55 _done: bool 56 57 def __init__(self, lines: Iterator[Tuple[int, str]]): 58 self._lines = lines 59 self._done = False 60 self._next = (0, '') 61 self._get_next() 62 63 def _get_next(self) -> None: 64 try: 65 self._next = next(self._lines) 66 except StopIteration: 67 self._done = True 68 69 def peek(self) -> str: 70 return self._next[1] 71 72 def pop(self) -> str: 73 n = self._next 74 self._get_next() 75 return n[1] 76 77 def __bool__(self) -> bool: 78 return not self._done 79 80 # Only used by kunit_tool_test.py. 81 def __iter__(self) -> Iterator[str]: 82 while bool(self): 83 yield self.pop() 84 85 def line_number(self) -> int: 86 return self._next[0] 87 88kunit_start_re = re.compile(r'TAP version [0-9]+$') 89kunit_end_re = re.compile('(List of all partitions:|' 90 'Kernel panic - not syncing: VFS:|reboot: System halted)') 91 92def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: 93 def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]: 94 line_num = 0 95 started = False 96 for line in kernel_output: 97 line_num += 1 98 line = line.rstrip() # line always has a trailing \n 99 if kunit_start_re.search(line): 100 prefix_len = len(line.split('TAP version')[0]) 101 started = True 102 yield line_num, line[prefix_len:] 103 elif kunit_end_re.search(line): 104 break 105 elif started: 106 yield line_num, line[prefix_len:] 107 return LineStream(lines=isolate_kunit_output(kernel_output)) 108 109def raw_output(kernel_output) -> None: 110 for line in kernel_output: 111 print(line.rstrip()) 112 113DIVIDER = '=' * 60 114 115RESET = '\033[0;0m' 116 117def red(text) -> str: 118 return '\033[1;31m' + text + RESET 119 120def yellow(text) -> str: 121 return '\033[1;33m' + text + RESET 122 123def green(text) -> str: 124 return '\033[1;32m' + text + RESET 125 126def print_with_timestamp(message) -> None: 127 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) 128 129def format_suite_divider(message) -> str: 130 return '======== ' + message + ' ========' 131 132def print_suite_divider(message) -> None: 133 print_with_timestamp(DIVIDER) 134 print_with_timestamp(format_suite_divider(message)) 135 136def print_log(log) -> None: 137 for m in log: 138 print_with_timestamp(m) 139 140TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$') 141 142def consume_non_diagnostic(lines: LineStream) -> None: 143 while lines and not TAP_ENTRIES.match(lines.peek()): 144 lines.pop() 145 146def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None: 147 while lines and not TAP_ENTRIES.match(lines.peek()): 148 test_case.log.append(lines.peek()) 149 lines.pop() 150 151OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) 152 153OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$') 154 155OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$') 156 157OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$') 158 159def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool: 160 save_non_diagnostic(lines, test_case) 161 if not lines: 162 test_case.status = TestStatus.TEST_CRASHED 163 return True 164 line = lines.peek() 165 match = OK_NOT_OK_SUBTEST.match(line) 166 while not match and lines: 167 line = lines.pop() 168 match = OK_NOT_OK_SUBTEST.match(line) 169 if match: 170 test_case.log.append(lines.pop()) 171 test_case.name = match.group(2) 172 skip_match = OK_NOT_OK_SKIP.match(line) 173 if skip_match: 174 test_case.status = TestStatus.SKIPPED 175 return True 176 if test_case.status == TestStatus.TEST_CRASHED: 177 return True 178 if match.group(1) == 'ok': 179 test_case.status = TestStatus.SUCCESS 180 else: 181 test_case.status = TestStatus.FAILURE 182 return True 183 else: 184 return False 185 186SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$') 187DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$') 188 189def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool: 190 save_non_diagnostic(lines, test_case) 191 if not lines: 192 return False 193 line = lines.peek() 194 match = SUBTEST_DIAGNOSTIC.match(line) 195 if match: 196 test_case.log.append(lines.pop()) 197 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line) 198 if crash_match: 199 test_case.status = TestStatus.TEST_CRASHED 200 return True 201 else: 202 return False 203 204def parse_test_case(lines: LineStream) -> Optional[TestCase]: 205 test_case = TestCase() 206 save_non_diagnostic(lines, test_case) 207 while parse_diagnostic(lines, test_case): 208 pass 209 if parse_ok_not_ok_test_case(lines, test_case): 210 return test_case 211 else: 212 return None 213 214SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$') 215 216def parse_subtest_header(lines: LineStream) -> Optional[str]: 217 consume_non_diagnostic(lines) 218 if not lines: 219 return None 220 match = SUBTEST_HEADER.match(lines.peek()) 221 if match: 222 lines.pop() 223 return match.group(1) 224 else: 225 return None 226 227SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)') 228 229def parse_subtest_plan(lines: LineStream) -> Optional[int]: 230 consume_non_diagnostic(lines) 231 match = SUBTEST_PLAN.match(lines.peek()) 232 if match: 233 lines.pop() 234 return int(match.group(1)) 235 else: 236 return None 237 238def max_status(left: TestStatus, right: TestStatus) -> TestStatus: 239 if left == right: 240 return left 241 elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: 242 return TestStatus.TEST_CRASHED 243 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: 244 return TestStatus.FAILURE 245 elif left == TestStatus.SKIPPED: 246 return right 247 else: 248 return left 249 250def parse_ok_not_ok_test_suite(lines: LineStream, 251 test_suite: TestSuite, 252 expected_suite_index: int) -> bool: 253 consume_non_diagnostic(lines) 254 if not lines: 255 test_suite.status = TestStatus.TEST_CRASHED 256 return False 257 line = lines.peek() 258 match = OK_NOT_OK_MODULE.match(line) 259 if match: 260 lines.pop() 261 if match.group(1) == 'ok': 262 test_suite.status = TestStatus.SUCCESS 263 else: 264 test_suite.status = TestStatus.FAILURE 265 skip_match = OK_NOT_OK_SKIP.match(line) 266 if skip_match: 267 test_suite.status = TestStatus.SKIPPED 268 suite_index = int(match.group(2)) 269 if suite_index != expected_suite_index: 270 print_with_timestamp( 271 red('[ERROR] ') + 'expected_suite_index ' + 272 str(expected_suite_index) + ', but got ' + 273 str(suite_index)) 274 return True 275 else: 276 return False 277 278def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus: 279 return reduce(max_status, status_list, TestStatus.SKIPPED) 280 281def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: 282 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases) 283 return max_status(max_test_case_status, test_suite.status) 284 285def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]: 286 if not lines: 287 return None 288 consume_non_diagnostic(lines) 289 test_suite = TestSuite() 290 test_suite.status = TestStatus.SUCCESS 291 name = parse_subtest_header(lines) 292 if not name: 293 return None 294 test_suite.name = name 295 expected_test_case_num = parse_subtest_plan(lines) 296 if expected_test_case_num is None: 297 return None 298 while expected_test_case_num > 0: 299 test_case = parse_test_case(lines) 300 if not test_case: 301 break 302 test_suite.cases.append(test_case) 303 expected_test_case_num -= 1 304 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index): 305 test_suite.status = bubble_up_test_case_errors(test_suite) 306 return test_suite 307 elif not lines: 308 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') 309 return test_suite 310 else: 311 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}') 312 return None 313 314TAP_HEADER = re.compile(r'^TAP version 14$') 315 316def parse_tap_header(lines: LineStream) -> bool: 317 consume_non_diagnostic(lines) 318 if TAP_HEADER.match(lines.peek()): 319 lines.pop() 320 return True 321 else: 322 return False 323 324TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)') 325 326def parse_test_plan(lines: LineStream) -> Optional[int]: 327 consume_non_diagnostic(lines) 328 match = TEST_PLAN.match(lines.peek()) 329 if match: 330 lines.pop() 331 return int(match.group(1)) 332 else: 333 return None 334 335def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus: 336 return bubble_up_errors(x.status for x in test_suites) 337 338def parse_test_result(lines: LineStream) -> TestResult: 339 consume_non_diagnostic(lines) 340 if not lines or not parse_tap_header(lines): 341 return TestResult(TestStatus.NO_TESTS, [], lines) 342 expected_test_suite_num = parse_test_plan(lines) 343 if not expected_test_suite_num: 344 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) 345 test_suites = [] 346 for i in range(1, expected_test_suite_num + 1): 347 test_suite = parse_test_suite(lines, i) 348 if test_suite: 349 test_suites.append(test_suite) 350 else: 351 print_with_timestamp( 352 red('[ERROR] ') + ' expected ' + 353 str(expected_test_suite_num) + 354 ' test suites, but got ' + str(i - 2)) 355 break 356 test_suite = parse_test_suite(lines, -1) 357 if test_suite: 358 print_with_timestamp(red('[ERROR] ') + 359 'got unexpected test suite: ' + test_suite.name) 360 if test_suites: 361 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines) 362 else: 363 return TestResult(TestStatus.NO_TESTS, [], lines) 364 365class TestCounts: 366 passed: int 367 failed: int 368 crashed: int 369 skipped: int 370 371 def __init__(self): 372 self.passed = 0 373 self.failed = 0 374 self.crashed = 0 375 self.skipped = 0 376 377 def total(self) -> int: 378 return self.passed + self.failed + self.crashed + self.skipped 379 380def print_and_count_results(test_result: TestResult) -> TestCounts: 381 counts = TestCounts() 382 for test_suite in test_result.suites: 383 if test_suite.status == TestStatus.SUCCESS: 384 print_suite_divider(green('[PASSED] ') + test_suite.name) 385 elif test_suite.status == TestStatus.SKIPPED: 386 print_suite_divider(yellow('[SKIPPED] ') + test_suite.name) 387 elif test_suite.status == TestStatus.TEST_CRASHED: 388 print_suite_divider(red('[CRASHED] ' + test_suite.name)) 389 else: 390 print_suite_divider(red('[FAILED] ') + test_suite.name) 391 for test_case in test_suite.cases: 392 if test_case.status == TestStatus.SUCCESS: 393 counts.passed += 1 394 print_with_timestamp(green('[PASSED] ') + test_case.name) 395 elif test_case.status == TestStatus.SKIPPED: 396 counts.skipped += 1 397 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name) 398 elif test_case.status == TestStatus.TEST_CRASHED: 399 counts.crashed += 1 400 print_with_timestamp(red('[CRASHED] ' + test_case.name)) 401 print_log(map(yellow, test_case.log)) 402 print_with_timestamp('') 403 else: 404 counts.failed += 1 405 print_with_timestamp(red('[FAILED] ') + test_case.name) 406 print_log(map(yellow, test_case.log)) 407 print_with_timestamp('') 408 return counts 409 410def parse_run_tests(kernel_output: Iterable[str]) -> TestResult: 411 counts = TestCounts() 412 lines = extract_tap_lines(kernel_output) 413 test_result = parse_test_result(lines) 414 if test_result.status == TestStatus.NO_TESTS: 415 print(red('[ERROR] ') + yellow('no tests run!')) 416 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS: 417 print(red('[ERROR] ') + yellow('could not parse test results!')) 418 else: 419 counts = print_and_count_results(test_result) 420 print_with_timestamp(DIVIDER) 421 if test_result.status == TestStatus.SUCCESS: 422 fmt = green 423 elif test_result.status == TestStatus.SKIPPED: 424 fmt = yellow 425 else: 426 fmt =red 427 print_with_timestamp( 428 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' % 429 (counts.total(), counts.failed, counts.crashed, counts.skipped))) 430 return test_result 431