1#!/usr/bin/env python 2 3import argparse 4import json 5import sys 6import jsonschema 7 8r""" 9Validates the phosphor-regulators configuration file. Checks it against a JSON 10schema as well as doing some extra checks that can't be encoded in the schema. 11""" 12 13def handle_validation_error(): 14 sys.exit("Validation failed.") 15 16def get_values(json_element, key, result = None): 17 r""" 18 Finds all occurrences of a key within the specified JSON element and its 19 children. Returns the associated values. 20 To search the entire configuration file, pass the root JSON element 21 json_element: JSON element within the config file. 22 key: key name. 23 result: list of values found with the specified key. 24 """ 25 26 if result is None: 27 result = [] 28 if type(json_element) is dict: 29 for json_key in json_element: 30 if json_key == key: 31 result.append(json_element[json_key]) 32 elif type(json_element[json_key]) in (list, dict): 33 get_values(json_element[json_key], key, result) 34 elif type(json_element) is list: 35 for item in json_element: 36 if type(item) in (list, dict): 37 get_values(item, key, result) 38 return result 39 40def get_rule_ids(config_json): 41 r""" 42 Get all rule IDs in the configuration file. 43 config_json: Configuration file JSON 44 """ 45 rule_ids = [] 46 for rule in config_json.get('rules', {}): 47 rule_ids.append(rule['id']) 48 return rule_ids 49 50def get_device_ids(config_json): 51 r""" 52 Get all device IDs in the configuration file. 53 config_json: Configuration file JSON 54 """ 55 device_ids = [] 56 for chassis in config_json.get('chassis', {}): 57 for device in chassis.get('devices', {}): 58 device_ids.append(device['id']) 59 return device_ids 60 61def check_rule_id_exists(config_json): 62 r""" 63 Check if a rule_id property specifies a rule ID that does not exist. 64 config_json: Configuration file JSON 65 """ 66 67 rule_ids = get_values(config_json, 'rule_id') 68 valid_rule_ids = get_rule_ids(config_json) 69 for rule_id in rule_ids: 70 if rule_id not in valid_rule_ids: 71 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 72 "Found rule_id value that specifies invalid rule ID "+\ 73 rule_id+'\n') 74 handle_validation_error() 75 76def check_set_device_value_exists(config_json): 77 r""" 78 Check if a set_device action specifies a device ID that does not exist. 79 config_json: Configuration file JSON 80 """ 81 82 device_ids = get_values(config_json, 'set_device') 83 valid_device_ids = get_device_ids(config_json) 84 for device_id in device_ids: 85 if device_id not in valid_device_ids: 86 sys.stderr.write("Error: Device ID does not exist.\n"+\ 87 "Found set_device action that specifies invalid device ID "+\ 88 device_id+'\n') 89 handle_validation_error() 90 91def check_run_rule_value_exists(config_json): 92 r""" 93 Check if any run_rule actions specify a rule ID that does not exist. 94 config_json: Configuration file JSON 95 """ 96 97 rule_ids = get_values(config_json, 'run_rule') 98 valid_rule_ids = get_rule_ids(config_json) 99 for rule_id in rule_ids: 100 if rule_id not in valid_rule_ids: 101 sys.stderr.write("Error: Rule ID does not exist.\n"+\ 102 "Found run_rule action that specifies invalid rule ID "+\ 103 rule_id+'\n') 104 handle_validation_error() 105 106def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): 107 r""" 108 Check if a 'run_rule' action in the specified rule causes an 109 infinite loop. 110 config_json: Configuration file JSON. 111 rule_json: A rule in the JSON config file. 112 call_stack: Current call stack of rules. 113 """ 114 115 call_stack.append(rule_json['id']) 116 for action in rule_json.get('actions', {}): 117 if 'run_rule' in action: 118 run_rule_id = action['run_rule'] 119 if run_rule_id in call_stack: 120 call_stack.append(run_rule_id) 121 sys.stderr.write(\ 122 "Infinite loop caused by run_rule actions.\n"+\ 123 str(call_stack)+'\n') 124 handle_validation_error() 125 else: 126 for rule in config_json.get('rules', {}): 127 if rule['id'] == run_rule_id: 128 check_infinite_loops_in_rule(\ 129 config_json, rule, call_stack) 130 call_stack.pop() 131 132def check_infinite_loops(config_json): 133 r""" 134 Check if rule in config file is called recursively, causing an 135 infinite loop. 136 config_json: Configuration file JSON 137 """ 138 139 for rule in config_json.get('rules', {}): 140 check_infinite_loops_in_rule(config_json, rule) 141 142def check_duplicate_object_id(config_json): 143 r""" 144 Check that there aren't any JSON objects with the same 'id' property value. 145 config_json: Configuration file JSON 146 """ 147 148 json_ids = get_values(config_json, 'id') 149 unique_ids = set() 150 for id in json_ids: 151 if id in unique_ids: 152 sys.stderr.write("Error: Duplicate ID.\n"+\ 153 "Found multiple objects with the ID "+id+'\n') 154 handle_validation_error() 155 else: 156 unique_ids.add(id) 157 158def check_duplicate_rule_id(config_json): 159 r""" 160 Check that there aren't any "rule" elements with the same 'id' field. 161 config_json: Configuration file JSON 162 """ 163 rule_ids = [] 164 for rule in config_json.get('rules', {}): 165 rule_id = rule['id'] 166 if rule_id in rule_ids: 167 sys.stderr.write("Error: Duplicate rule ID.\n"+\ 168 "Found multiple rules with the ID "+rule_id+'\n') 169 handle_validation_error() 170 else: 171 rule_ids.append(rule_id) 172 173def check_duplicate_chassis_number(config_json): 174 r""" 175 Check that there aren't any "chassis" elements with the same 'number' field. 176 config_json: Configuration file JSON 177 """ 178 numbers = [] 179 for chassis in config_json.get('chassis', {}): 180 number = chassis['number'] 181 if number in numbers: 182 sys.stderr.write("Error: Duplicate chassis number.\n"+\ 183 "Found multiple chassis with the number "+str(number)+'\n') 184 handle_validation_error() 185 else: 186 numbers.append(number) 187 188def check_duplicate_device_id(config_json): 189 r""" 190 Check that there aren't any "devices" with the same 'id' field. 191 config_json: Configuration file JSON 192 """ 193 device_ids = [] 194 for chassis in config_json.get('chassis', {}): 195 for device in chassis.get('devices', {}): 196 device_id = device['id'] 197 if device_id in device_ids: 198 sys.stderr.write("Error: Duplicate device ID.\n"+\ 199 "Found multiple devices with the ID "+device_id+'\n') 200 handle_validation_error() 201 else: 202 device_ids.append(device_id) 203 204def check_duplicate_rail_id(config_json): 205 r""" 206 Check that there aren't any "rails" with the same 'id' field. 207 config_json: Configuration file JSON 208 """ 209 rail_ids = [] 210 for chassis in config_json.get('chassis', {}): 211 for device in chassis.get('devices', {}): 212 for rail in device.get('rails', {}): 213 rail_id = rail['id'] 214 if rail_id in rail_ids: 215 sys.stderr.write("Error: Duplicate rail ID.\n"+\ 216 "Found multiple rails with the ID "+rail_id+'\n') 217 handle_validation_error() 218 else: 219 rail_ids.append(rail_id) 220 221def check_for_duplicates(config_json): 222 r""" 223 Check for duplicate ID. 224 """ 225 check_duplicate_rule_id(config_json) 226 227 check_duplicate_chassis_number(config_json) 228 229 check_duplicate_device_id(config_json) 230 231 check_duplicate_rail_id(config_json) 232 233 check_duplicate_object_id(config_json) 234 235def validate_schema(config, schema): 236 r""" 237 Validates the specified config file using the specified 238 schema file. 239 240 config: Path of the file containing the config JSON 241 schema: Path of the file containing the schema JSON 242 """ 243 244 with open(config) as config_handle: 245 config_json = json.load(config_handle) 246 247 with open(schema) as schema_handle: 248 schema_json = json.load(schema_handle) 249 250 try: 251 jsonschema.validate(config_json, schema_json) 252 except jsonschema.ValidationError as e: 253 print(e) 254 handle_validation_error() 255 256 return config_json 257 258if __name__ == '__main__': 259 260 parser = argparse.ArgumentParser( 261 description='phosphor-regulators configuration file validator') 262 263 parser.add_argument('-s', '--schema-file', dest='schema_file', 264 help='The phosphor-regulators schema file') 265 266 parser.add_argument('-c', '--configuration-file', dest='configuration_file', 267 help='The phosphor-regulators configuration file') 268 269 args = parser.parse_args() 270 271 if not args.schema_file: 272 parser.print_help() 273 sys.exit("Error: Schema file is required.") 274 if not args.configuration_file: 275 parser.print_help() 276 sys.exit("Error: Configuration file is required.") 277 278 config_json = validate_schema(args.configuration_file, args.schema_file) 279 280 check_for_duplicates(config_json) 281 282 check_infinite_loops(config_json) 283 284 check_run_rule_value_exists(config_json) 285 286 check_set_device_value_exists(config_json) 287 288 check_rule_id_exists(config_json) 289