1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13import re
14import sys
15
16from enum import Enum, auto
17from typing import Iterable, Iterator, List, Optional, Tuple
18
19from kunit_printer import stdout
20
21class Test:
22	"""
23	A class to represent a test parsed from KTAP results. All KTAP
24	results within a test log are stored in a main Test object as
25	subtests.
26
27	Attributes:
28	status : TestStatus - status of the test
29	name : str - name of the test
30	expected_count : int - expected number of subtests (0 if single
31		test case and None if unknown expected number of subtests)
32	subtests : List[Test] - list of subtests
33	log : List[str] - log of KTAP lines that correspond to the test
34	counts : TestCounts - counts of the test statuses and errors of
35		subtests or of the test itself if the test is a single
36		test case.
37	"""
38	def __init__(self) -> None:
39		"""Creates Test object with default attributes."""
40		self.status = TestStatus.TEST_CRASHED
41		self.name = ''
42		self.expected_count = 0  # type: Optional[int]
43		self.subtests = []  # type: List[Test]
44		self.log = []  # type: List[str]
45		self.counts = TestCounts()
46
47	def __str__(self) -> str:
48		"""Returns string representation of a Test class object."""
49		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50			f'{self.subtests}, {self.log}, {self.counts})')
51
52	def __repr__(self) -> str:
53		"""Returns string representation of a Test class object."""
54		return str(self)
55
56	def add_error(self, error_message: str) -> None:
57		"""Records an error that occurred while parsing this test."""
58		self.counts.errors += 1
59		stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61class TestStatus(Enum):
62	"""An enumeration class to represent the status of a test."""
63	SUCCESS = auto()
64	FAILURE = auto()
65	SKIPPED = auto()
66	TEST_CRASHED = auto()
67	NO_TESTS = auto()
68	FAILURE_TO_PARSE_TESTS = auto()
69
70class TestCounts:
71	"""
72	Tracks the counts of statuses of all test cases and any errors within
73	a Test.
74
75	Attributes:
76	passed : int - the number of tests that have passed
77	failed : int - the number of tests that have failed
78	crashed : int - the number of tests that have crashed
79	skipped : int - the number of tests that have skipped
80	errors : int - the number of errors in the test and subtests
81	"""
82	def __init__(self):
83		"""Creates TestCounts object with counts of all test
84		statuses and test errors set to 0.
85		"""
86		self.passed = 0
87		self.failed = 0
88		self.crashed = 0
89		self.skipped = 0
90		self.errors = 0
91
92	def __str__(self) -> str:
93		"""Returns the string representation of a TestCounts object."""
94		statuses = [('passed', self.passed), ('failed', self.failed),
95			('crashed', self.crashed), ('skipped', self.skipped),
96			('errors', self.errors)]
97		return f'Ran {self.total()} tests: ' + \
98			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100	def total(self) -> int:
101		"""Returns the total number of test cases within a test
102		object, where a test case is a test with no subtests.
103		"""
104		return (self.passed + self.failed + self.crashed +
105			self.skipped)
106
107	def add_subtest_counts(self, counts: TestCounts) -> None:
108		"""
109		Adds the counts of another TestCounts object to the current
110		TestCounts object. Used to add the counts of a subtest to the
111		parent test.
112
113		Parameters:
114		counts - a different TestCounts object whose counts
115			will be added to the counts of the TestCounts object
116		"""
117		self.passed += counts.passed
118		self.failed += counts.failed
119		self.crashed += counts.crashed
120		self.skipped += counts.skipped
121		self.errors += counts.errors
122
123	def get_status(self) -> TestStatus:
124		"""Returns the aggregated status of a Test using test
125		counts.
126		"""
127		if self.total() == 0:
128			return TestStatus.NO_TESTS
129		if self.crashed:
130			# Crashes should take priority.
131			return TestStatus.TEST_CRASHED
132		if self.failed:
133			return TestStatus.FAILURE
134		if self.passed:
135			# No failures or crashes, looks good!
136			return TestStatus.SUCCESS
137		# We have only skipped tests.
138		return TestStatus.SKIPPED
139
140	def add_status(self, status: TestStatus) -> None:
141		"""Increments the count for `status`."""
142		if status == TestStatus.SUCCESS:
143			self.passed += 1
144		elif status == TestStatus.FAILURE:
145			self.failed += 1
146		elif status == TestStatus.SKIPPED:
147			self.skipped += 1
148		elif status != TestStatus.NO_TESTS:
149			self.crashed += 1
150
151class LineStream:
152	"""
153	A class to represent the lines of kernel output.
154	Provides a lazy peek()/pop() interface over an iterator of
155	(line#, text).
156	"""
157	_lines: Iterator[Tuple[int, str]]
158	_next: Tuple[int, str]
159	_need_next: bool
160	_done: bool
161
162	def __init__(self, lines: Iterator[Tuple[int, str]]):
163		"""Creates a new LineStream that wraps the given iterator."""
164		self._lines = lines
165		self._done = False
166		self._need_next = True
167		self._next = (0, '')
168
169	def _get_next(self) -> None:
170		"""Advances the LineSteam to the next line, if necessary."""
171		if not self._need_next:
172			return
173		try:
174			self._next = next(self._lines)
175		except StopIteration:
176			self._done = True
177		finally:
178			self._need_next = False
179
180	def peek(self) -> str:
181		"""Returns the current line, without advancing the LineStream.
182		"""
183		self._get_next()
184		return self._next[1]
185
186	def pop(self) -> str:
187		"""Returns the current line and advances the LineStream to
188		the next line.
189		"""
190		s = self.peek()
191		if self._done:
192			raise ValueError(f'LineStream: going past EOF, last line was {s}')
193		self._need_next = True
194		return s
195
196	def __bool__(self) -> bool:
197		"""Returns True if stream has more lines."""
198		self._get_next()
199		return not self._done
200
201	# Only used by kunit_tool_test.py.
202	def __iter__(self) -> Iterator[str]:
203		"""Empties all lines stored in LineStream object into
204		Iterator object and returns the Iterator object.
205		"""
206		while bool(self):
207			yield self.pop()
208
209	def line_number(self) -> int:
210		"""Returns the line number of the current line."""
211		self._get_next()
212		return self._next[0]
213
214# Parsing helper methods:
215
216KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217TAP_START = re.compile(r'TAP version ([0-9]+)$')
218KTAP_END = re.compile('(List of all partitions:|'
219	'Kernel panic - not syncing: VFS:|reboot: System halted)')
220
221def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream:
222	"""Extracts KTAP lines from the kernel output."""
223	def isolate_ktap_output(kernel_output: Iterable[str]) \
224			-> Iterator[Tuple[int, str]]:
225		line_num = 0
226		started = False
227		for line in kernel_output:
228			line_num += 1
229			line = line.rstrip()  # remove trailing \n
230			if not started and KTAP_START.search(line):
231				# start extracting KTAP lines and set prefix
232				# to number of characters before version line
233				prefix_len = len(
234					line.split('KTAP version')[0])
235				started = True
236				yield line_num, line[prefix_len:]
237			elif not started and TAP_START.search(line):
238				# start extracting KTAP lines and set prefix
239				# to number of characters before version line
240				prefix_len = len(line.split('TAP version')[0])
241				started = True
242				yield line_num, line[prefix_len:]
243			elif started and KTAP_END.search(line):
244				# stop extracting KTAP lines
245				break
246			elif started:
247				# remove the prefix and optionally any leading
248				# whitespace. Our parsing logic relies on this.
249				line = line[prefix_len:]
250				if lstrip:
251					line = line.lstrip()
252				yield line_num, line
253	return LineStream(lines=isolate_ktap_output(kernel_output))
254
255KTAP_VERSIONS = [1]
256TAP_VERSIONS = [13, 14]
257
258def check_version(version_num: int, accepted_versions: List[int],
259			version_type: str, test: Test) -> None:
260	"""
261	Adds error to test object if version number is too high or too
262	low.
263
264	Parameters:
265	version_num - The inputted version number from the parsed KTAP or TAP
266		header line
267	accepted_version - List of accepted KTAP or TAP versions
268	version_type - 'KTAP' or 'TAP' depending on the type of
269		version line.
270	test - Test object for current test being parsed
271	"""
272	if version_num < min(accepted_versions):
273		test.add_error(f'{version_type} version lower than expected!')
274	elif version_num > max(accepted_versions):
275		test.add_error(f'{version_type} version higer than expected!')
276
277def parse_ktap_header(lines: LineStream, test: Test) -> bool:
278	"""
279	Parses KTAP/TAP header line and checks version number.
280	Returns False if fails to parse KTAP/TAP header line.
281
282	Accepted formats:
283	- 'KTAP version [version number]'
284	- 'TAP version [version number]'
285
286	Parameters:
287	lines - LineStream of KTAP output to parse
288	test - Test object for current test being parsed
289
290	Return:
291	True if successfully parsed KTAP/TAP header line
292	"""
293	ktap_match = KTAP_START.match(lines.peek())
294	tap_match = TAP_START.match(lines.peek())
295	if ktap_match:
296		version_num = int(ktap_match.group(1))
297		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
298	elif tap_match:
299		version_num = int(tap_match.group(1))
300		check_version(version_num, TAP_VERSIONS, 'TAP', test)
301	else:
302		return False
303	test.log.append(lines.pop())
304	return True
305
306TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
307
308def parse_test_header(lines: LineStream, test: Test) -> bool:
309	"""
310	Parses test header and stores test name in test object.
311	Returns False if fails to parse test header line.
312
313	Accepted format:
314	- '# Subtest: [test name]'
315
316	Parameters:
317	lines - LineStream of KTAP output to parse
318	test - Test object for current test being parsed
319
320	Return:
321	True if successfully parsed test header line
322	"""
323	match = TEST_HEADER.match(lines.peek())
324	if not match:
325		return False
326	test.log.append(lines.pop())
327	test.name = match.group(1)
328	return True
329
330TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
331
332def parse_test_plan(lines: LineStream, test: Test) -> bool:
333	"""
334	Parses test plan line and stores the expected number of subtests in
335	test object. Reports an error if expected count is 0.
336	Returns False and sets expected_count to None if there is no valid test
337	plan.
338
339	Accepted format:
340	- '1..[number of subtests]'
341
342	Parameters:
343	lines - LineStream of KTAP output to parse
344	test - Test object for current test being parsed
345
346	Return:
347	True if successfully parsed test plan line
348	"""
349	match = TEST_PLAN.match(lines.peek())
350	if not match:
351		test.expected_count = None
352		return False
353	test.log.append(lines.pop())
354	expected_count = int(match.group(1))
355	test.expected_count = expected_count
356	return True
357
358TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
359
360TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
361
362def peek_test_name_match(lines: LineStream, test: Test) -> bool:
363	"""
364	Matches current line with the format of a test result line and checks
365	if the name matches the name of the current test.
366	Returns False if fails to match format or name.
367
368	Accepted format:
369	- '[ok|not ok] [test number] [-] [test name] [optional skip
370		directive]'
371
372	Parameters:
373	lines - LineStream of KTAP output to parse
374	test - Test object for current test being parsed
375
376	Return:
377	True if matched a test result line and the name matching the
378		expected test name
379	"""
380	line = lines.peek()
381	match = TEST_RESULT.match(line)
382	if not match:
383		return False
384	name = match.group(4)
385	return name == test.name
386
387def parse_test_result(lines: LineStream, test: Test,
388			expected_num: int) -> bool:
389	"""
390	Parses test result line and stores the status and name in the test
391	object. Reports an error if the test number does not match expected
392	test number.
393	Returns False if fails to parse test result line.
394
395	Note that the SKIP directive is the only direction that causes a
396	change in status.
397
398	Accepted format:
399	- '[ok|not ok] [test number] [-] [test name] [optional skip
400		directive]'
401
402	Parameters:
403	lines - LineStream of KTAP output to parse
404	test - Test object for current test being parsed
405	expected_num - expected test number for current test
406
407	Return:
408	True if successfully parsed a test result line.
409	"""
410	line = lines.peek()
411	match = TEST_RESULT.match(line)
412	skip_match = TEST_RESULT_SKIP.match(line)
413
414	# Check if line matches test result line format
415	if not match:
416		return False
417	test.log.append(lines.pop())
418
419	# Set name of test object
420	if skip_match:
421		test.name = skip_match.group(4)
422	else:
423		test.name = match.group(4)
424
425	# Check test num
426	num = int(match.group(2))
427	if num != expected_num:
428		test.add_error(f'Expected test number {expected_num} but found {num}')
429
430	# Set status of test object
431	status = match.group(1)
432	if skip_match:
433		test.status = TestStatus.SKIPPED
434	elif status == 'ok':
435		test.status = TestStatus.SUCCESS
436	else:
437		test.status = TestStatus.FAILURE
438	return True
439
440def parse_diagnostic(lines: LineStream) -> List[str]:
441	"""
442	Parse lines that do not match the format of a test result line or
443	test header line and returns them in list.
444
445	Line formats that are not parsed:
446	- '# Subtest: [test name]'
447	- '[ok|not ok] [test number] [-] [test name] [optional skip
448		directive]'
449
450	Parameters:
451	lines - LineStream of KTAP output to parse
452
453	Return:
454	Log of diagnostic lines
455	"""
456	log = []  # type: List[str]
457	while lines and not TEST_RESULT.match(lines.peek()) and not \
458			TEST_HEADER.match(lines.peek()):
459		log.append(lines.pop())
460	return log
461
462
463# Printing helper methods:
464
465DIVIDER = '=' * 60
466
467def format_test_divider(message: str, len_message: int) -> str:
468	"""
469	Returns string with message centered in fixed width divider.
470
471	Example:
472	'===================== message example ====================='
473
474	Parameters:
475	message - message to be centered in divider line
476	len_message - length of the message to be printed such that
477		any characters of the color codes are not counted
478
479	Return:
480	String containing message centered in fixed width divider
481	"""
482	default_count = 3  # default number of dashes
483	len_1 = default_count
484	len_2 = default_count
485	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
486	if difference > 0:
487		# calculate number of dashes for each side of the divider
488		len_1 = int(difference / 2)
489		len_2 = difference - len_1
490	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
491
492def print_test_header(test: Test) -> None:
493	"""
494	Prints test header with test name and optionally the expected number
495	of subtests.
496
497	Example:
498	'=================== example (2 subtests) ==================='
499
500	Parameters:
501	test - Test object representing current test being printed
502	"""
503	message = test.name
504	if test.expected_count:
505		if test.expected_count == 1:
506			message += ' (1 subtest)'
507		else:
508			message += f' ({test.expected_count} subtests)'
509	stdout.print_with_timestamp(format_test_divider(message, len(message)))
510
511def print_log(log: Iterable[str]) -> None:
512	"""Prints all strings in saved log for test in yellow."""
513	for m in log:
514		stdout.print_with_timestamp(stdout.yellow(m))
515
516def format_test_result(test: Test) -> str:
517	"""
518	Returns string with formatted test result with colored status and test
519	name.
520
521	Example:
522	'[PASSED] example'
523
524	Parameters:
525	test - Test object representing current test being printed
526
527	Return:
528	String containing formatted test result
529	"""
530	if test.status == TestStatus.SUCCESS:
531		return stdout.green('[PASSED] ') + test.name
532	if test.status == TestStatus.SKIPPED:
533		return stdout.yellow('[SKIPPED] ') + test.name
534	if test.status == TestStatus.NO_TESTS:
535		return stdout.yellow('[NO TESTS RUN] ') + test.name
536	if test.status == TestStatus.TEST_CRASHED:
537		print_log(test.log)
538		return stdout.red('[CRASHED] ') + test.name
539	print_log(test.log)
540	return stdout.red('[FAILED] ') + test.name
541
542def print_test_result(test: Test) -> None:
543	"""
544	Prints result line with status of test.
545
546	Example:
547	'[PASSED] example'
548
549	Parameters:
550	test - Test object representing current test being printed
551	"""
552	stdout.print_with_timestamp(format_test_result(test))
553
554def print_test_footer(test: Test) -> None:
555	"""
556	Prints test footer with status of test.
557
558	Example:
559	'===================== [PASSED] example ====================='
560
561	Parameters:
562	test - Test object representing current test being printed
563	"""
564	message = format_test_result(test)
565	stdout.print_with_timestamp(format_test_divider(message,
566		len(message) - stdout.color_len()))
567
568def print_summary_line(test: Test) -> None:
569	"""
570	Prints summary line of test object. Color of line is dependent on
571	status of test. Color is green if test passes, yellow if test is
572	skipped, and red if the test fails or crashes. Summary line contains
573	counts of the statuses of the tests subtests or the test itself if it
574	has no subtests.
575
576	Example:
577	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
578	Errors: 0"
579
580	test - Test object representing current test being printed
581	"""
582	if test.status == TestStatus.SUCCESS:
583		color = stdout.green
584	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
585		color = stdout.yellow
586	else:
587		color = stdout.red
588	stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
589
590# Other methods:
591
592def bubble_up_test_results(test: Test) -> None:
593	"""
594	If the test has subtests, add the test counts of the subtests to the
595	test and check if any of the tests crashed and if so set the test
596	status to crashed. Otherwise if the test has no subtests add the
597	status of the test to the test counts.
598
599	Parameters:
600	test - Test object for current test being parsed
601	"""
602	subtests = test.subtests
603	counts = test.counts
604	status = test.status
605	for t in subtests:
606		counts.add_subtest_counts(t.counts)
607	if counts.total() == 0:
608		counts.add_status(status)
609	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
610		test.status = TestStatus.TEST_CRASHED
611
612def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
613	"""
614	Finds next test to parse in LineStream, creates new Test object,
615	parses any subtests of the test, populates Test object with all
616	information (status, name) about the test and the Test objects for
617	any subtests, and then returns the Test object. The method accepts
618	three formats of tests:
619
620	Accepted test formats:
621
622	- Main KTAP/TAP header
623
624	Example:
625
626	KTAP version 1
627	1..4
628	[subtests]
629
630	- Subtest header line
631
632	Example:
633
634	# Subtest: name
635	1..3
636	[subtests]
637	ok 1 name
638
639	- Test result line
640
641	Example:
642
643	ok 1 - test
644
645	Parameters:
646	lines - LineStream of KTAP output to parse
647	expected_num - expected test number for test to be parsed
648	log - list of strings containing any preceding diagnostic lines
649		corresponding to the current test
650
651	Return:
652	Test object populated with characteristics and any subtests
653	"""
654	test = Test()
655	test.log.extend(log)
656	parent_test = False
657	main = parse_ktap_header(lines, test)
658	if main:
659		# If KTAP/TAP header is found, attempt to parse
660		# test plan
661		test.name = "main"
662		parse_test_plan(lines, test)
663		parent_test = True
664	else:
665		# If KTAP/TAP header is not found, test must be subtest
666		# header or test result line so parse attempt to parser
667		# subtest header
668		parent_test = parse_test_header(lines, test)
669		if parent_test:
670			# If subtest header is found, attempt to parse
671			# test plan and print header
672			parse_test_plan(lines, test)
673			print_test_header(test)
674	expected_count = test.expected_count
675	subtests = []
676	test_num = 1
677	while parent_test and (expected_count is None or test_num <= expected_count):
678		# Loop to parse any subtests.
679		# Break after parsing expected number of tests or
680		# if expected number of tests is unknown break when test
681		# result line with matching name to subtest header is found
682		# or no more lines in stream.
683		sub_log = parse_diagnostic(lines)
684		sub_test = Test()
685		if not lines or (peek_test_name_match(lines, test) and
686				not main):
687			if expected_count and test_num <= expected_count:
688				# If parser reaches end of test before
689				# parsing expected number of subtests, print
690				# crashed subtest and record error
691				test.add_error('missing expected subtest!')
692				sub_test.log.extend(sub_log)
693				test.counts.add_status(
694					TestStatus.TEST_CRASHED)
695				print_test_result(sub_test)
696			else:
697				test.log.extend(sub_log)
698				break
699		else:
700			sub_test = parse_test(lines, test_num, sub_log)
701		subtests.append(sub_test)
702		test_num += 1
703	test.subtests = subtests
704	if not main:
705		# If not main test, look for test result line
706		test.log.extend(parse_diagnostic(lines))
707		if (parent_test and peek_test_name_match(lines, test)) or \
708				not parent_test:
709			parse_test_result(lines, test, expected_num)
710		else:
711			test.add_error('missing subtest result line!')
712
713	# Check for there being no tests
714	if parent_test and len(subtests) == 0:
715		# Don't override a bad status if this test had one reported.
716		# Assumption: no subtests means CRASHED is from Test.__init__()
717		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
718			test.status = TestStatus.NO_TESTS
719			test.add_error('0 tests run!')
720
721	# Add statuses to TestCounts attribute in Test object
722	bubble_up_test_results(test)
723	if parent_test and not main:
724		# If test has subtests and is not the main test object, print
725		# footer.
726		print_test_footer(test)
727	elif not main:
728		print_test_result(test)
729	return test
730
731def parse_run_tests(kernel_output: Iterable[str]) -> Test:
732	"""
733	Using kernel output, extract KTAP lines, parse the lines for test
734	results and print condensed test results and summary line.
735
736	Parameters:
737	kernel_output - Iterable object contains lines of kernel output
738
739	Return:
740	Test - the main test object with all subtests.
741	"""
742	stdout.print_with_timestamp(DIVIDER)
743	lines = extract_tap_lines(kernel_output)
744	test = Test()
745	if not lines:
746		test.name = '<missing>'
747		test.add_error('could not find any KTAP output!')
748		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
749	else:
750		test = parse_test(lines, 0, [])
751		if test.status != TestStatus.NO_TESTS:
752			test.status = test.counts.get_status()
753	stdout.print_with_timestamp(DIVIDER)
754	print_summary_line(test)
755	return test
756