1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
26 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
31 class TestCase(object):
38 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
43 class TestStatus(Enum):
48 FAILURE_TO_PARSE_TESTS = auto()
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
54 def isolate_kunit_output(kernel_output):
56 for line in kernel_output:
57 line = line.rstrip() # line always has a trailing \n
58 if kunit_start_re.search(line):
59 prefix_len = len(line.split('TAP version')[0])
61 yield line[prefix_len:] if prefix_len > 0 else line
62 elif kunit_end_re.search(line):
65 yield line[prefix_len:] if prefix_len > 0 else line
67 def raw_output(kernel_output):
68 for line in kernel_output:
76 return '\033[1;31m' + text + RESET
79 return '\033[1;33m' + text + RESET
82 return '\033[1;32m' + text + RESET
84 def print_with_timestamp(message):
85 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
87 def format_suite_divider(message):
88 return '======== ' + message + ' ========'
90 def print_suite_divider(message):
91 print_with_timestamp(DIVIDER)
92 print_with_timestamp(format_suite_divider(message))
96 print_with_timestamp(m)
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
100 def consume_non_diagnositic(lines: List[str]) -> None:
101 while lines and not TAP_ENTRIES.match(lines[0]):
104 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
105 while lines and not TAP_ENTRIES.match(lines[0]):
106 test_case.log.append(lines[0])
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116 save_non_diagnositic(lines, test_case)
118 test_case.status = TestStatus.TEST_CRASHED
121 match = OK_NOT_OK_SUBTEST.match(line)
122 while not match and lines:
124 match = OK_NOT_OK_SUBTEST.match(line)
126 test_case.log.append(lines.pop(0))
127 test_case.name = match.group(2)
128 if test_case.status == TestStatus.TEST_CRASHED:
130 if match.group(1) == 'ok':
131 test_case.status = TestStatus.SUCCESS
133 test_case.status = TestStatus.FAILURE
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142 save_non_diagnositic(lines, test_case)
146 match = SUBTEST_DIAGNOSTIC.match(line)
148 test_case.log.append(lines.pop(0))
149 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
151 test_case.status = TestStatus.TEST_CRASHED
156 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
157 test_case = TestCase()
158 save_non_diagnositic(lines, test_case)
159 while parse_diagnostic(lines, test_case):
161 if parse_ok_not_ok_test_case(lines, test_case):
166 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
168 def parse_subtest_header(lines: List[str]) -> Optional[str]:
169 consume_non_diagnositic(lines)
172 match = SUBTEST_HEADER.match(lines[0])
175 return match.group(1)
179 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
181 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
182 consume_non_diagnositic(lines)
183 match = SUBTEST_PLAN.match(lines[0])
186 return int(match.group(1))
190 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
191 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
192 return TestStatus.TEST_CRASHED
193 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
194 return TestStatus.FAILURE
195 elif left != TestStatus.SUCCESS:
197 elif right != TestStatus.SUCCESS:
200 return TestStatus.SUCCESS
202 def parse_ok_not_ok_test_suite(lines: List[str],
203 test_suite: TestSuite,
204 expected_suite_index: int) -> bool:
205 consume_non_diagnositic(lines)
207 test_suite.status = TestStatus.TEST_CRASHED
210 match = OK_NOT_OK_MODULE.match(line)
213 if match.group(1) == 'ok':
214 test_suite.status = TestStatus.SUCCESS
216 test_suite.status = TestStatus.FAILURE
217 suite_index = int(match.group(2))
218 if suite_index != expected_suite_index:
219 print_with_timestamp(
220 red('[ERROR] ') + 'expected_suite_index ' +
221 str(expected_suite_index) + ', but got ' +
227 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
228 status_list = map(to_status, status_container_list)
229 return reduce(max_status, status_list, TestStatus.SUCCESS)
231 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
232 max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
233 return max_status(max_test_case_status, test_suite.status)
235 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
238 consume_non_diagnositic(lines)
239 test_suite = TestSuite()
240 test_suite.status = TestStatus.SUCCESS
241 name = parse_subtest_header(lines)
244 test_suite.name = name
245 expected_test_case_num = parse_subtest_plan(lines)
246 if expected_test_case_num is None:
248 while expected_test_case_num > 0:
249 test_case = parse_test_case(lines)
252 test_suite.cases.append(test_case)
253 expected_test_case_num -= 1
254 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
255 test_suite.status = bubble_up_test_case_errors(test_suite)
258 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
261 print('failed to parse end of suite' + lines[0])
264 TAP_HEADER = re.compile(r'^TAP version 14$')
266 def parse_tap_header(lines: List[str]) -> bool:
267 consume_non_diagnositic(lines)
268 if TAP_HEADER.match(lines[0]):
274 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
276 def parse_test_plan(lines: List[str]) -> Optional[int]:
277 consume_non_diagnositic(lines)
278 match = TEST_PLAN.match(lines[0])
281 return int(match.group(1))
285 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
286 return bubble_up_errors(lambda x: x.status, test_suite_list)
288 def parse_test_result(lines: List[str]) -> TestResult:
289 consume_non_diagnositic(lines)
290 if not lines or not parse_tap_header(lines):
291 return TestResult(TestStatus.NO_TESTS, [], lines)
292 expected_test_suite_num = parse_test_plan(lines)
293 if not expected_test_suite_num:
294 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
296 for i in range(1, expected_test_suite_num + 1):
297 test_suite = parse_test_suite(lines, i)
299 test_suites.append(test_suite)
301 print_with_timestamp(
302 red('[ERROR] ') + ' expected ' +
303 str(expected_test_suite_num) +
304 ' test suites, but got ' + str(i - 2))
306 test_suite = parse_test_suite(lines, -1)
308 print_with_timestamp(red('[ERROR] ') +
309 'got unexpected test suite: ' + test_suite.name)
311 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
313 return TestResult(TestStatus.NO_TESTS, [], lines)
315 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
319 for test_suite in test_result.suites:
320 if test_suite.status == TestStatus.SUCCESS:
321 print_suite_divider(green('[PASSED] ') + test_suite.name)
322 elif test_suite.status == TestStatus.TEST_CRASHED:
323 print_suite_divider(red('[CRASHED] ' + test_suite.name))
325 print_suite_divider(red('[FAILED] ') + test_suite.name)
326 for test_case in test_suite.cases:
328 if test_case.status == TestStatus.SUCCESS:
329 print_with_timestamp(green('[PASSED] ') + test_case.name)
330 elif test_case.status == TestStatus.TEST_CRASHED:
332 print_with_timestamp(red('[CRASHED] ' + test_case.name))
333 print_log(map(yellow, test_case.log))
334 print_with_timestamp('')
337 print_with_timestamp(red('[FAILED] ') + test_case.name)
338 print_log(map(yellow, test_case.log))
339 print_with_timestamp('')
340 return total_tests, failed_tests, crashed_tests
342 def parse_run_tests(kernel_output) -> TestResult:
346 test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
347 if test_result.status == TestStatus.NO_TESTS:
348 print(red('[ERROR] ') + yellow('no tests run!'))
349 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
350 print(red('[ERROR] ') + yellow('could not parse test results!'))
354 crashed_tests) = print_and_count_results(test_result)
355 print_with_timestamp(DIVIDER)
356 fmt = green if test_result.status == TestStatus.SUCCESS else red
357 print_with_timestamp(
358 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
359 (total_tests, failed_tests, crashed_tests)))