summaryrefslogtreecommitdiff
path: root/tools/testing/kunit/kunit_parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/testing/kunit/kunit_parser.py')
-rw-r--r--tools/testing/kunit/kunit_parser.py1015
1 files changed, 702 insertions, 313 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py
index 6310a641b151..3355196d0515 100644
--- a/tools/testing/kunit/kunit_parser.py
+++ b/tools/testing/kunit/kunit_parser.py
@@ -1,11 +1,15 @@
# SPDX-License-Identifier: GPL-2.0
#
-# Parses test results from a kernel dmesg log.
+# Parses KTAP test results from a kernel dmesg log and incrementally prints
+# results with reader-friendly format. Stores and returns test results in a
+# Test object.
#
# Copyright (C) 2019, Google LLC.
# Author: Felix Guo <felixguoxiuping@gmail.com>
# Author: Brendan Higgins <brendanhiggins@google.com>
+# Author: Rae Moar <rmoar@google.com>
+from __future__ import annotations
import re
from collections import namedtuple
@@ -14,33 +18,52 @@ from enum import Enum, auto
from functools import reduce
from typing import Iterable, Iterator, List, Optional, Tuple
-TestResult = namedtuple('TestResult', ['status','suites','log'])
-
-class TestSuite(object):
+TestResult = namedtuple('TestResult', ['status','test','log'])
+
+class Test(object):
+ """
+ A class to represent a test parsed from KTAP results. All KTAP
+ results within a test log are stored in a main Test object as
+ subtests.
+
+ Attributes:
+ status : TestStatus - status of the test
+ name : str - name of the test
+ expected_count : int - expected number of subtests (0 if single
+ test case and None if unknown expected number of subtests)
+ subtests : List[Test] - list of subtests
+ log : List[str] - log of KTAP lines that correspond to the test
+ counts : TestCounts - counts of the test statuses and errors of
+ subtests or of the test itself if the test is a single
+ test case.
+ """
def __init__(self) -> None:
- self.status = TestStatus.SUCCESS
- self.name = ''
- self.cases = [] # type: List[TestCase]
-
- def __str__(self) -> str:
- return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
-
- def __repr__(self) -> str:
- return str(self)
-
-class TestCase(object):
- def __init__(self) -> None:
- self.status = TestStatus.SUCCESS
+ """Creates Test object with default attributes."""
+ self.status = TestStatus.TEST_CRASHED
self.name = ''
+ self.expected_count = 0 # type: Optional[int]
+ self.subtests = [] # type: List[Test]
self.log = [] # type: List[str]
+ self.counts = TestCounts()
def __str__(self) -> str:
- return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
+ """Returns string representation of a Test class object."""
+ return ('Test(' + str(self.status) + ', ' + self.name +
+ ', ' + str(self.expected_count) + ', ' +
+ str(self.subtests) + ', ' + str(self.log) + ', ' +
+ str(self.counts) + ')')
def __repr__(self) -> str:
+ """Returns string representation of a Test class object."""
return str(self)
+ def add_error(self, error_message: str) -> None:
+ """Records an error that occurred while parsing this test."""
+ self.counts.errors += 1
+ print_error('Test ' + self.name + ': ' + error_message)
+
class TestStatus(Enum):
+ """An enumeration class to represent the status of a test."""
SUCCESS = auto()
FAILURE = auto()
SKIPPED = auto()
@@ -48,381 +71,747 @@ class TestStatus(Enum):
NO_TESTS = auto()
FAILURE_TO_PARSE_TESTS = auto()
+class TestCounts:
+ """
+ Tracks the counts of statuses of all test cases and any errors within
+ a Test.
+
+ Attributes:
+ passed : int - the number of tests that have passed
+ failed : int - the number of tests that have failed
+ crashed : int - the number of tests that have crashed
+ skipped : int - the number of tests that have skipped
+ errors : int - the number of errors in the test and subtests
+ """
+ def __init__(self):
+ """Creates TestCounts object with counts of all test
+ statuses and test errors set to 0.
+ """
+ self.passed = 0
+ self.failed = 0
+ self.crashed = 0
+ self.skipped = 0
+ self.errors = 0
+
+ def __str__(self) -> str:
+ """Returns the string representation of a TestCounts object.
+ """
+ return ('Passed: ' + str(self.passed) +
+ ', Failed: ' + str(self.failed) +
+ ', Crashed: ' + str(self.crashed) +
+ ', Skipped: ' + str(self.skipped) +
+ ', Errors: ' + str(self.errors))
+
+ def total(self) -> int:
+ """Returns the total number of test cases within a test
+ object, where a test case is a test with no subtests.
+ """
+ return (self.passed + self.failed + self.crashed +
+ self.skipped)
+
+ def add_subtest_counts(self, counts: TestCounts) -> None:
+ """
+ Adds the counts of another TestCounts object to the current
+ TestCounts object. Used to add the counts of a subtest to the
+ parent test.
+
+ Parameters:
+ counts - a different TestCounts object whose counts
+ will be added to the counts of the TestCounts object
+ """
+ self.passed += counts.passed
+ self.failed += counts.failed
+ self.crashed += counts.crashed
+ self.skipped += counts.skipped
+ self.errors += counts.errors
+
+ def get_status(self) -> TestStatus:
+ """Returns the aggregated status of a Test using test
+ counts.
+ """
+ if self.total() == 0:
+ return TestStatus.NO_TESTS
+ elif self.crashed:
+ # If one of the subtests crash, the expected status
+ # of the Test is crashed.
+ return TestStatus.TEST_CRASHED
+ elif self.failed:
+ # Otherwise if one of the subtests fail, the
+ # expected status of the Test is failed.
+ return TestStatus.FAILURE
+ elif self.passed:
+ # Otherwise if one of the subtests pass, the
+ # expected status of the Test is passed.
+ return TestStatus.SUCCESS
+ else:
+ # Finally, if none of the subtests have failed,
+ # crashed, or passed, the expected status of the
+ # Test is skipped.
+ return TestStatus.SKIPPED
+
+ def add_status(self, status: TestStatus) -> None:
+ """
+ Increments count of inputted status.
+
+ Parameters:
+ status - status to be added to the TestCounts object
+ """
+ if status == TestStatus.SUCCESS:
+ self.passed += 1
+ elif status == TestStatus.FAILURE:
+ self.failed += 1
+ elif status == TestStatus.SKIPPED:
+ self.skipped += 1
+ elif status != TestStatus.NO_TESTS:
+ self.crashed += 1
+
class LineStream:
- """Provides a peek()/pop() interface over an iterator of (line#, text)."""
+ """
+ A class to represent the lines of kernel output.
+ Provides a peek()/pop() interface over an iterator of
+ (line#, text).
+ """
_lines: Iterator[Tuple[int, str]]
_next: Tuple[int, str]
_done: bool
def __init__(self, lines: Iterator[Tuple[int, str]]):
+ """Creates a new LineStream that wraps the given iterator."""
self._lines = lines
self._done = False
self._next = (0, '')
self._get_next()
def _get_next(self) -> None:
+ """Advances the LineSteam to the next line."""
try:
self._next = next(self._lines)
except StopIteration:
self._done = True
def peek(self) -> str:
+ """Returns the current line, without advancing the LineStream.
+ """
return self._next[1]
def pop(self) -> str:
+ """Returns the current line and advances the LineStream to
+ the next line.
+ """
n = self._next
self._get_next()
return n[1]
def __bool__(self) -> bool:
+ """Returns True if stream has more lines."""
return not self._done
# Only used by kunit_tool_test.py.
def __iter__(self) -> Iterator[str]:
+ """Empties all lines stored in LineStream object into
+ Iterator object and returns the Iterator object.
+ """
while bool(self):
yield self.pop()
def line_number(self) -> int:
+ """Returns the line number of the current line."""
return self._next[0]
-kunit_start_re = re.compile(r'TAP version [0-9]+$')
-kunit_end_re = re.compile('(List of all partitions:|'
- 'Kernel panic - not syncing: VFS:|reboot: System halted)')
+# Parsing helper methods:
+
+KTAP_START = re.compile(r'KTAP version ([0-9]+)$')
+TAP_START = re.compile(r'TAP version ([0-9]+)$')
+KTAP_END = re.compile('(List of all partitions:|'
+ 'Kernel panic - not syncing: VFS:|reboot: System halted)')
def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
- def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
+ """Extracts KTAP lines from the kernel output."""
+ def isolate_ktap_output(kernel_output: Iterable[str]) \
+ -> Iterator[Tuple[int, str]]:
line_num = 0
started = False
for line in kernel_output:
line_num += 1
- line = line.rstrip() # line always has a trailing \n
- if kunit_start_re.search(line):
+ line = line.rstrip() # remove trailing \n
+ if not started and KTAP_START.search(line):
+ # start extracting KTAP lines and set prefix
+ # to number of characters before version line
+ prefix_len = len(
+ line.split('KTAP version')[0])
+ started = True
+ yield line_num, line[prefix_len:]
+ elif not started and TAP_START.search(line):
+ # start extracting KTAP lines and set prefix
+ # to number of characters before version line
prefix_len = len(line.split('TAP version')[0])
started = True
yield line_num, line[prefix_len:]
- elif kunit_end_re.search(line):
+ elif started and KTAP_END.search(line):
+ # stop extracting KTAP lines
break
elif started:
- yield line_num, line[prefix_len:]
- return LineStream(lines=isolate_kunit_output(kernel_output))
-
-DIVIDER = '=' * 60
-
-RESET = '\033[0;0m'
-
-def red(text) -> str:
- return '\033[1;31m' + text + RESET
-
-def yellow(text) -> str:
- return '\033[1;33m' + text + RESET
-
-def green(text) -> str:
- return '\033[1;32m' + text + RESET
-
-def print_with_timestamp(message) -> None:
- print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
-
-def format_suite_divider(message) -> str:
- return '======== ' + message + ' ========'
+ # remove prefix and any indention and yield
+ # line with line number
+ line = line[prefix_len:].lstrip()
+ yield line_num, line
+ return LineStream(lines=isolate_ktap_output(kernel_output))
+
+KTAP_VERSIONS = [1]
+TAP_VERSIONS = [13, 14]
+
+def check_version(version_num: int, accepted_versions: List[int],
+ version_type: str, test: Test) -> None:
+ """
+ Adds error to test object if version number is too high or too
+ low.
+
+ Parameters:
+ version_num - The inputted version number from the parsed KTAP or TAP
+ header line
+ accepted_version - List of accepted KTAP or TAP versions
+ version_type - 'KTAP' or 'TAP' depending on the type of
+ version line.
+ test - Test object for current test being parsed
+ """
+ if version_num < min(accepted_versions):
+ test.add_error(version_type +
+ ' version lower than expected!')
+ elif version_num > max(accepted_versions):
+ test.add_error(
+ version_type + ' version higher than expected!')
+
+def parse_ktap_header(lines: LineStream, test: Test) -> bool:
+ """
+ Parses KTAP/TAP header line and checks version number.
+ Returns False if fails to parse KTAP/TAP header line.
+
+ Accepted formats:
+ - 'KTAP version [version number]'
+ - 'TAP version [version number]'
+
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ test - Test object for current test being parsed
+
+ Return:
+ True if successfully parsed KTAP/TAP header line
+ """
+ ktap_match = KTAP_START.match(lines.peek())
+ tap_match = TAP_START.match(lines.peek())
+ if ktap_match:
+ version_num = int(ktap_match.group(1))
+ check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
+ elif tap_match:
+ version_num = int(tap_match.group(1))
+ check_version(version_num, TAP_VERSIONS, 'TAP', test)
+ else:
+ return False
+ test.log.append(lines.pop())
+ return True
-def print_suite_divider(message) -> None:
- print_with_timestamp(DIVIDER)
- print_with_timestamp(format_suite_divider(message))
+TEST_HEADER = re.compile(r'^# Subtest: (.*)$')
-def print_log(log) -> None:
- for m in log:
- print_with_timestamp(m)
+def parse_test_header(lines: LineStream, test: Test) -> bool:
+ """
+ Parses test header and stores test name in test object.
+ Returns False if fails to parse test header line.
-TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*# (Subtest:|.*: kunit test case crashed!)).*$')
+ Accepted format:
+ - '# Subtest: [test name]'
-def consume_non_diagnostic(lines: LineStream) -> None:
- while lines and not TAP_ENTRIES.match(lines.peek()):
- lines.pop()
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ test - Test object for current test being parsed
-def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
- while lines and not TAP_ENTRIES.match(lines.peek()):
- test_case.log.append(lines.peek())
- lines.pop()
+ Return:
+ True if successfully parsed test header line
+ """
+ match = TEST_HEADER.match(lines.peek())
+ if not match:
+ return False
+ test.log.append(lines.pop())
+ test.name = match.group(1)
+ return True
-OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
+TEST_PLAN = re.compile(r'1\.\.([0-9]+)')
-OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
+def parse_test_plan(lines: LineStream, test: Test) -> bool:
+ """
+ Parses test plan line and stores the expected number of subtests in
+ test object. Reports an error if expected count is 0.
+ Returns False and reports missing test plan error if fails to parse
+ test plan.
-OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
+ Accepted format:
+ - '1..[number of subtests]'
-OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ test - Test object for current test being parsed
-def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
- save_non_diagnostic(lines, test_case)
- if not lines:
- test_case.status = TestStatus.TEST_CRASHED
- return True
- line = lines.peek()
- match = OK_NOT_OK_SUBTEST.match(line)
- while not match and lines:
- line = lines.pop()
- match = OK_NOT_OK_SUBTEST.match(line)
- if match:
- test_case.log.append(lines.pop())
- test_case.name = match.group(2)
- skip_match = OK_NOT_OK_SKIP.match(line)
- if skip_match:
- test_case.status = TestStatus.SKIPPED
- return True
- if test_case.status == TestStatus.TEST_CRASHED:
- return True
- if match.group(1) == 'ok':
- test_case.status = TestStatus.SUCCESS
- else:
- test_case.status = TestStatus.FAILURE
- return True
- else:
+ Return:
+ True if successfully parsed test plan line
+ """
+ match = TEST_PLAN.match(lines.peek())
+ if not match:
+ test.expected_count = None
+ test.add_error('missing plan line!')
return False
-
-SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
-DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
-
-def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
- save_non_diagnostic(lines, test_case)
- if not lines:
+ test.log.append(lines.pop())
+ expected_count = int(match.group(1))
+ test.expected_count = expected_count
+ if expected_count == 0:
+ test.status = TestStatus.NO_TESTS
+ test.add_error('0 tests run!')
+ return True
+
+TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
+
+TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
+
+def peek_test_name_match(lines: LineStream, test: Test) -> bool:
+ """
+ Matches current line with the format of a test result line and checks
+ if the name matches the name of the current test.
+ Returns False if fails to match format or name.
+
+ Accepted format:
+ - '[ok|not ok] [test number] [-] [test name] [optional skip
+ directive]'
+
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ test - Test object for current test being parsed
+
+ Return:
+ True if matched a test result line and the name matching the
+ expected test name
+ """
+ line = lines.peek()
+ match = TEST_RESULT.match(line)
+ if not match:
return False
+ name = match.group(4)
+ return (name == test.name)
+
+def parse_test_result(lines: LineStream, test: Test,
+ expected_num: int) -> bool:
+ """
+ Parses test result line and stores the status and name in the test
+ object. Reports an error if the test number does not match expected
+ test number.
+ Returns False if fails to parse test result line.
+
+ Note that the SKIP directive is the only direction that causes a
+ change in status.
+
+ Accepted format:
+ - '[ok|not ok] [test number] [-] [test name] [optional skip
+ directive]'
+
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ test - Test object for current test being parsed
+ expected_num - expected test number for current test
+
+ Return:
+ True if successfully parsed a test result line.
+ """
line = lines.peek()
- match = SUBTEST_DIAGNOSTIC.match(line)
- if match:
- test_case.log.append(lines.pop())
- crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
- if crash_match:
- test_case.status = TestStatus.TEST_CRASHED
- return True
- else:
+ match = TEST_RESULT.match(line)
+ skip_match = TEST_RESULT_SKIP.match(line)
+
+ # Check if line matches test result line format
+ if not match:
return False
+ test.log.append(lines.pop())
-def parse_test_case(lines: LineStream) -> Optional[TestCase]:
- test_case = TestCase()
- save_non_diagnostic(lines, test_case)
- while parse_diagnostic(lines, test_case):
- pass
- if parse_ok_not_ok_test_case(lines, test_case):
- return test_case
+ # Set name of test object
+ if skip_match:
+ test.name = skip_match.group(4)
else:
- return None
-
-SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
-
-def parse_subtest_header(lines: LineStream) -> Optional[str]:
- consume_non_diagnostic(lines)
- if not lines:
- return None
- match = SUBTEST_HEADER.match(lines.peek())
- if match:
- lines.pop()
- return match.group(1)
+ test.name = match.group(4)
+
+ # Check test num
+ num = int(match.group(2))
+ if num != expected_num:
+ test.add_error('Expected test number ' +
+ str(expected_num) + ' but found ' + str(num))
+
+ # Set status of test object
+ status = match.group(1)
+ if skip_match:
+ test.status = TestStatus.SKIPPED
+ elif status == 'ok':
+ test.status = TestStatus.SUCCESS
else:
- return None
+ test.status = TestStatus.FAILURE
+ return True
+
+def parse_diagnostic(lines: LineStream) -> List[str]:
+ """
+ Parse lines that do not match the format of a test result line or
+ test header line and returns them in list.
+
+ Line formats that are not parsed:
+ - '# Subtest: [test name]'
+ - '[ok|not ok] [test number] [-] [test name] [optional skip
+ directive]'
+
+ Parameters:
+ lines - LineStream of KTAP output to parse
+
+ Return:
+ Log of diagnostic lines
+ """
+ log = [] # type: List[str]
+ while lines and not TEST_RESULT.match(lines.peek()) and not \
+ TEST_HEADER.match(lines.peek()):
+ log.append(lines.pop())
+ return log
+
+DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$')
+
+def parse_crash_in_log(test: Test) -> bool:
+ """
+ Iterate through the lines of the log to parse for crash message.
+ If crash message found, set status to crashed and return True.
+ Otherwise return False.
+
+ Parameters:
+ test - Test object for current test being parsed
+
+ Return:
+ True if crash message found in log
+ """
+ for line in test.log:
+ if DIAGNOSTIC_CRASH_MESSAGE.match(line):
+ test.status = TestStatus.TEST_CRASHED
+ return True
+ return False
-SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
-def parse_subtest_plan(lines: LineStream) -> Optional[int]:
- consume_non_diagnostic(lines)
- match = SUBTEST_PLAN.match(lines.peek())
- if match:
- lines.pop()
- return int(match.group(1))
- else:
- return None
-
-def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
- if left == right:
- return left
- elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
- return TestStatus.TEST_CRASHED
- elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
- return TestStatus.FAILURE
- elif left == TestStatus.SKIPPED:
- return right
- else:
- return left
+# Printing helper methods:
-def parse_ok_not_ok_test_suite(lines: LineStream,
- test_suite: TestSuite,
- expected_suite_index: int) -> bool:
- consume_non_diagnostic(lines)
- if not lines:
- test_suite.status = TestStatus.TEST_CRASHED
- return False
- line = lines.peek()
- match = OK_NOT_OK_MODULE.match(line)
- if match:
- lines.pop()
- if match.group(1) == 'ok':
- test_suite.status = TestStatus.SUCCESS
- else:
- test_suite.status = TestStatus.FAILURE
- skip_match = OK_NOT_OK_SKIP.match(line)
- if skip_match:
- test_suite.status = TestStatus.SKIPPED
- suite_index = int(match.group(2))
- if suite_index != expected_suite_index:
- print_with_timestamp(
- red('[ERROR] ') + 'expected_suite_index ' +
- str(expected_suite_index) + ', but got ' +
- str(suite_index))
- return True
- else:
- return False
+DIVIDER = '=' * 60
-def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
- return reduce(max_status, status_list, TestStatus.SKIPPED)
+RESET = '\033[0;0m'
-def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
- max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
- return max_status(max_test_case_status, test_suite.status)
+def red(text: str) -> str:
+ """Returns inputted string with red color code."""
+ return '\033[1;31m' + text + RESET
-def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
- if not lines:
- return None
- consume_non_diagnostic(lines)
- test_suite = TestSuite()
- test_suite.status = TestStatus.SUCCESS
- name = parse_subtest_header(lines)
- if not name:
- return None
- test_suite.name = name
- expected_test_case_num = parse_subtest_plan(lines)
- if expected_test_case_num is None:
- return None
- while expected_test_case_num > 0:
- test_case = parse_test_case(lines)
- if not test_case:
- break
- test_suite.cases.append(test_case)
- expected_test_case_num -= 1
- if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
- test_suite.status = bubble_up_test_case_errors(test_suite)
- return test_suite
- elif not lines:
- print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
- return test_suite
- else:
- print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
- return None
+def yellow(text: str) -> str:
+ """Returns inputted string with yellow color code."""
+ return '\033[1;33m' + text + RESET
-TAP_HEADER = re.compile(r'^TAP version 14$')
+def green(text: str) -> str:
+ """Returns inputted string with green color code."""
+ return '\033[1;32m' + text + RESET
-def parse_tap_header(lines: LineStream) -> bool:
- consume_non_diagnostic(lines)
- if TAP_HEADER.match(lines.peek()):
- lines.pop()
- return True
- else:
- return False
+ANSI_LEN = len(red(''))
-TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
+def print_with_timestamp(message: str) -> None:
+ """Prints message with timestamp at beginning."""
+ print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
-def parse_test_plan(lines: LineStream) -> Optional[int]:
- consume_non_diagnostic(lines)
- match = TEST_PLAN.match(lines.peek())
- if match:
- lines.pop()
- return int(match.group(1))
- else:
- return None
-
-def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
- return bubble_up_errors(x.status for x in test_suites)
-
-def parse_test_result(lines: LineStream) -> TestResult:
- consume_non_diagnostic(lines)
- if not lines or not parse_tap_header(lines):
- return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
- expected_test_suite_num = parse_test_plan(lines)
- if expected_test_suite_num == 0:
- return TestResult(TestStatus.NO_TESTS, [], lines)
- elif expected_test_suite_num is None:
- return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
- test_suites = []
- for i in range(1, expected_test_suite_num + 1):
- test_suite = parse_test_suite(lines, i)
- if test_suite:
- test_suites.append(test_suite)
+def format_test_divider(message: str, len_message: int) -> str:
+ """
+ Returns string with message centered in fixed width divider.
+
+ Example:
+ '===================== message example ====================='
+
+ Parameters:
+ message - message to be centered in divider line
+ len_message - length of the message to be printed such that
+ any characters of the color codes are not counted
+
+ Return:
+ String containing message centered in fixed width divider
+ """
+ default_count = 3 # default number of dashes
+ len_1 = default_count
+ len_2 = default_count
+ difference = len(DIVIDER) - len_message - 2 # 2 spaces added
+ if difference > 0:
+ # calculate number of dashes for each side of the divider
+ len_1 = int(difference / 2)
+ len_2 = difference - len_1
+ return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2)
+
+def print_test_header(test: Test) -> None:
+ """
+ Prints test header with test name and optionally the expected number
+ of subtests.
+
+ Example:
+ '=================== example (2 subtests) ==================='
+
+ Parameters:
+ test - Test object representing current test being printed
+ """
+ message = test.name
+ if test.expected_count:
+ if test.expected_count == 1:
+ message += (' (' + str(test.expected_count) +
+ ' subtest)')
else:
- print_with_timestamp(
- red('[ERROR] ') + ' expected ' +
- str(expected_test_suite_num) +
- ' test suites, but got ' + str(i - 2))
- break
- test_suite = parse_test_suite(lines, -1)
- if test_suite:
- print_with_timestamp(red('[ERROR] ') +
- 'got unexpected test suite: ' + test_suite.name)
- if test_suites:
- return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
- else:
- return TestResult(TestStatus.NO_TESTS, [], lines)
+ message += (' (' + str(test.expected_count) +
+ ' subtests)')
+ print_with_timestamp(format_test_divider(message, len(message)))
-class TestCounts:
- passed: int
- failed: int
- crashed: int
- skipped: int
+def print_log(log: Iterable[str]) -> None:
+ """
+ Prints all strings in saved log for test in yellow.
- def __init__(self):
- self.passed = 0
- self.failed = 0
- self.crashed = 0
- self.skipped = 0
-
- def total(self) -> int:
- return self.passed + self.failed + self.crashed + self.skipped
-
-def print_and_count_results(test_result: TestResult) -> TestCounts:
- counts = TestCounts()
- for test_suite in test_result.suites:
- if test_suite.status == TestStatus.SUCCESS:
- print_suite_divider(green('[PASSED] ') + test_suite.name)
- elif test_suite.status == TestStatus.SKIPPED:
- print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
- elif test_suite.status == TestStatus.TEST_CRASHED:
- print_suite_divider(red('[CRASHED] ' + test_suite.name))
- else:
- print_suite_divider(red('[FAILED] ') + test_suite.name)
- for test_case in test_suite.cases:
- if test_case.status == TestStatus.SUCCESS:
- counts.passed += 1
- print_with_timestamp(green('[PASSED] ') + test_case.name)
- elif test_case.status == TestStatus.SKIPPED:
- counts.skipped += 1
- print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
- elif test_case.status == TestStatus.TEST_CRASHED:
- counts.crashed += 1
- print_with_timestamp(red('[CRASHED] ' + test_case.name))
- print_log(map(yellow, test_case.log))
- print_with_timestamp('')
+ Parameters:
+ log - Iterable object with all strings saved in log for test
+ """
+ for m in log:
+ print_with_timestamp(yellow(m))
+
+def format_test_result(test: Test) -> str:
+ """
+ Returns string with formatted test result with colored status and test
+ name.
+
+ Example:
+ '[PASSED] example'
+
+ Parameters:
+ test - Test object representing current test being printed
+
+ Return:
+ String containing formatted test result
+ """
+ if test.status == TestStatus.SUCCESS:
+ return (green('[PASSED] ') + test.name)
+ elif test.status == TestStatus.SKIPPED:
+ return (yellow('[SKIPPED] ') + test.name)
+ elif test.status == TestStatus.TEST_CRASHED:
+ print_log(test.log)
+ return (red('[CRASHED] ') + test.name)
+ else:
+ print_log(test.log)
+ return (red('[FAILED] ') + test.name)
+
+def print_test_result(test: Test) -> None:
+ """
+ Prints result line with status of test.
+
+ Example:
+ '[PASSED] example'
+
+ Parameters:
+ test - Test object representing current test being printed
+ """
+ print_with_timestamp(format_test_result(test))
+
+def print_test_footer(test: Test) -> None:
+ """
+ Prints test footer with status of test.
+
+ Example:
+ '===================== [PASSED] example ====================='
+
+ Parameters:
+ test - Test object representing current test being printed
+ """
+ message = format_test_result(test)
+ print_with_timestamp(format_test_divider(message,
+ len(message) - ANSI_LEN))
+
+def print_summary_line(test: Test) -> None:
+ """
+ Prints summary line of test object. Color of line is dependent on
+ status of test. Color is green if test passes, yellow if test is
+ skipped, and red if the test fails or crashes. Summary line contains
+ counts of the statuses of the tests subtests or the test itself if it
+ has no subtests.
+
+ Example:
+ "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
+ Errors: 0"
+
+ test - Test object representing current test being printed
+ """
+ if test.status == TestStatus.SUCCESS:
+ color = green
+ elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS:
+ color = yellow
+ else:
+ color = red
+ counts = test.counts
+ print_with_timestamp(color('Testing complete. ' + str(counts)))
+
+def print_error(error_message: str) -> None:
+ """
+ Prints error message with error format.
+
+ Example:
+ "[ERROR] Test example: missing test plan!"
+
+ Parameters:
+ error_message - message describing error
+ """
+ print_with_timestamp(red('[ERROR] ') + error_message)
+
+# Other methods:
+
+def bubble_up_test_results(test: Test) -> None:
+ """
+ If the test has subtests, add the test counts of the subtests to the
+ test and check if any of the tests crashed and if so set the test
+ status to crashed. Otherwise if the test has no subtests add the
+ status of the test to the test counts.
+
+ Parameters:
+ test - Test object for current test being parsed
+ """
+ parse_crash_in_log(test)
+ subtests = test.subtests
+ counts = test.counts
+ status = test.status
+ for t in subtests:
+ counts.add_subtest_counts(t.counts)
+ if counts.total() == 0:
+ counts.add_status(status)
+ elif test.counts.get_status() == TestStatus.TEST_CRASHED:
+ test.status = TestStatus.TEST_CRASHED
+
+def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test:
+ """
+ Finds next test to parse in LineStream, creates new Test object,
+ parses any subtests of the test, populates Test object with all
+ information (status, name) about the test and the Test objects for
+ any subtests, and then returns the Test object. The method accepts
+ three formats of tests:
+
+ Accepted test formats:
+
+ - Main KTAP/TAP header
+
+ Example:
+
+ KTAP version 1
+ 1..4
+ [subtests]
+
+ - Subtest header line
+
+ Example:
+
+ # Subtest: name
+ 1..3
+ [subtests]
+ ok 1 name
+
+ - Test result line
+
+ Example:
+
+ ok 1 - test
+
+ Parameters:
+ lines - LineStream of KTAP output to parse
+ expected_num - expected test number for test to be parsed
+ log - list of strings containing any preceding diagnostic lines
+ corresponding to the current test
+
+ Return:
+ Test object populated with characteristics and any subtests
+ """
+ test = Test()
+ test.log.extend(log)
+ parent_test = False
+ main = parse_ktap_header(lines, test)
+ if main:
+ # If KTAP/TAP header is found, attempt to parse
+ # test plan
+ test.name = "main"
+ parse_test_plan(lines, test)
+ else:
+ # If KTAP/TAP header is not found, test must be subtest
+ # header or test result line so parse attempt to parser
+ # subtest header
+ parent_test = parse_test_header(lines, test)
+ if parent_test:
+ # If subtest header is found, attempt to parse
+ # test plan and print header
+ parse_test_plan(lines, test)
+ print_test_header(test)
+ expected_count = test.expected_count
+ subtests = []
+ test_num = 1
+ while expected_count is None or test_num <= expected_count:
+ # Loop to parse any subtests.
+ # Break after parsing expected number of tests or
+ # if expected number of tests is unknown break when test
+ # result line with matching name to subtest header is found
+ # or no more lines in stream.
+ sub_log = parse_diagnostic(lines)
+ sub_test = Test()
+ if not lines or (peek_test_name_match(lines, test) and
+ not main):
+ if expected_count and test_num <= expected_count:
+ # If parser reaches end of test before
+ # parsing expected number of subtests, print
+ # crashed subtest and record error
+ test.add_error('missing expected subtest!')
+ sub_test.log.extend(sub_log)
+ test.counts.add_status(
+ TestStatus.TEST_CRASHED)
+ print_test_result(sub_test)
else:
- counts.failed += 1
- print_with_timestamp(red('[FAILED] ') + test_case.name)
- print_log(map(yellow, test_case.log))
- print_with_timestamp('')
- return counts
+ test.log.extend(sub_log)
+ break
+ else:
+ sub_test = parse_test(lines, test_num, sub_log)
+ subtests.append(sub_test)
+ test_num += 1
+ test.subtests = subtests
+ if not main:
+ # If not main test, look for test result line
+ test.log.extend(parse_diagnostic(lines))
+ if (parent_test and peek_test_name_match(lines, test)) or \
+ not parent_test:
+ parse_test_result(lines, test, expected_num)
+ else:
+ test.add_error('missing subtest result line!')
+ # Add statuses to TestCounts attribute in Test object
+ bubble_up_test_results(test)
+ if parent_test:
+ # If test has subtests and is not the main test object, print
+ # footer.
+ print_test_footer(test)
+ elif not main:
+ print_test_result(test)
+ return test
def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
- counts = TestCounts()
+ """
+ Using kernel output, extract KTAP lines, parse the lines for test
+ results and print condensed test results and summary line .
+
+ Parameters:
+ kernel_output - Iterable object contains lines of kernel output
+
+ Return:
+ TestResult - Tuple containg status of main test object, main test
+ object with all subtests, and log of all KTAP lines.
+ """
+ print_with_timestamp(DIVIDER)
lines = extract_tap_lines(kernel_output)
- test_result = parse_test_result(lines)
- if test_result.status == TestStatus.NO_TESTS:
- print(red('[ERROR] ') + yellow('no tests run!'))
- elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
- print(red('[ERROR] ') + yellow('could not parse test results!'))
+ test = Test()
+ if not lines:
+ test.add_error('invalid KTAP input!')
+ test.status = TestStatus.FAILURE_TO_PARSE_TESTS
else:
- counts = print_and_count_results(test_result)
+ test = parse_test(lines, 0, [])
+ if test.status != TestStatus.NO_TESTS:
+ test.status = test.counts.get_status()
print_with_timestamp(DIVIDER)
- if test_result.status == TestStatus.SUCCESS:
- fmt = green
- elif test_result.status == TestStatus.SKIPPED:
- fmt = yellow
- else:
- fmt =red
- print_with_timestamp(
- fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
- (counts.total(), counts.failed, counts.crashed, counts.skipped)))
- return test_result
+ print_summary_line(test)
+ return TestResult(test.status, test, lines)