diff options
Diffstat (limited to 'tools/testing/selftests/net/lib/py/ksft.py')
| -rw-r--r-- | tools/testing/selftests/net/lib/py/ksft.py | 210 |
1 files changed, 194 insertions, 16 deletions
diff --git a/tools/testing/selftests/net/lib/py/ksft.py b/tools/testing/selftests/net/lib/py/ksft.py index f26c20df9db4..531e7fa1b3ea 100644 --- a/tools/testing/selftests/net/lib/py/ksft.py +++ b/tools/testing/selftests/net/lib/py/ksft.py @@ -1,15 +1,18 @@ # SPDX-License-Identifier: GPL-2.0 -import builtins +import functools import inspect +import signal import sys import time import traceback +from collections import namedtuple from .consts import KSFT_MAIN_NAME from .utils import global_defer_queue KSFT_RESULT = None KSFT_RESULT_ALL = True +KSFT_DISRUPTIVE = True class KsftFailEx(Exception): @@ -24,7 +27,12 @@ class KsftXfailEx(Exception): pass +class KsftTerminate(KeyboardInterrupt): + pass + + def ksft_pr(*objs, **kwargs): + kwargs["flush"] = True print("#", *objs, **kwargs) @@ -32,8 +40,18 @@ def _fail(*args): global KSFT_RESULT KSFT_RESULT = False - frame = inspect.stack()[2] - ksft_pr("At " + frame.filename + " line " + str(frame.lineno) + ":") + stack = inspect.stack() + started = False + for frame in reversed(stack[2:]): + # Start printing from the test case function + if not started: + if frame.function == 'ksft_run': + started = True + continue + + ksft_pr("Check| At " + frame.filename + ", line " + str(frame.lineno) + + ", in " + frame.function + ":") + ksft_pr("Check| " + frame.code_context[0].strip()) ksft_pr(*args) @@ -43,21 +61,47 @@ def ksft_eq(a, b, comment=""): _fail("Check failed", a, "!=", b, comment) +def ksft_ne(a, b, comment=""): + global KSFT_RESULT + if a == b: + _fail("Check failed", a, "==", b, comment) + + def ksft_true(a, comment=""): if not a: _fail("Check failed", a, "does not eval to True", comment) +def ksft_not_none(a, comment=""): + if a is None: + _fail("Check failed", a, "is None", comment) + + def ksft_in(a, b, comment=""): if a not in b: _fail("Check failed", a, "not in", b, comment) +def ksft_not_in(a, b, comment=""): + if a in b: + _fail("Check failed", a, "in", b, comment) + + +def ksft_is(a, b, comment=""): + if a is not b: + _fail("Check failed", a, "is not", b, comment) + + def ksft_ge(a, b, comment=""): if a < b: _fail("Check failed", a, "<", b, comment) +def ksft_gt(a, b, comment=""): + if a <= b: + _fail("Check failed", a, "<=", b, comment) + + def ksft_lt(a, b, comment=""): if a >= b: _fail("Check failed", a, ">=", b, comment) @@ -92,7 +136,7 @@ def ksft_busy_wait(cond, sleep=0.005, deadline=1, comment=""): time.sleep(sleep) -def ktap_result(ok, cnt=1, case="", comment=""): +def ktap_result(ok, cnt=1, case_name="", comment=""): global KSFT_RESULT_ALL KSFT_RESULT_ALL = KSFT_RESULT_ALL and ok @@ -102,11 +146,11 @@ def ktap_result(ok, cnt=1, case="", comment=""): res += "ok " res += str(cnt) + " " res += KSFT_MAIN_NAME - if case: - res += "." + str(case.__name__) + if case_name: + res += "." + case_name if comment: res += " # " + comment - print(res) + print(res, flush=True) def ksft_flush_defer(): @@ -119,7 +163,7 @@ def ksft_flush_defer(): entry = global_defer_queue.pop() try: entry.exec_only() - except: + except Exception: ksft_pr(f"Exception while handling defer / cleanup (callback {i} of {qlen_start})!") tb = traceback.format_exc() for line in tb.strip().split('\n'): @@ -127,9 +171,102 @@ def ksft_flush_defer(): KSFT_RESULT = False -def ksft_run(cases=None, globs=None, case_pfx=None, args=()): +KsftCaseFunction = namedtuple("KsftCaseFunction", + ['name', 'original_func', 'variants']) + + +def ksft_disruptive(func): + """ + Decorator that marks the test as disruptive (e.g. the test + that can down the interface). Disruptive tests can be skipped + by passing DISRUPTIVE=False environment variable. + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + if not KSFT_DISRUPTIVE: + raise KsftSkipEx("marked as disruptive") + return func(*args, **kwargs) + return wrapper + + +class KsftNamedVariant: + """ Named string name + argument list tuple for @ksft_variants """ + + def __init__(self, name, *params): + self.params = params + self.name = name or "_".join([str(x) for x in self.params]) + + +def ksft_variants(params): + """ + Decorator defining the sets of inputs for a test. + The parameters will be included in the name of the resulting sub-case. + Parameters can be either single object, tuple or a KsftNamedVariant. + The argument can be a list or a generator. + + Example: + + @ksft_variants([ + (1, "a"), + (2, "b"), + KsftNamedVariant("three", 3, "c"), + ]) + def my_case(cfg, a, b): + pass # ... + + ksft_run(cases=[my_case], args=(cfg, )) + + Will generate cases: + my_case.1_a + my_case.2_b + my_case.three + """ + + return lambda func: KsftCaseFunction(func.__name__, func, params) + + +def ksft_setup(env): + """ + Setup test framework global state from the environment. + """ + + def get_bool(env, name): + value = env.get(name, "").lower() + if value in ["yes", "true"]: + return True + if value in ["no", "false"]: + return False + try: + return bool(int(value)) + except Exception: + raise Exception(f"failed to parse {name}") + + if "DISRUPTIVE" in env: + global KSFT_DISRUPTIVE + KSFT_DISRUPTIVE = get_bool(env, "DISRUPTIVE") + + return env + + +def _ksft_intr(signum, frame): + # ksft runner.sh sends 2 SIGTERMs in a row on a timeout + # if we don't ignore the second one it will stop us from handling cleanup + global term_cnt + term_cnt += 1 + if term_cnt == 1: + raise KsftTerminate() + else: + ksft_pr(f"Ignoring SIGTERM (cnt: {term_cnt}), already exiting...") + + +def _ksft_generate_test_cases(cases, globs, case_pfx, args): + """Generate a flat list of (func, args, name) tuples""" + cases = cases or [] + test_cases = [] + # If using the globs method find all relevant functions if globs and case_pfx: for key, value in globs.items(): if not callable(value): @@ -139,22 +276,47 @@ def ksft_run(cases=None, globs=None, case_pfx=None, args=()): cases.append(value) break + for func in cases: + if isinstance(func, KsftCaseFunction): + # Parametrized test - create case for each param + for param in func.variants: + if not isinstance(param, KsftNamedVariant): + if not isinstance(param, tuple): + param = (param, ) + param = KsftNamedVariant(None, *param) + + test_cases.append((func.original_func, + (*args, *param.params), + func.name + "." + param.name)) + else: + test_cases.append((func, args, func.__name__)) + + return test_cases + + +def ksft_run(cases=None, globs=None, case_pfx=None, args=()): + test_cases = _ksft_generate_test_cases(cases, globs, case_pfx, args) + + global term_cnt + term_cnt = 0 + prev_sigterm = signal.signal(signal.SIGTERM, _ksft_intr) + totals = {"pass": 0, "fail": 0, "skip": 0, "xfail": 0} - print("KTAP version 1") - print("1.." + str(len(cases))) + print("TAP version 13", flush=True) + print("1.." + str(len(test_cases)), flush=True) global KSFT_RESULT cnt = 0 stop = False - for case in cases: + for func, args, name in test_cases: KSFT_RESULT = True cnt += 1 comment = "" cnt_key = "" try: - case(*args) + func(*args) except KsftSkipEx as e: comment = "SKIP " + str(e) cnt_key = 'skip' @@ -167,21 +329,37 @@ def ksft_run(cases=None, globs=None, case_pfx=None, args=()): for line in tb.strip().split('\n'): ksft_pr("Exception|", line) if stop: - ksft_pr("Stopping tests due to KeyboardInterrupt.") + ksft_pr(f"Stopping tests due to {type(e).__name__}.") KSFT_RESULT = False cnt_key = 'fail' - ksft_flush_defer() + try: + ksft_flush_defer() + except BaseException as e: + tb = traceback.format_exc() + for line in tb.strip().split('\n'): + ksft_pr("Exception|", line) + if isinstance(e, KeyboardInterrupt): + ksft_pr() + ksft_pr("WARN: defer() interrupted, cleanup may be incomplete.") + ksft_pr(" Attempting to finish cleanup before exiting.") + ksft_pr(" Interrupt again to exit immediately.") + ksft_pr() + stop = True + # Flush was interrupted, try to finish the job best we can + ksft_flush_defer() if not cnt_key: cnt_key = 'pass' if KSFT_RESULT else 'fail' - ktap_result(KSFT_RESULT, cnt, case, comment=comment) + ktap_result(KSFT_RESULT, cnt, name, comment=comment) totals[cnt_key] += 1 if stop: break + signal.signal(signal.SIGTERM, prev_sigterm) + print( f"# Totals: pass:{totals['pass']} fail:{totals['fail']} xfail:{totals['xfail']} xpass:0 skip:{totals['skip']} error:0" ) |
