1#!/usr/bin/env python 2 3import argparse 4import json 5import sys 6import jsonschema 7 8r""" 9Validates the phosphor-regulators configuration file. Checks it against a JSON 10schema as well as doing some extra checks that can't be encoded in the schema. 11""" 12 13def handle_validation_error(): 14 sys.exit("Validation failed.") 15 16def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]): 17 r""" 18 Check if a 'run_rule' action in the specified rule causes an 19 infinite loop. 20 config_json: Configuration file JSON. 21 rule_json: A rule in the JSON config file. 22 call_stack: Current call stack of rules. 23 """ 24 25 call_stack.append(rule_json['id']) 26 for action in rule_json.get('actions', {}): 27 if 'run_rule' in action: 28 run_rule_id = action['run_rule'] 29 if run_rule_id in call_stack: 30 call_stack.append(run_rule_id) 31 sys.stderr.write(\ 32 "Infinite loop caused by run_rule actions.\n"+\ 33 str(call_stack)+'\n') 34 handle_validation_error() 35 else: 36 for rule in config_json.get('rules', {}): 37 if rule['id'] == run_rule_id: 38 check_infinite_loops_in_rule(\ 39 config_json, rule, call_stack) 40 call_stack.pop() 41 42def check_infinite_loops(config_json): 43 r""" 44 Check if rule in config file is called recursively, causing an 45 infinite loop. 46 config_json: Configuration file JSON 47 """ 48 49 for rule in config_json.get('rules', {}): 50 check_infinite_loops_in_rule(config_json, rule) 51 52def check_duplicate_object_id(config_json): 53 r""" 54 Check that there aren't any JSON objects with the same 'id' property value. 55 config_json: Configuration file JSON 56 """ 57 58 json_ids = check_duplicate_rule_id(config_json)+\ 59 check_duplicate_device_id(config_json)+\ 60 check_duplicate_rail_id(config_json) 61 unique_ids = set() 62 for id in json_ids: 63 if id in unique_ids: 64 sys.stderr.write("Error: Duplicate ID.\n"+\ 65 "Found multiple objects with the ID "+id+'\n') 66 handle_validation_error() 67 else: 68 unique_ids.add(id) 69 70def check_duplicate_rule_id(config_json): 71 r""" 72 Check that there aren't any "rule" elements with the same 'id' field. 73 config_json: Configuration file JSON 74 """ 75 rule_ids = [] 76 for rule in config_json.get('rules', {}): 77 rule_id = rule['id'] 78 if rule_id in rule_ids: 79 sys.stderr.write("Error: Duplicate rule ID.\n"+\ 80 "Found multiple rules with the ID "+rule_id+'\n') 81 handle_validation_error() 82 else: 83 rule_ids.append(rule_id) 84 return rule_ids 85 86def check_duplicate_chassis_number(config_json): 87 r""" 88 Check that there aren't any "chassis" elements with the same 'number' field. 89 config_json: Configuration file JSON 90 """ 91 numbers = [] 92 for chassis in config_json.get('chassis', {}): 93 number = chassis['number'] 94 if number in numbers: 95 sys.stderr.write("Error: Duplicate chassis number.\n"+\ 96 "Found multiple chassis with the number "+str(number)+'\n') 97 handle_validation_error() 98 else: 99 numbers.append(number) 100 101def check_duplicate_device_id(config_json): 102 r""" 103 Check that there aren't any "devices" with the same 'id' field. 104 config_json: Configuration file JSON 105 """ 106 device_ids = [] 107 for chassis in config_json.get('chassis', {}): 108 for device in chassis.get('devices', {}): 109 device_id = device['id'] 110 if device_id in device_ids: 111 sys.stderr.write("Error: Duplicate device ID.\n"+\ 112 "Found multiple devices with the ID "+device_id+'\n') 113 handle_validation_error() 114 else: 115 device_ids.append(device_id) 116 return device_ids 117 118def check_duplicate_rail_id(config_json): 119 r""" 120 Check that there aren't any "rails" with the same 'id' field. 121 config_json: Configuration file JSON 122 """ 123 rail_ids = [] 124 for chassis in config_json.get('chassis', {}): 125 for device in chassis.get('devices', {}): 126 for rail in device.get('rails', {}): 127 rail_id = rail['id'] 128 if rail_id in rail_ids: 129 sys.stderr.write("Error: Duplicate rail ID.\n"+\ 130 "Found multiple rails with the ID "+rail_id+'\n') 131 handle_validation_error() 132 else: 133 rail_ids.append(rail_id) 134 return rail_ids 135 136def check_for_duplicates(config_json): 137 r""" 138 Check for duplicate ID. 139 """ 140 check_duplicate_rule_id(config_json) 141 142 check_duplicate_chassis_number(config_json) 143 144 check_duplicate_device_id(config_json) 145 146 check_duplicate_rail_id(config_json) 147 148 check_duplicate_object_id(config_json) 149 150def validate_schema(config, schema): 151 r""" 152 Validates the specified config file using the specified 153 schema file. 154 155 config: Path of the file containing the config JSON 156 schema: Path of the file containing the schema JSON 157 """ 158 159 with open(config) as config_handle: 160 config_json = json.load(config_handle) 161 162 with open(schema) as schema_handle: 163 schema_json = json.load(schema_handle) 164 165 try: 166 jsonschema.validate(config_json, schema_json) 167 except jsonschema.ValidationError as e: 168 print(e) 169 handle_validation_error() 170 171 return config_json 172 173if __name__ == '__main__': 174 175 parser = argparse.ArgumentParser( 176 description='phosphor-regulators configuration file validator') 177 178 parser.add_argument('-s', '--schema-file', dest='schema_file', 179 help='The phosphor-regulators schema file') 180 181 parser.add_argument('-c', '--configuration-file', dest='configuration_file', 182 help='The phosphor-regulators configuration file') 183 184 args = parser.parse_args() 185 186 if not args.schema_file: 187 parser.print_help() 188 sys.exit("Error: Schema file is required.") 189 if not args.configuration_file: 190 parser.print_help() 191 sys.exit("Error: Configuration file is required.") 192 193 config_json = validate_schema(args.configuration_file, args.schema_file) 194 195 check_for_duplicates(config_json) 196 197 check_infinite_loops(config_json) 198 199