1# SPDX-License-Identifier: BSD-3-Clause 2 3import collections 4import importlib 5import os 6import yaml 7 8 9# To be loaded dynamically as needed 10jsonschema = None 11 12 13class SpecElement: 14 """Netlink spec element. 15 16 Abstract element of the Netlink spec. Implements the dictionary interface 17 for access to the raw spec. Supports iterative resolution of dependencies 18 across elements and class inheritance levels. The elements of the spec 19 may refer to each other, and although loops should be very rare, having 20 to maintain correct ordering of instantiation is painful, so the resolve() 21 method should be used to perform parts of init which require access to 22 other parts of the spec. 23 24 Attributes: 25 yaml raw spec as loaded from the spec file 26 family back reference to the full family 27 28 name name of the entity as listed in the spec (optional) 29 ident_name name which can be safely used as identifier in code (optional) 30 """ 31 def __init__(self, family, yaml): 32 self.yaml = yaml 33 self.family = family 34 35 if 'name' in self.yaml: 36 self.name = self.yaml['name'] 37 self.ident_name = self.name.replace('-', '_') 38 39 self._super_resolved = False 40 family.add_unresolved(self) 41 42 def __getitem__(self, key): 43 return self.yaml[key] 44 45 def __contains__(self, key): 46 return key in self.yaml 47 48 def get(self, key, default=None): 49 return self.yaml.get(key, default) 50 51 def resolve_up(self, up): 52 if not self._super_resolved: 53 up.resolve() 54 self._super_resolved = True 55 56 def resolve(self): 57 pass 58 59 60class SpecAttr(SpecElement): 61 """ Single Netlink atttribute type 62 63 Represents a single attribute type within an attr space. 64 65 Attributes: 66 value numerical ID when serialized 67 attr_set Attribute Set containing this attr 68 """ 69 def __init__(self, family, attr_set, yaml, value): 70 super().__init__(family, yaml) 71 72 self.value = value 73 self.attr_set = attr_set 74 self.is_multi = yaml.get('multi-attr', False) 75 76 77class SpecAttrSet(SpecElement): 78 """ Netlink Attribute Set class. 79 80 Represents a ID space of attributes within Netlink. 81 82 Note that unlike other elements, which expose contents of the raw spec 83 via the dictionary interface Attribute Set exposes attributes by name. 84 85 Attributes: 86 attrs ordered dict of all attributes (indexed by name) 87 attrs_by_val ordered dict of all attributes (indexed by value) 88 subset_of parent set if this is a subset, otherwise None 89 """ 90 def __init__(self, family, yaml): 91 super().__init__(family, yaml) 92 93 self.subset_of = self.yaml.get('subset-of', None) 94 95 self.attrs = collections.OrderedDict() 96 self.attrs_by_val = collections.OrderedDict() 97 98 val = 0 99 for elem in self.yaml['attributes']: 100 if 'value' in elem: 101 val = elem['value'] 102 103 attr = self.new_attr(elem, val) 104 self.attrs[attr.name] = attr 105 self.attrs_by_val[attr.value] = attr 106 val += 1 107 108 def new_attr(self, elem, value): 109 return SpecAttr(self.family, self, elem, value) 110 111 def __getitem__(self, key): 112 return self.attrs[key] 113 114 def __contains__(self, key): 115 return key in self.attrs 116 117 def __iter__(self): 118 yield from self.attrs 119 120 def items(self): 121 return self.attrs.items() 122 123 124class SpecOperation(SpecElement): 125 """Netlink Operation 126 127 Information about a single Netlink operation. 128 129 Attributes: 130 value numerical ID when serialized, None if req/rsp values differ 131 132 req_value numerical ID when serialized, user -> kernel 133 rsp_value numerical ID when serialized, user <- kernel 134 is_call bool, whether the operation is a call 135 is_async bool, whether the operation is a notification 136 is_resv bool, whether the operation does not exist (it's just a reserved ID) 137 attr_set attribute set name 138 139 yaml raw spec as loaded from the spec file 140 """ 141 def __init__(self, family, yaml, req_value, rsp_value): 142 super().__init__(family, yaml) 143 144 self.value = req_value if req_value == rsp_value else None 145 self.req_value = req_value 146 self.rsp_value = rsp_value 147 148 self.is_call = 'do' in yaml or 'dump' in yaml 149 self.is_async = 'notify' in yaml or 'event' in yaml 150 self.is_resv = not self.is_async and not self.is_call 151 152 # Added by resolve: 153 self.attr_set = None 154 delattr(self, "attr_set") 155 156 def resolve(self): 157 self.resolve_up(super()) 158 159 if 'attribute-set' in self.yaml: 160 attr_set_name = self.yaml['attribute-set'] 161 elif 'notify' in self.yaml: 162 msg = self.family.msgs[self.yaml['notify']] 163 attr_set_name = msg['attribute-set'] 164 elif self.is_resv: 165 attr_set_name = '' 166 else: 167 raise Exception(f"Can't resolve attribute set for op '{self.name}'") 168 if attr_set_name: 169 self.attr_set = self.family.attr_sets[attr_set_name] 170 171 172class SpecFamily(SpecElement): 173 """ Netlink Family Spec class. 174 175 Netlink family information loaded from a spec (e.g. in YAML). 176 Takes care of unfolding implicit information which can be skipped 177 in the spec itself for brevity. 178 179 The class can be used like a dictionary to access the raw spec 180 elements but that's usually a bad idea. 181 182 Attributes: 183 proto protocol type (e.g. genetlink) 184 185 attr_sets dict of attribute sets 186 msgs dict of all messages (index by name) 187 msgs_by_value dict of all messages (indexed by name) 188 ops dict of all valid requests / responses 189 """ 190 def __init__(self, spec_path, schema_path=None): 191 with open(spec_path, "r") as stream: 192 spec = yaml.safe_load(stream) 193 194 self._resolution_list = [] 195 196 super().__init__(self, spec) 197 198 self.proto = self.yaml.get('protocol', 'genetlink') 199 200 if schema_path is None: 201 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml' 202 if schema_path: 203 global jsonschema 204 205 with open(schema_path, "r") as stream: 206 schema = yaml.safe_load(stream) 207 208 if jsonschema is None: 209 jsonschema = importlib.import_module("jsonschema") 210 211 jsonschema.validate(self.yaml, schema) 212 213 self.attr_sets = collections.OrderedDict() 214 self.msgs = collections.OrderedDict() 215 self.req_by_value = collections.OrderedDict() 216 self.rsp_by_value = collections.OrderedDict() 217 self.ops = collections.OrderedDict() 218 219 last_exception = None 220 while len(self._resolution_list) > 0: 221 resolved = [] 222 unresolved = self._resolution_list 223 self._resolution_list = [] 224 225 for elem in unresolved: 226 try: 227 elem.resolve() 228 except (KeyError, AttributeError) as e: 229 self._resolution_list.append(elem) 230 last_exception = e 231 continue 232 233 resolved.append(elem) 234 235 if len(resolved) == 0: 236 raise last_exception 237 238 def new_attr_set(self, elem): 239 return SpecAttrSet(self, elem) 240 241 def new_operation(self, elem, req_val, rsp_val): 242 return SpecOperation(self, elem, req_val, rsp_val) 243 244 def add_unresolved(self, elem): 245 self._resolution_list.append(elem) 246 247 def _dictify_ops_unified(self): 248 val = 0 249 for elem in self.yaml['operations']['list']: 250 if 'value' in elem: 251 val = elem['value'] 252 253 op = self.new_operation(elem, val, val) 254 val += 1 255 256 self.msgs[op.name] = op 257 258 def _dictify_ops_directional(self): 259 req_val = rsp_val = 0 260 for elem in self.yaml['operations']['list']: 261 if 'notify' in elem: 262 if 'value' in elem: 263 rsp_val = elem['value'] 264 req_val_next = req_val 265 rsp_val_next = rsp_val + 1 266 req_val = None 267 elif 'do' in elem or 'dump' in elem: 268 mode = elem['do'] if 'do' in elem else elem['dump'] 269 270 v = mode.get('request', {}).get('value', None) 271 if v: 272 req_val = v 273 v = mode.get('reply', {}).get('value', None) 274 if v: 275 rsp_val = v 276 277 rsp_inc = 1 if 'reply' in mode else 0 278 req_val_next = req_val + 1 279 rsp_val_next = rsp_val + rsp_inc 280 else: 281 raise Exception("Can't parse directional ops") 282 283 op = self.new_operation(elem, req_val, rsp_val) 284 req_val = req_val_next 285 rsp_val = rsp_val_next 286 287 self.msgs[op.name] = op 288 289 def resolve(self): 290 self.resolve_up(super()) 291 292 for elem in self.yaml['attribute-sets']: 293 attr_set = self.new_attr_set(elem) 294 self.attr_sets[elem['name']] = attr_set 295 296 msg_id_model = self.yaml['operations'].get('enum-model', 'unified') 297 if msg_id_model == 'unified': 298 self._dictify_ops_unified() 299 elif msg_id_model == 'directional': 300 self._dictify_ops_directional() 301 302 for op in self.msgs.values(): 303 if op.req_value is not None: 304 self.req_by_value[op.req_value] = op 305 if op.rsp_value is not None: 306 self.rsp_by_value[op.rsp_value] = op 307 if not op.is_async and 'attribute-set' in op: 308 self.ops[op.name] = op 309