xref: /openbmc/entity-manager/scripts/validate_configs.py (revision 809fbdc2d7052dc7eb3612562eb7b0d2820b924b)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: Apache-2.0
3"""
4A tool for validating entity manager configurations.
5"""
6
7import argparse
8import json
9import os
10import re
11import sys
12from concurrent.futures import ProcessPoolExecutor
13
14import jsonschema.exceptions
15import jsonschema.validators
16import referencing
17from referencing.jsonschema import DRAFT202012
18
19DEFAULT_SCHEMA_FILENAME = "global.json"
20
21
22def get_default_thread_count() -> int:
23    """
24    Returns the number of CPUs available to the current process.
25    """
26    try:
27        # This will respect CPU affinity settings
28        return len(os.sched_getaffinity(0))
29    except AttributeError:
30        # Fallback for systems without sched_getaffinity
31        return os.cpu_count() or 1
32
33
34def remove_c_comments(string):
35    # first group captures quoted strings (double or single)
36    # second group captures comments (//single-line or /* multi-line */)
37    pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)"
38    regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
39
40    def _replacer(match):
41        if match.group(2) is not None:
42            return ""
43        else:
44            return match.group(1)
45
46    return regex.sub(_replacer, string)
47
48
49def main():
50    parser = argparse.ArgumentParser(
51        description="Entity manager configuration validator",
52    )
53    parser.add_argument(
54        "-s",
55        "--schema",
56        help=(
57            "Use the specified schema file instead of the default "
58            "(__file__/../../schemas/global.json)"
59        ),
60    )
61    parser.add_argument(
62        "-c",
63        "--config",
64        action="append",
65        help=(
66            "Validate the specified configuration files (can be "
67            "specified more than once) instead of the default "
68            "(__file__/../../configurations/**.json)"
69        ),
70    )
71    parser.add_argument(
72        "-e",
73        "--expected-fails",
74        help=(
75            "A file with a list of configurations to ignore should "
76            "they fail to validate"
77        ),
78    )
79    parser.add_argument(
80        "-k",
81        "--continue",
82        action="store_true",
83        help="keep validating after a failure",
84    )
85    parser.add_argument(
86        "-v", "--verbose", action="store_true", help="be noisy"
87    )
88    parser.add_argument(
89        "-t",
90        "--threads",
91        type=int,
92        default=get_default_thread_count(),
93        help="Number of threads to use for parallel validation (default: number of CPUs)",
94    )
95    args = parser.parse_args()
96
97    schema_file = args.schema
98    if schema_file is None:
99        try:
100            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
101            schema_file = os.sep + os.path.join(
102                *source_dir, "schemas", DEFAULT_SCHEMA_FILENAME
103            )
104        except Exception:
105            print(
106                f"Could not guess location of {DEFAULT_SCHEMA_FILENAME}",
107                file=sys.stderr,
108            )
109            sys.exit(2)
110
111    config_files = args.config or []
112    if len(config_files) == 0:
113        try:
114            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
115            configs_dir = os.sep + os.path.join(*source_dir, "configurations")
116            data = os.walk(configs_dir)
117            for root, _, files in data:
118                for f in files:
119                    if f.endswith(".json"):
120                        config_files.append(os.path.join(root, f))
121        except Exception:
122            print(
123                "Could not guess location of configurations", file=sys.stderr
124            )
125            sys.exit(2)
126
127    configs = []
128    for config_file in config_files:
129        try:
130            with open(config_file) as fd:
131                configs.append(json.loads(remove_c_comments(fd.read())))
132        except FileNotFoundError:
133            print(
134                f"Could not parse config file: {config_file}", file=sys.stderr
135            )
136            sys.exit(2)
137
138    expected_fails = []
139    if args.expected_fails:
140        try:
141            with open(args.expected_fails) as fd:
142                for line in fd:
143                    expected_fails.append(line.strip())
144        except Exception:
145            print(
146                f"Could not read expected fails file: {args.expected_fails}",
147                file=sys.stderr,
148            )
149            sys.exit(2)
150
151    results = {
152        "invalid": [],
153        "unexpected_pass": [],
154    }
155
156    should_continue = getattr(args, "continue")
157
158    with ProcessPoolExecutor(max_workers=args.threads) as executor:
159        # Submit all validation tasks
160        config_to_future = {}
161        for config_file, config in zip(config_files, configs):
162            filename = os.path.split(config_file)[1]
163            future = executor.submit(
164                validate_single_config,
165                args,
166                filename,
167                config,
168                expected_fails,
169                schema_file,
170            )
171            config_to_future[config_file] = future
172
173        # Process results as they complete
174        for config_file, future in config_to_future.items():
175            # Wait for the future to complete and get its result
176            is_invalid, is_unexpected_pass = future.result()
177            # Update the results with the validation result
178            filename = os.path.split(config_file)[1]
179            if is_invalid:
180                results["invalid"].append(filename)
181            if is_unexpected_pass:
182                results["unexpected_pass"].append(filename)
183
184            # Stop validation if validation failed unexpectedly and --continue is not set
185            validation_failed = is_invalid or is_unexpected_pass
186            if validation_failed and not should_continue:
187                executor.shutdown(wait=False, cancel_futures=True)
188                break
189
190    exit_status = 0
191    if len(results["invalid"]) + len(results["unexpected_pass"]):
192        exit_status = 1
193        unexpected_pass_suffix = " **"
194        show_suffix_explanation = False
195        print("results:")
196        for f in config_files:
197            if any([x in f for x in results["unexpected_pass"]]):
198                show_suffix_explanation = True
199                print(f"  '{f}' passed!{unexpected_pass_suffix}")
200            if any([x in f for x in results["invalid"]]):
201                print(f"  '{f}' failed!")
202
203        if show_suffix_explanation:
204            print("\n** configuration expected to fail")
205
206    sys.exit(exit_status)
207
208
209def validator_from_file(schema_file):
210    # Get root directory of schema file, so we can walk all the directories
211    # for referenced schemas.
212    schema_path = os.path.dirname(schema_file)
213
214    root_schema = None
215    registry = referencing.Registry()
216
217    # Pre-load all .json files from the schemas directory and its subdirectories
218    # into the registry. This allows $refs to resolve to any schema.
219    for dirpath, _, directory in os.walk(schema_path):
220        for filename in directory:
221            if filename.endswith(".json"):
222                full_file_path = os.path.join(dirpath, filename)
223
224                # The URI  is their path relative to schema_path.
225                relative_uri = os.path.relpath(full_file_path, schema_path)
226
227                with open(full_file_path, "r") as fd:
228                    schema_contents = json.loads(remove_c_comments(fd.read()))
229                    jsonschema.validators.Draft202012Validator.check_schema(
230                        schema_contents
231                    )
232
233                    # Add to the registry.
234                    registry = registry.with_resource(
235                        uri=relative_uri,
236                        resource=referencing.Resource.from_contents(
237                            schema_contents, default_specification=DRAFT202012
238                        ),
239                    )
240
241                    # If this was the schema_file we need to save the contents
242                    # as the root schema.
243                    if schema_file == full_file_path:
244                        root_schema = schema_contents
245
246    # Create the validator instance with the schema content and the configured registry.
247    validator = jsonschema.validators.Draft202012Validator(
248        root_schema, registry=registry
249    )
250
251    return validator
252
253
254def validate_single_config(
255    args, filename, config, expected_fails, schema_file
256):
257    expect_fail = filename in expected_fails
258
259    is_invalid = False
260    is_unexpected_pass = False
261
262    try:
263        validator = validator_from_file(schema_file)
264        validator.validate(config)
265        if expect_fail:
266            is_unexpected_pass = True
267    except jsonschema.exceptions.ValidationError as e:
268        if not expect_fail:
269            is_invalid = True
270            if args.verbose:
271                print(f"Validation Error for {filename}: {e}")
272
273    return (is_invalid, is_unexpected_pass)
274
275
276if __name__ == "__main__":
277    main()
278