1#!/usr/bin/env python3 2# SPDX-License-Identifier: Apache-2.0 3""" 4A tool for validating entity manager configurations. 5""" 6 7import argparse 8import json 9import os 10import re 11import sys 12from concurrent.futures import ProcessPoolExecutor 13 14import jsonschema.exceptions 15import jsonschema.validators 16import referencing 17from referencing.jsonschema import DRAFT202012 18 19DEFAULT_SCHEMA_FILENAME = "global.json" 20 21 22def get_default_thread_count() -> int: 23 """ 24 Returns the number of CPUs available to the current process. 25 """ 26 try: 27 # This will respect CPU affinity settings 28 return len(os.sched_getaffinity(0)) 29 except AttributeError: 30 # Fallback for systems without sched_getaffinity 31 return os.cpu_count() or 1 32 33 34def remove_c_comments(string): 35 # first group captures quoted strings (double or single) 36 # second group captures comments (//single-line or /* multi-line */) 37 pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)" 38 regex = re.compile(pattern, re.MULTILINE | re.DOTALL) 39 40 def _replacer(match): 41 if match.group(2) is not None: 42 return "" 43 else: 44 return match.group(1) 45 46 return regex.sub(_replacer, string) 47 48 49def main(): 50 parser = argparse.ArgumentParser( 51 description="Entity manager configuration validator", 52 ) 53 parser.add_argument( 54 "-s", 55 "--schema", 56 help=( 57 "Use the specified schema file instead of the default " 58 "(__file__/../../schemas/global.json)" 59 ), 60 ) 61 parser.add_argument( 62 "-c", 63 "--config", 64 action="append", 65 help=( 66 "Validate the specified configuration files (can be " 67 "specified more than once) instead of the default " 68 "(__file__/../../configurations/**.json)" 69 ), 70 ) 71 parser.add_argument( 72 "-e", 73 "--expected-fails", 74 help=( 75 "A file with a list of configurations to ignore should " 76 "they fail to validate" 77 ), 78 ) 79 parser.add_argument( 80 "-k", 81 "--continue", 82 action="store_true", 83 help="keep validating after a failure", 84 ) 85 parser.add_argument( 86 "-v", "--verbose", action="store_true", help="be noisy" 87 ) 88 parser.add_argument( 89 "-t", 90 "--threads", 91 type=int, 92 default=get_default_thread_count(), 93 help="Number of threads to use for parallel validation (default: number of CPUs)", 94 ) 95 args = parser.parse_args() 96 97 schema_file = args.schema 98 if schema_file is None: 99 try: 100 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 101 schema_file = os.sep + os.path.join( 102 *source_dir, "schemas", DEFAULT_SCHEMA_FILENAME 103 ) 104 except Exception: 105 print( 106 f"Could not guess location of {DEFAULT_SCHEMA_FILENAME}", 107 file=sys.stderr, 108 ) 109 sys.exit(2) 110 111 config_files = args.config or [] 112 if len(config_files) == 0: 113 try: 114 source_dir = os.path.realpath(__file__).split(os.sep)[:-2] 115 configs_dir = os.sep + os.path.join(*source_dir, "configurations") 116 data = os.walk(configs_dir) 117 for root, _, files in data: 118 for f in files: 119 if f.endswith(".json"): 120 config_files.append(os.path.join(root, f)) 121 except Exception: 122 print( 123 "Could not guess location of configurations", file=sys.stderr 124 ) 125 sys.exit(2) 126 127 configs = [] 128 for config_file in config_files: 129 try: 130 with open(config_file) as fd: 131 configs.append(json.loads(remove_c_comments(fd.read()))) 132 except FileNotFoundError: 133 print( 134 f"Could not parse config file: {config_file}", file=sys.stderr 135 ) 136 sys.exit(2) 137 138 expected_fails = [] 139 if args.expected_fails: 140 try: 141 with open(args.expected_fails) as fd: 142 for line in fd: 143 expected_fails.append(line.strip()) 144 except Exception: 145 print( 146 f"Could not read expected fails file: {args.expected_fails}", 147 file=sys.stderr, 148 ) 149 sys.exit(2) 150 151 results = { 152 "invalid": [], 153 "unexpected_pass": [], 154 } 155 156 should_continue = getattr(args, "continue") 157 158 with ProcessPoolExecutor(max_workers=args.threads) as executor: 159 # Submit all validation tasks 160 config_to_future = {} 161 for config_file, config in zip(config_files, configs): 162 filename = os.path.split(config_file)[1] 163 future = executor.submit( 164 validate_single_config, 165 args, 166 filename, 167 config, 168 expected_fails, 169 schema_file, 170 ) 171 config_to_future[config_file] = future 172 173 # Process results as they complete 174 for config_file, future in config_to_future.items(): 175 # Wait for the future to complete and get its result 176 is_invalid, is_unexpected_pass = future.result() 177 # Update the results with the validation result 178 filename = os.path.split(config_file)[1] 179 if is_invalid: 180 results["invalid"].append(filename) 181 if is_unexpected_pass: 182 results["unexpected_pass"].append(filename) 183 184 # Stop validation if validation failed unexpectedly and --continue is not set 185 validation_failed = is_invalid or is_unexpected_pass 186 if validation_failed and not should_continue: 187 executor.shutdown(wait=False, cancel_futures=True) 188 break 189 190 exit_status = 0 191 if len(results["invalid"]) + len(results["unexpected_pass"]): 192 exit_status = 1 193 unexpected_pass_suffix = " **" 194 show_suffix_explanation = False 195 print("results:") 196 for f in config_files: 197 if any([x in f for x in results["unexpected_pass"]]): 198 show_suffix_explanation = True 199 print(f" '{f}' passed!{unexpected_pass_suffix}") 200 if any([x in f for x in results["invalid"]]): 201 print(f" '{f}' failed!") 202 203 if show_suffix_explanation: 204 print("\n** configuration expected to fail") 205 206 sys.exit(exit_status) 207 208 209def validator_from_file(schema_file): 210 # Get root directory of schema file, so we can walk all the directories 211 # for referenced schemas. 212 schema_path = os.path.dirname(schema_file) 213 214 root_schema = None 215 registry = referencing.Registry() 216 217 # Pre-load all .json files from the schemas directory and its subdirectories 218 # into the registry. This allows $refs to resolve to any schema. 219 for dirpath, _, directory in os.walk(schema_path): 220 for filename in directory: 221 if filename.endswith(".json"): 222 full_file_path = os.path.join(dirpath, filename) 223 224 # The URI is their path relative to schema_path. 225 relative_uri = os.path.relpath(full_file_path, schema_path) 226 227 with open(full_file_path, "r") as fd: 228 schema_contents = json.loads(remove_c_comments(fd.read())) 229 jsonschema.validators.Draft202012Validator.check_schema( 230 schema_contents 231 ) 232 233 # Add to the registry. 234 registry = registry.with_resource( 235 uri=relative_uri, 236 resource=referencing.Resource.from_contents( 237 schema_contents, default_specification=DRAFT202012 238 ), 239 ) 240 241 # If this was the schema_file we need to save the contents 242 # as the root schema. 243 if schema_file == full_file_path: 244 root_schema = schema_contents 245 246 # Create the validator instance with the schema content and the configured registry. 247 validator = jsonschema.validators.Draft202012Validator( 248 root_schema, registry=registry 249 ) 250 251 return validator 252 253 254def validate_single_config( 255 args, filename, config, expected_fails, schema_file 256): 257 expect_fail = filename in expected_fails 258 259 is_invalid = False 260 is_unexpected_pass = False 261 262 try: 263 validator = validator_from_file(schema_file) 264 validator.validate(config) 265 if expect_fail: 266 is_unexpected_pass = True 267 except jsonschema.exceptions.ValidationError as e: 268 if not expect_fail: 269 is_invalid = True 270 if args.verbose: 271 print(f"Validation Error for {filename}: {e}") 272 273 return (is_invalid, is_unexpected_pass) 274 275 276if __name__ == "__main__": 277 main() 278