diff options
Diffstat (limited to 'tools/net/ynl/pyynl/lib')
-rw-r--r-- | tools/net/ynl/pyynl/lib/__init__.py | 4 | ||||
-rw-r--r-- | tools/net/ynl/pyynl/lib/doc_generator.py | 402 | ||||
-rw-r--r-- | tools/net/ynl/pyynl/lib/nlspec.py | 2 | ||||
-rw-r--r-- | tools/net/ynl/pyynl/lib/ynl.py | 45 |
4 files changed, 441 insertions, 12 deletions
diff --git a/tools/net/ynl/pyynl/lib/__init__.py b/tools/net/ynl/pyynl/lib/__init__.py index 71518b9842ee..ec9ea00071be 100644 --- a/tools/net/ynl/pyynl/lib/__init__.py +++ b/tools/net/ynl/pyynl/lib/__init__.py @@ -4,6 +4,8 @@ from .nlspec import SpecAttr, SpecAttrSet, SpecEnumEntry, SpecEnumSet, \ SpecFamily, SpecOperation, SpecSubMessage, SpecSubMessageFormat from .ynl import YnlFamily, Netlink, NlError +from .doc_generator import YnlDocGenerator + __all__ = ["SpecAttr", "SpecAttrSet", "SpecEnumEntry", "SpecEnumSet", "SpecFamily", "SpecOperation", "SpecSubMessage", "SpecSubMessageFormat", - "YnlFamily", "Netlink", "NlError"] + "YnlFamily", "Netlink", "NlError", "YnlDocGenerator"] diff --git a/tools/net/ynl/pyynl/lib/doc_generator.py b/tools/net/ynl/pyynl/lib/doc_generator.py new file mode 100644 index 000000000000..3a16b8eb01ca --- /dev/null +++ b/tools/net/ynl/pyynl/lib/doc_generator.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0 +# -*- coding: utf-8; mode: python -*- + +""" + Class to auto generate the documentation for Netlink specifications. + + :copyright: Copyright (C) 2023 Breno Leitao <leitao@debian.org> + :license: GPL Version 2, June 1991 see linux/COPYING for details. + + This class performs extensive parsing to the Linux kernel's netlink YAML + spec files, in an effort to avoid needing to heavily mark up the original + YAML file. + + This code is split in two classes: + 1) RST formatters: Use to convert a string to a RST output + 2) YAML Netlink (YNL) doc generator: Generate docs from YAML data +""" + +from typing import Any, Dict, List +import yaml + +LINE_STR = '__lineno__' + +class NumberedSafeLoader(yaml.SafeLoader): # pylint: disable=R0901 + """Override the SafeLoader class to add line number to parsed data""" + + def construct_mapping(self, node, *args, **kwargs): + mapping = super().construct_mapping(node, *args, **kwargs) + mapping[LINE_STR] = node.start_mark.line + + return mapping + +class RstFormatters: + """RST Formatters""" + + SPACE_PER_LEVEL = 4 + + @staticmethod + def headroom(level: int) -> str: + """Return space to format""" + return " " * (level * RstFormatters.SPACE_PER_LEVEL) + + @staticmethod + def bold(text: str) -> str: + """Format bold text""" + return f"**{text}**" + + @staticmethod + def inline(text: str) -> str: + """Format inline text""" + return f"``{text}``" + + @staticmethod + def sanitize(text: str) -> str: + """Remove newlines and multiple spaces""" + # This is useful for some fields that are spread across multiple lines + return str(text).replace("\n", " ").strip() + + def rst_fields(self, key: str, value: str, level: int = 0) -> str: + """Return a RST formatted field""" + return self.headroom(level) + f":{key}: {value}" + + def rst_definition(self, key: str, value: Any, level: int = 0) -> str: + """Format a single rst definition""" + return self.headroom(level) + key + "\n" + self.headroom(level + 1) + str(value) + + def rst_paragraph(self, paragraph: str, level: int = 0) -> str: + """Return a formatted paragraph""" + return self.headroom(level) + paragraph + + def rst_bullet(self, item: str, level: int = 0) -> str: + """Return a formatted a bullet""" + return self.headroom(level) + f"- {item}" + + @staticmethod + def rst_subsection(title: str) -> str: + """Add a sub-section to the document""" + return f"{title}\n" + "-" * len(title) + + @staticmethod + def rst_subsubsection(title: str) -> str: + """Add a sub-sub-section to the document""" + return f"{title}\n" + "~" * len(title) + + @staticmethod + def rst_section(namespace: str, prefix: str, title: str) -> str: + """Add a section to the document""" + return f".. _{namespace}-{prefix}-{title}:\n\n{title}\n" + "=" * len(title) + + @staticmethod + def rst_subtitle(title: str) -> str: + """Add a subtitle to the document""" + return "\n" + "-" * len(title) + f"\n{title}\n" + "-" * len(title) + "\n\n" + + @staticmethod + def rst_title(title: str) -> str: + """Add a title to the document""" + return "=" * len(title) + f"\n{title}\n" + "=" * len(title) + "\n\n" + + def rst_list_inline(self, list_: List[str], level: int = 0) -> str: + """Format a list using inlines""" + return self.headroom(level) + "[" + ", ".join(self.inline(i) for i in list_) + "]" + + @staticmethod + def rst_ref(namespace: str, prefix: str, name: str) -> str: + """Add a hyperlink to the document""" + mappings = {'enum': 'definition', + 'fixed-header': 'definition', + 'nested-attributes': 'attribute-set', + 'struct': 'definition'} + if prefix in mappings: + prefix = mappings[prefix] + return f":ref:`{namespace}-{prefix}-{name}`" + + def rst_header(self) -> str: + """The headers for all the auto generated RST files""" + lines = [] + + lines.append(self.rst_paragraph(".. SPDX-License-Identifier: GPL-2.0")) + lines.append(self.rst_paragraph(".. NOTE: This document was auto-generated.\n\n")) + + return "\n".join(lines) + + @staticmethod + def rst_toctree(maxdepth: int = 2) -> str: + """Generate a toctree RST primitive""" + lines = [] + + lines.append(".. toctree::") + lines.append(f" :maxdepth: {maxdepth}\n\n") + + return "\n".join(lines) + + @staticmethod + def rst_label(title: str) -> str: + """Return a formatted label""" + return f".. _{title}:\n\n" + + @staticmethod + def rst_lineno(lineno: int) -> str: + """Return a lineno comment""" + return f".. LINENO {lineno}\n" + +class YnlDocGenerator: + """YAML Netlink specs Parser""" + + fmt = RstFormatters() + + def parse_mcast_group(self, mcast_group: List[Dict[str, Any]]) -> str: + """Parse 'multicast' group list and return a formatted string""" + lines = [] + for group in mcast_group: + lines.append(self.fmt.rst_bullet(group["name"])) + + return "\n".join(lines) + + def parse_do(self, do_dict: Dict[str, Any], level: int = 0) -> str: + """Parse 'do' section and return a formatted string""" + lines = [] + if LINE_STR in do_dict: + lines.append(self.fmt.rst_lineno(do_dict[LINE_STR])) + + for key in do_dict.keys(): + if key == LINE_STR: + continue + lines.append(self.fmt.rst_paragraph(self.fmt.bold(key), level + 1)) + if key in ['request', 'reply']: + lines.append(self.parse_do_attributes(do_dict[key], level + 1) + "\n") + else: + lines.append(self.fmt.headroom(level + 2) + do_dict[key] + "\n") + + return "\n".join(lines) + + def parse_do_attributes(self, attrs: Dict[str, Any], level: int = 0) -> str: + """Parse 'attributes' section""" + if "attributes" not in attrs: + return "" + lines = [self.fmt.rst_fields("attributes", + self.fmt.rst_list_inline(attrs["attributes"]), + level + 1)] + + return "\n".join(lines) + + def parse_operations(self, operations: List[Dict[str, Any]], namespace: str) -> str: + """Parse operations block""" + preprocessed = ["name", "doc", "title", "do", "dump", "flags"] + linkable = ["fixed-header", "attribute-set"] + lines = [] + + for operation in operations: + if LINE_STR in operation: + lines.append(self.fmt.rst_lineno(operation[LINE_STR])) + + lines.append(self.fmt.rst_section(namespace, 'operation', + operation["name"])) + lines.append(self.fmt.rst_paragraph(operation["doc"]) + "\n") + + for key in operation.keys(): + if key == LINE_STR: + continue + + if key in preprocessed: + # Skip the special fields + continue + value = operation[key] + if key in linkable: + value = self.fmt.rst_ref(namespace, key, value) + lines.append(self.fmt.rst_fields(key, value, 0)) + if 'flags' in operation: + lines.append(self.fmt.rst_fields('flags', + self.fmt.rst_list_inline(operation['flags']))) + + if "do" in operation: + lines.append(self.fmt.rst_paragraph(":do:", 0)) + lines.append(self.parse_do(operation["do"], 0)) + if "dump" in operation: + lines.append(self.fmt.rst_paragraph(":dump:", 0)) + lines.append(self.parse_do(operation["dump"], 0)) + + # New line after fields + lines.append("\n") + + return "\n".join(lines) + + def parse_entries(self, entries: List[Dict[str, Any]], level: int) -> str: + """Parse a list of entries""" + ignored = ["pad"] + lines = [] + for entry in entries: + if isinstance(entry, dict): + # entries could be a list or a dictionary + field_name = entry.get("name", "") + if field_name in ignored: + continue + type_ = entry.get("type") + if type_: + field_name += f" ({self.fmt.inline(type_)})" + lines.append( + self.fmt.rst_fields(field_name, + self.fmt.sanitize(entry.get("doc", "")), + level) + ) + elif isinstance(entry, list): + lines.append(self.fmt.rst_list_inline(entry, level)) + else: + lines.append(self.fmt.rst_bullet(self.fmt.inline(self.fmt.sanitize(entry)), + level)) + + lines.append("\n") + return "\n".join(lines) + + def parse_definitions(self, defs: Dict[str, Any], namespace: str) -> str: + """Parse definitions section""" + preprocessed = ["name", "entries", "members"] + ignored = ["render-max"] # This is not printed + lines = [] + + for definition in defs: + if LINE_STR in definition: + lines.append(self.fmt.rst_lineno(definition[LINE_STR])) + + lines.append(self.fmt.rst_section(namespace, 'definition', definition["name"])) + for k in definition.keys(): + if k == LINE_STR: + continue + if k in preprocessed + ignored: + continue + lines.append(self.fmt.rst_fields(k, self.fmt.sanitize(definition[k]), 0)) + + # Field list needs to finish with a new line + lines.append("\n") + if "entries" in definition: + lines.append(self.fmt.rst_paragraph(":entries:", 0)) + lines.append(self.parse_entries(definition["entries"], 1)) + if "members" in definition: + lines.append(self.fmt.rst_paragraph(":members:", 0)) + lines.append(self.parse_entries(definition["members"], 1)) + + return "\n".join(lines) + + def parse_attr_sets(self, entries: List[Dict[str, Any]], namespace: str) -> str: + """Parse attribute from attribute-set""" + preprocessed = ["name", "type"] + linkable = ["enum", "nested-attributes", "struct", "sub-message"] + ignored = ["checks"] + lines = [] + + for entry in entries: + lines.append(self.fmt.rst_section(namespace, 'attribute-set', + entry["name"])) + + if "doc" in entry: + lines.append(self.fmt.rst_paragraph(entry["doc"], 0) + "\n") + + for attr in entry["attributes"]: + if LINE_STR in attr: + lines.append(self.fmt.rst_lineno(attr[LINE_STR])) + + type_ = attr.get("type") + attr_line = attr["name"] + if type_: + # Add the attribute type in the same line + attr_line += f" ({self.fmt.inline(type_)})" + + lines.append(self.fmt.rst_subsubsection(attr_line)) + + for k in attr.keys(): + if k == LINE_STR: + continue + if k in preprocessed + ignored: + continue + if k in linkable: + value = self.fmt.rst_ref(namespace, k, attr[k]) + else: + value = self.fmt.sanitize(attr[k]) + lines.append(self.fmt.rst_fields(k, value, 0)) + lines.append("\n") + + return "\n".join(lines) + + def parse_sub_messages(self, entries: List[Dict[str, Any]], namespace: str) -> str: + """Parse sub-message definitions""" + lines = [] + + for entry in entries: + lines.append(self.fmt.rst_section(namespace, 'sub-message', + entry["name"])) + for fmt in entry["formats"]: + value = fmt["value"] + + lines.append(self.fmt.rst_bullet(self.fmt.bold(value))) + for attr in ['fixed-header', 'attribute-set']: + if attr in fmt: + lines.append(self.fmt.rst_fields(attr, + self.fmt.rst_ref(namespace, + attr, + fmt[attr]), + 1)) + lines.append("\n") + + return "\n".join(lines) + + def parse_yaml(self, obj: Dict[str, Any]) -> str: + """Format the whole YAML into a RST string""" + lines = [] + + # Main header + lineno = obj.get('__lineno__', 0) + lines.append(self.fmt.rst_lineno(lineno)) + + family = obj['name'] + + lines.append(self.fmt.rst_header()) + lines.append(self.fmt.rst_label("netlink-" + family)) + + title = f"Family ``{family}`` netlink specification" + lines.append(self.fmt.rst_title(title)) + lines.append(self.fmt.rst_paragraph(".. contents:: :depth: 3\n")) + + if "doc" in obj: + lines.append(self.fmt.rst_subtitle("Summary")) + lines.append(self.fmt.rst_paragraph(obj["doc"], 0)) + + # Operations + if "operations" in obj: + lines.append(self.fmt.rst_subtitle("Operations")) + lines.append(self.parse_operations(obj["operations"]["list"], + family)) + + # Multicast groups + if "mcast-groups" in obj: + lines.append(self.fmt.rst_subtitle("Multicast groups")) + lines.append(self.parse_mcast_group(obj["mcast-groups"]["list"])) + + # Definitions + if "definitions" in obj: + lines.append(self.fmt.rst_subtitle("Definitions")) + lines.append(self.parse_definitions(obj["definitions"], family)) + + # Attributes set + if "attribute-sets" in obj: + lines.append(self.fmt.rst_subtitle("Attribute sets")) + lines.append(self.parse_attr_sets(obj["attribute-sets"], family)) + + # Sub-messages + if "sub-messages" in obj: + lines.append(self.fmt.rst_subtitle("Sub-messages")) + lines.append(self.parse_sub_messages(obj["sub-messages"], family)) + + return "\n".join(lines) + + # Main functions + # ============== + + def parse_yaml_file(self, filename: str) -> str: + """Transform the YAML specified by filename into an RST-formatted string""" + with open(filename, "r", encoding="utf-8") as spec_file: + numbered_yaml = yaml.load(spec_file, Loader=NumberedSafeLoader) + content = self.parse_yaml(numbered_yaml) + + return content diff --git a/tools/net/ynl/pyynl/lib/nlspec.py b/tools/net/ynl/pyynl/lib/nlspec.py index 314ec8007496..85c17fe01e35 100644 --- a/tools/net/ynl/pyynl/lib/nlspec.py +++ b/tools/net/ynl/pyynl/lib/nlspec.py @@ -501,7 +501,7 @@ class SpecFamily(SpecElement): return SpecStruct(self, elem) def new_sub_message(self, elem): - return SpecSubMessage(self, elem); + return SpecSubMessage(self, elem) def new_operation(self, elem, req_val, rsp_val): return SpecOperation(self, elem, req_val, rsp_val) diff --git a/tools/net/ynl/pyynl/lib/ynl.py b/tools/net/ynl/pyynl/lib/ynl.py index 8244a5f440b2..62383c70ebb9 100644 --- a/tools/net/ynl/pyynl/lib/ynl.py +++ b/tools/net/ynl/pyynl/lib/ynl.py @@ -9,7 +9,6 @@ import socket import struct from struct import Struct import sys -import yaml import ipaddress import uuid import queue @@ -562,11 +561,13 @@ class YnlFamily(SpecFamily): if attr["type"] == 'nest': nl_type |= Netlink.NLA_F_NESTED - attr_payload = b'' sub_space = attr['nested-attributes'] - sub_attrs = SpaceAttrs(self.attr_sets[sub_space], value, search_attrs) - for subname, subvalue in value.items(): - attr_payload += self._add_attr(sub_space, subname, subvalue, sub_attrs) + attr_payload = self._add_nest_attrs(value, sub_space, search_attrs) + elif attr['type'] == 'indexed-array' and attr['sub-type'] == 'nest': + nl_type |= Netlink.NLA_F_NESTED + sub_space = attr['nested-attributes'] + attr_payload = self._encode_indexed_array(value, sub_space, + search_attrs) elif attr["type"] == 'flag': if not value: # If value is absent or false then skip attribute creation. @@ -620,9 +621,28 @@ class YnlFamily(SpecFamily): else: raise Exception(f'Unknown type at {space} {name} {value} {attr["type"]}') + return self._add_attr_raw(nl_type, attr_payload) + + def _add_attr_raw(self, nl_type, attr_payload): pad = b'\x00' * ((4 - len(attr_payload) % 4) % 4) return struct.pack('HH', len(attr_payload) + 4, nl_type) + attr_payload + pad + def _add_nest_attrs(self, value, sub_space, search_attrs): + sub_attrs = SpaceAttrs(self.attr_sets[sub_space], value, search_attrs) + attr_payload = b'' + for subname, subvalue in value.items(): + attr_payload += self._add_attr(sub_space, subname, subvalue, + sub_attrs) + return attr_payload + + def _encode_indexed_array(self, vals, sub_space, search_attrs): + attr_payload = b'' + for i, val in enumerate(vals): + idx = i | Netlink.NLA_F_NESTED + val_payload = self._add_nest_attrs(val, sub_space, search_attrs) + attr_payload += self._add_attr_raw(idx, val_payload) + return attr_payload + def _get_enum_or_unknown(self, enum, raw): try: name = enum.entries_by_val[raw].name @@ -706,7 +726,7 @@ class YnlFamily(SpecFamily): return attr.as_bin() def _rsp_add(self, rsp, name, is_multi, decoded): - if is_multi == None: + if is_multi is None: if name in rsp and type(rsp[name]) is not list: rsp[name] = [rsp[name]] is_multi = True @@ -739,14 +759,14 @@ class YnlFamily(SpecFamily): decoded = {} offset = 0 if msg_format.fixed_header: - decoded.update(self._decode_struct(attr.raw, msg_format.fixed_header)); + decoded.update(self._decode_struct(attr.raw, msg_format.fixed_header)) offset = self._struct_size(msg_format.fixed_header) if msg_format.attr_set: if msg_format.attr_set in self.attr_sets: subdict = self._decode(NlAttrs(attr.raw, offset), msg_format.attr_set) decoded.update(subdict) else: - raise Exception(f"Unknown attribute-set '{attr_space}' when decoding '{attr_spec.name}'") + raise Exception(f"Unknown attribute-set '{msg_format.attr_set}' when decoding '{attr_spec.name}'") return decoded def _decode(self, attrs, space, outer_attrs = None): @@ -936,7 +956,7 @@ class YnlFamily(SpecFamily): formatted = hex(raw) else: formatted = bytes.hex(raw, ' ') - elif display_hint in [ 'ipv4', 'ipv6' ]: + elif display_hint in [ 'ipv4', 'ipv6', 'ipv4-or-v6' ]: formatted = format(ipaddress.ip_address(raw)) elif display_hint == 'uuid': formatted = str(uuid.UUID(bytes=raw)) @@ -945,12 +965,17 @@ class YnlFamily(SpecFamily): return formatted def _from_string(self, string, attr_spec): - if attr_spec.display_hint in ['ipv4', 'ipv6']: + if attr_spec.display_hint in ['ipv4', 'ipv6', 'ipv4-or-v6']: ip = ipaddress.ip_address(string) if attr_spec['type'] == 'binary': raw = ip.packed else: raw = int(ip) + elif attr_spec.display_hint == 'hex': + if attr_spec['type'] == 'binary': + raw = bytes.fromhex(string) + else: + raw = int(string, 16) else: raise Exception(f"Display hint '{attr_spec.display_hint}' not implemented" f" when parsing '{attr_spec['name']}'") |