1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
26 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
31 class TestCase(object):
38 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
43 class TestStatus(Enum):
48 FAILURE_TO_PARSE_TESTS = auto()
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
54 def isolate_kunit_output(kernel_output):
56 for line in kernel_output:
57 line = line.rstrip() # line always has a trailing \n
58 if kunit_start_re.search(line):
59 prefix_len = len(line.split('TAP version')[0])
61 yield line[prefix_len:] if prefix_len > 0 else line
62 elif kunit_end_re.search(line):
65 yield line[prefix_len:] if prefix_len > 0 else line
67 def raw_output(kernel_output):
68 for line in kernel_output:
76 return '\033[1;31m' + text + RESET
79 return '\033[1;33m' + text + RESET
82 return '\033[1;32m' + text + RESET
84 def print_with_timestamp(message):
85 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
87 def format_suite_divider(message):
88 return '======== ' + message + ' ========'
90 def print_suite_divider(message):
91 print_with_timestamp(DIVIDER)
92 print_with_timestamp(format_suite_divider(message))
96 print_with_timestamp(m)
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
100 def consume_non_diagnositic(lines: List[str]) -> None:
101 while lines and not TAP_ENTRIES.match(lines[0]):
104 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
105 while lines and not TAP_ENTRIES.match(lines[0]):
106 test_case.log.append(lines[0])
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116 save_non_diagnositic(lines, test_case)
118 test_case.status = TestStatus.TEST_CRASHED
121 match = OK_NOT_OK_SUBTEST.match(line)
122 while not match and lines:
124 match = OK_NOT_OK_SUBTEST.match(line)
126 test_case.log.append(lines.pop(0))
127 test_case.name = match.group(2)
128 if test_case.status == TestStatus.TEST_CRASHED:
130 if match.group(1) == 'ok':
131 test_case.status = TestStatus.SUCCESS
133 test_case.status = TestStatus.FAILURE
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# .*?: (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = 'kunit test case crashed!'
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142 save_non_diagnositic(lines, test_case)
146 match = SUBTEST_DIAGNOSTIC.match(line)
148 test_case.log.append(lines.pop(0))
149 if match.group(1) == DIAGNOSTIC_CRASH_MESSAGE:
150 test_case.status = TestStatus.TEST_CRASHED
155 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
156 test_case = TestCase()
157 save_non_diagnositic(lines, test_case)
158 while parse_diagnostic(lines, test_case):
160 if parse_ok_not_ok_test_case(lines, test_case):
165 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
167 def parse_subtest_header(lines: List[str]) -> Optional[str]:
168 consume_non_diagnositic(lines)
171 match = SUBTEST_HEADER.match(lines[0])
174 return match.group(1)
178 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
180 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
181 consume_non_diagnositic(lines)
182 match = SUBTEST_PLAN.match(lines[0])
185 return int(match.group(1))
189 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
190 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
191 return TestStatus.TEST_CRASHED
192 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
193 return TestStatus.FAILURE
194 elif left != TestStatus.SUCCESS:
196 elif right != TestStatus.SUCCESS:
199 return TestStatus.SUCCESS
201 def parse_ok_not_ok_test_suite(lines: List[str],
202 test_suite: TestSuite,
203 expected_suite_index: int) -> bool:
204 consume_non_diagnositic(lines)
206 test_suite.status = TestStatus.TEST_CRASHED
209 match = OK_NOT_OK_MODULE.match(line)
212 if match.group(1) == 'ok':
213 test_suite.status = TestStatus.SUCCESS
215 test_suite.status = TestStatus.FAILURE
216 suite_index = int(match.group(2))
217 if suite_index != expected_suite_index:
218 print_with_timestamp(
219 red('[ERROR] ') + 'expected_suite_index ' +
220 str(expected_suite_index) + ', but got ' +
226 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
227 status_list = map(to_status, status_container_list)
228 return reduce(max_status, status_list, TestStatus.SUCCESS)
230 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
231 max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
232 return max_status(max_test_case_status, test_suite.status)
234 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
237 consume_non_diagnositic(lines)
238 test_suite = TestSuite()
239 test_suite.status = TestStatus.SUCCESS
240 name = parse_subtest_header(lines)
243 test_suite.name = name
244 expected_test_case_num = parse_subtest_plan(lines)
245 if expected_test_case_num is None:
247 while expected_test_case_num > 0:
248 test_case = parse_test_case(lines)
251 test_suite.cases.append(test_case)
252 expected_test_case_num -= 1
253 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
254 test_suite.status = bubble_up_test_case_errors(test_suite)
257 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
260 print('failed to parse end of suite' + lines[0])
263 TAP_HEADER = re.compile(r'^TAP version 14$')
265 def parse_tap_header(lines: List[str]) -> bool:
266 consume_non_diagnositic(lines)
267 if TAP_HEADER.match(lines[0]):
273 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
275 def parse_test_plan(lines: List[str]) -> Optional[int]:
276 consume_non_diagnositic(lines)
277 match = TEST_PLAN.match(lines[0])
280 return int(match.group(1))
284 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
285 return bubble_up_errors(lambda x: x.status, test_suite_list)
287 def parse_test_result(lines: List[str]) -> TestResult:
288 consume_non_diagnositic(lines)
289 if not lines or not parse_tap_header(lines):
290 return TestResult(TestStatus.NO_TESTS, [], lines)
291 expected_test_suite_num = parse_test_plan(lines)
292 if not expected_test_suite_num:
293 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
295 for i in range(1, expected_test_suite_num + 1):
296 test_suite = parse_test_suite(lines, i)
298 test_suites.append(test_suite)
300 print_with_timestamp(
301 red('[ERROR] ') + ' expected ' +
302 str(expected_test_suite_num) +
303 ' test suites, but got ' + str(i - 2))
305 test_suite = parse_test_suite(lines, -1)
307 print_with_timestamp(red('[ERROR] ') +
308 'got unexpected test suite: ' + test_suite.name)
310 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
312 return TestResult(TestStatus.NO_TESTS, [], lines)
314 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
318 for test_suite in test_result.suites:
319 if test_suite.status == TestStatus.SUCCESS:
320 print_suite_divider(green('[PASSED] ') + test_suite.name)
321 elif test_suite.status == TestStatus.TEST_CRASHED:
322 print_suite_divider(red('[CRASHED] ' + test_suite.name))
324 print_suite_divider(red('[FAILED] ') + test_suite.name)
325 for test_case in test_suite.cases:
327 if test_case.status == TestStatus.SUCCESS:
328 print_with_timestamp(green('[PASSED] ') + test_case.name)
329 elif test_case.status == TestStatus.TEST_CRASHED:
331 print_with_timestamp(red('[CRASHED] ' + test_case.name))
332 print_log(map(yellow, test_case.log))
333 print_with_timestamp('')
336 print_with_timestamp(red('[FAILED] ') + test_case.name)
337 print_log(map(yellow, test_case.log))
338 print_with_timestamp('')
339 return total_tests, failed_tests, crashed_tests
341 def parse_run_tests(kernel_output) -> TestResult:
345 test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
346 if test_result.status == TestStatus.NO_TESTS:
347 print(red('[ERROR] ') + yellow('no tests run!'))
348 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
349 print(red('[ERROR] ') + yellow('could not parse test results!'))
353 crashed_tests) = print_and_count_results(test_result)
354 print_with_timestamp(DIVIDER)
355 fmt = green if test_result.status == TestStatus.SUCCESS else red
356 print_with_timestamp(
357 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
358 (total_tests, failed_tests, crashed_tests)))