1#!/usr/bin/env python3 2 3import argparse 4import json 5import sys 6import jsonschema 7 8r""" 9Validates the phosphor-regulators configuration file. Checks it against a JSON 10schema as well as doing some extra checks that can't be encoded in the schema. 11""" 12 13def handle_validation_error(): 14 sys.exit("Validation failed.") 15 16def get_values(json_element, key, result = None): 17 r""" 18 Finds all occurrences of a key within the specified JSON element and its 19 children. Returns the associated values. 20 To search the entire configuration file, pass the root JSON element 21 json_element: JSON element within the config file. 22 key: key name. 23 result: list of values found with the specified key. 24 """ 25 26 if result is None: 27 result = [] 28 if type(json_element) is dict: 29 for json_key in json_element: 30 if json_key == key: 31 result.append(json_element[json_key]) 32 elif type(json_element[json_key]) in (list, dict): 33 get_values(json_element[json_key], key, result) 34 elif type(json_element) is list: 35 for item in json_element: 36 if type(item) in (list, dict): 37 get_values(item, key, result) 38 return result 39 40def get_rule_ids(config_json): 41 r""" 42 Get all rule IDs in the configuration file. 43 config_json: Configuration file JSON 44 """ 45 rule_ids = [] 46 for rule in config_json.get('rules', {}): 47 rule_ids.append(rule['id']) 48 return rule_ids 49 50def get_device_ids(config_json): 51 r""" 52 Get all device IDs in the configuration file. 53 config_json: Configuration file JSON 54 """ 55 device_ids = [] 56 for chassis in config_json.get('chassis', {}): 57 for device in chassis.get('devices', {}): 58 device_ids.append(device['id']) 59 return device_ids 60 61def check_number_of_elements_in_masks(config_json): 62 r""" 63 Check if the number of bit masks in the 'masks' property matches the number 64 of byte values in the 'values' property. 65 config_json: Configuration file JSON 66 """ 67 68 i2c_write_bytes = get_values(config_json, 'i2c_write_bytes') 69 i2c_compare_bytes = get_values(config_json, 'i2c_compare_bytes') 70 71 for object in i2c_write_bytes: 72 if 'masks' in object: 73 if len(object.get('masks', [])) != len(object.get('values', [])): 74 sys.stderr.write("Error: Invalid i2c_write_bytes action.\n"+\ 75 "The masks array must have the same size as the values array. "+\ 76 "masks: "+str(object.get('masks', []))+\ 77 ", values: "+str(object.get('values', []))+'.\n') 78 handle_validation_error() 79 80 for object in i2c_compare_bytes: 81 if 'masks' in object: 82 if len(object.get('masks', [])) != len(object.get('values', [])): 83 sys.stderr.write("Error: Invalid i2c_compare_bytes action.\n"+\ 84 "The masks array must have the same size as the values array. "+\ 85 "masks: "+str(object.get('masks', []))+\ 86 ", values: "+str(object.get('values', []))+'.\n') 87 handle_validation_error() 88 89def check_rule_id_exists(config_json): 90 r""" 91 Check if a rule_id property specifies a rule ID that does not exist. 92 config_json: Configuration file JSON 93 """ 94 95 rule_ids = get_values(config_json, 'rule_id') 96 valid_rule_ids = get_rule_ids(config_json) 97 for rule_id in rule_ids: 98 if rule_id not in valid_rule_ids: 99 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 100 "Found rule_id value that specifies invalid rule ID "+\ 101 rule_id+'\n') 102 handle_validation_error() 103 104def check_set_device_value_exists(config_json): 105 r""" 106 Check if a set_device action specifies a device ID that does not exist. 107 config_json: Configuration file JSON 108 """ 109 110 device_ids = get_values(config_json, 'set_device') 111 valid_device_ids = get_device_ids(config_json) 112 for device_id in device_ids: 113 if device_id not in valid_device_ids: 114 sys.stderr.write("Error: Device ID does not exist.\n"+\ 115 "Found set_device action that specifies invalid device ID "+\ 116 device_id+'\n') 117 handle_validation_error() 118 119def check_run_rule_value_exists(config_json): 120 r""" 121 Check if any run_rule actions specify a rule ID that does not exist. 122 config_json: Configuration file JSON 123 """ 124 125 rule_ids = get_values(config_json, 'run_rule') 126 valid_rule_ids = get_rule_ids(config_json) 127 for rule_id in rule_ids: 128 if rule_id not in valid_rule_ids: 129 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 130 "Found run_rule action that specifies invalid rule ID "+\ 131 rule_id+'\n') 132 handle_validation_error() 133 134def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): 135 r""" 136 Check if a 'run_rule' action in the specified rule causes an 137 infinite loop. 138 config_json: Configuration file JSON. 139 rule_json: A rule in the JSON config file. 140 call_stack: Current call stack of rules. 141 """ 142 143 call_stack.append(rule_json['id']) 144 for action in rule_json.get('actions', {}): 145 if 'run_rule' in action: 146 run_rule_id = action['run_rule'] 147 if run_rule_id in call_stack: 148 call_stack.append(run_rule_id) 149 sys.stderr.write(\ 150 "Infinite loop caused by run_rule actions.\n"+\ 151 str(call_stack)+'\n') 152 handle_validation_error() 153 else: 154 for rule in config_json.get('rules', {}): 155 if rule['id'] == run_rule_id: 156 check_infinite_loops_in_rule(\ 157 config_json, rule, call_stack) 158 call_stack.pop() 159 160def check_infinite_loops(config_json): 161 r""" 162 Check if rule in config file is called recursively, causing an 163 infinite loop. 164 config_json: Configuration file JSON 165 """ 166 167 for rule in config_json.get('rules', {}): 168 check_infinite_loops_in_rule(config_json, rule) 169 170def check_duplicate_object_id(config_json): 171 r""" 172 Check that there aren't any JSON objects with the same 'id' property value. 173 config_json: Configuration file JSON 174 """ 175 176 json_ids = get_values(config_json, 'id') 177 unique_ids = set() 178 for id in json_ids: 179 if id in unique_ids: 180 sys.stderr.write("Error: Duplicate ID.\n"+\ 181 "Found multiple objects with the ID "+id+'\n') 182 handle_validation_error() 183 else: 184 unique_ids.add(id) 185 186def check_duplicate_rule_id(config_json): 187 r""" 188 Check that there aren't any "rule" elements with the same 'id' field. 189 config_json: Configuration file JSON 190 """ 191 rule_ids = [] 192 for rule in config_json.get('rules', {}): 193 rule_id = rule['id'] 194 if rule_id in rule_ids: 195 sys.stderr.write("Error: Duplicate rule ID.\n"+\ 196 "Found multiple rules with the ID "+rule_id+'\n') 197 handle_validation_error() 198 else: 199 rule_ids.append(rule_id) 200 201def check_duplicate_chassis_number(config_json): 202 r""" 203 Check that there aren't any "chassis" elements with the same 'number' field. 204 config_json: Configuration file JSON 205 """ 206 numbers = [] 207 for chassis in config_json.get('chassis', {}): 208 number = chassis['number'] 209 if number in numbers: 210 sys.stderr.write("Error: Duplicate chassis number.\n"+\ 211 "Found multiple chassis with the number "+str(number)+'\n') 212 handle_validation_error() 213 else: 214 numbers.append(number) 215 216def check_duplicate_device_id(config_json): 217 r""" 218 Check that there aren't any "devices" with the same 'id' field. 219 config_json: Configuration file JSON 220 """ 221 device_ids = [] 222 for chassis in config_json.get('chassis', {}): 223 for device in chassis.get('devices', {}): 224 device_id = device['id'] 225 if device_id in device_ids: 226 sys.stderr.write("Error: Duplicate device ID.\n"+\ 227 "Found multiple devices with the ID "+device_id+'\n') 228 handle_validation_error() 229 else: 230 device_ids.append(device_id) 231 232def check_duplicate_rail_id(config_json): 233 r""" 234 Check that there aren't any "rails" with the same 'id' field. 235 config_json: Configuration file JSON 236 """ 237 rail_ids = [] 238 for chassis in config_json.get('chassis', {}): 239 for device in chassis.get('devices', {}): 240 for rail in device.get('rails', {}): 241 rail_id = rail['id'] 242 if rail_id in rail_ids: 243 sys.stderr.write("Error: Duplicate rail ID.\n"+\ 244 "Found multiple rails with the ID "+rail_id+'\n') 245 handle_validation_error() 246 else: 247 rail_ids.append(rail_id) 248 249def check_for_duplicates(config_json): 250 r""" 251 Check for duplicate ID. 252 """ 253 check_duplicate_rule_id(config_json) 254 255 check_duplicate_chassis_number(config_json) 256 257 check_duplicate_device_id(config_json) 258 259 check_duplicate_rail_id(config_json) 260 261 check_duplicate_object_id(config_json) 262 263def validate_schema(config, schema): 264 r""" 265 Validates the specified config file using the specified 266 schema file. 267 268 config: Path of the file containing the config JSON 269 schema: Path of the file containing the schema JSON 270 """ 271 272 with open(config) as config_handle: 273 config_json = json.load(config_handle) 274 275 with open(schema) as schema_handle: 276 schema_json = json.load(schema_handle) 277 278 try: 279 jsonschema.validate(config_json, schema_json) 280 except jsonschema.ValidationError as e: 281 print(e) 282 handle_validation_error() 283 284 return config_json 285 286if __name__ == '__main__': 287 288 parser = argparse.ArgumentParser( 289 description='phosphor-regulators configuration file validator') 290 291 parser.add_argument('-s', '--schema-file', dest='schema_file', 292 help='The phosphor-regulators schema file') 293 294 parser.add_argument('-c', '--configuration-file', dest='configuration_file', 295 help='The phosphor-regulators configuration file') 296 297 args = parser.parse_args() 298 299 if not args.schema_file: 300 parser.print_help() 301 sys.exit("Error: Schema file is required.") 302 if not args.configuration_file: 303 parser.print_help() 304 sys.exit("Error: Configuration file is required.") 305 306 config_json = validate_schema(args.configuration_file, args.schema_file) 307 308 check_for_duplicates(config_json) 309 310 check_infinite_loops(config_json) 311 312 check_run_rule_value_exists(config_json) 313 314 check_set_device_value_exists(config_json) 315 316 check_rule_id_exists(config_json) 317 318 check_number_of_elements_in_masks(config_json) 319