1# SPDX-License-Identifier: BSD-3-Clause 2 3import collections 4import importlib 5import os 6import traceback 7import yaml 8 9 10# To be loaded dynamically as needed 11jsonschema = None 12 13 14class SpecElement: 15 """Netlink spec element. 16 17 Abstract element of the Netlink spec. Implements the dictionary interface 18 for access to the raw spec. Supports iterative resolution of dependencies 19 across elements and class inheritance levels. The elements of the spec 20 may refer to each other, and although loops should be very rare, having 21 to maintain correct ordering of instantiation is painful, so the resolve() 22 method should be used to perform parts of init which require access to 23 other parts of the spec. 24 25 Attributes: 26 yaml raw spec as loaded from the spec file 27 family back reference to the full family 28 29 name name of the entity as listed in the spec (optional) 30 ident_name name which can be safely used as identifier in code (optional) 31 """ 32 def __init__(self, family, yaml): 33 self.yaml = yaml 34 self.family = family 35 36 if 'name' in self.yaml: 37 self.name = self.yaml['name'] 38 self.ident_name = self.name.replace('-', '_') 39 40 self._super_resolved = False 41 family.add_unresolved(self) 42 43 def __getitem__(self, key): 44 return self.yaml[key] 45 46 def __contains__(self, key): 47 return key in self.yaml 48 49 def get(self, key, default=None): 50 return self.yaml.get(key, default) 51 52 def resolve_up(self, up): 53 if not self._super_resolved: 54 up.resolve() 55 self._super_resolved = True 56 57 def resolve(self): 58 pass 59 60 61class SpecAttr(SpecElement): 62 """ Single Netlink atttribute type 63 64 Represents a single attribute type within an attr space. 65 66 Attributes: 67 value numerical ID when serialized 68 attr_set Attribute Set containing this attr 69 """ 70 def __init__(self, family, attr_set, yaml, value): 71 super().__init__(family, yaml) 72 73 self.value = value 74 self.attr_set = attr_set 75 self.is_multi = yaml.get('multi-attr', False) 76 77 78class SpecAttrSet(SpecElement): 79 """ Netlink Attribute Set class. 80 81 Represents a ID space of attributes within Netlink. 82 83 Note that unlike other elements, which expose contents of the raw spec 84 via the dictionary interface Attribute Set exposes attributes by name. 85 86 Attributes: 87 attrs ordered dict of all attributes (indexed by name) 88 attrs_by_val ordered dict of all attributes (indexed by value) 89 subset_of parent set if this is a subset, otherwise None 90 """ 91 def __init__(self, family, yaml): 92 super().__init__(family, yaml) 93 94 self.subset_of = self.yaml.get('subset-of', None) 95 96 self.attrs = collections.OrderedDict() 97 self.attrs_by_val = collections.OrderedDict() 98 99 val = 0 100 for elem in self.yaml['attributes']: 101 if 'value' in elem: 102 val = elem['value'] 103 104 attr = self.new_attr(elem, val) 105 self.attrs[attr.name] = attr 106 self.attrs_by_val[attr.value] = attr 107 val += 1 108 109 def new_attr(self, elem, value): 110 return SpecAttr(self.family, self, elem, value) 111 112 def __getitem__(self, key): 113 return self.attrs[key] 114 115 def __contains__(self, key): 116 return key in self.attrs 117 118 def __iter__(self): 119 yield from self.attrs 120 121 def items(self): 122 return self.attrs.items() 123 124 125class SpecOperation(SpecElement): 126 """Netlink Operation 127 128 Information about a single Netlink operation. 129 130 Attributes: 131 value numerical ID when serialized, None if req/rsp values differ 132 133 req_value numerical ID when serialized, user -> kernel 134 rsp_value numerical ID when serialized, user <- kernel 135 is_call bool, whether the operation is a call 136 is_async bool, whether the operation is a notification 137 is_resv bool, whether the operation does not exist (it's just a reserved ID) 138 attr_set attribute set name 139 140 yaml raw spec as loaded from the spec file 141 """ 142 def __init__(self, family, yaml, req_value, rsp_value): 143 super().__init__(family, yaml) 144 145 self.value = req_value if req_value == rsp_value else None 146 self.req_value = req_value 147 self.rsp_value = rsp_value 148 149 self.is_call = 'do' in yaml or 'dump' in yaml 150 self.is_async = 'notify' in yaml or 'event' in yaml 151 self.is_resv = not self.is_async and not self.is_call 152 153 # Added by resolve: 154 self.attr_set = None 155 delattr(self, "attr_set") 156 157 def resolve(self): 158 self.resolve_up(super()) 159 160 if 'attribute-set' in self.yaml: 161 attr_set_name = self.yaml['attribute-set'] 162 elif 'notify' in self.yaml: 163 msg = self.family.msgs[self.yaml['notify']] 164 attr_set_name = msg['attribute-set'] 165 elif self.is_resv: 166 attr_set_name = '' 167 else: 168 raise Exception(f"Can't resolve attribute set for op '{self.name}'") 169 if attr_set_name: 170 self.attr_set = self.family.attr_sets[attr_set_name] 171 172 173class SpecFamily(SpecElement): 174 """ Netlink Family Spec class. 175 176 Netlink family information loaded from a spec (e.g. in YAML). 177 Takes care of unfolding implicit information which can be skipped 178 in the spec itself for brevity. 179 180 The class can be used like a dictionary to access the raw spec 181 elements but that's usually a bad idea. 182 183 Attributes: 184 proto protocol type (e.g. genetlink) 185 186 attr_sets dict of attribute sets 187 msgs dict of all messages (index by name) 188 msgs_by_value dict of all messages (indexed by name) 189 ops dict of all valid requests / responses 190 """ 191 def __init__(self, spec_path, schema_path=None): 192 with open(spec_path, "r") as stream: 193 spec = yaml.safe_load(stream) 194 195 self._resolution_list = [] 196 197 super().__init__(self, spec) 198 199 self.proto = self.yaml.get('protocol', 'genetlink') 200 201 if schema_path is None: 202 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml' 203 if schema_path: 204 global jsonschema 205 206 with open(schema_path, "r") as stream: 207 schema = yaml.safe_load(stream) 208 209 if jsonschema is None: 210 jsonschema = importlib.import_module("jsonschema") 211 212 jsonschema.validate(self.yaml, schema) 213 214 self.attr_sets = collections.OrderedDict() 215 self.msgs = collections.OrderedDict() 216 self.req_by_value = collections.OrderedDict() 217 self.rsp_by_value = collections.OrderedDict() 218 self.ops = collections.OrderedDict() 219 220 last_exception = None 221 while len(self._resolution_list) > 0: 222 resolved = [] 223 unresolved = self._resolution_list 224 self._resolution_list = [] 225 226 for elem in unresolved: 227 try: 228 elem.resolve() 229 except (KeyError, AttributeError) as e: 230 self._resolution_list.append(elem) 231 last_exception = e 232 continue 233 234 resolved.append(elem) 235 236 if len(resolved) == 0: 237 traceback.print_exception(last_exception) 238 raise Exception("Could not resolve any spec element, infinite loop?") 239 240 def new_attr_set(self, elem): 241 return SpecAttrSet(self, elem) 242 243 def new_operation(self, elem, req_val, rsp_val): 244 return SpecOperation(self, elem, req_val, rsp_val) 245 246 def add_unresolved(self, elem): 247 self._resolution_list.append(elem) 248 249 def _dictify_ops_unified(self): 250 val = 0 251 for elem in self.yaml['operations']['list']: 252 if 'value' in elem: 253 val = elem['value'] 254 255 op = self.new_operation(elem, val, val) 256 val += 1 257 258 self.msgs[op.name] = op 259 260 def _dictify_ops_directional(self): 261 req_val = rsp_val = 0 262 for elem in self.yaml['operations']['list']: 263 if 'notify' in elem: 264 if 'value' in elem: 265 rsp_val = elem['value'] 266 req_val_next = req_val 267 rsp_val_next = rsp_val + 1 268 req_val = None 269 elif 'do' in elem or 'dump' in elem: 270 mode = elem['do'] if 'do' in elem else elem['dump'] 271 272 v = mode.get('request', {}).get('value', None) 273 if v: 274 req_val = v 275 v = mode.get('reply', {}).get('value', None) 276 if v: 277 rsp_val = v 278 279 rsp_inc = 1 if 'reply' in mode else 0 280 req_val_next = req_val + 1 281 rsp_val_next = rsp_val + rsp_inc 282 else: 283 raise Exception("Can't parse directional ops") 284 285 op = self.new_operation(elem, req_val, rsp_val) 286 req_val = req_val_next 287 rsp_val = rsp_val_next 288 289 self.msgs[op.name] = op 290 291 def resolve(self): 292 self.resolve_up(super()) 293 294 for elem in self.yaml['attribute-sets']: 295 attr_set = self.new_attr_set(elem) 296 self.attr_sets[elem['name']] = attr_set 297 298 msg_id_model = self.yaml['operations'].get('enum-model', 'unified') 299 if msg_id_model == 'unified': 300 self._dictify_ops_unified() 301 elif msg_id_model == 'directional': 302 self._dictify_ops_directional() 303 304 for op in self.msgs.values(): 305 if op.req_value is not None: 306 self.req_by_value[op.req_value] = op 307 if op.rsp_value is not None: 308 self.rsp_by_value[op.rsp_value] = op 309 if not op.is_async and 'attribute-set' in op: 310 self.ops[op.name] = op 311