diff options
Diffstat (limited to 'tools/testing/kunit/kunit_parser.py')
| -rw-r--r-- | tools/testing/kunit/kunit_parser.py | 163 |
1 files changed, 106 insertions, 57 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py index a225799f6b1b..333cd3a4a56b 100644 --- a/tools/testing/kunit/kunit_parser.py +++ b/tools/testing/kunit/kunit_parser.py @@ -12,13 +12,12 @@ from __future__ import annotations from dataclasses import dataclass import re -import sys import textwrap from enum import Enum, auto from typing import Iterable, Iterator, List, Optional, Tuple -from kunit_printer import stdout +from kunit_printer import Printer, stdout class Test: """ @@ -55,10 +54,10 @@ class Test: """Returns string representation of a Test class object.""" return str(self) - def add_error(self, error_message: str) -> None: + def add_error(self, printer: Printer, error_message: str) -> None: """Records an error that occurred while parsing this test.""" self.counts.errors += 1 - stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') + printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') def ok_status(self) -> bool: """Returns true if the status was ok, i.e. passed or skipped.""" @@ -213,6 +212,7 @@ KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$') TAP_START = re.compile(r'\s*TAP version ([0-9]+)$') KTAP_END = re.compile(r'\s*(List of all partitions:|' 'Kernel panic - not syncing: VFS:|reboot: System halted)') +EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$') def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: """Extracts KTAP lines from the kernel output.""" @@ -243,13 +243,15 @@ def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: # remove the prefix, if any. line = line[prefix_len:] yield line_num, line + elif EXECUTOR_ERROR.search(line): + yield line_num, line return LineStream(lines=isolate_ktap_output(kernel_output)) KTAP_VERSIONS = [1] TAP_VERSIONS = [13, 14] def check_version(version_num: int, accepted_versions: List[int], - version_type: str, test: Test) -> None: + version_type: str, test: Test, printer: Printer) -> None: """ Adds error to test object if version number is too high or too low. @@ -261,13 +263,14 @@ def check_version(version_num: int, accepted_versions: List[int], version_type - 'KTAP' or 'TAP' depending on the type of version line. test - Test object for current test being parsed + printer - Printer object to output error """ if version_num < min(accepted_versions): - test.add_error(f'{version_type} version lower than expected!') + test.add_error(printer, f'{version_type} version lower than expected!') elif version_num > max(accepted_versions): - test.add_error(f'{version_type} version higer than expected!') + test.add_error(printer, f'{version_type} version higer than expected!') -def parse_ktap_header(lines: LineStream, test: Test) -> bool: +def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool: """ Parses KTAP/TAP header line and checks version number. Returns False if fails to parse KTAP/TAP header line. @@ -279,6 +282,7 @@ def parse_ktap_header(lines: LineStream, test: Test) -> bool: Parameters: lines - LineStream of KTAP output to parse test - Test object for current test being parsed + printer - Printer object to output results Return: True if successfully parsed KTAP/TAP header line @@ -287,10 +291,10 @@ def parse_ktap_header(lines: LineStream, test: Test) -> bool: tap_match = TAP_START.match(lines.peek()) if ktap_match: version_num = int(ktap_match.group(1)) - check_version(version_num, KTAP_VERSIONS, 'KTAP', test) + check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer) elif tap_match: version_num = int(tap_match.group(1)) - check_version(version_num, TAP_VERSIONS, 'TAP', test) + check_version(version_num, TAP_VERSIONS, 'TAP', test, printer) else: return False lines.pop() @@ -348,9 +352,9 @@ def parse_test_plan(lines: LineStream, test: Test) -> bool: lines.pop() return True -TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') +TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?([^#]*)( # .*)?$') -TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') +TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?(.*) # SKIP ?(.*)$') def peek_test_name_match(lines: LineStream, test: Test) -> bool: """ @@ -375,10 +379,12 @@ def peek_test_name_match(lines: LineStream, test: Test) -> bool: if not match: return False name = match.group(4) + if not name: + return False return name == test.name def parse_test_result(lines: LineStream, test: Test, - expected_num: int) -> bool: + expected_num: int, printer: Printer) -> bool: """ Parses test result line and stores the status and name in the test object. Reports an error if the test number does not match expected @@ -396,6 +402,7 @@ def parse_test_result(lines: LineStream, test: Test, lines - LineStream of KTAP output to parse test - Test object for current test being parsed expected_num - expected test number for current test + printer - Printer object to output results Return: True if successfully parsed a test result line. @@ -411,14 +418,14 @@ def parse_test_result(lines: LineStream, test: Test, # Set name of test object if skip_match: - test.name = skip_match.group(4) + test.name = skip_match.group(4) or skip_match.group(5) else: test.name = match.group(4) # Check test num num = int(match.group(2)) if num != expected_num: - test.add_error(f'Expected test number {expected_num} but found {num}') + test.add_error(printer, f'Expected test number {expected_num} but found {num}') # Set status of test object status = match.group(1) @@ -448,7 +455,7 @@ def parse_diagnostic(lines: LineStream) -> List[str]: Log of diagnostic lines """ log = [] # type: List[str] - non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START] + non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN] while lines and not any(re.match(lines.peek()) for re in non_diagnostic_lines): log.append(lines.pop()) @@ -484,7 +491,7 @@ def format_test_divider(message: str, len_message: int) -> str: len_2 = difference - len_1 return ('=' * len_1) + f' {message} ' + ('=' * len_2) -def print_test_header(test: Test) -> None: +def print_test_header(test: Test, printer: Printer) -> None: """ Prints test header with test name and optionally the expected number of subtests. @@ -494,6 +501,7 @@ def print_test_header(test: Test) -> None: Parameters: test - Test object representing current test being printed + printer - Printer object to output results """ message = test.name if message != "": @@ -505,15 +513,15 @@ def print_test_header(test: Test) -> None: message += '(1 subtest)' else: message += f'({test.expected_count} subtests)' - stdout.print_with_timestamp(format_test_divider(message, len(message))) + printer.print_with_timestamp(format_test_divider(message, len(message))) -def print_log(log: Iterable[str]) -> None: +def print_log(log: Iterable[str], printer: Printer) -> None: """Prints all strings in saved log for test in yellow.""" formatted = textwrap.dedent('\n'.join(log)) for line in formatted.splitlines(): - stdout.print_with_timestamp(stdout.yellow(line)) + printer.print_with_timestamp(printer.yellow(line)) -def format_test_result(test: Test) -> str: +def format_test_result(test: Test, printer: Printer) -> str: """ Returns string with formatted test result with colored status and test name. @@ -523,23 +531,24 @@ def format_test_result(test: Test) -> str: Parameters: test - Test object representing current test being printed + printer - Printer object to output results Return: String containing formatted test result """ if test.status == TestStatus.SUCCESS: - return stdout.green('[PASSED] ') + test.name + return printer.green('[PASSED] ') + test.name if test.status == TestStatus.SKIPPED: - return stdout.yellow('[SKIPPED] ') + test.name + return printer.yellow('[SKIPPED] ') + test.name if test.status == TestStatus.NO_TESTS: - return stdout.yellow('[NO TESTS RUN] ') + test.name + return printer.yellow('[NO TESTS RUN] ') + test.name if test.status == TestStatus.TEST_CRASHED: - print_log(test.log) + print_log(test.log, printer) return stdout.red('[CRASHED] ') + test.name - print_log(test.log) - return stdout.red('[FAILED] ') + test.name + print_log(test.log, printer) + return printer.red('[FAILED] ') + test.name -def print_test_result(test: Test) -> None: +def print_test_result(test: Test, printer: Printer) -> None: """ Prints result line with status of test. @@ -548,10 +557,11 @@ def print_test_result(test: Test) -> None: Parameters: test - Test object representing current test being printed + printer - Printer object """ - stdout.print_with_timestamp(format_test_result(test)) + printer.print_with_timestamp(format_test_result(test, printer)) -def print_test_footer(test: Test) -> None: +def print_test_footer(test: Test, printer: Printer) -> None: """ Prints test footer with status of test. @@ -560,12 +570,38 @@ def print_test_footer(test: Test) -> None: Parameters: test - Test object representing current test being printed + printer - Printer object to output results """ - message = format_test_result(test) - stdout.print_with_timestamp(format_test_divider(message, - len(message) - stdout.color_len())) + message = format_test_result(test, printer) + printer.print_with_timestamp(format_test_divider(message, + len(message) - printer.color_len())) +def print_test(test: Test, failed_only: bool, printer: Printer) -> None: + """ + Prints Test object to given printer. For a child test, the result line is + printed. For a parent test, the test header, all child test results, and + the test footer are all printed. If failed_only is true, only failed/crashed + tests will be printed. + Parameters: + test - Test object to print + failed_only - True if only failed/crashed tests should be printed. + printer - Printer object to output results + """ + if test.name == "main": + printer.print_with_timestamp(DIVIDER) + for subtest in test.subtests: + print_test(subtest, failed_only, printer) + printer.print_with_timestamp(DIVIDER) + elif test.subtests != []: + if not failed_only or not test.ok_status(): + print_test_header(test, printer) + for subtest in test.subtests: + print_test(subtest, failed_only, printer) + print_test_footer(test, printer) + else: + if not failed_only or not test.ok_status(): + print_test_result(test, printer) def _summarize_failed_tests(test: Test) -> str: """Tries to summarize all the failing subtests in `test`.""" @@ -599,7 +635,7 @@ def _summarize_failed_tests(test: Test) -> str: return 'Failures: ' + ', '.join(failures) -def print_summary_line(test: Test) -> None: +def print_summary_line(test: Test, printer: Printer) -> None: """ Prints summary line of test object. Color of line is dependent on status of test. Color is green if test passes, yellow if test is @@ -612,6 +648,7 @@ def print_summary_line(test: Test) -> None: Errors: 0" test - Test object representing current test being printed + printer - Printer object to output results """ if test.status == TestStatus.SUCCESS: color = stdout.green @@ -619,7 +656,7 @@ def print_summary_line(test: Test) -> None: color = stdout.yellow else: color = stdout.red - stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) + printer.print_with_timestamp(color(f'Testing complete. {test.counts}')) # Summarize failures that might have gone off-screen since we had a lot # of tests (arbitrarily defined as >=100 for now). @@ -628,7 +665,7 @@ def print_summary_line(test: Test) -> None: summarized = _summarize_failed_tests(test) if not summarized: return - stdout.print_with_timestamp(color(summarized)) + printer.print_with_timestamp(color(summarized)) # Other methods: @@ -652,7 +689,7 @@ def bubble_up_test_results(test: Test) -> None: elif test.counts.get_status() == TestStatus.TEST_CRASHED: test.status = TestStatus.TEST_CRASHED -def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test: +def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test: """ Finds next test to parse in LineStream, creates new Test object, parses any subtests of the test, populates Test object with all @@ -708,30 +745,37 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: log - list of strings containing any preceding diagnostic lines corresponding to the current test is_subtest - boolean indicating whether test is a subtest + printer - Printer object to output results Return: Test object populated with characteristics and any subtests """ test = Test() test.log.extend(log) + + # Parse any errors prior to parsing tests + err_log = parse_diagnostic(lines) + test.log.extend(err_log) + if not is_subtest: # If parsing the main/top-level test, parse KTAP version line and # test plan test.name = "main" - ktap_line = parse_ktap_header(lines, test) + parse_ktap_header(lines, test, printer) + test.log.extend(parse_diagnostic(lines)) parse_test_plan(lines, test) parent_test = True else: # If not the main test, attempt to parse a test header containing # the KTAP version line and/or subtest header line - ktap_line = parse_ktap_header(lines, test) + ktap_line = parse_ktap_header(lines, test, printer) subtest_line = parse_test_header(lines, test) + test.log.extend(parse_diagnostic(lines)) + parse_test_plan(lines, test) parent_test = (ktap_line or subtest_line) if parent_test: - # If KTAP version line and/or subtest header is found, attempt - # to parse test plan and print test header - parse_test_plan(lines, test) - print_test_header(test) + print_test_header(test, printer) + expected_count = test.expected_count subtests = [] test_num = 1 @@ -749,16 +793,16 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: # If parser reaches end of test before # parsing expected number of subtests, print # crashed subtest and record error - test.add_error('missing expected subtest!') + test.add_error(printer, 'missing expected subtest!') sub_test.log.extend(sub_log) test.counts.add_status( TestStatus.TEST_CRASHED) - print_test_result(sub_test) + print_test_result(sub_test, printer) else: test.log.extend(sub_log) break else: - sub_test = parse_test(lines, test_num, sub_log, True) + sub_test = parse_test(lines, test_num, sub_log, True, printer) subtests.append(sub_test) test_num += 1 test.subtests = subtests @@ -766,50 +810,55 @@ def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: # If not main test, look for test result line test.log.extend(parse_diagnostic(lines)) if test.name != "" and not peek_test_name_match(lines, test): - test.add_error('missing subtest result line!') + test.add_error(printer, 'missing subtest result line!') + elif not lines: + print_log(test.log, printer) + test.status = TestStatus.NO_TESTS + test.add_error(printer, 'No more test results!') else: - parse_test_result(lines, test, expected_num) + parse_test_result(lines, test, expected_num, printer) # Check for there being no subtests within parent test if parent_test and len(subtests) == 0: # Don't override a bad status if this test had one reported. # Assumption: no subtests means CRASHED is from Test.__init__() if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): + print_log(test.log, printer) test.status = TestStatus.NO_TESTS - test.add_error('0 tests run!') + test.add_error(printer, '0 tests run!') # Add statuses to TestCounts attribute in Test object bubble_up_test_results(test) if parent_test and is_subtest: # If test has subtests and is not the main test object, print # footer. - print_test_footer(test) + print_test_footer(test, printer) elif is_subtest: - print_test_result(test) + print_test_result(test, printer) return test -def parse_run_tests(kernel_output: Iterable[str]) -> Test: +def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test: """ Using kernel output, extract KTAP lines, parse the lines for test results and print condensed test results and summary line. Parameters: kernel_output - Iterable object contains lines of kernel output + printer - Printer object to output results Return: Test - the main test object with all subtests. """ - stdout.print_with_timestamp(DIVIDER) + printer.print_with_timestamp(DIVIDER) lines = extract_tap_lines(kernel_output) test = Test() if not lines: test.name = '<missing>' - test.add_error('Could not find any KTAP output. Did any KUnit tests run?') + test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?') test.status = TestStatus.FAILURE_TO_PARSE_TESTS else: - test = parse_test(lines, 0, [], False) + test = parse_test(lines, 0, [], False, printer) if test.status != TestStatus.NO_TESTS: test.status = test.counts.get_status() - stdout.print_with_timestamp(DIVIDER) - print_summary_line(test) + printer.print_with_timestamp(DIVIDER) return test |
