1#!/usr/bin/env python3
2# SPDX-License-Identifier: Apache-2.0
3"""
4A tool for validating entity manager configurations.
5"""
6import argparse
7import json
8import os
9import re
10import sys
11
12import jsonschema.validators
13
14DEFAULT_SCHEMA_FILENAME = "global.json"
15
16
17def remove_c_comments(string):
18    # first group captures quoted strings (double or single)
19    # second group captures comments (//single-line or /* multi-line */)
20    pattern = r"(\".*?(?<!\\)\"|\'.*?(?<!\\)\')|(/\*.*?\*/|//[^\r\n]*$)"
21    regex = re.compile(pattern, re.MULTILINE | re.DOTALL)
22
23    def _replacer(match):
24        if match.group(2) is not None:
25            return ""
26        else:
27            return match.group(1)
28
29    return regex.sub(_replacer, string)
30
31
32def main():
33    parser = argparse.ArgumentParser(
34        description="Entity manager configuration validator",
35    )
36    parser.add_argument(
37        "-s",
38        "--schema",
39        help=(
40            "Use the specified schema file instead of the default "
41            "(__file__/../../schemas/global.json)"
42        ),
43    )
44    parser.add_argument(
45        "-c",
46        "--config",
47        action="append",
48        help=(
49            "Validate the specified configuration files (can be "
50            "specified more than once) instead of the default "
51            "(__file__/../../configurations/**.json)"
52        ),
53    )
54    parser.add_argument(
55        "-e",
56        "--expected-fails",
57        help=(
58            "A file with a list of configurations to ignore should "
59            "they fail to validate"
60        ),
61    )
62    parser.add_argument(
63        "-k",
64        "--continue",
65        action="store_true",
66        help="keep validating after a failure",
67    )
68    parser.add_argument(
69        "-v", "--verbose", action="store_true", help="be noisy"
70    )
71    args = parser.parse_args()
72
73    schema_file = args.schema
74    if schema_file is None:
75        try:
76            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
77            schema_file = os.sep + os.path.join(
78                *source_dir, "schemas", DEFAULT_SCHEMA_FILENAME
79            )
80        except Exception:
81            sys.stderr.write(
82                "Could not guess location of {}\n".format(
83                    DEFAULT_SCHEMA_FILENAME
84                )
85            )
86            sys.exit(2)
87
88    schema = {}
89    try:
90        with open(schema_file) as fd:
91            schema = json.load(fd)
92    except FileNotFoundError:
93        sys.stderr.write(
94            "Could not read schema file '{}'\n".format(schema_file)
95        )
96        sys.exit(2)
97
98    config_files = args.config or []
99    if len(config_files) == 0:
100        try:
101            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
102            configs_dir = os.sep + os.path.join(*source_dir, "configurations")
103            data = os.walk(configs_dir)
104            for root, _, files in data:
105                for f in files:
106                    if f.endswith(".json"):
107                        config_files.append(os.path.join(root, f))
108        except Exception:
109            sys.stderr.write("Could not guess location of configurations\n")
110            sys.exit(2)
111
112    configs = []
113    for config_file in config_files:
114        try:
115            with open(config_file) as fd:
116                configs.append(json.loads(remove_c_comments(fd.read())))
117        except FileNotFoundError:
118            sys.stderr.write(
119                "Could not parse config file '{}'\n".format(config_file)
120            )
121            sys.exit(2)
122
123    expected_fails = []
124    if args.expected_fails:
125        try:
126            with open(args.expected_fails) as fd:
127                for line in fd:
128                    expected_fails.append(line.strip())
129        except Exception:
130            sys.stderr.write(
131                "Could not read expected fails file '{}'\n".format(
132                    args.expected_fails
133                )
134            )
135            sys.exit(2)
136
137    spec = jsonschema.Draft202012Validator
138    spec.check_schema(schema)
139    base_uri = "file://{}/".format(
140        os.path.split(os.path.realpath(schema_file))[0]
141    )
142    resolver = jsonschema.RefResolver(base_uri, schema)
143    validator = spec(schema, resolver=resolver)
144
145    results = {
146        "invalid": [],
147        "unexpected_pass": [],
148    }
149    for config_file, config in zip(config_files, configs):
150        name = os.path.split(config_file)[1]
151        expect_fail = name in expected_fails
152        try:
153            validator.validate(config)
154            if expect_fail:
155                results["unexpected_pass"].append(name)
156                if not getattr(args, "continue"):
157                    break
158        except jsonschema.exceptions.ValidationError as e:
159            if not expect_fail:
160                results["invalid"].append(name)
161                if args.verbose:
162                    print(e)
163            if expect_fail or getattr(args, "continue"):
164                continue
165            break
166
167    exit_status = 0
168    if len(results["invalid"]) + len(results["unexpected_pass"]):
169        exit_status = 1
170        unexpected_pass_suffix = " **"
171        show_suffix_explanation = False
172        print("results:")
173        for f in config_files:
174            if any([x in f for x in results["unexpected_pass"]]):
175                show_suffix_explanation = True
176                print("  '{}' passed!{}".format(f, unexpected_pass_suffix))
177            if any([x in f for x in results["invalid"]]):
178                print("  '{}' failed!".format(f))
179
180        if show_suffix_explanation:
181            print("\n** configuration expected to fail")
182
183    sys.exit(exit_status)
184
185
186if __name__ == "__main__":
187    main()
188