1 # SPDX-License-Identifier: GPL-2.0
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
7 # Copyright (C) 2019, Google LLC.
12 from __future__ import annotations
13 from dataclasses import dataclass
18 from enum import Enum, auto
19 from typing import Iterable, Iterator, List, Optional, Tuple
21 from kunit_printer import stdout
25 A class to represent a test parsed from KTAP results. All KTAP
26 results within a test log are stored in a main Test object as
30 status : TestStatus - status of the test
31 name : str - name of the test
32 expected_count : int - expected number of subtests (0 if single
33 test case and None if unknown expected number of subtests)
34 subtests : List[Test] - list of subtests
35 log : List[str] - log of KTAP lines that correspond to the test
36 counts : TestCounts - counts of the test statuses and errors of
37 subtests or of the test itself if the test is a single
40 def __init__(self) -> None:
41 """Creates Test object with default attributes."""
42 self.status = TestStatus.TEST_CRASHED
44 self.expected_count = 0 # type: Optional[int]
45 self.subtests = [] # type: List[Test]
46 self.log = [] # type: List[str]
47 self.counts = TestCounts()
49 def __str__(self) -> str:
50 """Returns string representation of a Test class object."""
51 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
52 f'{self.subtests}, {self.log}, {self.counts})')
54 def __repr__(self) -> str:
55 """Returns string representation of a Test class object."""
58 def add_error(self, error_message: str) -> None:
59 """Records an error that occurred while parsing this test."""
60 self.counts.errors += 1
61 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
63 def ok_status(self) -> bool:
64 """Returns true if the status was ok, i.e. passed or skipped."""
65 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
67 class TestStatus(Enum):
68 """An enumeration class to represent the status of a test."""
74 FAILURE_TO_PARSE_TESTS = auto()
79 Tracks the counts of statuses of all test cases and any errors within
88 def __str__(self) -> str:
89 """Returns the string representation of a TestCounts object."""
90 statuses = [('passed', self.passed), ('failed', self.failed),
91 ('crashed', self.crashed), ('skipped', self.skipped),
92 ('errors', self.errors)]
93 return f'Ran {self.total()} tests: ' + \
94 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
96 def total(self) -> int:
97 """Returns the total number of test cases within a test
98 object, where a test case is a test with no subtests.
100 return (self.passed + self.failed + self.crashed +
103 def add_subtest_counts(self, counts: TestCounts) -> None:
105 Adds the counts of another TestCounts object to the current
106 TestCounts object. Used to add the counts of a subtest to the
110 counts - a different TestCounts object whose counts
111 will be added to the counts of the TestCounts object
113 self.passed += counts.passed
114 self.failed += counts.failed
115 self.crashed += counts.crashed
116 self.skipped += counts.skipped
117 self.errors += counts.errors
119 def get_status(self) -> TestStatus:
120 """Returns the aggregated status of a Test using test
123 if self.total() == 0:
124 return TestStatus.NO_TESTS
126 # Crashes should take priority.
127 return TestStatus.TEST_CRASHED
129 return TestStatus.FAILURE
131 # No failures or crashes, looks good!
132 return TestStatus.SUCCESS
133 # We have only skipped tests.
134 return TestStatus.SKIPPED
136 def add_status(self, status: TestStatus) -> None:
137 """Increments the count for `status`."""
138 if status == TestStatus.SUCCESS:
140 elif status == TestStatus.FAILURE:
142 elif status == TestStatus.SKIPPED:
144 elif status != TestStatus.NO_TESTS:
149 A class to represent the lines of kernel output.
150 Provides a lazy peek()/pop() interface over an iterator of
153 _lines: Iterator[Tuple[int, str]]
154 _next: Tuple[int, str]
158 def __init__(self, lines: Iterator[Tuple[int, str]]):
159 """Creates a new LineStream that wraps the given iterator."""
162 self._need_next = True
165 def _get_next(self) -> None:
166 """Advances the LineSteam to the next line, if necessary."""
167 if not self._need_next:
170 self._next = next(self._lines)
171 except StopIteration:
174 self._need_next = False
176 def peek(self) -> str:
177 """Returns the current line, without advancing the LineStream.
182 def pop(self) -> str:
183 """Returns the current line and advances the LineStream to
188 raise ValueError(f'LineStream: going past EOF, last line was {s}')
189 self._need_next = True
192 def __bool__(self) -> bool:
193 """Returns True if stream has more lines."""
195 return not self._done
197 # Only used by kunit_tool_test.py.
198 def __iter__(self) -> Iterator[str]:
199 """Empties all lines stored in LineStream object into
200 Iterator object and returns the Iterator object.
205 def line_number(self) -> int:
206 """Returns the line number of the current line."""
210 # Parsing helper methods:
212 KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
213 TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
214 KTAP_END = re.compile(r'\s*(List of all partitions:|'
215 'Kernel panic - not syncing: VFS:|reboot: System halted)')
217 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
223 for line in kernel_output:
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
230 line.split('KTAP version')[0])
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
243 # remove the prefix, if any.
244 line = line[prefix_len:]
246 return LineStream(lines=isolate_ktap_output(kernel_output))
249 TAP_VERSIONS = [13, 14]
251 def check_version(version_num: int, accepted_versions: List[int],
252 version_type: str, test: Test) -> None:
254 Adds error to test object if version number is too high or too
258 version_num - The inputted version number from the parsed KTAP or TAP
260 accepted_version - List of accepted KTAP or TAP versions
261 version_type - 'KTAP' or 'TAP' depending on the type of
263 test - Test object for current test being parsed
265 if version_num < min(accepted_versions):
266 test.add_error(f'{version_type} version lower than expected!')
267 elif version_num > max(accepted_versions):
268 test.add_error(f'{version_type} version higer than expected!')
270 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
272 Parses KTAP/TAP header line and checks version number.
273 Returns False if fails to parse KTAP/TAP header line.
276 - 'KTAP version [version number]'
277 - 'TAP version [version number]'
280 lines - LineStream of KTAP output to parse
281 test - Test object for current test being parsed
284 True if successfully parsed KTAP/TAP header line
286 ktap_match = KTAP_START.match(lines.peek())
287 tap_match = TAP_START.match(lines.peek())
289 version_num = int(ktap_match.group(1))
290 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
292 version_num = int(tap_match.group(1))
293 check_version(version_num, TAP_VERSIONS, 'TAP', test)
299 TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
301 def parse_test_header(lines: LineStream, test: Test) -> bool:
303 Parses test header and stores test name in test object.
304 Returns False if fails to parse test header line.
307 - '# Subtest: [test name]'
310 lines - LineStream of KTAP output to parse
311 test - Test object for current test being parsed
314 True if successfully parsed test header line
316 match = TEST_HEADER.match(lines.peek())
319 test.name = match.group(1)
323 TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
325 def parse_test_plan(lines: LineStream, test: Test) -> bool:
327 Parses test plan line and stores the expected number of subtests in
328 test object. Reports an error if expected count is 0.
329 Returns False and sets expected_count to None if there is no valid test
333 - '1..[number of subtests]'
336 lines - LineStream of KTAP output to parse
337 test - Test object for current test being parsed
340 True if successfully parsed test plan line
342 match = TEST_PLAN.match(lines.peek())
344 test.expected_count = None
346 expected_count = int(match.group(1))
347 test.expected_count = expected_count
351 TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
353 TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
355 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
357 Matches current line with the format of a test result line and checks
358 if the name matches the name of the current test.
359 Returns False if fails to match format or name.
362 - '[ok|not ok] [test number] [-] [test name] [optional skip
366 lines - LineStream of KTAP output to parse
367 test - Test object for current test being parsed
370 True if matched a test result line and the name matching the
374 match = TEST_RESULT.match(line)
377 name = match.group(4)
378 return name == test.name
380 def parse_test_result(lines: LineStream, test: Test,
381 expected_num: int) -> bool:
383 Parses test result line and stores the status and name in the test
384 object. Reports an error if the test number does not match expected
386 Returns False if fails to parse test result line.
388 Note that the SKIP directive is the only direction that causes a
392 - '[ok|not ok] [test number] [-] [test name] [optional skip
396 lines - LineStream of KTAP output to parse
397 test - Test object for current test being parsed
398 expected_num - expected test number for current test
401 True if successfully parsed a test result line.
404 match = TEST_RESULT.match(line)
405 skip_match = TEST_RESULT_SKIP.match(line)
407 # Check if line matches test result line format
412 # Set name of test object
414 test.name = skip_match.group(4)
416 test.name = match.group(4)
419 num = int(match.group(2))
420 if num != expected_num:
421 test.add_error(f'Expected test number {expected_num} but found {num}')
423 # Set status of test object
424 status = match.group(1)
426 test.status = TestStatus.SKIPPED
428 test.status = TestStatus.SUCCESS
430 test.status = TestStatus.FAILURE
433 def parse_diagnostic(lines: LineStream) -> List[str]:
435 Parse lines that do not match the format of a test result line or
436 test header line and returns them in list.
438 Line formats that are not parsed:
439 - '# Subtest: [test name]'
440 - '[ok|not ok] [test number] [-] [test name] [optional skip
442 - 'KTAP version [version number]'
445 lines - LineStream of KTAP output to parse
448 Log of diagnostic lines
450 log = [] # type: List[str]
451 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START]
452 while lines and not any(re.match(lines.peek())
453 for re in non_diagnostic_lines):
454 log.append(lines.pop())
458 # Printing helper methods:
462 def format_test_divider(message: str, len_message: int) -> str:
464 Returns string with message centered in fixed width divider.
467 '===================== message example ====================='
470 message - message to be centered in divider line
471 len_message - length of the message to be printed such that
472 any characters of the color codes are not counted
475 String containing message centered in fixed width divider
477 default_count = 3 # default number of dashes
478 len_1 = default_count
479 len_2 = default_count
480 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
482 # calculate number of dashes for each side of the divider
483 len_1 = int(difference / 2)
484 len_2 = difference - len_1
485 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
487 def print_test_header(test: Test) -> None:
489 Prints test header with test name and optionally the expected number
493 '=================== example (2 subtests) ==================='
496 test - Test object representing current test being printed
500 # Add a leading space before the subtest counts only if a test name
501 # is provided using a "# Subtest" header line.
503 if test.expected_count:
504 if test.expected_count == 1:
505 message += '(1 subtest)'
507 message += f'({test.expected_count} subtests)'
508 stdout.print_with_timestamp(format_test_divider(message, len(message)))
510 def print_log(log: Iterable[str]) -> None:
511 """Prints all strings in saved log for test in yellow."""
512 formatted = textwrap.dedent('\n'.join(log))
513 for line in formatted.splitlines():
514 stdout.print_with_timestamp(stdout.yellow(line))
516 def format_test_result(test: Test) -> str:
518 Returns string with formatted test result with colored status and test
525 test - Test object representing current test being printed
528 String containing formatted test result
530 if test.status == TestStatus.SUCCESS:
531 return stdout.green('[PASSED] ') + test.name
532 if test.status == TestStatus.SKIPPED:
533 return stdout.yellow('[SKIPPED] ') + test.name
534 if test.status == TestStatus.NO_TESTS:
535 return stdout.yellow('[NO TESTS RUN] ') + test.name
536 if test.status == TestStatus.TEST_CRASHED:
538 return stdout.red('[CRASHED] ') + test.name
540 return stdout.red('[FAILED] ') + test.name
542 def print_test_result(test: Test) -> None:
544 Prints result line with status of test.
550 test - Test object representing current test being printed
552 stdout.print_with_timestamp(format_test_result(test))
554 def print_test_footer(test: Test) -> None:
556 Prints test footer with status of test.
559 '===================== [PASSED] example ====================='
562 test - Test object representing current test being printed
564 message = format_test_result(test)
565 stdout.print_with_timestamp(format_test_divider(message,
566 len(message) - stdout.color_len()))
570 def _summarize_failed_tests(test: Test) -> str:
571 """Tries to summarize all the failing subtests in `test`."""
573 def failed_names(test: Test, parent_name: str) -> List[str]:
574 # Note: we use 'main' internally for the top-level test.
575 if not parent_name or parent_name == 'main':
576 full_name = test.name
578 full_name = parent_name + '.' + test.name
580 if not test.subtests: # this is a leaf node
583 # If all the children failed, just say this subtest failed.
584 # Don't summarize it down "the top-level test failed", though.
585 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
586 if parent_name and len(failed_subtests) == len(test.subtests):
589 all_failures = [] # type: List[str]
590 for t in failed_subtests:
591 all_failures.extend(failed_names(t, full_name))
594 failures = failed_names(test, '')
595 # If there are too many failures, printing them out will just be noisy.
596 if len(failures) > 10: # this is an arbitrary limit
599 return 'Failures: ' + ', '.join(failures)
602 def print_summary_line(test: Test) -> None:
604 Prints summary line of test object. Color of line is dependent on
605 status of test. Color is green if test passes, yellow if test is
606 skipped, and red if the test fails or crashes. Summary line contains
607 counts of the statuses of the tests subtests or the test itself if it
611 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
614 test - Test object representing current test being printed
616 if test.status == TestStatus.SUCCESS:
618 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
619 color = stdout.yellow
622 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
624 # Summarize failures that might have gone off-screen since we had a lot
625 # of tests (arbitrarily defined as >=100 for now).
626 if test.ok_status() or test.counts.total() < 100:
628 summarized = _summarize_failed_tests(test)
631 stdout.print_with_timestamp(color(summarized))
635 def bubble_up_test_results(test: Test) -> None:
637 If the test has subtests, add the test counts of the subtests to the
638 test and check if any of the tests crashed and if so set the test
639 status to crashed. Otherwise if the test has no subtests add the
640 status of the test to the test counts.
643 test - Test object for current test being parsed
645 subtests = test.subtests
649 counts.add_subtest_counts(t.counts)
650 if counts.total() == 0:
651 counts.add_status(status)
652 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
653 test.status = TestStatus.TEST_CRASHED
655 def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
657 Finds next test to parse in LineStream, creates new Test object,
658 parses any subtests of the test, populates Test object with all
659 information (status, name) about the test and the Test objects for
660 any subtests, and then returns the Test object. The method accepts
661 three formats of tests:
663 Accepted test formats:
665 - Main KTAP/TAP header
673 - Subtest header (must include either the KTAP version line or
674 "# Subtest" header line)
676 Example (preferred format with both KTAP version line and
685 Example (only "# Subtest" line):
692 Example (only KTAP version line, compliant with KTAP v1 spec):
706 lines - LineStream of KTAP output to parse
707 expected_num - expected test number for test to be parsed
708 log - list of strings containing any preceding diagnostic lines
709 corresponding to the current test
710 is_subtest - boolean indicating whether test is a subtest
713 Test object populated with characteristics and any subtests
718 # If parsing the main/top-level test, parse KTAP version line and
721 ktap_line = parse_ktap_header(lines, test)
722 parse_test_plan(lines, test)
725 # If not the main test, attempt to parse a test header containing
726 # the KTAP version line and/or subtest header line
727 ktap_line = parse_ktap_header(lines, test)
728 subtest_line = parse_test_header(lines, test)
729 parent_test = (ktap_line or subtest_line)
731 # If KTAP version line and/or subtest header is found, attempt
732 # to parse test plan and print test header
733 parse_test_plan(lines, test)
734 print_test_header(test)
735 expected_count = test.expected_count
738 while parent_test and (expected_count is None or test_num <= expected_count):
739 # Loop to parse any subtests.
740 # Break after parsing expected number of tests or
741 # if expected number of tests is unknown break when test
742 # result line with matching name to subtest header is found
743 # or no more lines in stream.
744 sub_log = parse_diagnostic(lines)
746 if not lines or (peek_test_name_match(lines, test) and
748 if expected_count and test_num <= expected_count:
749 # If parser reaches end of test before
750 # parsing expected number of subtests, print
751 # crashed subtest and record error
752 test.add_error('missing expected subtest!')
753 sub_test.log.extend(sub_log)
754 test.counts.add_status(
755 TestStatus.TEST_CRASHED)
756 print_test_result(sub_test)
758 test.log.extend(sub_log)
761 sub_test = parse_test(lines, test_num, sub_log, True)
762 subtests.append(sub_test)
764 test.subtests = subtests
766 # If not main test, look for test result line
767 test.log.extend(parse_diagnostic(lines))
768 if test.name != "" and not peek_test_name_match(lines, test):
769 test.add_error('missing subtest result line!')
771 parse_test_result(lines, test, expected_num)
773 # Check for there being no subtests within parent test
774 if parent_test and len(subtests) == 0:
775 # Don't override a bad status if this test had one reported.
776 # Assumption: no subtests means CRASHED is from Test.__init__()
777 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
778 test.status = TestStatus.NO_TESTS
779 test.add_error('0 tests run!')
781 # Add statuses to TestCounts attribute in Test object
782 bubble_up_test_results(test)
783 if parent_test and is_subtest:
784 # If test has subtests and is not the main test object, print
786 print_test_footer(test)
788 print_test_result(test)
791 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
793 Using kernel output, extract KTAP lines, parse the lines for test
794 results and print condensed test results and summary line.
797 kernel_output - Iterable object contains lines of kernel output
800 Test - the main test object with all subtests.
802 stdout.print_with_timestamp(DIVIDER)
803 lines = extract_tap_lines(kernel_output)
806 test.name = '<missing>'
807 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
808 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
810 test = parse_test(lines, 0, [], False)
811 if test.status != TestStatus.NO_TESTS:
812 test.status = test.counts.get_status()
813 stdout.print_with_timestamp(DIVIDER)
814 print_summary_line(test)