1#!/usr/bin/env python3 2 3import argparse 4import json 5import os 6import sys 7 8import jsonschema 9 10r""" 11Validates the phosphor-regulators configuration file. Checks it against a JSON 12schema as well as doing some extra checks that can't be encoded in the schema. 13""" 14 15 16def handle_validation_error(): 17 sys.exit("Validation failed.") 18 19 20def get_values(json_element, key, result=None): 21 r""" 22 Finds all occurrences of a key within the specified JSON element and its 23 children. Returns the associated values. 24 To search the entire configuration file, pass the root JSON element 25 json_element: JSON element within the config file. 26 key: key name. 27 result: list of values found with the specified key. 28 """ 29 30 if result is None: 31 result = [] 32 if type(json_element) is dict: 33 for json_key in json_element: 34 if json_key == key: 35 result.append(json_element[json_key]) 36 elif type(json_element[json_key]) in (list, dict): 37 get_values(json_element[json_key], key, result) 38 elif type(json_element) is list: 39 for item in json_element: 40 if type(item) in (list, dict): 41 get_values(item, key, result) 42 return result 43 44 45def get_rule_ids(config_json): 46 r""" 47 Get all rule IDs in the configuration file. 48 config_json: Configuration file JSON 49 """ 50 rule_ids = [] 51 for rule in config_json.get("rules", {}): 52 rule_ids.append(rule["id"]) 53 return rule_ids 54 55 56def get_device_ids(config_json): 57 r""" 58 Get all device IDs in the configuration file. 59 config_json: Configuration file JSON 60 """ 61 device_ids = [] 62 for chassis in config_json.get("chassis", {}): 63 for device in chassis.get("devices", {}): 64 device_ids.append(device["id"]) 65 return device_ids 66 67 68def check_number_of_elements_in_masks(config_json): 69 r""" 70 Check if the number of bit masks in the 'masks' property matches the number 71 of byte values in the 'values' property. 72 config_json: Configuration file JSON 73 """ 74 75 i2c_write_bytes = get_values(config_json, "i2c_write_bytes") 76 i2c_compare_bytes = get_values(config_json, "i2c_compare_bytes") 77 78 for object in i2c_write_bytes: 79 if "masks" in object: 80 if len(object.get("masks", [])) != len(object.get("values", [])): 81 sys.stderr.write( 82 "Error: Invalid i2c_write_bytes action.\n" 83 + "The masks array must have the same size as the values" 84 + " array. masks: " 85 + str(object.get("masks", [])) 86 + ", values: " 87 + str(object.get("values", [])) 88 + ".\n" 89 ) 90 handle_validation_error() 91 92 for object in i2c_compare_bytes: 93 if "masks" in object: 94 if len(object.get("masks", [])) != len(object.get("values", [])): 95 sys.stderr.write( 96 "Error: Invalid i2c_compare_bytes action.\n" 97 + "The masks array must have the same size as the values " 98 + "array. masks: " 99 + str(object.get("masks", [])) 100 + ", values: " 101 + str(object.get("values", [])) 102 + ".\n" 103 ) 104 handle_validation_error() 105 106 107def check_rule_id_exists(config_json): 108 r""" 109 Check if a rule_id property specifies a rule ID that does not exist. 110 config_json: Configuration file JSON 111 """ 112 113 rule_ids = get_values(config_json, "rule_id") 114 valid_rule_ids = get_rule_ids(config_json) 115 for rule_id in rule_ids: 116 if rule_id not in valid_rule_ids: 117 sys.stderr.write( 118 "Error: Rule ID does not exist.\n" 119 + "Found rule_id value that specifies invalid rule ID " 120 + rule_id 121 + "\n" 122 ) 123 handle_validation_error() 124 125 126def check_device_id_exists(config_json): 127 r""" 128 Check if a device_id property specifies a device ID that does not exist. 129 config_json: Configuration file JSON 130 """ 131 132 device_ids = get_values(config_json, "device_id") 133 valid_device_ids = get_device_ids(config_json) 134 for device_id in device_ids: 135 if device_id not in valid_device_ids: 136 sys.stderr.write( 137 "Error: Device ID does not exist.\n" 138 + "Found device_id value that specifies invalid device ID " 139 + device_id 140 + "\n" 141 ) 142 handle_validation_error() 143 144 145def check_set_device_value_exists(config_json): 146 r""" 147 Check if a set_device action specifies a device ID that does not exist. 148 config_json: Configuration file JSON 149 """ 150 151 device_ids = get_values(config_json, "set_device") 152 valid_device_ids = get_device_ids(config_json) 153 for device_id in device_ids: 154 if device_id not in valid_device_ids: 155 sys.stderr.write( 156 "Error: Device ID does not exist.\n" 157 + "Found set_device action that specifies invalid device ID " 158 + device_id 159 + "\n" 160 ) 161 handle_validation_error() 162 163 164def check_run_rule_value_exists(config_json): 165 r""" 166 Check if any run_rule actions specify a rule ID that does not exist. 167 config_json: Configuration file JSON 168 """ 169 170 rule_ids = get_values(config_json, "run_rule") 171 valid_rule_ids = get_rule_ids(config_json) 172 for rule_id in rule_ids: 173 if rule_id not in valid_rule_ids: 174 sys.stderr.write( 175 "Error: Rule ID does not exist.\n" 176 + "Found run_rule action that specifies invalid rule ID " 177 + rule_id 178 + "\n" 179 ) 180 handle_validation_error() 181 182 183def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): 184 r""" 185 Check if a 'run_rule' action in the specified rule causes an 186 infinite loop. 187 config_json: Configuration file JSON. 188 rule_json: A rule in the JSON config file. 189 call_stack: Current call stack of rules. 190 """ 191 192 call_stack.append(rule_json["id"]) 193 for action in rule_json.get("actions", {}): 194 if "run_rule" in action: 195 run_rule_id = action["run_rule"] 196 if run_rule_id in call_stack: 197 call_stack.append(run_rule_id) 198 sys.stderr.write( 199 "Infinite loop caused by run_rule actions.\n" 200 + str(call_stack) 201 + "\n" 202 ) 203 handle_validation_error() 204 else: 205 for rule in config_json.get("rules", {}): 206 if rule["id"] == run_rule_id: 207 check_infinite_loops_in_rule( 208 config_json, rule, call_stack 209 ) 210 call_stack.pop() 211 212 213def check_infinite_loops(config_json): 214 r""" 215 Check if rule in config file is called recursively, causing an 216 infinite loop. 217 config_json: Configuration file JSON 218 """ 219 220 for rule in config_json.get("rules", {}): 221 check_infinite_loops_in_rule(config_json, rule) 222 223 224def check_duplicate_object_id(config_json): 225 r""" 226 Check that there aren't any JSON objects with the same 'id' property value. 227 config_json: Configuration file JSON 228 """ 229 230 json_ids = get_values(config_json, "id") 231 unique_ids = set() 232 for id in json_ids: 233 if id in unique_ids: 234 sys.stderr.write( 235 "Error: Duplicate ID.\n" 236 + "Found multiple objects with the ID " 237 + id 238 + "\n" 239 ) 240 handle_validation_error() 241 else: 242 unique_ids.add(id) 243 244 245def check_duplicate_rule_id(config_json): 246 r""" 247 Check that there aren't any "rule" elements with the same 'id' field. 248 config_json: Configuration file JSON 249 """ 250 rule_ids = [] 251 for rule in config_json.get("rules", {}): 252 rule_id = rule["id"] 253 if rule_id in rule_ids: 254 sys.stderr.write( 255 "Error: Duplicate rule ID.\n" 256 + "Found multiple rules with the ID " 257 + rule_id 258 + "\n" 259 ) 260 handle_validation_error() 261 else: 262 rule_ids.append(rule_id) 263 264 265def check_duplicate_chassis_number(config_json): 266 r""" 267 Check that there aren't any "chassis" elements with the same 'number' 268 field. 269 config_json: Configuration file JSON 270 """ 271 numbers = [] 272 for chassis in config_json.get("chassis", {}): 273 number = chassis["number"] 274 if number in numbers: 275 sys.stderr.write( 276 "Error: Duplicate chassis number.\n" 277 + "Found multiple chassis with the number " 278 + str(number) 279 + "\n" 280 ) 281 handle_validation_error() 282 else: 283 numbers.append(number) 284 285 286def check_duplicate_device_id(config_json): 287 r""" 288 Check that there aren't any "devices" with the same 'id' field. 289 config_json: Configuration file JSON 290 """ 291 device_ids = [] 292 for chassis in config_json.get("chassis", {}): 293 for device in chassis.get("devices", {}): 294 device_id = device["id"] 295 if device_id in device_ids: 296 sys.stderr.write( 297 "Error: Duplicate device ID.\n" 298 + "Found multiple devices with the ID " 299 + device_id 300 + "\n" 301 ) 302 handle_validation_error() 303 else: 304 device_ids.append(device_id) 305 306 307def check_duplicate_rail_id(config_json): 308 r""" 309 Check that there aren't any "rails" with the same 'id' field. 310 config_json: Configuration file JSON 311 """ 312 rail_ids = [] 313 for chassis in config_json.get("chassis", {}): 314 for device in chassis.get("devices", {}): 315 for rail in device.get("rails", {}): 316 rail_id = rail["id"] 317 if rail_id in rail_ids: 318 sys.stderr.write( 319 "Error: Duplicate rail ID.\n" 320 + "Found multiple rails with the ID " 321 + rail_id 322 + "\n" 323 ) 324 handle_validation_error() 325 else: 326 rail_ids.append(rail_id) 327 328 329def check_for_duplicates(config_json): 330 r""" 331 Check for duplicate ID. 332 """ 333 check_duplicate_rule_id(config_json) 334 335 check_duplicate_chassis_number(config_json) 336 337 check_duplicate_device_id(config_json) 338 339 check_duplicate_rail_id(config_json) 340 341 check_duplicate_object_id(config_json) 342 343 344def validate_schema(config, schema): 345 r""" 346 Validates the specified config file using the specified 347 schema file. 348 349 config: Path of the file containing the config JSON 350 schema: Path of the file containing the schema JSON 351 """ 352 353 with open(config) as config_handle: 354 config_json = json.load(config_handle) 355 356 with open(schema) as schema_handle: 357 schema_json = json.load(schema_handle) 358 359 try: 360 jsonschema.validate(config_json, schema_json) 361 except jsonschema.ValidationError as e: 362 print(e) 363 handle_validation_error() 364 365 return config_json 366 367 368def validate_JSON_format(file): 369 with open(file) as json_data: 370 try: 371 return json.load(json_data) 372 except ValueError: 373 return False 374 return True 375 376 377if __name__ == "__main__": 378 parser = argparse.ArgumentParser( 379 description="phosphor-regulators configuration file validator" 380 ) 381 382 parser.add_argument( 383 "-s", 384 "--schema-file", 385 dest="schema_file", 386 help="The phosphor-regulators schema file", 387 ) 388 389 parser.add_argument( 390 "-c", 391 "--configuration-file", 392 dest="configuration_file", 393 help="The phosphor-regulators configuration file", 394 ) 395 396 args = parser.parse_args() 397 398 if not args.schema_file: 399 parser.print_help() 400 sys.exit("Error: Schema file is required.") 401 if not os.path.exists(args.schema_file): 402 parser.print_help() 403 sys.exit("Error: Schema file does not exist.") 404 if not os.access(args.schema_file, os.R_OK): 405 parser.print_help() 406 sys.exit("Error: Schema file is not readable.") 407 if not validate_JSON_format(args.schema_file): 408 parser.print_help() 409 sys.exit("Error: Schema file is not in the JSON format.") 410 if not args.configuration_file: 411 parser.print_help() 412 sys.exit("Error: Configuration file is required.") 413 if not os.path.exists(args.configuration_file): 414 parser.print_help() 415 sys.exit("Error: Configuration file does not exist.") 416 if not os.access(args.configuration_file, os.R_OK): 417 parser.print_help() 418 sys.exit("Error: Configuration file is not readable.") 419 if not validate_JSON_format(args.configuration_file): 420 parser.print_help() 421 sys.exit("Error: Configuration file is not in the JSON format.") 422 423 config_json = validate_schema(args.configuration_file, args.schema_file) 424 425 check_for_duplicates(config_json) 426 427 check_infinite_loops(config_json) 428 429 check_run_rule_value_exists(config_json) 430 431 check_set_device_value_exists(config_json) 432 433 check_rule_id_exists(config_json) 434 435 check_device_id_exists(config_json) 436 437 check_number_of_elements_in_masks(config_json) 438