diff options
Diffstat (limited to 'tools/testing/selftests/hid/tests/base_device.py')
-rw-r--r-- | tools/testing/selftests/hid/tests/base_device.py | 49 |
1 files changed, 38 insertions, 11 deletions
diff --git a/tools/testing/selftests/hid/tests/base_device.py b/tools/testing/selftests/hid/tests/base_device.py index e0515be97f83..59465c58d94d 100644 --- a/tools/testing/selftests/hid/tests/base_device.py +++ b/tools/testing/selftests/hid/tests/base_device.py @@ -18,10 +18,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import dataclasses import fcntl import functools import libevdev import os +import threading try: import pyudev @@ -104,6 +106,12 @@ class PowerSupply(object): return self._type.str_value +@dataclasses.dataclass +class HidReadiness: + is_ready: bool = False + count: int = 0 + + class HIDIsReady(object): """ Companion class that binds to a kernel mechanism @@ -115,18 +123,18 @@ class HIDIsReady(object): def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None: self.uhid = uhid - def is_ready(self: "HIDIsReady") -> bool: + def is_ready(self: "HIDIsReady") -> HidReadiness: """ Overwrite in subclasses: should return True or False whether the attached uhid device is ready or not. """ - return False + return HidReadiness() class UdevHIDIsReady(HIDIsReady): _pyudev_context: ClassVar[Optional[pyudev.Context]] = None _pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None - _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {} + _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {} def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None: super().__init__(uhid) @@ -157,18 +165,19 @@ class UdevHIDIsReady(HIDIsReady): id = int(event.sys_path.strip().split(".")[-1], 16) - device_ready, count = cls._uhid_devices.get(id, (False, 0)) + readiness = cls._uhid_devices.setdefault(id, HidReadiness()) ready = event.action == "bind" - if not device_ready and ready: - count += 1 - cls._uhid_devices[id] = (ready, count) + if not readiness.is_ready and ready: + readiness.count += 1 + + readiness.is_ready = ready - def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]: + def is_ready(self: "UdevHIDIsReady") -> HidReadiness: try: return self._uhid_devices[self.uhid.hid_id] except KeyError: - return (False, 0) + return HidReadiness() class EvdevMatch(object): @@ -322,11 +331,11 @@ class BaseDevice(UHIDDevice): @property def kernel_is_ready(self: "BaseDevice") -> bool: - return self._kernel_is_ready.is_ready()[0] and self.started + return self._kernel_is_ready.is_ready().is_ready and self.started @property def kernel_ready_count(self: "BaseDevice") -> int: - return self._kernel_is_ready.is_ready()[1] + return self._kernel_is_ready.is_ready().count @property def input_nodes(self: "BaseDevice") -> List[EvdevDevice]: @@ -336,10 +345,28 @@ class BaseDevice(UHIDDevice): if not self.kernel_is_ready or not self.started: return [] + # Starting with kernel v6.16, an event is emitted when + # userspace opens a kernel device, and for some devices + # this translates into a SET_REPORT. + # Because EvdevDevice(path) opens every single evdev node + # we need to have a separate thread to process the incoming + # SET_REPORT or we end up having to wait for the kernel + # timeout of 5 seconds. + done = False + + def dispatch(): + while not done: + self.dispatch(1) + + t = threading.Thread(target=dispatch) + t.start() + self._input_nodes = [ EvdevDevice(path) for path in self.walk_sysfs("input", "input/input*/event*") ] + done = True + t.join() return self._input_nodes def match_evdev_rule(self, application, evdev): |