1# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 3import collections 4import importlib 5import os 6import yaml 7 8 9# To be loaded dynamically as needed 10jsonschema = None 11 12 13class SpecElement: 14 """Netlink spec element. 15 16 Abstract element of the Netlink spec. Implements the dictionary interface 17 for access to the raw spec. Supports iterative resolution of dependencies 18 across elements and class inheritance levels. The elements of the spec 19 may refer to each other, and although loops should be very rare, having 20 to maintain correct ordering of instantiation is painful, so the resolve() 21 method should be used to perform parts of init which require access to 22 other parts of the spec. 23 24 Attributes: 25 yaml raw spec as loaded from the spec file 26 family back reference to the full family 27 28 name name of the entity as listed in the spec (optional) 29 ident_name name which can be safely used as identifier in code (optional) 30 """ 31 def __init__(self, family, yaml): 32 self.yaml = yaml 33 self.family = family 34 35 if 'name' in self.yaml: 36 self.name = self.yaml['name'] 37 self.ident_name = self.name.replace('-', '_') 38 39 self._super_resolved = False 40 family.add_unresolved(self) 41 42 def __getitem__(self, key): 43 return self.yaml[key] 44 45 def __contains__(self, key): 46 return key in self.yaml 47 48 def get(self, key, default=None): 49 return self.yaml.get(key, default) 50 51 def resolve_up(self, up): 52 if not self._super_resolved: 53 up.resolve() 54 self._super_resolved = True 55 56 def resolve(self): 57 pass 58 59 60class SpecEnumEntry(SpecElement): 61 """ Entry within an enum declared in the Netlink spec. 62 63 Attributes: 64 doc documentation string 65 enum_set back reference to the enum 66 value numerical value of this enum (use accessors in most situations!) 67 68 Methods: 69 raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags 70 user_value user value, same as raw value for enums, for flags it's the mask 71 """ 72 def __init__(self, enum_set, yaml, prev, value_start): 73 if isinstance(yaml, str): 74 yaml = {'name': yaml} 75 super().__init__(enum_set.family, yaml) 76 77 self.doc = yaml.get('doc', '') 78 self.enum_set = enum_set 79 80 if 'value' in yaml: 81 self.value = yaml['value'] 82 elif prev: 83 self.value = prev.value + 1 84 else: 85 self.value = value_start 86 87 def has_doc(self): 88 return bool(self.doc) 89 90 def raw_value(self): 91 return self.value 92 93 def user_value(self): 94 if self.enum_set['type'] == 'flags': 95 return 1 << self.value 96 else: 97 return self.value 98 99 100class SpecEnumSet(SpecElement): 101 """ Enum type 102 103 Represents an enumeration (list of numerical constants) 104 as declared in the "definitions" section of the spec. 105 106 Attributes: 107 type enum or flags 108 entries entries by name 109 entries_by_val entries by value 110 Methods: 111 get_mask for flags compute the mask of all defined values 112 """ 113 def __init__(self, family, yaml): 114 super().__init__(family, yaml) 115 116 self.type = yaml['type'] 117 118 prev_entry = None 119 value_start = self.yaml.get('value-start', 0) 120 self.entries = dict() 121 self.entries_by_val = dict() 122 for entry in self.yaml['entries']: 123 e = self.new_entry(entry, prev_entry, value_start) 124 self.entries[e.name] = e 125 self.entries_by_val[e.raw_value()] = e 126 prev_entry = e 127 128 def new_entry(self, entry, prev_entry, value_start): 129 return SpecEnumEntry(self, entry, prev_entry, value_start) 130 131 def has_doc(self): 132 if 'doc' in self.yaml: 133 return True 134 for entry in self.entries.values(): 135 if entry.has_doc(): 136 return True 137 return False 138 139 def get_mask(self): 140 mask = 0 141 idx = self.yaml.get('value-start', 0) 142 for _ in self.entries.values(): 143 mask |= 1 << idx 144 idx += 1 145 return mask 146 147 148class SpecAttr(SpecElement): 149 """ Single Netlink atttribute type 150 151 Represents a single attribute type within an attr space. 152 153 Attributes: 154 value numerical ID when serialized 155 attr_set Attribute Set containing this attr 156 """ 157 def __init__(self, family, attr_set, yaml, value): 158 super().__init__(family, yaml) 159 160 self.value = value 161 self.attr_set = attr_set 162 self.is_multi = yaml.get('multi-attr', False) 163 164 165class SpecAttrSet(SpecElement): 166 """ Netlink Attribute Set class. 167 168 Represents a ID space of attributes within Netlink. 169 170 Note that unlike other elements, which expose contents of the raw spec 171 via the dictionary interface Attribute Set exposes attributes by name. 172 173 Attributes: 174 attrs ordered dict of all attributes (indexed by name) 175 attrs_by_val ordered dict of all attributes (indexed by value) 176 subset_of parent set if this is a subset, otherwise None 177 """ 178 def __init__(self, family, yaml): 179 super().__init__(family, yaml) 180 181 self.subset_of = self.yaml.get('subset-of', None) 182 183 self.attrs = collections.OrderedDict() 184 self.attrs_by_val = collections.OrderedDict() 185 186 if self.subset_of is None: 187 val = 1 188 for elem in self.yaml['attributes']: 189 if 'value' in elem: 190 val = elem['value'] 191 192 attr = self.new_attr(elem, val) 193 self.attrs[attr.name] = attr 194 self.attrs_by_val[attr.value] = attr 195 val += 1 196 else: 197 real_set = family.attr_sets[self.subset_of] 198 for elem in self.yaml['attributes']: 199 attr = real_set[elem['name']] 200 self.attrs[attr.name] = attr 201 self.attrs_by_val[attr.value] = attr 202 203 def new_attr(self, elem, value): 204 return SpecAttr(self.family, self, elem, value) 205 206 def __getitem__(self, key): 207 return self.attrs[key] 208 209 def __contains__(self, key): 210 return key in self.attrs 211 212 def __iter__(self): 213 yield from self.attrs 214 215 def items(self): 216 return self.attrs.items() 217 218 219class SpecOperation(SpecElement): 220 """Netlink Operation 221 222 Information about a single Netlink operation. 223 224 Attributes: 225 value numerical ID when serialized, None if req/rsp values differ 226 227 req_value numerical ID when serialized, user -> kernel 228 rsp_value numerical ID when serialized, user <- kernel 229 is_call bool, whether the operation is a call 230 is_async bool, whether the operation is a notification 231 is_resv bool, whether the operation does not exist (it's just a reserved ID) 232 attr_set attribute set name 233 234 yaml raw spec as loaded from the spec file 235 """ 236 def __init__(self, family, yaml, req_value, rsp_value): 237 super().__init__(family, yaml) 238 239 self.value = req_value if req_value == rsp_value else None 240 self.req_value = req_value 241 self.rsp_value = rsp_value 242 243 self.is_call = 'do' in yaml or 'dump' in yaml 244 self.is_async = 'notify' in yaml or 'event' in yaml 245 self.is_resv = not self.is_async and not self.is_call 246 247 # Added by resolve: 248 self.attr_set = None 249 delattr(self, "attr_set") 250 251 def resolve(self): 252 self.resolve_up(super()) 253 254 if 'attribute-set' in self.yaml: 255 attr_set_name = self.yaml['attribute-set'] 256 elif 'notify' in self.yaml: 257 msg = self.family.msgs[self.yaml['notify']] 258 attr_set_name = msg['attribute-set'] 259 elif self.is_resv: 260 attr_set_name = '' 261 else: 262 raise Exception(f"Can't resolve attribute set for op '{self.name}'") 263 if attr_set_name: 264 self.attr_set = self.family.attr_sets[attr_set_name] 265 266 267class SpecFamily(SpecElement): 268 """ Netlink Family Spec class. 269 270 Netlink family information loaded from a spec (e.g. in YAML). 271 Takes care of unfolding implicit information which can be skipped 272 in the spec itself for brevity. 273 274 The class can be used like a dictionary to access the raw spec 275 elements but that's usually a bad idea. 276 277 Attributes: 278 proto protocol type (e.g. genetlink) 279 280 attr_sets dict of attribute sets 281 msgs dict of all messages (index by name) 282 msgs_by_value dict of all messages (indexed by name) 283 ops dict of all valid requests / responses 284 consts dict of all constants/enums 285 """ 286 def __init__(self, spec_path, schema_path=None): 287 with open(spec_path, "r") as stream: 288 spec = yaml.safe_load(stream) 289 290 self._resolution_list = [] 291 292 super().__init__(self, spec) 293 294 self.proto = self.yaml.get('protocol', 'genetlink') 295 296 if schema_path is None: 297 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml' 298 if schema_path: 299 global jsonschema 300 301 with open(schema_path, "r") as stream: 302 schema = yaml.safe_load(stream) 303 304 if jsonschema is None: 305 jsonschema = importlib.import_module("jsonschema") 306 307 jsonschema.validate(self.yaml, schema) 308 309 self.attr_sets = collections.OrderedDict() 310 self.msgs = collections.OrderedDict() 311 self.req_by_value = collections.OrderedDict() 312 self.rsp_by_value = collections.OrderedDict() 313 self.ops = collections.OrderedDict() 314 self.consts = collections.OrderedDict() 315 316 last_exception = None 317 while len(self._resolution_list) > 0: 318 resolved = [] 319 unresolved = self._resolution_list 320 self._resolution_list = [] 321 322 for elem in unresolved: 323 try: 324 elem.resolve() 325 except (KeyError, AttributeError) as e: 326 self._resolution_list.append(elem) 327 last_exception = e 328 continue 329 330 resolved.append(elem) 331 332 if len(resolved) == 0: 333 raise last_exception 334 335 def new_enum(self, elem): 336 return SpecEnumSet(self, elem) 337 338 def new_attr_set(self, elem): 339 return SpecAttrSet(self, elem) 340 341 def new_operation(self, elem, req_val, rsp_val): 342 return SpecOperation(self, elem, req_val, rsp_val) 343 344 def add_unresolved(self, elem): 345 self._resolution_list.append(elem) 346 347 def _dictify_ops_unified(self): 348 val = 1 349 for elem in self.yaml['operations']['list']: 350 if 'value' in elem: 351 val = elem['value'] 352 353 op = self.new_operation(elem, val, val) 354 val += 1 355 356 self.msgs[op.name] = op 357 358 def _dictify_ops_directional(self): 359 req_val = rsp_val = 1 360 for elem in self.yaml['operations']['list']: 361 if 'notify' in elem: 362 if 'value' in elem: 363 rsp_val = elem['value'] 364 req_val_next = req_val 365 rsp_val_next = rsp_val + 1 366 req_val = None 367 elif 'do' in elem or 'dump' in elem: 368 mode = elem['do'] if 'do' in elem else elem['dump'] 369 370 v = mode.get('request', {}).get('value', None) 371 if v: 372 req_val = v 373 v = mode.get('reply', {}).get('value', None) 374 if v: 375 rsp_val = v 376 377 rsp_inc = 1 if 'reply' in mode else 0 378 req_val_next = req_val + 1 379 rsp_val_next = rsp_val + rsp_inc 380 else: 381 raise Exception("Can't parse directional ops") 382 383 op = self.new_operation(elem, req_val, rsp_val) 384 req_val = req_val_next 385 rsp_val = rsp_val_next 386 387 self.msgs[op.name] = op 388 389 def resolve(self): 390 self.resolve_up(super()) 391 392 for elem in self.yaml['definitions']: 393 if elem['type'] == 'enum' or elem['type'] == 'flags': 394 self.consts[elem['name']] = self.new_enum(elem) 395 else: 396 self.consts[elem['name']] = elem 397 398 for elem in self.yaml['attribute-sets']: 399 attr_set = self.new_attr_set(elem) 400 self.attr_sets[elem['name']] = attr_set 401 402 msg_id_model = self.yaml['operations'].get('enum-model', 'unified') 403 if msg_id_model == 'unified': 404 self._dictify_ops_unified() 405 elif msg_id_model == 'directional': 406 self._dictify_ops_directional() 407 408 for op in self.msgs.values(): 409 if op.req_value is not None: 410 self.req_by_value[op.req_value] = op 411 if op.rsp_value is not None: 412 self.rsp_by_value[op.rsp_value] = op 413 if not op.is_async and 'attribute-set' in op: 414 self.ops[op.name] = op 415