xref: /openbmc/linux/tools/testing/kunit/kunit_parser.py (revision 61c1f340bc809a1ca1e3c8794207a91cde1a7c78)
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13import re
14import sys
15
16import datetime
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20class Test:
21	"""
22	A class to represent a test parsed from KTAP results. All KTAP
23	results within a test log are stored in a main Test object as
24	subtests.
25
26	Attributes:
27	status : TestStatus - status of the test
28	name : str - name of the test
29	expected_count : int - expected number of subtests (0 if single
30		test case and None if unknown expected number of subtests)
31	subtests : List[Test] - list of subtests
32	log : List[str] - log of KTAP lines that correspond to the test
33	counts : TestCounts - counts of the test statuses and errors of
34		subtests or of the test itself if the test is a single
35		test case.
36	"""
37	def __init__(self) -> None:
38		"""Creates Test object with default attributes."""
39		self.status = TestStatus.TEST_CRASHED
40		self.name = ''
41		self.expected_count = 0  # type: Optional[int]
42		self.subtests = []  # type: List[Test]
43		self.log = []  # type: List[str]
44		self.counts = TestCounts()
45
46	def __str__(self) -> str:
47		"""Returns string representation of a Test class object."""
48		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
49			f'{self.subtests}, {self.log}, {self.counts})')
50
51	def __repr__(self) -> str:
52		"""Returns string representation of a Test class object."""
53		return str(self)
54
55	def add_error(self, error_message: str) -> None:
56		"""Records an error that occurred while parsing this test."""
57		self.counts.errors += 1
58		print_with_timestamp(red('[ERROR]') + f' Test: {self.name}: {error_message}')
59
60class TestStatus(Enum):
61	"""An enumeration class to represent the status of a test."""
62	SUCCESS = auto()
63	FAILURE = auto()
64	SKIPPED = auto()
65	TEST_CRASHED = auto()
66	NO_TESTS = auto()
67	FAILURE_TO_PARSE_TESTS = auto()
68
69class TestCounts:
70	"""
71	Tracks the counts of statuses of all test cases and any errors within
72	a Test.
73
74	Attributes:
75	passed : int - the number of tests that have passed
76	failed : int - the number of tests that have failed
77	crashed : int - the number of tests that have crashed
78	skipped : int - the number of tests that have skipped
79	errors : int - the number of errors in the test and subtests
80	"""
81	def __init__(self):
82		"""Creates TestCounts object with counts of all test
83		statuses and test errors set to 0.
84		"""
85		self.passed = 0
86		self.failed = 0
87		self.crashed = 0
88		self.skipped = 0
89		self.errors = 0
90
91	def __str__(self) -> str:
92		"""Returns the string representation of a TestCounts object."""
93		statuses = [('passed', self.passed), ('failed', self.failed),
94			('crashed', self.crashed), ('skipped', self.skipped),
95			('errors', self.errors)]
96		return f'Ran {self.total()} tests: ' + \
97			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
98
99	def total(self) -> int:
100		"""Returns the total number of test cases within a test
101		object, where a test case is a test with no subtests.
102		"""
103		return (self.passed + self.failed + self.crashed +
104			self.skipped)
105
106	def add_subtest_counts(self, counts: TestCounts) -> None:
107		"""
108		Adds the counts of another TestCounts object to the current
109		TestCounts object. Used to add the counts of a subtest to the
110		parent test.
111
112		Parameters:
113		counts - a different TestCounts object whose counts
114			will be added to the counts of the TestCounts object
115		"""
116		self.passed += counts.passed
117		self.failed += counts.failed
118		self.crashed += counts.crashed
119		self.skipped += counts.skipped
120		self.errors += counts.errors
121
122	def get_status(self) -> TestStatus:
123		"""Returns the aggregated status of a Test using test
124		counts.
125		"""
126		if self.total() == 0:
127			return TestStatus.NO_TESTS
128		if self.crashed:
129			# Crashes should take priority.
130			return TestStatus.TEST_CRASHED
131		if self.failed:
132			return TestStatus.FAILURE
133		if self.passed:
134			# No failures or crashes, looks good!
135			return TestStatus.SUCCESS
136		# We have only skipped tests.
137		return TestStatus.SKIPPED
138
139	def add_status(self, status: TestStatus) -> None:
140		"""Increments the count for `status`."""
141		if status == TestStatus.SUCCESS:
142			self.passed += 1
143		elif status == TestStatus.FAILURE:
144			self.failed += 1
145		elif status == TestStatus.SKIPPED:
146			self.skipped += 1
147		elif status != TestStatus.NO_TESTS:
148			self.crashed += 1
149
150class LineStream:
151	"""
152	A class to represent the lines of kernel output.
153	Provides a lazy peek()/pop() interface over an iterator of
154	(line#, text).
155	"""
156	_lines: Iterator[Tuple[int, str]]
157	_next: Tuple[int, str]
158	_need_next: bool
159	_done: bool
160
161	def __init__(self, lines: Iterator[Tuple[int, str]]):
162		"""Creates a new LineStream that wraps the given iterator."""
163		self._lines = lines
164		self._done = False
165		self._need_next = True
166		self._next = (0, '')
167
168	def _get_next(self) -> None:
169		"""Advances the LineSteam to the next line, if necessary."""
170		if not self._need_next:
171			return
172		try:
173			self._next = next(self._lines)
174		except StopIteration:
175			self._done = True
176		finally:
177			self._need_next = False
178
179	def peek(self) -> str:
180		"""Returns the current line, without advancing the LineStream.
181		"""
182		self._get_next()
183		return self._next[1]
184
185	def pop(self) -> str:
186		"""Returns the current line and advances the LineStream to
187		the next line.
188		"""
189		s = self.peek()
190		if self._done:
191			raise ValueError(f'LineStream: going past EOF, last line was {s}')
192		self._need_next = True
193		return s
194
195	def __bool__(self) -> bool:
196		"""Returns True if stream has more lines."""
197		self._get_next()
198		return not self._done
199
200	# Only used by kunit_tool_test.py.
201	def __iter__(self) -> Iterator[str]:
202		"""Empties all lines stored in LineStream object into
203		Iterator object and returns the Iterator object.
204		"""
205		while bool(self):
206			yield self.pop()
207
208	def line_number(self) -> int:
209		"""Returns the line number of the current line."""
210		self._get_next()
211		return self._next[0]
212
213# Parsing helper methods:
214
215KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
216TAP_START = re.compile(r'TAP version ([0-9]+)$')
217KTAP_END = re.compile('(List of all partitions:|'
218	'Kernel panic - not syncing: VFS:|reboot: System halted)')
219
220def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
221	"""Extracts KTAP lines from the kernel output."""
222	def isolate_ktap_output(kernel_output: Iterable[str]) \
223			-> Iterator[Tuple[int, str]]:
224		line_num = 0
225		started = False
226		for line in kernel_output:
227			line_num += 1
228			line = line.rstrip()  # remove trailing \n
229			if not started and KTAP_START.search(line):
230				# start extracting KTAP lines and set prefix
231				# to number of characters before version line
232				prefix_len = len(
233					line.split('KTAP version')[0])
234				started = True
235				yield line_num, line[prefix_len:]
236			elif not started and TAP_START.search(line):
237				# start extracting KTAP lines and set prefix
238				# to number of characters before version line
239				prefix_len = len(line.split('TAP version')[0])
240				started = True
241				yield line_num, line[prefix_len:]
242			elif started and KTAP_END.search(line):
243				# stop extracting KTAP lines
244				break
245			elif started:
246				# remove prefix and any indention and yield
247				# line with line number
248				line = line[prefix_len:].lstrip()
249				yield line_num, line
250	return LineStream(lines=isolate_ktap_output(kernel_output))
251
252KTAP_VERSIONS = [1]
253TAP_VERSIONS = [13, 14]
254
255def check_version(version_num: int, accepted_versions: List[int],
256			version_type: str, test: Test) -> None:
257	"""
258	Adds error to test object if version number is too high or too
259	low.
260
261	Parameters:
262	version_num - The inputted version number from the parsed KTAP or TAP
263		header line
264	accepted_version - List of accepted KTAP or TAP versions
265	version_type - 'KTAP' or 'TAP' depending on the type of
266		version line.
267	test - Test object for current test being parsed
268	"""
269	if version_num < min(accepted_versions):
270		test.add_error(f'{version_type} version lower than expected!')
271	elif version_num > max(accepted_versions):
272		test.add_error(f'{version_type} version higer than expected!')
273
274def parse_ktap_header(lines: LineStream, test: Test) -> bool:
275	"""
276	Parses KTAP/TAP header line and checks version number.
277	Returns False if fails to parse KTAP/TAP header line.
278
279	Accepted formats:
280	- 'KTAP version [version number]'
281	- 'TAP version [version number]'
282
283	Parameters:
284	lines - LineStream of KTAP output to parse
285	test - Test object for current test being parsed
286
287	Return:
288	True if successfully parsed KTAP/TAP header line
289	"""
290	ktap_match = KTAP_START.match(lines.peek())
291	tap_match = TAP_START.match(lines.peek())
292	if ktap_match:
293		version_num = int(ktap_match.group(1))
294		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
295	elif tap_match:
296		version_num = int(tap_match.group(1))
297		check_version(version_num, TAP_VERSIONS, 'TAP', test)
298	else:
299		return False
300	test.log.append(lines.pop())
301	return True
302
303TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306	"""
307	Parses test header and stores test name in test object.
308	Returns False if fails to parse test header line.
309
310	Accepted format:
311	- '# Subtest: [test name]'
312
313	Parameters:
314	lines - LineStream of KTAP output to parse
315	test - Test object for current test being parsed
316
317	Return:
318	True if successfully parsed test header line
319	"""
320	match = TEST_HEADER.match(lines.peek())
321	if not match:
322		return False
323	test.log.append(lines.pop())
324	test.name = match.group(1)
325	return True
326
327TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330	"""
331	Parses test plan line and stores the expected number of subtests in
332	test object. Reports an error if expected count is 0.
333	Returns False and sets expected_count to None if there is no valid test
334	plan.
335
336	Accepted format:
337	- '1..[number of subtests]'
338
339	Parameters:
340	lines - LineStream of KTAP output to parse
341	test - Test object for current test being parsed
342
343	Return:
344	True if successfully parsed test plan line
345	"""
346	match = TEST_PLAN.match(lines.peek())
347	if not match:
348		test.expected_count = None
349		return False
350	test.log.append(lines.pop())
351	expected_count = int(match.group(1))
352	test.expected_count = expected_count
353	return True
354
355TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360	"""
361	Matches current line with the format of a test result line and checks
362	if the name matches the name of the current test.
363	Returns False if fails to match format or name.
364
365	Accepted format:
366	- '[ok|not ok] [test number] [-] [test name] [optional skip
367		directive]'
368
369	Parameters:
370	lines - LineStream of KTAP output to parse
371	test - Test object for current test being parsed
372
373	Return:
374	True if matched a test result line and the name matching the
375		expected test name
376	"""
377	line = lines.peek()
378	match = TEST_RESULT.match(line)
379	if not match:
380		return False
381	name = match.group(4)
382	return name == test.name
383
384def parse_test_result(lines: LineStream, test: Test,
385			expected_num: int) -> bool:
386	"""
387	Parses test result line and stores the status and name in the test
388	object. Reports an error if the test number does not match expected
389	test number.
390	Returns False if fails to parse test result line.
391
392	Note that the SKIP directive is the only direction that causes a
393	change in status.
394
395	Accepted format:
396	- '[ok|not ok] [test number] [-] [test name] [optional skip
397		directive]'
398
399	Parameters:
400	lines - LineStream of KTAP output to parse
401	test - Test object for current test being parsed
402	expected_num - expected test number for current test
403
404	Return:
405	True if successfully parsed a test result line.
406	"""
407	line = lines.peek()
408	match = TEST_RESULT.match(line)
409	skip_match = TEST_RESULT_SKIP.match(line)
410
411	# Check if line matches test result line format
412	if not match:
413		return False
414	test.log.append(lines.pop())
415
416	# Set name of test object
417	if skip_match:
418		test.name = skip_match.group(4)
419	else:
420		test.name = match.group(4)
421
422	# Check test num
423	num = int(match.group(2))
424	if num != expected_num:
425		test.add_error(f'Expected test number {expected_num} but found {num}')
426
427	# Set status of test object
428	status = match.group(1)
429	if skip_match:
430		test.status = TestStatus.SKIPPED
431	elif status == 'ok':
432		test.status = TestStatus.SUCCESS
433	else:
434		test.status = TestStatus.FAILURE
435	return True
436
437def parse_diagnostic(lines: LineStream) -> List[str]:
438	"""
439	Parse lines that do not match the format of a test result line or
440	test header line and returns them in list.
441
442	Line formats that are not parsed:
443	- '# Subtest: [test name]'
444	- '[ok|not ok] [test number] [-] [test name] [optional skip
445		directive]'
446
447	Parameters:
448	lines - LineStream of KTAP output to parse
449
450	Return:
451	Log of diagnostic lines
452	"""
453	log = []  # type: List[str]
454	while lines and not TEST_RESULT.match(lines.peek()) and not \
455			TEST_HEADER.match(lines.peek()):
456		log.append(lines.pop())
457	return log
458
459
460# Printing helper methods:
461
462DIVIDER = '=' * 60
463
464RESET = '\033[0;0m'
465
466def red(text: str) -> str:
467	"""Returns inputted string with red color code."""
468	if not sys.stdout.isatty():
469		return text
470	return '\033[1;31m' + text + RESET
471
472def yellow(text: str) -> str:
473	"""Returns inputted string with yellow color code."""
474	if not sys.stdout.isatty():
475		return text
476	return '\033[1;33m' + text + RESET
477
478def green(text: str) -> str:
479	"""Returns inputted string with green color code."""
480	if not sys.stdout.isatty():
481		return text
482	return '\033[1;32m' + text + RESET
483
484ANSI_LEN = len(red(''))
485
486def print_with_timestamp(message: str) -> None:
487	"""Prints message with timestamp at beginning."""
488	print('[%s] %s' % (datetime.datetime.now().strftime('%H:%M:%S'), message))
489
490def format_test_divider(message: str, len_message: int) -> str:
491	"""
492	Returns string with message centered in fixed width divider.
493
494	Example:
495	'===================== message example ====================='
496
497	Parameters:
498	message - message to be centered in divider line
499	len_message - length of the message to be printed such that
500		any characters of the color codes are not counted
501
502	Return:
503	String containing message centered in fixed width divider
504	"""
505	default_count = 3  # default number of dashes
506	len_1 = default_count
507	len_2 = default_count
508	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
509	if difference > 0:
510		# calculate number of dashes for each side of the divider
511		len_1 = int(difference / 2)
512		len_2 = difference - len_1
513	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
514
515def print_test_header(test: Test) -> None:
516	"""
517	Prints test header with test name and optionally the expected number
518	of subtests.
519
520	Example:
521	'=================== example (2 subtests) ==================='
522
523	Parameters:
524	test - Test object representing current test being printed
525	"""
526	message = test.name
527	if test.expected_count:
528		if test.expected_count == 1:
529			message += ' (1 subtest)'
530		else:
531			message += f' ({test.expected_count} subtests)'
532	print_with_timestamp(format_test_divider(message, len(message)))
533
534def print_log(log: Iterable[str]) -> None:
535	"""Prints all strings in saved log for test in yellow."""
536	for m in log:
537		print_with_timestamp(yellow(m))
538
539def format_test_result(test: Test) -> str:
540	"""
541	Returns string with formatted test result with colored status and test
542	name.
543
544	Example:
545	'[PASSED] example'
546
547	Parameters:
548	test - Test object representing current test being printed
549
550	Return:
551	String containing formatted test result
552	"""
553	if test.status == TestStatus.SUCCESS:
554		return green('[PASSED] ') + test.name
555	if test.status == TestStatus.SKIPPED:
556		return yellow('[SKIPPED] ') + test.name
557	if test.status == TestStatus.NO_TESTS:
558		return yellow('[NO TESTS RUN] ') + test.name
559	if test.status == TestStatus.TEST_CRASHED:
560		print_log(test.log)
561		return red('[CRASHED] ') + test.name
562	print_log(test.log)
563	return red('[FAILED] ') + test.name
564
565def print_test_result(test: Test) -> None:
566	"""
567	Prints result line with status of test.
568
569	Example:
570	'[PASSED] example'
571
572	Parameters:
573	test - Test object representing current test being printed
574	"""
575	print_with_timestamp(format_test_result(test))
576
577def print_test_footer(test: Test) -> None:
578	"""
579	Prints test footer with status of test.
580
581	Example:
582	'===================== [PASSED] example ====================='
583
584	Parameters:
585	test - Test object representing current test being printed
586	"""
587	message = format_test_result(test)
588	print_with_timestamp(format_test_divider(message,
589		len(message) - ANSI_LEN))
590
591def print_summary_line(test: Test) -> None:
592	"""
593	Prints summary line of test object. Color of line is dependent on
594	status of test. Color is green if test passes, yellow if test is
595	skipped, and red if the test fails or crashes. Summary line contains
596	counts of the statuses of the tests subtests or the test itself if it
597	has no subtests.
598
599	Example:
600	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
601	Errors: 0"
602
603	test - Test object representing current test being printed
604	"""
605	if test.status == TestStatus.SUCCESS:
606		color = green
607	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
608		color = yellow
609	else:
610		color = red
611	print_with_timestamp(color(f'Testing complete. {test.counts}'))
612
613# Other methods:
614
615def bubble_up_test_results(test: Test) -> None:
616	"""
617	If the test has subtests, add the test counts of the subtests to the
618	test and check if any of the tests crashed and if so set the test
619	status to crashed. Otherwise if the test has no subtests add the
620	status of the test to the test counts.
621
622	Parameters:
623	test - Test object for current test being parsed
624	"""
625	subtests = test.subtests
626	counts = test.counts
627	status = test.status
628	for t in subtests:
629		counts.add_subtest_counts(t.counts)
630	if counts.total() == 0:
631		counts.add_status(status)
632	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
633		test.status = TestStatus.TEST_CRASHED
634
635def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
636	"""
637	Finds next test to parse in LineStream, creates new Test object,
638	parses any subtests of the test, populates Test object with all
639	information (status, name) about the test and the Test objects for
640	any subtests, and then returns the Test object. The method accepts
641	three formats of tests:
642
643	Accepted test formats:
644
645	- Main KTAP/TAP header
646
647	Example:
648
649	KTAP version 1
650	1..4
651	[subtests]
652
653	- Subtest header line
654
655	Example:
656
657	# Subtest: name
658	1..3
659	[subtests]
660	ok 1 name
661
662	- Test result line
663
664	Example:
665
666	ok 1 - test
667
668	Parameters:
669	lines - LineStream of KTAP output to parse
670	expected_num - expected test number for test to be parsed
671	log - list of strings containing any preceding diagnostic lines
672		corresponding to the current test
673
674	Return:
675	Test object populated with characteristics and any subtests
676	"""
677	test = Test()
678	test.log.extend(log)
679	parent_test = False
680	main = parse_ktap_header(lines, test)
681	if main:
682		# If KTAP/TAP header is found, attempt to parse
683		# test plan
684		test.name = "main"
685		parse_test_plan(lines, test)
686		parent_test = True
687	else:
688		# If KTAP/TAP header is not found, test must be subtest
689		# header or test result line so parse attempt to parser
690		# subtest header
691		parent_test = parse_test_header(lines, test)
692		if parent_test:
693			# If subtest header is found, attempt to parse
694			# test plan and print header
695			parse_test_plan(lines, test)
696			print_test_header(test)
697	expected_count = test.expected_count
698	subtests = []
699	test_num = 1
700	while parent_test and (expected_count is None or test_num <= expected_count):
701		# Loop to parse any subtests.
702		# Break after parsing expected number of tests or
703		# if expected number of tests is unknown break when test
704		# result line with matching name to subtest header is found
705		# or no more lines in stream.
706		sub_log = parse_diagnostic(lines)
707		sub_test = Test()
708		if not lines or (peek_test_name_match(lines, test) and
709				not main):
710			if expected_count and test_num <= expected_count:
711				# If parser reaches end of test before
712				# parsing expected number of subtests, print
713				# crashed subtest and record error
714				test.add_error('missing expected subtest!')
715				sub_test.log.extend(sub_log)
716				test.counts.add_status(
717					TestStatus.TEST_CRASHED)
718				print_test_result(sub_test)
719			else:
720				test.log.extend(sub_log)
721				break
722		else:
723			sub_test = parse_test(lines, test_num, sub_log)
724		subtests.append(sub_test)
725		test_num += 1
726	test.subtests = subtests
727	if not main:
728		# If not main test, look for test result line
729		test.log.extend(parse_diagnostic(lines))
730		if (parent_test and peek_test_name_match(lines, test)) or \
731				not parent_test:
732			parse_test_result(lines, test, expected_num)
733		else:
734			test.add_error('missing subtest result line!')
735
736	# Check for there being no tests
737	if parent_test and len(subtests) == 0:
738		# Don't override a bad status if this test had one reported.
739		# Assumption: no subtests means CRASHED is from Test.__init__()
740		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
741			test.status = TestStatus.NO_TESTS
742			test.add_error('0 tests run!')
743
744	# Add statuses to TestCounts attribute in Test object
745	bubble_up_test_results(test)
746	if parent_test and not main:
747		# If test has subtests and is not the main test object, print
748		# footer.
749		print_test_footer(test)
750	elif not main:
751		print_test_result(test)
752	return test
753
754def parse_run_tests(kernel_output: Iterable[str]) -> Test:
755	"""
756	Using kernel output, extract KTAP lines, parse the lines for test
757	results and print condensed test results and summary line.
758
759	Parameters:
760	kernel_output - Iterable object contains lines of kernel output
761
762	Return:
763	Test - the main test object with all subtests.
764	"""
765	print_with_timestamp(DIVIDER)
766	lines = extract_tap_lines(kernel_output)
767	test = Test()
768	if not lines:
769		test.name = '<missing>'
770		test.add_error('could not find any KTAP output!')
771		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
772	else:
773		test = parse_test(lines, 0, [])
774		if test.status != TestStatus.NO_TESTS:
775			test.status = test.counts.get_status()
776	print_with_timestamp(DIVIDER)
777	print_summary_line(test)
778	return test
779