1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses test results from a kernel dmesg log. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import re 10 11from collections import namedtuple 12from datetime import datetime 13from enum import Enum, auto 14from functools import reduce 15from typing import List 16 17TestResult = namedtuple('TestResult', ['status','suites','log']) 18 19class TestSuite(object): 20 def __init__(self): 21 self.status = None 22 self.name = None 23 self.cases = [] 24 25 def __str__(self): 26 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')' 27 28 def __repr__(self): 29 return str(self) 30 31class TestCase(object): 32 def __init__(self): 33 self.status = None 34 self.name = '' 35 self.log = [] 36 37 def __str__(self): 38 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')' 39 40 def __repr__(self): 41 return str(self) 42 43class TestStatus(Enum): 44 SUCCESS = auto() 45 FAILURE = auto() 46 TEST_CRASHED = auto() 47 NO_TESTS = auto() 48 49kunit_start_re = re.compile(r'^TAP version [0-9]+$') 50kunit_end_re = re.compile('List of all partitions:') 51 52def isolate_kunit_output(kernel_output): 53 started = False 54 for line in kernel_output: 55 if kunit_start_re.match(line): 56 started = True 57 yield line 58 elif kunit_end_re.match(line): 59 break 60 elif started: 61 yield line 62 63def raw_output(kernel_output): 64 for line in kernel_output: 65 print(line) 66 67DIVIDER = '=' * 60 68 69RESET = '\033[0;0m' 70 71def red(text): 72 return '\033[1;31m' + text + RESET 73 74def yellow(text): 75 return '\033[1;33m' + text + RESET 76 77def green(text): 78 return '\033[1;32m' + text + RESET 79 80def print_with_timestamp(message): 81 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) 82 83def format_suite_divider(message): 84 return '======== ' + message + ' ========' 85 86def print_suite_divider(message): 87 print_with_timestamp(DIVIDER) 88 print_with_timestamp(format_suite_divider(message)) 89 90def print_log(log): 91 for m in log: 92 print_with_timestamp(m) 93 94TAP_ENTRIES = re.compile(r'^(TAP|\t?ok|\t?not ok|\t?[0-9]+\.\.[0-9]+|\t?#).*$') 95 96def consume_non_diagnositic(lines: List[str]) -> None: 97 while lines and not TAP_ENTRIES.match(lines[0]): 98 lines.pop(0) 99 100def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None: 101 while lines and not TAP_ENTRIES.match(lines[0]): 102 test_case.log.append(lines[0]) 103 lines.pop(0) 104 105OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) 106 107OK_NOT_OK_SUBTEST = re.compile(r'^\t(ok|not ok) [0-9]+ - (.*)$') 108 109OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) [0-9]+ - (.*)$') 110 111def parse_ok_not_ok_test_case(lines: List[str], 112 test_case: TestCase, 113 expecting_test_case: bool) -> bool: 114 save_non_diagnositic(lines, test_case) 115 if not lines: 116 if expecting_test_case: 117 test_case.status = TestStatus.TEST_CRASHED 118 return True 119 else: 120 return False 121 line = lines[0] 122 match = OK_NOT_OK_SUBTEST.match(line) 123 if match: 124 test_case.log.append(lines.pop(0)) 125 test_case.name = match.group(2) 126 if test_case.status == TestStatus.TEST_CRASHED: 127 return True 128 if match.group(1) == 'ok': 129 test_case.status = TestStatus.SUCCESS 130 else: 131 test_case.status = TestStatus.FAILURE 132 return True 133 else: 134 return False 135 136SUBTEST_DIAGNOSTIC = re.compile(r'^\t# .*?: (.*)$') 137DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!' 138 139def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool: 140 save_non_diagnositic(lines, test_case) 141 if not lines: 142 return False 143 line = lines[0] 144 match = SUBTEST_DIAGNOSTIC.match(line) 145 if match: 146 test_case.log.append(lines.pop(0)) 147 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE: 148 test_case.status = TestStatus.TEST_CRASHED 149 return True 150 else: 151 return False 152 153def parse_test_case(lines: List[str], expecting_test_case: bool) -> TestCase: 154 test_case = TestCase() 155 save_non_diagnositic(lines, test_case) 156 while parse_diagnostic(lines, test_case): 157 pass 158 if parse_ok_not_ok_test_case(lines, test_case, expecting_test_case): 159 return test_case 160 else: 161 return None 162 163SUBTEST_HEADER = re.compile(r'^\t# Subtest: (.*)$') 164 165def parse_subtest_header(lines: List[str]) -> str: 166 consume_non_diagnositic(lines) 167 if not lines: 168 return None 169 match = SUBTEST_HEADER.match(lines[0]) 170 if match: 171 lines.pop(0) 172 return match.group(1) 173 else: 174 return None 175 176SUBTEST_PLAN = re.compile(r'\t[0-9]+\.\.([0-9]+)') 177 178def parse_subtest_plan(lines: List[str]) -> int: 179 consume_non_diagnositic(lines) 180 match = SUBTEST_PLAN.match(lines[0]) 181 if match: 182 lines.pop(0) 183 return int(match.group(1)) 184 else: 185 return None 186 187def max_status(left: TestStatus, right: TestStatus) -> TestStatus: 188 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: 189 return TestStatus.TEST_CRASHED 190 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: 191 return TestStatus.FAILURE 192 elif left != TestStatus.SUCCESS: 193 return left 194 elif right != TestStatus.SUCCESS: 195 return right 196 else: 197 return TestStatus.SUCCESS 198 199def parse_ok_not_ok_test_suite(lines: List[str], test_suite: TestSuite) -> bool: 200 consume_non_diagnositic(lines) 201 if not lines: 202 test_suite.status = TestStatus.TEST_CRASHED 203 return False 204 line = lines[0] 205 match = OK_NOT_OK_MODULE.match(line) 206 if match: 207 lines.pop(0) 208 if match.group(1) == 'ok': 209 test_suite.status = TestStatus.SUCCESS 210 else: 211 test_suite.status = TestStatus.FAILURE 212 return True 213 else: 214 return False 215 216def bubble_up_errors(to_status, status_container_list) -> TestStatus: 217 status_list = map(to_status, status_container_list) 218 return reduce(max_status, status_list, TestStatus.SUCCESS) 219 220def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: 221 max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases) 222 return max_status(max_test_case_status, test_suite.status) 223 224def parse_test_suite(lines: List[str]) -> TestSuite: 225 if not lines: 226 return None 227 consume_non_diagnositic(lines) 228 test_suite = TestSuite() 229 test_suite.status = TestStatus.SUCCESS 230 name = parse_subtest_header(lines) 231 if not name: 232 return None 233 test_suite.name = name 234 expected_test_case_num = parse_subtest_plan(lines) 235 if not expected_test_case_num: 236 return None 237 test_case = parse_test_case(lines, expected_test_case_num > 0) 238 expected_test_case_num -= 1 239 while test_case: 240 test_suite.cases.append(test_case) 241 test_case = parse_test_case(lines, expected_test_case_num > 0) 242 expected_test_case_num -= 1 243 if parse_ok_not_ok_test_suite(lines, test_suite): 244 test_suite.status = bubble_up_test_case_errors(test_suite) 245 return test_suite 246 elif not lines: 247 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') 248 return test_suite 249 else: 250 print('failed to parse end of suite' + lines[0]) 251 return None 252 253TAP_HEADER = re.compile(r'^TAP version 14$') 254 255def parse_tap_header(lines: List[str]) -> bool: 256 consume_non_diagnositic(lines) 257 if TAP_HEADER.match(lines[0]): 258 lines.pop(0) 259 return True 260 else: 261 return False 262 263def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus: 264 return bubble_up_errors(lambda x: x.status, test_suite_list) 265 266def parse_test_result(lines: List[str]) -> TestResult: 267 if not lines: 268 return TestResult(TestStatus.NO_TESTS, [], lines) 269 consume_non_diagnositic(lines) 270 if not parse_tap_header(lines): 271 return None 272 test_suites = [] 273 test_suite = parse_test_suite(lines) 274 while test_suite: 275 test_suites.append(test_suite) 276 test_suite = parse_test_suite(lines) 277 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines) 278 279def parse_run_tests(kernel_output) -> TestResult: 280 total_tests = 0 281 failed_tests = 0 282 crashed_tests = 0 283 test_result = parse_test_result(list(isolate_kunit_output(kernel_output))) 284 for test_suite in test_result.suites: 285 if test_suite.status == TestStatus.SUCCESS: 286 print_suite_divider(green('[PASSED] ') + test_suite.name) 287 elif test_suite.status == TestStatus.TEST_CRASHED: 288 print_suite_divider(red('[CRASHED] ' + test_suite.name)) 289 else: 290 print_suite_divider(red('[FAILED] ') + test_suite.name) 291 for test_case in test_suite.cases: 292 total_tests += 1 293 if test_case.status == TestStatus.SUCCESS: 294 print_with_timestamp(green('[PASSED] ') + test_case.name) 295 elif test_case.status == TestStatus.TEST_CRASHED: 296 crashed_tests += 1 297 print_with_timestamp(red('[CRASHED] ' + test_case.name)) 298 print_log(map(yellow, test_case.log)) 299 print_with_timestamp('') 300 else: 301 failed_tests += 1 302 print_with_timestamp(red('[FAILED] ') + test_case.name) 303 print_log(map(yellow, test_case.log)) 304 print_with_timestamp('') 305 print_with_timestamp(DIVIDER) 306 fmt = green if test_result.status == TestStatus.SUCCESS else red 307 print_with_timestamp( 308 fmt('Testing complete. %d tests run. %d failed. %d crashed.' % 309 (total_tests, failed_tests, crashed_tests))) 310 return test_result 311