1#!/usr/bin/env python3
2
3import argparse
4import json
5import os
6import sys
7
8import jsonschema
9
10r"""
11Validates the phosphor-regulators configuration file. Checks it against a JSON
12schema as well as doing some extra checks that can't be encoded in the schema.
13"""
14
15
16def handle_validation_error():
17    sys.exit("Validation failed.")
18
19
20def get_values(json_element, key, result=None):
21    r"""
22    Finds all occurrences of a key within the specified JSON element and its
23    children. Returns the associated values.
24    To search the entire configuration file, pass the root JSON element
25    json_element: JSON element within the config file.
26    key: key name.
27    result: list of values found with the specified key.
28    """
29
30    if result is None:
31        result = []
32    if type(json_element) is dict:
33        for json_key in json_element:
34            if json_key == key:
35                result.append(json_element[json_key])
36            elif type(json_element[json_key]) in (list, dict):
37                get_values(json_element[json_key], key, result)
38    elif type(json_element) is list:
39        for item in json_element:
40            if type(item) in (list, dict):
41                get_values(item, key, result)
42    return result
43
44
45def get_rule_ids(config_json):
46    r"""
47    Get all rule IDs in the configuration file.
48    config_json: Configuration file JSON
49    """
50    rule_ids = []
51    for rule in config_json.get("rules", {}):
52        rule_ids.append(rule["id"])
53    return rule_ids
54
55
56def get_device_ids(config_json):
57    r"""
58    Get all device IDs in the configuration file.
59    config_json: Configuration file JSON
60    """
61    device_ids = []
62    for chassis in config_json.get("chassis", {}):
63        for device in chassis.get("devices", {}):
64            device_ids.append(device["id"])
65    return device_ids
66
67
68def check_number_of_elements_in_masks(config_json):
69    r"""
70    Check if the number of bit masks in the 'masks' property matches the number
71    of byte values in the 'values' property.
72    config_json: Configuration file JSON
73    """
74
75    i2c_write_bytes = get_values(config_json, "i2c_write_bytes")
76    i2c_compare_bytes = get_values(config_json, "i2c_compare_bytes")
77
78    for object in i2c_write_bytes:
79        if "masks" in object:
80            if len(object.get("masks", [])) != len(object.get("values", [])):
81                sys.stderr.write(
82                    "Error: Invalid i2c_write_bytes action.\n"
83                    + "The masks array must have the same size as the values"
84                    + " array. masks: "
85                    + str(object.get("masks", []))
86                    + ", values: "
87                    + str(object.get("values", []))
88                    + ".\n"
89                )
90                handle_validation_error()
91
92    for object in i2c_compare_bytes:
93        if "masks" in object:
94            if len(object.get("masks", [])) != len(object.get("values", [])):
95                sys.stderr.write(
96                    "Error: Invalid i2c_compare_bytes action.\n"
97                    + "The masks array must have the same size as the values "
98                    + "array. masks: "
99                    + str(object.get("masks", []))
100                    + ", values: "
101                    + str(object.get("values", []))
102                    + ".\n"
103                )
104                handle_validation_error()
105
106
107def check_rule_id_exists(config_json):
108    r"""
109    Check if a rule_id property specifies a rule ID that does not exist.
110    config_json: Configuration file JSON
111    """
112
113    rule_ids = get_values(config_json, "rule_id")
114    valid_rule_ids = get_rule_ids(config_json)
115    for rule_id in rule_ids:
116        if rule_id not in valid_rule_ids:
117            sys.stderr.write(
118                "Error: Rule ID does not exist.\n"
119                + "Found rule_id value that specifies invalid rule ID "
120                + rule_id
121                + "\n"
122            )
123            handle_validation_error()
124
125
126def check_device_id_exists(config_json):
127    r"""
128    Check if a device_id property specifies a device ID that does not exist.
129    config_json: Configuration file JSON
130    """
131
132    device_ids = get_values(config_json, "device_id")
133    valid_device_ids = get_device_ids(config_json)
134    for device_id in device_ids:
135        if device_id not in valid_device_ids:
136            sys.stderr.write(
137                "Error: Device ID does not exist.\n"
138                + "Found device_id value that specifies invalid device ID "
139                + device_id
140                + "\n"
141            )
142            handle_validation_error()
143
144
145def check_set_device_value_exists(config_json):
146    r"""
147    Check if a set_device action specifies a device ID that does not exist.
148    config_json: Configuration file JSON
149    """
150
151    device_ids = get_values(config_json, "set_device")
152    valid_device_ids = get_device_ids(config_json)
153    for device_id in device_ids:
154        if device_id not in valid_device_ids:
155            sys.stderr.write(
156                "Error: Device ID does not exist.\n"
157                + "Found set_device action that specifies invalid device ID "
158                + device_id
159                + "\n"
160            )
161            handle_validation_error()
162
163
164def check_run_rule_value_exists(config_json):
165    r"""
166    Check if any run_rule actions specify a rule ID that does not exist.
167    config_json: Configuration file JSON
168    """
169
170    rule_ids = get_values(config_json, "run_rule")
171    valid_rule_ids = get_rule_ids(config_json)
172    for rule_id in rule_ids:
173        if rule_id not in valid_rule_ids:
174            sys.stderr.write(
175                "Error: Rule ID does not exist.\n"
176                + "Found run_rule action that specifies invalid rule ID "
177                + rule_id
178                + "\n"
179            )
180            handle_validation_error()
181
182
183def check_infinite_loops_in_rule(config_json, rule_json, call_stack=[]):
184    r"""
185    Check if a 'run_rule' action in the specified rule causes an
186    infinite loop.
187    config_json: Configuration file JSON.
188    rule_json: A rule in the JSON config file.
189    call_stack: Current call stack of rules.
190    """
191
192    call_stack.append(rule_json["id"])
193    for action in rule_json.get("actions", {}):
194        if "run_rule" in action:
195            run_rule_id = action["run_rule"]
196            if run_rule_id in call_stack:
197                call_stack.append(run_rule_id)
198                sys.stderr.write(
199                    "Infinite loop caused by run_rule actions.\n"
200                    + str(call_stack)
201                    + "\n"
202                )
203                handle_validation_error()
204            else:
205                for rule in config_json.get("rules", {}):
206                    if rule["id"] == run_rule_id:
207                        check_infinite_loops_in_rule(
208                            config_json, rule, call_stack
209                        )
210    call_stack.pop()
211
212
213def check_infinite_loops(config_json):
214    r"""
215    Check if rule in config file is called recursively, causing an
216    infinite loop.
217    config_json: Configuration file JSON
218    """
219
220    for rule in config_json.get("rules", {}):
221        check_infinite_loops_in_rule(config_json, rule)
222
223
224def check_duplicate_object_id(config_json):
225    r"""
226    Check that there aren't any JSON objects with the same 'id' property value.
227    config_json: Configuration file JSON
228    """
229
230    json_ids = get_values(config_json, "id")
231    unique_ids = set()
232    for id in json_ids:
233        if id in unique_ids:
234            sys.stderr.write(
235                "Error: Duplicate ID.\n"
236                + "Found multiple objects with the ID "
237                + id
238                + "\n"
239            )
240            handle_validation_error()
241        else:
242            unique_ids.add(id)
243
244
245def check_duplicate_rule_id(config_json):
246    r"""
247    Check that there aren't any "rule" elements with the same 'id' field.
248    config_json: Configuration file JSON
249    """
250    rule_ids = []
251    for rule in config_json.get("rules", {}):
252        rule_id = rule["id"]
253        if rule_id in rule_ids:
254            sys.stderr.write(
255                "Error: Duplicate rule ID.\n"
256                + "Found multiple rules with the ID "
257                + rule_id
258                + "\n"
259            )
260            handle_validation_error()
261        else:
262            rule_ids.append(rule_id)
263
264
265def check_duplicate_chassis_number(config_json):
266    r"""
267    Check that there aren't any "chassis" elements with the same 'number'
268    field.
269    config_json: Configuration file JSON
270    """
271    numbers = []
272    for chassis in config_json.get("chassis", {}):
273        number = chassis["number"]
274        if number in numbers:
275            sys.stderr.write(
276                "Error: Duplicate chassis number.\n"
277                + "Found multiple chassis with the number "
278                + str(number)
279                + "\n"
280            )
281            handle_validation_error()
282        else:
283            numbers.append(number)
284
285
286def check_duplicate_device_id(config_json):
287    r"""
288    Check that there aren't any "devices" with the same 'id' field.
289    config_json: Configuration file JSON
290    """
291    device_ids = []
292    for chassis in config_json.get("chassis", {}):
293        for device in chassis.get("devices", {}):
294            device_id = device["id"]
295            if device_id in device_ids:
296                sys.stderr.write(
297                    "Error: Duplicate device ID.\n"
298                    + "Found multiple devices with the ID "
299                    + device_id
300                    + "\n"
301                )
302                handle_validation_error()
303            else:
304                device_ids.append(device_id)
305
306
307def check_duplicate_rail_id(config_json):
308    r"""
309    Check that there aren't any "rails" with the same 'id' field.
310    config_json: Configuration file JSON
311    """
312    rail_ids = []
313    for chassis in config_json.get("chassis", {}):
314        for device in chassis.get("devices", {}):
315            for rail in device.get("rails", {}):
316                rail_id = rail["id"]
317                if rail_id in rail_ids:
318                    sys.stderr.write(
319                        "Error: Duplicate rail ID.\n"
320                        + "Found multiple rails with the ID "
321                        + rail_id
322                        + "\n"
323                    )
324                    handle_validation_error()
325                else:
326                    rail_ids.append(rail_id)
327
328
329def check_for_duplicates(config_json):
330    r"""
331    Check for duplicate ID.
332    """
333    check_duplicate_rule_id(config_json)
334
335    check_duplicate_chassis_number(config_json)
336
337    check_duplicate_device_id(config_json)
338
339    check_duplicate_rail_id(config_json)
340
341    check_duplicate_object_id(config_json)
342
343
344def validate_schema(config, schema):
345    r"""
346    Validates the specified config file using the specified
347    schema file.
348
349    config:   Path of the file containing the config JSON
350    schema:   Path of the file containing the schema JSON
351    """
352
353    with open(config) as config_handle:
354        config_json = json.load(config_handle)
355
356        with open(schema) as schema_handle:
357            schema_json = json.load(schema_handle)
358
359            try:
360                jsonschema.validate(config_json, schema_json)
361            except jsonschema.ValidationError as e:
362                print(e)
363                handle_validation_error()
364
365    return config_json
366
367
368def validate_JSON_format(file):
369    with open(file) as json_data:
370        try:
371            return json.load(json_data)
372        except ValueError:
373            return False
374        return True
375
376
377if __name__ == "__main__":
378    parser = argparse.ArgumentParser(
379        description="phosphor-regulators configuration file validator"
380    )
381
382    parser.add_argument(
383        "-s",
384        "--schema-file",
385        dest="schema_file",
386        help="The phosphor-regulators schema file",
387    )
388
389    parser.add_argument(
390        "-c",
391        "--configuration-file",
392        dest="configuration_file",
393        help="The phosphor-regulators configuration file",
394    )
395
396    args = parser.parse_args()
397
398    if not args.schema_file:
399        parser.print_help()
400        sys.exit("Error: Schema file is required.")
401    if not os.path.exists(args.schema_file):
402        parser.print_help()
403        sys.exit("Error: Schema file does not exist.")
404    if not os.access(args.schema_file, os.R_OK):
405        parser.print_help()
406        sys.exit("Error: Schema file is not readable.")
407    if not validate_JSON_format(args.schema_file):
408        parser.print_help()
409        sys.exit("Error: Schema file is not in the JSON format.")
410    if not args.configuration_file:
411        parser.print_help()
412        sys.exit("Error: Configuration file is required.")
413    if not os.path.exists(args.configuration_file):
414        parser.print_help()
415        sys.exit("Error: Configuration file does not exist.")
416    if not os.access(args.configuration_file, os.R_OK):
417        parser.print_help()
418        sys.exit("Error: Configuration file is not readable.")
419    if not validate_JSON_format(args.configuration_file):
420        parser.print_help()
421        sys.exit("Error: Configuration file is not in the JSON format.")
422
423    config_json = validate_schema(args.configuration_file, args.schema_file)
424
425    check_for_duplicates(config_json)
426
427    check_infinite_loops(config_json)
428
429    check_run_rule_value_exists(config_json)
430
431    check_set_device_value_exists(config_json)
432
433    check_rule_id_exists(config_json)
434
435    check_device_id_exists(config_json)
436
437    check_number_of_elements_in_masks(config_json)
438