1#!/usr/bin/env python3 2 3import argparse 4import json 5import jsonschema 6import os 7import sys 8 9r""" 10Validates the phosphor-regulators configuration file. Checks it against a JSON 11schema as well as doing some extra checks that can't be encoded in the schema. 12""" 13 14def handle_validation_error(): 15 sys.exit("Validation failed.") 16 17def get_values(json_element, key, result = None): 18 r""" 19 Finds all occurrences of a key within the specified JSON element and its 20 children. Returns the associated values. 21 To search the entire configuration file, pass the root JSON element 22 json_element: JSON element within the config file. 23 key: key name. 24 result: list of values found with the specified key. 25 """ 26 27 if result is None: 28 result = [] 29 if type(json_element) is dict: 30 for json_key in json_element: 31 if json_key == key: 32 result.append(json_element[json_key]) 33 elif type(json_element[json_key]) in (list, dict): 34 get_values(json_element[json_key], key, result) 35 elif type(json_element) is list: 36 for item in json_element: 37 if type(item) in (list, dict): 38 get_values(item, key, result) 39 return result 40 41def get_rule_ids(config_json): 42 r""" 43 Get all rule IDs in the configuration file. 44 config_json: Configuration file JSON 45 """ 46 rule_ids = [] 47 for rule in config_json.get('rules', {}): 48 rule_ids.append(rule['id']) 49 return rule_ids 50 51def get_device_ids(config_json): 52 r""" 53 Get all device IDs in the configuration file. 54 config_json: Configuration file JSON 55 """ 56 device_ids = [] 57 for chassis in config_json.get('chassis', {}): 58 for device in chassis.get('devices', {}): 59 device_ids.append(device['id']) 60 return device_ids 61 62def check_number_of_elements_in_masks(config_json): 63 r""" 64 Check if the number of bit masks in the 'masks' property matches the number 65 of byte values in the 'values' property. 66 config_json: Configuration file JSON 67 """ 68 69 i2c_write_bytes = get_values(config_json, 'i2c_write_bytes') 70 i2c_compare_bytes = get_values(config_json, 'i2c_compare_bytes') 71 72 for object in i2c_write_bytes: 73 if 'masks' in object: 74 if len(object.get('masks', [])) != len(object.get('values', [])): 75 sys.stderr.write("Error: Invalid i2c_write_bytes action.\n"+\ 76 "The masks array must have the same size as the values array. "+\ 77 "masks: "+str(object.get('masks', []))+\ 78 ", values: "+str(object.get('values', []))+'.\n') 79 handle_validation_error() 80 81 for object in i2c_compare_bytes: 82 if 'masks' in object: 83 if len(object.get('masks', [])) != len(object.get('values', [])): 84 sys.stderr.write("Error: Invalid i2c_compare_bytes action.\n"+\ 85 "The masks array must have the same size as the values array. "+\ 86 "masks: "+str(object.get('masks', []))+\ 87 ", values: "+str(object.get('values', []))+'.\n') 88 handle_validation_error() 89 90def check_rule_id_exists(config_json): 91 r""" 92 Check if a rule_id property specifies a rule ID that does not exist. 93 config_json: Configuration file JSON 94 """ 95 96 rule_ids = get_values(config_json, 'rule_id') 97 valid_rule_ids = get_rule_ids(config_json) 98 for rule_id in rule_ids: 99 if rule_id not in valid_rule_ids: 100 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 101 "Found rule_id value that specifies invalid rule ID "+\ 102 rule_id+'\n') 103 handle_validation_error() 104 105def check_device_id_exists(config_json): 106 r""" 107 Check if a device_id property specifies a device ID that does not exist. 108 config_json: Configuration file JSON 109 """ 110 111 device_ids = get_values(config_json, 'device_id') 112 valid_device_ids = get_device_ids(config_json) 113 for device_id in device_ids: 114 if device_id not in valid_device_ids: 115 sys.stderr.write("Error: Device ID does not exist.\n"+\ 116 "Found device_id value that specifies invalid device ID "+\ 117 device_id+'\n') 118 handle_validation_error() 119 120def check_set_device_value_exists(config_json): 121 r""" 122 Check if a set_device action specifies a device ID that does not exist. 123 config_json: Configuration file JSON 124 """ 125 126 device_ids = get_values(config_json, 'set_device') 127 valid_device_ids = get_device_ids(config_json) 128 for device_id in device_ids: 129 if device_id not in valid_device_ids: 130 sys.stderr.write("Error: Device ID does not exist.\n"+\ 131 "Found set_device action that specifies invalid device ID "+\ 132 device_id+'\n') 133 handle_validation_error() 134 135def check_run_rule_value_exists(config_json): 136 r""" 137 Check if any run_rule actions specify a rule ID that does not exist. 138 config_json: Configuration file JSON 139 """ 140 141 rule_ids = get_values(config_json, 'run_rule') 142 valid_rule_ids = get_rule_ids(config_json) 143 for rule_id in rule_ids: 144 if rule_id not in valid_rule_ids: 145 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 146 "Found run_rule action that specifies invalid rule ID "+\ 147 rule_id+'\n') 148 handle_validation_error() 149 150def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): 151 r""" 152 Check if a 'run_rule' action in the specified rule causes an 153 infinite loop. 154 config_json: Configuration file JSON. 155 rule_json: A rule in the JSON config file. 156 call_stack: Current call stack of rules. 157 """ 158 159 call_stack.append(rule_json['id']) 160 for action in rule_json.get('actions', {}): 161 if 'run_rule' in action: 162 run_rule_id = action['run_rule'] 163 if run_rule_id in call_stack: 164 call_stack.append(run_rule_id) 165 sys.stderr.write(\ 166 "Infinite loop caused by run_rule actions.\n"+\ 167 str(call_stack)+'\n') 168 handle_validation_error() 169 else: 170 for rule in config_json.get('rules', {}): 171 if rule['id'] == run_rule_id: 172 check_infinite_loops_in_rule(\ 173 config_json, rule, call_stack) 174 call_stack.pop() 175 176def check_infinite_loops(config_json): 177 r""" 178 Check if rule in config file is called recursively, causing an 179 infinite loop. 180 config_json: Configuration file JSON 181 """ 182 183 for rule in config_json.get('rules', {}): 184 check_infinite_loops_in_rule(config_json, rule) 185 186def check_duplicate_object_id(config_json): 187 r""" 188 Check that there aren't any JSON objects with the same 'id' property value. 189 config_json: Configuration file JSON 190 """ 191 192 json_ids = get_values(config_json, 'id') 193 unique_ids = set() 194 for id in json_ids: 195 if id in unique_ids: 196 sys.stderr.write("Error: Duplicate ID.\n"+\ 197 "Found multiple objects with the ID "+id+'\n') 198 handle_validation_error() 199 else: 200 unique_ids.add(id) 201 202def check_duplicate_rule_id(config_json): 203 r""" 204 Check that there aren't any "rule" elements with the same 'id' field. 205 config_json: Configuration file JSON 206 """ 207 rule_ids = [] 208 for rule in config_json.get('rules', {}): 209 rule_id = rule['id'] 210 if rule_id in rule_ids: 211 sys.stderr.write("Error: Duplicate rule ID.\n"+\ 212 "Found multiple rules with the ID "+rule_id+'\n') 213 handle_validation_error() 214 else: 215 rule_ids.append(rule_id) 216 217def check_duplicate_chassis_number(config_json): 218 r""" 219 Check that there aren't any "chassis" elements with the same 'number' field. 220 config_json: Configuration file JSON 221 """ 222 numbers = [] 223 for chassis in config_json.get('chassis', {}): 224 number = chassis['number'] 225 if number in numbers: 226 sys.stderr.write("Error: Duplicate chassis number.\n"+\ 227 "Found multiple chassis with the number "+str(number)+'\n') 228 handle_validation_error() 229 else: 230 numbers.append(number) 231 232def check_duplicate_device_id(config_json): 233 r""" 234 Check that there aren't any "devices" with the same 'id' field. 235 config_json: Configuration file JSON 236 """ 237 device_ids = [] 238 for chassis in config_json.get('chassis', {}): 239 for device in chassis.get('devices', {}): 240 device_id = device['id'] 241 if device_id in device_ids: 242 sys.stderr.write("Error: Duplicate device ID.\n"+\ 243 "Found multiple devices with the ID "+device_id+'\n') 244 handle_validation_error() 245 else: 246 device_ids.append(device_id) 247 248def check_duplicate_rail_id(config_json): 249 r""" 250 Check that there aren't any "rails" with the same 'id' field. 251 config_json: Configuration file JSON 252 """ 253 rail_ids = [] 254 for chassis in config_json.get('chassis', {}): 255 for device in chassis.get('devices', {}): 256 for rail in device.get('rails', {}): 257 rail_id = rail['id'] 258 if rail_id in rail_ids: 259 sys.stderr.write("Error: Duplicate rail ID.\n"+\ 260 "Found multiple rails with the ID "+rail_id+'\n') 261 handle_validation_error() 262 else: 263 rail_ids.append(rail_id) 264 265def check_for_duplicates(config_json): 266 r""" 267 Check for duplicate ID. 268 """ 269 check_duplicate_rule_id(config_json) 270 271 check_duplicate_chassis_number(config_json) 272 273 check_duplicate_device_id(config_json) 274 275 check_duplicate_rail_id(config_json) 276 277 check_duplicate_object_id(config_json) 278 279def validate_schema(config, schema): 280 r""" 281 Validates the specified config file using the specified 282 schema file. 283 284 config: Path of the file containing the config JSON 285 schema: Path of the file containing the schema JSON 286 """ 287 288 with open(config) as config_handle: 289 config_json = json.load(config_handle) 290 291 with open(schema) as schema_handle: 292 schema_json = json.load(schema_handle) 293 294 try: 295 jsonschema.validate(config_json, schema_json) 296 except jsonschema.ValidationError as e: 297 print(e) 298 handle_validation_error() 299 300 return config_json 301 302def validate_JSON_format(file): 303 with open(file) as json_data: 304 try: 305 return json.load(json_data) 306 except ValueError as err: 307 return False 308 return True 309 310if __name__ == '__main__': 311 312 parser = argparse.ArgumentParser( 313 description='phosphor-regulators configuration file validator') 314 315 parser.add_argument('-s', '--schema-file', dest='schema_file', 316 help='The phosphor-regulators schema file') 317 318 parser.add_argument('-c', '--configuration-file', dest='configuration_file', 319 help='The phosphor-regulators configuration file') 320 321 args = parser.parse_args() 322 323 if not args.schema_file: 324 parser.print_help() 325 sys.exit("Error: Schema file is required.") 326 if not os.path.exists(args.schema_file): 327 parser.print_help() 328 sys.exit("Error: Schema file does not exist.") 329 if not os.access(args.schema_file, os.R_OK): 330 parser.print_help() 331 sys.exit("Error: Schema file is not readable.") 332 if not validate_JSON_format(args.schema_file): 333 parser.print_help() 334 sys.exit("Error: Schema file is not in the JSON format.") 335 if not args.configuration_file: 336 parser.print_help() 337 sys.exit("Error: Configuration file is required.") 338 if not os.path.exists(args.configuration_file): 339 parser.print_help() 340 sys.exit("Error: Configuration file does not exist.") 341 if not os.access(args.configuration_file, os.R_OK): 342 parser.print_help() 343 sys.exit("Error: Configuration file is not readable.") 344 if not validate_JSON_format(args.configuration_file): 345 parser.print_help() 346 sys.exit("Error: Configuration file is not in the JSON format.") 347 348 config_json = validate_schema(args.configuration_file, args.schema_file) 349 350 check_for_duplicates(config_json) 351 352 check_infinite_loops(config_json) 353 354 check_run_rule_value_exists(config_json) 355 356 check_set_device_value_exists(config_json) 357 358 check_rule_id_exists(config_json) 359 360 check_device_id_exists(config_json) 361 362 check_number_of_elements_in_masks(config_json) 363