summaryrefslogtreecommitdiff
path: root/tools/testing/selftests/hid/tests/base_device.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/testing/selftests/hid/tests/base_device.py')
-rw-r--r--tools/testing/selftests/hid/tests/base_device.py49
1 files changed, 38 insertions, 11 deletions
diff --git a/tools/testing/selftests/hid/tests/base_device.py b/tools/testing/selftests/hid/tests/base_device.py
index e0515be97f83..59465c58d94d 100644
--- a/tools/testing/selftests/hid/tests/base_device.py
+++ b/tools/testing/selftests/hid/tests/base_device.py
@@ -18,10 +18,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import dataclasses
import fcntl
import functools
import libevdev
import os
+import threading
try:
import pyudev
@@ -104,6 +106,12 @@ class PowerSupply(object):
return self._type.str_value
+@dataclasses.dataclass
+class HidReadiness:
+ is_ready: bool = False
+ count: int = 0
+
+
class HIDIsReady(object):
"""
Companion class that binds to a kernel mechanism
@@ -115,18 +123,18 @@ class HIDIsReady(object):
def __init__(self: "HIDIsReady", uhid: UHIDDevice) -> None:
self.uhid = uhid
- def is_ready(self: "HIDIsReady") -> bool:
+ def is_ready(self: "HIDIsReady") -> HidReadiness:
"""
Overwrite in subclasses: should return True or False whether
the attached uhid device is ready or not.
"""
- return False
+ return HidReadiness()
class UdevHIDIsReady(HIDIsReady):
_pyudev_context: ClassVar[Optional[pyudev.Context]] = None
_pyudev_monitor: ClassVar[Optional[pyudev.Monitor]] = None
- _uhid_devices: ClassVar[Dict[int, Tuple[bool, int]]] = {}
+ _uhid_devices: ClassVar[Dict[int, HidReadiness]] = {}
def __init__(self: "UdevHIDIsReady", uhid: UHIDDevice) -> None:
super().__init__(uhid)
@@ -157,18 +165,19 @@ class UdevHIDIsReady(HIDIsReady):
id = int(event.sys_path.strip().split(".")[-1], 16)
- device_ready, count = cls._uhid_devices.get(id, (False, 0))
+ readiness = cls._uhid_devices.setdefault(id, HidReadiness())
ready = event.action == "bind"
- if not device_ready and ready:
- count += 1
- cls._uhid_devices[id] = (ready, count)
+ if not readiness.is_ready and ready:
+ readiness.count += 1
+
+ readiness.is_ready = ready
- def is_ready(self: "UdevHIDIsReady") -> Tuple[bool, int]:
+ def is_ready(self: "UdevHIDIsReady") -> HidReadiness:
try:
return self._uhid_devices[self.uhid.hid_id]
except KeyError:
- return (False, 0)
+ return HidReadiness()
class EvdevMatch(object):
@@ -322,11 +331,11 @@ class BaseDevice(UHIDDevice):
@property
def kernel_is_ready(self: "BaseDevice") -> bool:
- return self._kernel_is_ready.is_ready()[0] and self.started
+ return self._kernel_is_ready.is_ready().is_ready and self.started
@property
def kernel_ready_count(self: "BaseDevice") -> int:
- return self._kernel_is_ready.is_ready()[1]
+ return self._kernel_is_ready.is_ready().count
@property
def input_nodes(self: "BaseDevice") -> List[EvdevDevice]:
@@ -336,10 +345,28 @@ class BaseDevice(UHIDDevice):
if not self.kernel_is_ready or not self.started:
return []
+ # Starting with kernel v6.16, an event is emitted when
+ # userspace opens a kernel device, and for some devices
+ # this translates into a SET_REPORT.
+ # Because EvdevDevice(path) opens every single evdev node
+ # we need to have a separate thread to process the incoming
+ # SET_REPORT or we end up having to wait for the kernel
+ # timeout of 5 seconds.
+ done = False
+
+ def dispatch():
+ while not done:
+ self.dispatch(1)
+
+ t = threading.Thread(target=dispatch)
+ t.start()
+
self._input_nodes = [
EvdevDevice(path)
for path in self.walk_sysfs("input", "input/input*/event*")
]
+ done = True
+ t.join()
return self._input_nodes
def match_evdev_rule(self, application, evdev):