summaryrefslogtreecommitdiff
path: root/tools/perf/python/ilist.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/perf/python/ilist.py')
-rwxr-xr-xtools/perf/python/ilist.py495
1 files changed, 495 insertions, 0 deletions
diff --git a/tools/perf/python/ilist.py b/tools/perf/python/ilist.py
new file mode 100755
index 000000000000..9d6465c60df3
--- /dev/null
+++ b/tools/perf/python/ilist.py
@@ -0,0 +1,495 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: (LGPL-2.1 OR BSD-2-Clause)
+"""Interactive perf list."""
+
+from abc import ABC, abstractmethod
+import argparse
+from dataclasses import dataclass
+import math
+from typing import Any, Dict, Optional, Tuple
+import perf
+from textual import on
+from textual.app import App, ComposeResult
+from textual.binding import Binding
+from textual.containers import Horizontal, HorizontalGroup, Vertical, VerticalScroll
+from textual.css.query import NoMatches
+from textual.command import SearchIcon
+from textual.screen import ModalScreen
+from textual.widgets import Button, Footer, Header, Input, Label, Sparkline, Static, Tree
+from textual.widgets.tree import TreeNode
+
+
+def get_info(info: Dict[str, str], key: str):
+ return (info[key] + "\n") if key in info else ""
+
+
+class TreeValue(ABC):
+ """Abstraction for the data of value in the tree."""
+
+ @abstractmethod
+ def name(self) -> str:
+ pass
+
+ @abstractmethod
+ def description(self) -> str:
+ pass
+
+ @abstractmethod
+ def matches(self, query: str) -> bool:
+ pass
+
+ @abstractmethod
+ def parse(self) -> perf.evlist:
+ pass
+
+ @abstractmethod
+ def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
+ pass
+
+
+@dataclass
+class Metric(TreeValue):
+ """A metric in the tree."""
+ metric_name: str
+
+ def name(self) -> str:
+ return self.metric_name
+
+ def description(self) -> str:
+ """Find and format metric description."""
+ for metric in perf.metrics():
+ if metric["MetricName"] != self.metric_name:
+ continue
+ desc = get_info(metric, "BriefDescription")
+ desc += get_info(metric, "PublicDescription")
+ desc += get_info(metric, "MetricExpr")
+ desc += get_info(metric, "MetricThreshold")
+ return desc
+ return "description"
+
+ def matches(self, query: str) -> bool:
+ return query in self.metric_name
+
+ def parse(self) -> perf.evlist:
+ return perf.parse_metrics(self.metric_name)
+
+ def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
+ val = evlist.compute_metric(self.metric_name, cpu, thread)
+ return 0 if math.isnan(val) else val
+
+
+@dataclass
+class PmuEvent(TreeValue):
+ """A PMU and event within the tree."""
+ pmu: str
+ event: str
+
+ def name(self) -> str:
+ if self.event.startswith(self.pmu) or ':' in self.event:
+ return self.event
+ else:
+ return f"{self.pmu}/{self.event}/"
+
+ def description(self) -> str:
+ """Find and format event description for {pmu}/{event}/."""
+ for p in perf.pmus():
+ if p.name() != self.pmu:
+ continue
+ for info in p.events():
+ if "name" not in info or info["name"] != self.event:
+ continue
+
+ desc = get_info(info, "topic")
+ desc += get_info(info, "event_type_desc")
+ desc += get_info(info, "desc")
+ desc += get_info(info, "long_desc")
+ desc += get_info(info, "encoding_desc")
+ return desc
+ return "description"
+
+ def matches(self, query: str) -> bool:
+ return query in self.pmu or query in self.event
+
+ def parse(self) -> perf.evlist:
+ return perf.parse_events(self.name())
+
+ def value(self, evlist: perf.evlist, evsel: perf.evsel, cpu: int, thread: int) -> float:
+ return evsel.read(cpu, thread).val
+
+
+class ErrorScreen(ModalScreen[bool]):
+ """Pop up dialog for errors."""
+
+ CSS = """
+ ErrorScreen {
+ align: center middle;
+ }
+ """
+
+ def __init__(self, error: str):
+ self.error = error
+ super().__init__()
+
+ def compose(self) -> ComposeResult:
+ yield Button(f"Error: {self.error}", variant="primary", id="error")
+
+ def on_button_pressed(self, event: Button.Pressed) -> None:
+ self.dismiss(True)
+
+
+class SearchScreen(ModalScreen[str]):
+ """Pop up dialog for search."""
+
+ CSS = """
+ SearchScreen Horizontal {
+ align: center middle;
+ margin-top: 1;
+ }
+ SearchScreen Input {
+ width: 1fr;
+ }
+ """
+
+ def compose(self) -> ComposeResult:
+ yield Horizontal(SearchIcon(), Input(placeholder="Event name"))
+
+ def on_input_submitted(self, event: Input.Submitted) -> None:
+ """Handle the user pressing Enter in the input field."""
+ self.dismiss(event.value)
+
+
+class Counter(HorizontalGroup):
+ """Two labels for a CPU and its counter value."""
+
+ CSS = """
+ Label {
+ gutter: 1;
+ }
+ """
+
+ def __init__(self, cpu: int) -> None:
+ self.cpu = cpu
+ super().__init__()
+
+ def compose(self) -> ComposeResult:
+ label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
+ yield Label(label + " ")
+ yield Label("0", id=f"counter_{label}")
+
+
+class CounterSparkline(HorizontalGroup):
+ """A Sparkline for a performance counter."""
+
+ def __init__(self, cpu: int) -> None:
+ self.cpu = cpu
+ super().__init__()
+
+ def compose(self) -> ComposeResult:
+ label = f"cpu{self.cpu}" if self.cpu >= 0 else "total"
+ yield Label(label)
+ yield Sparkline([], summary_function=max, id=f"sparkline_{label}")
+
+
+class IListApp(App):
+ TITLE = "Interactive Perf List"
+
+ BINDINGS = [
+ Binding(key="s", action="search", description="Search",
+ tooltip="Search events and PMUs"),
+ Binding(key="n", action="next", description="Next",
+ tooltip="Next search result or item"),
+ Binding(key="p", action="prev", description="Previous",
+ tooltip="Previous search result or item"),
+ Binding(key="c", action="collapse", description="Collapse",
+ tooltip="Collapse the current PMU"),
+ Binding(key="^q", action="quit", description="Quit",
+ tooltip="Quit the app"),
+ ]
+
+ CSS = """
+ /* Make the 'total' sparkline a different color. */
+ #sparkline_total > .sparkline--min-color {
+ color: $accent;
+ }
+ #sparkline_total > .sparkline--max-color {
+ color: $accent 30%;
+ }
+ /*
+ * Make the active_search initially not displayed with the text in
+ * the middle of the line.
+ */
+ #active_search {
+ display: none;
+ width: 100%;
+ text-align: center;
+ }
+ """
+
+ def __init__(self, interval: float) -> None:
+ self.interval = interval
+ self.evlist = None
+ self.selected: Optional[TreeValue] = None
+ self.search_results: list[TreeNode[TreeValue]] = []
+ self.cur_search_result: TreeNode[TreeValue] | None = None
+ super().__init__()
+
+ def expand_and_select(self, node: TreeNode[Any]) -> None:
+ """Expand select a node in the tree."""
+ if node.parent:
+ node.parent.expand()
+ if node.parent.parent:
+ node.parent.parent.expand()
+ node.expand()
+ node.tree.select_node(node)
+ node.tree.scroll_to_node(node)
+
+ def set_searched_tree_node(self, previous: bool) -> None:
+ """Set the cur_search_result node to either the next or previous."""
+ l = len(self.search_results)
+
+ if l < 1:
+ tree: Tree[TreeValue] = self.query_one("#root", Tree)
+ if previous:
+ tree.action_cursor_up()
+ else:
+ tree.action_cursor_down()
+ return
+
+ if self.cur_search_result:
+ idx = self.search_results.index(self.cur_search_result)
+ if previous:
+ idx = idx - 1 if idx > 0 else l - 1
+ else:
+ idx = idx + 1 if idx < l - 1 else 0
+ else:
+ idx = l - 1 if previous else 0
+
+ node = self.search_results[idx]
+ if node == self.cur_search_result:
+ return
+
+ self.cur_search_result = node
+ self.expand_and_select(node)
+
+ def action_search(self) -> None:
+ """Search was chosen."""
+ def set_initial_focus(event: str | None) -> None:
+ """Sets the focus after the SearchScreen is dismissed."""
+
+ search_label = self.query_one("#active_search", Label)
+ search_label.display = True if event else False
+ if not event:
+ return
+ event = event.lower()
+ search_label.update(f'Searching for events matching "{event}"')
+
+ tree: Tree[str] = self.query_one("#root", Tree)
+
+ def find_search_results(event: str, node: TreeNode[str],
+ cursor_seen: bool = False,
+ match_after_cursor: Optional[TreeNode[str]] = None
+ ) -> Tuple[bool, Optional[TreeNode[str]]]:
+ """Find nodes that match the search remembering the one after the cursor."""
+ if not cursor_seen and node == tree.cursor_node:
+ cursor_seen = True
+ if node.data and node.data.matches(event):
+ if cursor_seen and not match_after_cursor:
+ match_after_cursor = node
+ self.search_results.append(node)
+
+ if node.children:
+ for child in node.children:
+ (cursor_seen, match_after_cursor) = \
+ find_search_results(event, child, cursor_seen, match_after_cursor)
+ return (cursor_seen, match_after_cursor)
+
+ self.search_results.clear()
+ (_, self.cur_search_result) = find_search_results(event, tree.root)
+ if len(self.search_results) < 1:
+ self.push_screen(ErrorScreen(f"Failed to find pmu/event or metric {event}"))
+ search_label.display = False
+ elif self.cur_search_result:
+ self.expand_and_select(self.cur_search_result)
+ else:
+ self.set_searched_tree_node(previous=False)
+
+ self.push_screen(SearchScreen(), set_initial_focus)
+
+ def action_next(self) -> None:
+ """Next was chosen."""
+ self.set_searched_tree_node(previous=False)
+
+ def action_prev(self) -> None:
+ """Previous was chosen."""
+ self.set_searched_tree_node(previous=True)
+
+ def action_collapse(self) -> None:
+ """Collapse the part of the tree currently on."""
+ tree: Tree[str] = self.query_one("#root", Tree)
+ node = tree.cursor_node
+ if node and node.parent:
+ node.parent.collapse_all()
+ node.tree.scroll_to_node(node.parent)
+
+ def update_counts(self) -> None:
+ """Called every interval to update counts."""
+ if not self.selected or not self.evlist:
+ return
+
+ def update_count(cpu: int, count: int):
+ # Update the raw count display.
+ counter: Label = self.query(f"#counter_cpu{cpu}" if cpu >= 0 else "#counter_total")
+ if not counter:
+ return
+ counter = counter.first(Label)
+ counter.update(str(count))
+
+ # Update the sparkline.
+ line: Sparkline = self.query(f"#sparkline_cpu{cpu}" if cpu >= 0 else "#sparkline_total")
+ if not line:
+ return
+ line = line.first(Sparkline)
+ # If there are more events than the width, remove the front event.
+ if len(line.data) > line.size.width:
+ line.data.pop(0)
+ line.data.append(count)
+ line.mutate_reactive(Sparkline.data)
+
+ # Update the total and each CPU counts, assume there's just 1 evsel.
+ total = 0
+ self.evlist.disable()
+ for evsel in self.evlist:
+ for cpu in evsel.cpus():
+ aggr = 0
+ for thread in evsel.threads():
+ aggr += self.selected.value(self.evlist, evsel, cpu, thread)
+ update_count(cpu, aggr)
+ total += aggr
+ update_count(-1, total)
+ self.evlist.enable()
+
+ def on_mount(self) -> None:
+ """When App starts set up periodic event updating."""
+ self.update_counts()
+ self.set_interval(self.interval, self.update_counts)
+
+ def set_selected(self, value: TreeValue) -> None:
+ """Updates the event/description and starts the counters."""
+ try:
+ label_name = self.query_one("#event_name", Label)
+ event_description = self.query_one("#event_description", Static)
+ lines = self.query_one("#lines")
+ except NoMatches:
+ # A race with rendering, ignore the update as we can't
+ # mount the assumed output widgets.
+ return
+
+ self.selected = value
+
+ # Remove previous event information.
+ if self.evlist:
+ self.evlist.disable()
+ self.evlist.close()
+ old_lines = self.query(CounterSparkline)
+ for line in old_lines:
+ line.remove()
+ old_counters = self.query(Counter)
+ for counter in old_counters:
+ counter.remove()
+
+ # Update event/metric text and description.
+ label_name.update(value.name())
+ event_description.update(value.description())
+
+ # Open the event.
+ try:
+ self.evlist = value.parse()
+ if self.evlist:
+ self.evlist.open()
+ self.evlist.enable()
+ except:
+ self.evlist = None
+
+ if not self.evlist:
+ self.push_screen(ErrorScreen(f"Failed to open {value.name()}"))
+ return
+
+ # Add spark lines for all the CPUs. Note, must be done after
+ # open so that the evlist CPUs have been computed by propagate
+ # maps.
+ line = CounterSparkline(cpu=-1)
+ lines.mount(line)
+ for cpu in self.evlist.all_cpus():
+ line = CounterSparkline(cpu)
+ lines.mount(line)
+ line = Counter(cpu=-1)
+ lines.mount(line)
+ for cpu in self.evlist.all_cpus():
+ line = Counter(cpu)
+ lines.mount(line)
+
+ def compose(self) -> ComposeResult:
+ """Draws the app."""
+ def metric_event_tree() -> Tree:
+ """Create tree of PMUs and metricgroups with events or metrics under."""
+ tree: Tree[TreeValue] = Tree("Root", id="root")
+ pmus = tree.root.add("PMUs")
+ for pmu in perf.pmus():
+ pmu_name = pmu.name().lower()
+ pmu_node = pmus.add(pmu_name)
+ try:
+ for event in sorted(pmu.events(), key=lambda x: x["name"]):
+ if "name" in event:
+ e = event["name"].lower()
+ if "alias" in event:
+ pmu_node.add_leaf(f'{e} ({event["alias"]})',
+ data=PmuEvent(pmu_name, e))
+ else:
+ pmu_node.add_leaf(e, data=PmuEvent(pmu_name, e))
+ except:
+ # Reading events may fail with EPERM, ignore.
+ pass
+ metrics = tree.root.add("Metrics")
+ groups = set()
+ for metric in perf.metrics():
+ groups.update(metric["MetricGroup"])
+
+ def add_metrics_to_tree(node: TreeNode[TreeValue], parent: str):
+ for metric in sorted(perf.metrics(), key=lambda x: x["MetricName"]):
+ if parent in metric["MetricGroup"]:
+ name = metric["MetricName"]
+ node.add_leaf(name, data=Metric(name))
+ child_group_name = f'{name}_group'
+ if child_group_name in groups:
+ add_metrics_to_tree(node.add(child_group_name), child_group_name)
+
+ for group in sorted(groups):
+ if group.endswith('_group'):
+ continue
+ add_metrics_to_tree(metrics.add(group), group)
+
+ tree.root.expand()
+ return tree
+
+ yield Header(id="header")
+ yield Horizontal(Vertical(metric_event_tree(), id="events"),
+ Vertical(Label("event name", id="event_name"),
+ Static("description", markup=False, id="event_description"),
+ ))
+ yield Label(id="active_search")
+ yield VerticalScroll(id="lines")
+ yield Footer(id="footer")
+
+ @on(Tree.NodeSelected)
+ def on_tree_node_selected(self, event: Tree.NodeSelected[TreeValue]) -> None:
+ """Called when a tree node is selected, selecting the event."""
+ if event.node.data:
+ self.set_selected(event.node.data)
+
+
+if __name__ == "__main__":
+ ap = argparse.ArgumentParser()
+ ap.add_argument('-I', '--interval', help="Counter update interval in seconds", default=0.1)
+ args = ap.parse_args()
+ app = IListApp(float(args.interval))
+ app.run()