1# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause 2 3import collections 4import importlib 5import os 6import yaml 7 8 9# To be loaded dynamically as needed 10jsonschema = None 11 12 13class SpecElement: 14 """Netlink spec element. 15 16 Abstract element of the Netlink spec. Implements the dictionary interface 17 for access to the raw spec. Supports iterative resolution of dependencies 18 across elements and class inheritance levels. The elements of the spec 19 may refer to each other, and although loops should be very rare, having 20 to maintain correct ordering of instantiation is painful, so the resolve() 21 method should be used to perform parts of init which require access to 22 other parts of the spec. 23 24 Attributes: 25 yaml raw spec as loaded from the spec file 26 family back reference to the full family 27 28 name name of the entity as listed in the spec (optional) 29 ident_name name which can be safely used as identifier in code (optional) 30 """ 31 def __init__(self, family, yaml): 32 self.yaml = yaml 33 self.family = family 34 35 if 'name' in self.yaml: 36 self.name = self.yaml['name'] 37 self.ident_name = self.name.replace('-', '_') 38 39 self._super_resolved = False 40 family.add_unresolved(self) 41 42 def __getitem__(self, key): 43 return self.yaml[key] 44 45 def __contains__(self, key): 46 return key in self.yaml 47 48 def get(self, key, default=None): 49 return self.yaml.get(key, default) 50 51 def resolve_up(self, up): 52 if not self._super_resolved: 53 up.resolve() 54 self._super_resolved = True 55 56 def resolve(self): 57 pass 58 59 60class SpecEnumEntry(SpecElement): 61 """ Entry within an enum declared in the Netlink spec. 62 63 Attributes: 64 doc documentation string 65 enum_set back reference to the enum 66 value numerical value of this enum (use accessors in most situations!) 67 68 Methods: 69 raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags 70 user_value user value, same as raw value for enums, for flags it's the mask 71 """ 72 def __init__(self, enum_set, yaml, prev, value_start): 73 if isinstance(yaml, str): 74 yaml = {'name': yaml} 75 super().__init__(enum_set.family, yaml) 76 77 self.doc = yaml.get('doc', '') 78 self.enum_set = enum_set 79 80 if 'value' in yaml: 81 self.value = yaml['value'] 82 elif prev: 83 self.value = prev.value + 1 84 else: 85 self.value = value_start 86 87 def has_doc(self): 88 return bool(self.doc) 89 90 def raw_value(self): 91 return self.value 92 93 def user_value(self): 94 if self.enum_set['type'] == 'flags': 95 return 1 << self.value 96 else: 97 return self.value 98 99 100class SpecEnumSet(SpecElement): 101 """ Enum type 102 103 Represents an enumeration (list of numerical constants) 104 as declared in the "definitions" section of the spec. 105 106 Attributes: 107 type enum or flags 108 entries entries by name 109 entries_by_val entries by value 110 Methods: 111 get_mask for flags compute the mask of all defined values 112 """ 113 def __init__(self, family, yaml): 114 super().__init__(family, yaml) 115 116 self.type = yaml['type'] 117 118 prev_entry = None 119 value_start = self.yaml.get('value-start', 0) 120 self.entries = dict() 121 self.entries_by_val = dict() 122 for entry in self.yaml['entries']: 123 e = self.new_entry(entry, prev_entry, value_start) 124 self.entries[e.name] = e 125 self.entries_by_val[e.raw_value()] = e 126 prev_entry = e 127 128 def new_entry(self, entry, prev_entry, value_start): 129 return SpecEnumEntry(self, entry, prev_entry, value_start) 130 131 def has_doc(self): 132 if 'doc' in self.yaml: 133 return True 134 for entry in self.entries.values(): 135 if entry.has_doc(): 136 return True 137 return False 138 139 def get_mask(self): 140 mask = 0 141 for e in self.entries.values(): 142 mask += e.user_value() 143 return mask 144 145 146class SpecAttr(SpecElement): 147 """ Single Netlink atttribute type 148 149 Represents a single attribute type within an attr space. 150 151 Attributes: 152 value numerical ID when serialized 153 attr_set Attribute Set containing this attr 154 """ 155 def __init__(self, family, attr_set, yaml, value): 156 super().__init__(family, yaml) 157 158 self.value = value 159 self.attr_set = attr_set 160 self.is_multi = yaml.get('multi-attr', False) 161 162 163class SpecAttrSet(SpecElement): 164 """ Netlink Attribute Set class. 165 166 Represents a ID space of attributes within Netlink. 167 168 Note that unlike other elements, which expose contents of the raw spec 169 via the dictionary interface Attribute Set exposes attributes by name. 170 171 Attributes: 172 attrs ordered dict of all attributes (indexed by name) 173 attrs_by_val ordered dict of all attributes (indexed by value) 174 subset_of parent set if this is a subset, otherwise None 175 """ 176 def __init__(self, family, yaml): 177 super().__init__(family, yaml) 178 179 self.subset_of = self.yaml.get('subset-of', None) 180 181 self.attrs = collections.OrderedDict() 182 self.attrs_by_val = collections.OrderedDict() 183 184 if self.subset_of is None: 185 val = 1 186 for elem in self.yaml['attributes']: 187 if 'value' in elem: 188 val = elem['value'] 189 190 attr = self.new_attr(elem, val) 191 self.attrs[attr.name] = attr 192 self.attrs_by_val[attr.value] = attr 193 val += 1 194 else: 195 real_set = family.attr_sets[self.subset_of] 196 for elem in self.yaml['attributes']: 197 attr = real_set[elem['name']] 198 self.attrs[attr.name] = attr 199 self.attrs_by_val[attr.value] = attr 200 201 def new_attr(self, elem, value): 202 return SpecAttr(self.family, self, elem, value) 203 204 def __getitem__(self, key): 205 return self.attrs[key] 206 207 def __contains__(self, key): 208 return key in self.attrs 209 210 def __iter__(self): 211 yield from self.attrs 212 213 def items(self): 214 return self.attrs.items() 215 216 217class SpecOperation(SpecElement): 218 """Netlink Operation 219 220 Information about a single Netlink operation. 221 222 Attributes: 223 value numerical ID when serialized, None if req/rsp values differ 224 225 req_value numerical ID when serialized, user -> kernel 226 rsp_value numerical ID when serialized, user <- kernel 227 is_call bool, whether the operation is a call 228 is_async bool, whether the operation is a notification 229 is_resv bool, whether the operation does not exist (it's just a reserved ID) 230 attr_set attribute set name 231 232 yaml raw spec as loaded from the spec file 233 """ 234 def __init__(self, family, yaml, req_value, rsp_value): 235 super().__init__(family, yaml) 236 237 self.value = req_value if req_value == rsp_value else None 238 self.req_value = req_value 239 self.rsp_value = rsp_value 240 241 self.is_call = 'do' in yaml or 'dump' in yaml 242 self.is_async = 'notify' in yaml or 'event' in yaml 243 self.is_resv = not self.is_async and not self.is_call 244 245 # Added by resolve: 246 self.attr_set = None 247 delattr(self, "attr_set") 248 249 def resolve(self): 250 self.resolve_up(super()) 251 252 if 'attribute-set' in self.yaml: 253 attr_set_name = self.yaml['attribute-set'] 254 elif 'notify' in self.yaml: 255 msg = self.family.msgs[self.yaml['notify']] 256 attr_set_name = msg['attribute-set'] 257 elif self.is_resv: 258 attr_set_name = '' 259 else: 260 raise Exception(f"Can't resolve attribute set for op '{self.name}'") 261 if attr_set_name: 262 self.attr_set = self.family.attr_sets[attr_set_name] 263 264 265class SpecFamily(SpecElement): 266 """ Netlink Family Spec class. 267 268 Netlink family information loaded from a spec (e.g. in YAML). 269 Takes care of unfolding implicit information which can be skipped 270 in the spec itself for brevity. 271 272 The class can be used like a dictionary to access the raw spec 273 elements but that's usually a bad idea. 274 275 Attributes: 276 proto protocol type (e.g. genetlink) 277 license spec license (loaded from an SPDX tag on the spec) 278 279 attr_sets dict of attribute sets 280 msgs dict of all messages (index by name) 281 msgs_by_value dict of all messages (indexed by name) 282 ops dict of all valid requests / responses 283 consts dict of all constants/enums 284 """ 285 def __init__(self, spec_path, schema_path=None): 286 with open(spec_path, "r") as stream: 287 prefix = '# SPDX-License-Identifier: ' 288 first = stream.readline().strip() 289 if not first.startswith(prefix): 290 raise Exception('SPDX license tag required in the spec') 291 self.license = first[len(prefix):] 292 293 stream.seek(0) 294 spec = yaml.safe_load(stream) 295 296 self._resolution_list = [] 297 298 super().__init__(self, spec) 299 300 self.proto = self.yaml.get('protocol', 'genetlink') 301 302 if schema_path is None: 303 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml' 304 if schema_path: 305 global jsonschema 306 307 with open(schema_path, "r") as stream: 308 schema = yaml.safe_load(stream) 309 310 if jsonschema is None: 311 jsonschema = importlib.import_module("jsonschema") 312 313 jsonschema.validate(self.yaml, schema) 314 315 self.attr_sets = collections.OrderedDict() 316 self.msgs = collections.OrderedDict() 317 self.req_by_value = collections.OrderedDict() 318 self.rsp_by_value = collections.OrderedDict() 319 self.ops = collections.OrderedDict() 320 self.consts = collections.OrderedDict() 321 322 last_exception = None 323 while len(self._resolution_list) > 0: 324 resolved = [] 325 unresolved = self._resolution_list 326 self._resolution_list = [] 327 328 for elem in unresolved: 329 try: 330 elem.resolve() 331 except (KeyError, AttributeError) as e: 332 self._resolution_list.append(elem) 333 last_exception = e 334 continue 335 336 resolved.append(elem) 337 338 if len(resolved) == 0: 339 raise last_exception 340 341 def new_enum(self, elem): 342 return SpecEnumSet(self, elem) 343 344 def new_attr_set(self, elem): 345 return SpecAttrSet(self, elem) 346 347 def new_operation(self, elem, req_val, rsp_val): 348 return SpecOperation(self, elem, req_val, rsp_val) 349 350 def add_unresolved(self, elem): 351 self._resolution_list.append(elem) 352 353 def _dictify_ops_unified(self): 354 val = 1 355 for elem in self.yaml['operations']['list']: 356 if 'value' in elem: 357 val = elem['value'] 358 359 op = self.new_operation(elem, val, val) 360 val += 1 361 362 self.msgs[op.name] = op 363 364 def _dictify_ops_directional(self): 365 req_val = rsp_val = 1 366 for elem in self.yaml['operations']['list']: 367 if 'notify' in elem: 368 if 'value' in elem: 369 rsp_val = elem['value'] 370 req_val_next = req_val 371 rsp_val_next = rsp_val + 1 372 req_val = None 373 elif 'do' in elem or 'dump' in elem: 374 mode = elem['do'] if 'do' in elem else elem['dump'] 375 376 v = mode.get('request', {}).get('value', None) 377 if v: 378 req_val = v 379 v = mode.get('reply', {}).get('value', None) 380 if v: 381 rsp_val = v 382 383 rsp_inc = 1 if 'reply' in mode else 0 384 req_val_next = req_val + 1 385 rsp_val_next = rsp_val + rsp_inc 386 else: 387 raise Exception("Can't parse directional ops") 388 389 op = self.new_operation(elem, req_val, rsp_val) 390 req_val = req_val_next 391 rsp_val = rsp_val_next 392 393 self.msgs[op.name] = op 394 395 def resolve(self): 396 self.resolve_up(super()) 397 398 definitions = self.yaml.get('definitions', []) 399 for elem in definitions: 400 if elem['type'] == 'enum' or elem['type'] == 'flags': 401 self.consts[elem['name']] = self.new_enum(elem) 402 else: 403 self.consts[elem['name']] = elem 404 405 for elem in self.yaml['attribute-sets']: 406 attr_set = self.new_attr_set(elem) 407 self.attr_sets[elem['name']] = attr_set 408 409 msg_id_model = self.yaml['operations'].get('enum-model', 'unified') 410 if msg_id_model == 'unified': 411 self._dictify_ops_unified() 412 elif msg_id_model == 'directional': 413 self._dictify_ops_directional() 414 415 for op in self.msgs.values(): 416 if op.req_value is not None: 417 self.req_by_value[op.req_value] = op 418 if op.rsp_value is not None: 419 self.rsp_by_value[op.rsp_value] = op 420 if not op.is_async and 'attribute-set' in op: 421 self.ops[op.name] = op 422