1#!/usr/bin/env python3 2# SPDX-License-Identifier: Apache-2.0 3""" 4A tool for validating entity manager configurations. 5""" 6import argparse 7import json 8import os 9import sys 10 11import jsonschema.validators 12 13DEFAULT_SCHEMA_FILENAME = "global.json" 14 15 16def main(): 17 parser = argparse.ArgumentParser( 18 description="Entity manager configuration validator", 19 ) 20 parser.add_argument( 21 "-s", 22 "--schema", 23 help=( 24 "Use the specified schema file instead of the default " 25 "(__file__/../../schemas/global.json)" 26 ), 27 ) 28 parser.add_argument( 29 "-c", 30 "--config", 31 action="append", 32 help=( 33 "Validate the specified configuration files (can be " 34 "specified more than once) instead of the default " 35 "(__file__/../../configurations/**.json)" 36 ), 37 ) 38 parser.add_argument( 39 "-e", 40 "--expected-fails", 41 help=( 42 "A file with a list of configurations to ignore should " 43 "they fail to validate" 44 ), 45 ) 46 parser.add_argument( 47 "-k", 48 "--continue", 49 action="store_true", 50 help="keep validating after a failure", 51 ) 52 parser.add_argument( 53 "-v", "--verbose", action="store_true", help="be noisy" 54 ) 55 args = parser.parse_args() 56 57 schema_file = args.schema 58 if schema_file is None: 59 try: 60 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 61 schema_file = os.sep + os.path.join( 62 *source_dir, "schemas", DEFAULT_SCHEMA_FILENAME 63 ) 64 except Exception: 65 sys.stderr.write( 66 "Could not guess location of {}\n".format( 67 DEFAULT_SCHEMA_FILENAME 68 ) 69 ) 70 sys.exit(2) 71 72 schema = {} 73 try: 74 with open(schema_file) as fd: 75 schema = json.load(fd) 76 except FileNotFoundError: 77 sys.stderr.write( 78 "Could not read schema file '{}'\n".format(schema_file) 79 ) 80 sys.exit(2) 81 82 config_files = args.config or [] 83 if len(config_files) == 0: 84 try: 85 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 86 configs_dir = os.sep + os.path.join(*source_dir, "configurations") 87 data = os.walk(configs_dir) 88 for root, _, files in data: 89 for f in files: 90 if f.endswith(".json"): 91 config_files.append(os.path.join(root, f)) 92 except Exception: 93 sys.stderr.write("Could not guess location of configurations\n") 94 sys.exit(2) 95 96 configs = [] 97 for config_file in config_files: 98 try: 99 with open(config_file) as fd: 100 configs.append(json.load(fd)) 101 except FileNotFoundError: 102 sys.stderr.write( 103 "Could not parse config file '{}'\n".format(config_file) 104 ) 105 sys.exit(2) 106 107 expected_fails = [] 108 if args.expected_fails: 109 try: 110 with open(args.expected_fails) as fd: 111 for line in fd: 112 expected_fails.append(line.strip()) 113 except Exception: 114 sys.stderr.write( 115 "Could not read expected fails file '{}'\n".format( 116 args.expected_fails 117 ) 118 ) 119 sys.exit(2) 120 121 base_uri = "file://{}/".format( 122 os.path.split(os.path.realpath(schema_file))[0] 123 ) 124 resolver = jsonschema.RefResolver(base_uri, schema) 125 validator = jsonschema.Draft7Validator(schema, resolver=resolver) 126 127 results = { 128 "invalid": [], 129 "unexpected_pass": [], 130 } 131 for config_file, config in zip(config_files, configs): 132 name = os.path.split(config_file)[1] 133 expect_fail = name in expected_fails 134 try: 135 validator.validate(config) 136 if expect_fail: 137 results["unexpected_pass"].append(name) 138 if not getattr(args, "continue"): 139 break 140 except jsonschema.exceptions.ValidationError as e: 141 if not expect_fail: 142 results["invalid"].append(name) 143 if args.verbose: 144 print(e) 145 if expect_fail or getattr(args, "continue"): 146 continue 147 break 148 149 exit_status = 0 150 if len(results["invalid"]) + len(results["unexpected_pass"]): 151 exit_status = 1 152 unexpected_pass_suffix = " **" 153 show_suffix_explanation = False 154 print("results:") 155 for f in config_files: 156 if any([x in f for x in results["unexpected_pass"]]): 157 show_suffix_explanation = True 158 print(" '{}' passed!{}".format(f, unexpected_pass_suffix)) 159 if any([x in f for x in results["invalid"]]): 160 print(" '{}' failed!".format(f)) 161 162 if show_suffix_explanation: 163 print("\n** configuration expected to fail") 164 165 sys.exit(exit_status) 166 167 168if __name__ == "__main__": 169 main() 170