1#!/usr/bin/env python3 2# SPDX-License-Identifier: Apache-2.0 3""" 4A tool for validating entity manager configurations. 5""" 6import argparse 7import json 8import jsonschema.validators 9import os 10import sys 11 12DEFAULT_SCHEMA_FILENAME = "global.json" 13 14 15def main(): 16 parser = argparse.ArgumentParser( 17 description="Entity manager configuration validator", 18 ) 19 parser.add_argument( 20 "-s", "--schema", help=( 21 "Use the specified schema file instead of the default " 22 "(__file__/../../schemas/global.json)")) 23 parser.add_argument( 24 "-c", "--config", action='append', help=( 25 "Validate the specified configuration files (can be " 26 "specified more than once) instead of the default " 27 "(__file__/../../configurations/**.json)")) 28 parser.add_argument( 29 "-e", "--expected-fails", help=( 30 "A file with a list of configurations to ignore should " 31 "they fail to validate")) 32 parser.add_argument( 33 "-k", "--continue", action='store_true', help=( 34 "keep validating after a failure")) 35 parser.add_argument( 36 "-v", "--verbose", action='store_true', help=( 37 "be noisy")) 38 args = parser.parse_args() 39 40 schema_file = args.schema 41 if schema_file is None: 42 try: 43 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 44 schema_file = os.sep + os.path.join( 45 *source_dir, 'schemas', DEFAULT_SCHEMA_FILENAME) 46 except Exception as e: 47 sys.stderr.write( 48 "Could not guess location of {}\n".format( 49 DEFAULT_SCHEMA_FILENAME)) 50 sys.exit(2) 51 52 schema = {} 53 try: 54 with open(schema_file) as fd: 55 schema = json.load(fd) 56 except FileNotFoundError as e: 57 sys.stderr.write( 58 "Could not read schema file '{}'\n".format(schema_file)) 59 sys.exit(2) 60 61 config_files = args.config or [] 62 if len(config_files) == 0: 63 try: 64 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 65 configs_dir = os.sep + os.path.join(*source_dir, 'configurations') 66 data = os.walk(configs_dir) 67 for root, _, files in data: 68 for f in files: 69 if f.endswith('.json'): 70 config_files.append(os.path.join(root, f)) 71 except Exception as e: 72 sys.stderr.write( 73 "Could not guess location of configurations\n") 74 sys.exit(2) 75 76 configs = [] 77 for config_file in config_files: 78 try: 79 with open(config_file) as fd: 80 configs.append(json.load(fd)) 81 except FileNotFoundError as e: 82 sys.stderr.write( 83 "Could not parse config file '{}'\n".format(config_file)) 84 sys.exit(2) 85 86 expected_fails = [] 87 if args.expected_fails: 88 try: 89 with open(args.expected_fails) as fd: 90 for line in fd: 91 expected_fails.append(line.strip()) 92 except Exception as e: 93 sys.stderr.write( 94 "Could not read expected fails file '{}'\n".format( 95 args.expected_fails)) 96 sys.exit(2) 97 98 base_uri = "file://{}/".format( 99 os.path.split(os.path.realpath(schema_file))[0]) 100 resolver = jsonschema.RefResolver(base_uri, schema) 101 validator = jsonschema.Draft7Validator(schema, resolver=resolver) 102 103 results = { 104 "invalid": [], 105 "unexpected_pass": [], 106 } 107 for config_file, config in zip(config_files, configs): 108 name = os.path.split(config_file)[1] 109 expect_fail = name in expected_fails 110 try: 111 validator.validate(config) 112 if expect_fail: 113 results["unexpected_pass"].append(name) 114 if not getattr(args, "continue"): 115 break 116 except jsonschema.exceptions.ValidationError as e: 117 if not expect_fail: 118 results["invalid"].append(name) 119 if args.verbose: 120 print(e) 121 if expect_fail or getattr(args, "continue"): 122 continue 123 break 124 125 exit_status = 0 126 if len(results["invalid"]) + len(results["unexpected_pass"]): 127 exit_status = 1 128 unexpected_pass_suffix = " **" 129 show_suffix_explanation = False 130 print("results:") 131 for f in config_files: 132 if any([x in f for x in results["unexpected_pass"]]): 133 show_suffix_explanation = True 134 print(" '{}' passed!{}".format(f, unexpected_pass_suffix)) 135 if any([x in f for x in results["invalid"]]): 136 print(" '{}' failed!".format(f)) 137 138 if show_suffix_explanation: 139 print("\n** configuration expected to fail") 140 141 sys.exit(exit_status) 142 143 144if __name__ == "__main__": 145 main() 146