1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13import re
14import sys
15
16from enum import Enum, auto
17from typing import Iterable, Iterator, List, Optional, Tuple
18
19from kunit_printer import stdout
20
21class Test:
22	"""
23	A class to represent a test parsed from KTAP results. All KTAP
24	results within a test log are stored in a main Test object as
25	subtests.
26
27	Attributes:
28	status : TestStatus - status of the test
29	name : str - name of the test
30	expected_count : int - expected number of subtests (0 if single
31		test case and None if unknown expected number of subtests)
32	subtests : List[Test] - list of subtests
33	log : List[str] - log of KTAP lines that correspond to the test
34	counts : TestCounts - counts of the test statuses and errors of
35		subtests or of the test itself if the test is a single
36		test case.
37	"""
38	def __init__(self) -> None:
39		"""Creates Test object with default attributes."""
40		self.status = TestStatus.TEST_CRASHED
41		self.name = ''
42		self.expected_count = 0  # type: Optional[int]
43		self.subtests = []  # type: List[Test]
44		self.log = []  # type: List[str]
45		self.counts = TestCounts()
46
47	def __str__(self) -> str:
48		"""Returns string representation of a Test class object."""
49		return (f'Test({self.status}, {self.name}, {self.expected_count}, '
50			f'{self.subtests}, {self.log}, {self.counts})')
51
52	def __repr__(self) -> str:
53		"""Returns string representation of a Test class object."""
54		return str(self)
55
56	def add_error(self, error_message: str) -> None:
57		"""Records an error that occurred while parsing this test."""
58		self.counts.errors += 1
59		stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
60
61class TestStatus(Enum):
62	"""An enumeration class to represent the status of a test."""
63	SUCCESS = auto()
64	FAILURE = auto()
65	SKIPPED = auto()
66	TEST_CRASHED = auto()
67	NO_TESTS = auto()
68	FAILURE_TO_PARSE_TESTS = auto()
69
70class TestCounts:
71	"""
72	Tracks the counts of statuses of all test cases and any errors within
73	a Test.
74
75	Attributes:
76	passed : int - the number of tests that have passed
77	failed : int - the number of tests that have failed
78	crashed : int - the number of tests that have crashed
79	skipped : int - the number of tests that have skipped
80	errors : int - the number of errors in the test and subtests
81	"""
82	def __init__(self):
83		"""Creates TestCounts object with counts of all test
84		statuses and test errors set to 0.
85		"""
86		self.passed = 0
87		self.failed = 0
88		self.crashed = 0
89		self.skipped = 0
90		self.errors = 0
91
92	def __str__(self) -> str:
93		"""Returns the string representation of a TestCounts object."""
94		statuses = [('passed', self.passed), ('failed', self.failed),
95			('crashed', self.crashed), ('skipped', self.skipped),
96			('errors', self.errors)]
97		return f'Ran {self.total()} tests: ' + \
98			', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
99
100	def total(self) -> int:
101		"""Returns the total number of test cases within a test
102		object, where a test case is a test with no subtests.
103		"""
104		return (self.passed + self.failed + self.crashed +
105			self.skipped)
106
107	def add_subtest_counts(self, counts: TestCounts) -> None:
108		"""
109		Adds the counts of another TestCounts object to the current
110		TestCounts object. Used to add the counts of a subtest to the
111		parent test.
112
113		Parameters:
114		counts - a different TestCounts object whose counts
115			will be added to the counts of the TestCounts object
116		"""
117		self.passed += counts.passed
118		self.failed += counts.failed
119		self.crashed += counts.crashed
120		self.skipped += counts.skipped
121		self.errors += counts.errors
122
123	def get_status(self) -> TestStatus:
124		"""Returns the aggregated status of a Test using test
125		counts.
126		"""
127		if self.total() == 0:
128			return TestStatus.NO_TESTS
129		if self.crashed:
130			# Crashes should take priority.
131			return TestStatus.TEST_CRASHED
132		if self.failed:
133			return TestStatus.FAILURE
134		if self.passed:
135			# No failures or crashes, looks good!
136			return TestStatus.SUCCESS
137		# We have only skipped tests.
138		return TestStatus.SKIPPED
139
140	def add_status(self, status: TestStatus) -> None:
141		"""Increments the count for `status`."""
142		if status == TestStatus.SUCCESS:
143			self.passed += 1
144		elif status == TestStatus.FAILURE:
145			self.failed += 1
146		elif status == TestStatus.SKIPPED:
147			self.skipped += 1
148		elif status != TestStatus.NO_TESTS:
149			self.crashed += 1
150
151class LineStream:
152	"""
153	A class to represent the lines of kernel output.
154	Provides a lazy peek()/pop() interface over an iterator of
155	(line#, text).
156	"""
157	_lines: Iterator[Tuple[int, str]]
158	_next: Tuple[int, str]
159	_need_next: bool
160	_done: bool
161
162	def __init__(self, lines: Iterator[Tuple[int, str]]):
163		"""Creates a new LineStream that wraps the given iterator."""
164		self._lines = lines
165		self._done = False
166		self._need_next = True
167		self._next = (0, '')
168
169	def _get_next(self) -> None:
170		"""Advances the LineSteam to the next line, if necessary."""
171		if not self._need_next:
172			return
173		try:
174			self._next = next(self._lines)
175		except StopIteration:
176			self._done = True
177		finally:
178			self._need_next = False
179
180	def peek(self) -> str:
181		"""Returns the current line, without advancing the LineStream.
182		"""
183		self._get_next()
184		return self._next[1]
185
186	def pop(self) -> str:
187		"""Returns the current line and advances the LineStream to
188		the next line.
189		"""
190		s = self.peek()
191		if self._done:
192			raise ValueError(f'LineStream: going past EOF, last line was {s}')
193		self._need_next = True
194		return s
195
196	def __bool__(self) -> bool:
197		"""Returns True if stream has more lines."""
198		self._get_next()
199		return not self._done
200
201	# Only used by kunit_tool_test.py.
202	def __iter__(self) -> Iterator[str]:
203		"""Empties all lines stored in LineStream object into
204		Iterator object and returns the Iterator object.
205		"""
206		while bool(self):
207			yield self.pop()
208
209	def line_number(self) -> int:
210		"""Returns the line number of the current line."""
211		self._get_next()
212		return self._next[0]
213
214# Parsing helper methods:
215
216KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
217TAP_START = re.compile(r'TAP version ([0-9]+)$')
218KTAP_END = re.compile('(List of all partitions:|'
219	'Kernel panic - not syncing: VFS:|reboot: System halted)')
220
221def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
222	"""Extracts KTAP lines from the kernel output."""
223	def isolate_ktap_output(kernel_output: Iterable[str]) \
224			-> Iterator[Tuple[int, str]]:
225		line_num = 0
226		started = False
227		for line in kernel_output:
228			line_num += 1
229			line = line.rstrip()  # remove trailing \n
230			if not started and KTAP_START.search(line):
231				# start extracting KTAP lines and set prefix
232				# to number of characters before version line
233				prefix_len = len(
234					line.split('KTAP version')[0])
235				started = True
236				yield line_num, line[prefix_len:]
237			elif not started and TAP_START.search(line):
238				# start extracting KTAP lines and set prefix
239				# to number of characters before version line
240				prefix_len = len(line.split('TAP version')[0])
241				started = True
242				yield line_num, line[prefix_len:]
243			elif started and KTAP_END.search(line):
244				# stop extracting KTAP lines
245				break
246			elif started:
247				# remove prefix and any indention and yield
248				# line with line number
249				line = line[prefix_len:].lstrip()
250				yield line_num, line
251	return LineStream(lines=isolate_ktap_output(kernel_output))
252
253KTAP_VERSIONS = [1]
254TAP_VERSIONS = [13, 14]
255
256def check_version(version_num: int, accepted_versions: List[int],
257			version_type: str, test: Test) -> None:
258	"""
259	Adds error to test object if version number is too high or too
260	low.
261
262	Parameters:
263	version_num - The inputted version number from the parsed KTAP or TAP
264		header line
265	accepted_version - List of accepted KTAP or TAP versions
266	version_type - 'KTAP' or 'TAP' depending on the type of
267		version line.
268	test - Test object for current test being parsed
269	"""
270	if version_num < min(accepted_versions):
271		test.add_error(f'{version_type} version lower than expected!')
272	elif version_num > max(accepted_versions):
273		test.add_error(f'{version_type} version higer than expected!')
274
275def parse_ktap_header(lines: LineStream, test: Test) -> bool:
276	"""
277	Parses KTAP/TAP header line and checks version number.
278	Returns False if fails to parse KTAP/TAP header line.
279
280	Accepted formats:
281	- 'KTAP version [version number]'
282	- 'TAP version [version number]'
283
284	Parameters:
285	lines - LineStream of KTAP output to parse
286	test - Test object for current test being parsed
287
288	Return:
289	True if successfully parsed KTAP/TAP header line
290	"""
291	ktap_match = KTAP_START.match(lines.peek())
292	tap_match = TAP_START.match(lines.peek())
293	if ktap_match:
294		version_num = int(ktap_match.group(1))
295		check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
296	elif tap_match:
297		version_num = int(tap_match.group(1))
298		check_version(version_num, TAP_VERSIONS, 'TAP', test)
299	else:
300		return False
301	test.log.append(lines.pop())
302	return True
303
304TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
305
306def parse_test_header(lines: LineStream, test: Test) -> bool:
307	"""
308	Parses test header and stores test name in test object.
309	Returns False if fails to parse test header line.
310
311	Accepted format:
312	- '# Subtest: [test name]'
313
314	Parameters:
315	lines - LineStream of KTAP output to parse
316	test - Test object for current test being parsed
317
318	Return:
319	True if successfully parsed test header line
320	"""
321	match = TEST_HEADER.match(lines.peek())
322	if not match:
323		return False
324	test.log.append(lines.pop())
325	test.name = match.group(1)
326	return True
327
328TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
329
330def parse_test_plan(lines: LineStream, test: Test) -> bool:
331	"""
332	Parses test plan line and stores the expected number of subtests in
333	test object. Reports an error if expected count is 0.
334	Returns False and sets expected_count to None if there is no valid test
335	plan.
336
337	Accepted format:
338	- '1..[number of subtests]'
339
340	Parameters:
341	lines - LineStream of KTAP output to parse
342	test - Test object for current test being parsed
343
344	Return:
345	True if successfully parsed test plan line
346	"""
347	match = TEST_PLAN.match(lines.peek())
348	if not match:
349		test.expected_count = None
350		return False
351	test.log.append(lines.pop())
352	expected_count = int(match.group(1))
353	test.expected_count = expected_count
354	return True
355
356TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
357
358TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
359
360def peek_test_name_match(lines: LineStream, test: Test) -> bool:
361	"""
362	Matches current line with the format of a test result line and checks
363	if the name matches the name of the current test.
364	Returns False if fails to match format or name.
365
366	Accepted format:
367	- '[ok|not ok] [test number] [-] [test name] [optional skip
368		directive]'
369
370	Parameters:
371	lines - LineStream of KTAP output to parse
372	test - Test object for current test being parsed
373
374	Return:
375	True if matched a test result line and the name matching the
376		expected test name
377	"""
378	line = lines.peek()
379	match = TEST_RESULT.match(line)
380	if not match:
381		return False
382	name = match.group(4)
383	return name == test.name
384
385def parse_test_result(lines: LineStream, test: Test,
386			expected_num: int) -> bool:
387	"""
388	Parses test result line and stores the status and name in the test
389	object. Reports an error if the test number does not match expected
390	test number.
391	Returns False if fails to parse test result line.
392
393	Note that the SKIP directive is the only direction that causes a
394	change in status.
395
396	Accepted format:
397	- '[ok|not ok] [test number] [-] [test name] [optional skip
398		directive]'
399
400	Parameters:
401	lines - LineStream of KTAP output to parse
402	test - Test object for current test being parsed
403	expected_num - expected test number for current test
404
405	Return:
406	True if successfully parsed a test result line.
407	"""
408	line = lines.peek()
409	match = TEST_RESULT.match(line)
410	skip_match = TEST_RESULT_SKIP.match(line)
411
412	# Check if line matches test result line format
413	if not match:
414		return False
415	test.log.append(lines.pop())
416
417	# Set name of test object
418	if skip_match:
419		test.name = skip_match.group(4)
420	else:
421		test.name = match.group(4)
422
423	# Check test num
424	num = int(match.group(2))
425	if num != expected_num:
426		test.add_error(f'Expected test number {expected_num} but found {num}')
427
428	# Set status of test object
429	status = match.group(1)
430	if skip_match:
431		test.status = TestStatus.SKIPPED
432	elif status == 'ok':
433		test.status = TestStatus.SUCCESS
434	else:
435		test.status = TestStatus.FAILURE
436	return True
437
438def parse_diagnostic(lines: LineStream) -> List[str]:
439	"""
440	Parse lines that do not match the format of a test result line or
441	test header line and returns them in list.
442
443	Line formats that are not parsed:
444	- '# Subtest: [test name]'
445	- '[ok|not ok] [test number] [-] [test name] [optional skip
446		directive]'
447
448	Parameters:
449	lines - LineStream of KTAP output to parse
450
451	Return:
452	Log of diagnostic lines
453	"""
454	log = []  # type: List[str]
455	while lines and not TEST_RESULT.match(lines.peek()) and not \
456			TEST_HEADER.match(lines.peek()):
457		log.append(lines.pop())
458	return log
459
460
461# Printing helper methods:
462
463DIVIDER = '=' * 60
464
465def format_test_divider(message: str, len_message: int) -> str:
466	"""
467	Returns string with message centered in fixed width divider.
468
469	Example:
470	'===================== message example ====================='
471
472	Parameters:
473	message - message to be centered in divider line
474	len_message - length of the message to be printed such that
475		any characters of the color codes are not counted
476
477	Return:
478	String containing message centered in fixed width divider
479	"""
480	default_count = 3  # default number of dashes
481	len_1 = default_count
482	len_2 = default_count
483	difference = len(DIVIDER) - len_message - 2  # 2 spaces added
484	if difference > 0:
485		# calculate number of dashes for each side of the divider
486		len_1 = int(difference / 2)
487		len_2 = difference - len_1
488	return ('=' * len_1) + f' {message} ' + ('=' * len_2)
489
490def print_test_header(test: Test) -> None:
491	"""
492	Prints test header with test name and optionally the expected number
493	of subtests.
494
495	Example:
496	'=================== example (2 subtests) ==================='
497
498	Parameters:
499	test - Test object representing current test being printed
500	"""
501	message = test.name
502	if test.expected_count:
503		if test.expected_count == 1:
504			message += ' (1 subtest)'
505		else:
506			message += f' ({test.expected_count} subtests)'
507	stdout.print_with_timestamp(format_test_divider(message, len(message)))
508
509def print_log(log: Iterable[str]) -> None:
510	"""Prints all strings in saved log for test in yellow."""
511	for m in log:
512		stdout.print_with_timestamp(stdout.yellow(m))
513
514def format_test_result(test: Test) -> str:
515	"""
516	Returns string with formatted test result with colored status and test
517	name.
518
519	Example:
520	'[PASSED] example'
521
522	Parameters:
523	test - Test object representing current test being printed
524
525	Return:
526	String containing formatted test result
527	"""
528	if test.status == TestStatus.SUCCESS:
529		return stdout.green('[PASSED] ') + test.name
530	if test.status == TestStatus.SKIPPED:
531		return stdout.yellow('[SKIPPED] ') + test.name
532	if test.status == TestStatus.NO_TESTS:
533		return stdout.yellow('[NO TESTS RUN] ') + test.name
534	if test.status == TestStatus.TEST_CRASHED:
535		print_log(test.log)
536		return stdout.red('[CRASHED] ') + test.name
537	print_log(test.log)
538	return stdout.red('[FAILED] ') + test.name
539
540def print_test_result(test: Test) -> None:
541	"""
542	Prints result line with status of test.
543
544	Example:
545	'[PASSED] example'
546
547	Parameters:
548	test - Test object representing current test being printed
549	"""
550	stdout.print_with_timestamp(format_test_result(test))
551
552def print_test_footer(test: Test) -> None:
553	"""
554	Prints test footer with status of test.
555
556	Example:
557	'===================== [PASSED] example ====================='
558
559	Parameters:
560	test - Test object representing current test being printed
561	"""
562	message = format_test_result(test)
563	stdout.print_with_timestamp(format_test_divider(message,
564		len(message) - stdout.color_len()))
565
566def print_summary_line(test: Test) -> None:
567	"""
568	Prints summary line of test object. Color of line is dependent on
569	status of test. Color is green if test passes, yellow if test is
570	skipped, and red if the test fails or crashes. Summary line contains
571	counts of the statuses of the tests subtests or the test itself if it
572	has no subtests.
573
574	Example:
575	"Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
576	Errors: 0"
577
578	test - Test object representing current test being printed
579	"""
580	if test.status == TestStatus.SUCCESS:
581		color = stdout.green
582	elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
583		color = stdout.yellow
584	else:
585		color = stdout.red
586	stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
587
588# Other methods:
589
590def bubble_up_test_results(test: Test) -> None:
591	"""
592	If the test has subtests, add the test counts of the subtests to the
593	test and check if any of the tests crashed and if so set the test
594	status to crashed. Otherwise if the test has no subtests add the
595	status of the test to the test counts.
596
597	Parameters:
598	test - Test object for current test being parsed
599	"""
600	subtests = test.subtests
601	counts = test.counts
602	status = test.status
603	for t in subtests:
604		counts.add_subtest_counts(t.counts)
605	if counts.total() == 0:
606		counts.add_status(status)
607	elif test.counts.get_status() == TestStatus.TEST_CRASHED:
608		test.status = TestStatus.TEST_CRASHED
609
610def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
611	"""
612	Finds next test to parse in LineStream, creates new Test object,
613	parses any subtests of the test, populates Test object with all
614	information (status, name) about the test and the Test objects for
615	any subtests, and then returns the Test object. The method accepts
616	three formats of tests:
617
618	Accepted test formats:
619
620	- Main KTAP/TAP header
621
622	Example:
623
624	KTAP version 1
625	1..4
626	[subtests]
627
628	- Subtest header line
629
630	Example:
631
632	# Subtest: name
633	1..3
634	[subtests]
635	ok 1 name
636
637	- Test result line
638
639	Example:
640
641	ok 1 - test
642
643	Parameters:
644	lines - LineStream of KTAP output to parse
645	expected_num - expected test number for test to be parsed
646	log - list of strings containing any preceding diagnostic lines
647		corresponding to the current test
648
649	Return:
650	Test object populated with characteristics and any subtests
651	"""
652	test = Test()
653	test.log.extend(log)
654	parent_test = False
655	main = parse_ktap_header(lines, test)
656	if main:
657		# If KTAP/TAP header is found, attempt to parse
658		# test plan
659		test.name = "main"
660		parse_test_plan(lines, test)
661		parent_test = True
662	else:
663		# If KTAP/TAP header is not found, test must be subtest
664		# header or test result line so parse attempt to parser
665		# subtest header
666		parent_test = parse_test_header(lines, test)
667		if parent_test:
668			# If subtest header is found, attempt to parse
669			# test plan and print header
670			parse_test_plan(lines, test)
671			print_test_header(test)
672	expected_count = test.expected_count
673	subtests = []
674	test_num = 1
675	while parent_test and (expected_count is None or test_num <= expected_count):
676		# Loop to parse any subtests.
677		# Break after parsing expected number of tests or
678		# if expected number of tests is unknown break when test
679		# result line with matching name to subtest header is found
680		# or no more lines in stream.
681		sub_log = parse_diagnostic(lines)
682		sub_test = Test()
683		if not lines or (peek_test_name_match(lines, test) and
684				not main):
685			if expected_count and test_num <= expected_count:
686				# If parser reaches end of test before
687				# parsing expected number of subtests, print
688				# crashed subtest and record error
689				test.add_error('missing expected subtest!')
690				sub_test.log.extend(sub_log)
691				test.counts.add_status(
692					TestStatus.TEST_CRASHED)
693				print_test_result(sub_test)
694			else:
695				test.log.extend(sub_log)
696				break
697		else:
698			sub_test = parse_test(lines, test_num, sub_log)
699		subtests.append(sub_test)
700		test_num += 1
701	test.subtests = subtests
702	if not main:
703		# If not main test, look for test result line
704		test.log.extend(parse_diagnostic(lines))
705		if (parent_test and peek_test_name_match(lines, test)) or \
706				not parent_test:
707			parse_test_result(lines, test, expected_num)
708		else:
709			test.add_error('missing subtest result line!')
710
711	# Check for there being no tests
712	if parent_test and len(subtests) == 0:
713		# Don't override a bad status if this test had one reported.
714		# Assumption: no subtests means CRASHED is from Test.__init__()
715		if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
716			test.status = TestStatus.NO_TESTS
717			test.add_error('0 tests run!')
718
719	# Add statuses to TestCounts attribute in Test object
720	bubble_up_test_results(test)
721	if parent_test and not main:
722		# If test has subtests and is not the main test object, print
723		# footer.
724		print_test_footer(test)
725	elif not main:
726		print_test_result(test)
727	return test
728
729def parse_run_tests(kernel_output: Iterable[str]) -> Test:
730	"""
731	Using kernel output, extract KTAP lines, parse the lines for test
732	results and print condensed test results and summary line.
733
734	Parameters:
735	kernel_output - Iterable object contains lines of kernel output
736
737	Return:
738	Test - the main test object with all subtests.
739	"""
740	stdout.print_with_timestamp(DIVIDER)
741	lines = extract_tap_lines(kernel_output)
742	test = Test()
743	if not lines:
744		test.name = '<missing>'
745		test.add_error('could not find any KTAP output!')
746		test.status = TestStatus.FAILURE_TO_PARSE_TESTS
747	else:
748		test = parse_test(lines, 0, [])
749		if test.status != TestStatus.NO_TESTS:
750			test.status = test.counts.get_status()
751	stdout.print_with_timestamp(DIVIDER)
752	print_summary_line(test)
753	return test
754