1#!/usr/bin/env python3
2# SPDX-License-Identifier: Apache-2.0
3"""
4A tool for validating entity manager configurations.
5"""
6import argparse
7import json
8import jsonschema.validators
9import os
10import sys
11
12DEFAULT_SCHEMA_FILENAME = "global.json"
13
14
15def main():
16    parser = argparse.ArgumentParser(
17        description="Entity manager configuration validator",
18    )
19    parser.add_argument(
20        "-s", "--schema", help=(
21            "Use the specified schema file instead of the default "
22            "(__file__/../../schemas/global.json)"))
23    parser.add_argument(
24        "-c", "--config", action='append', help=(
25            "Validate the specified configuration files (can be "
26            "specified more than once) instead of the default "
27            "(__file__/../../configurations/**.json)"))
28    parser.add_argument(
29        "-e", "--expected-fails", help=(
30            "A file with a list of configurations to ignore should "
31            "they fail to validate"))
32    parser.add_argument(
33        "-k", "--continue", action='store_true', help=(
34            "keep validating after a failure"))
35    parser.add_argument(
36        "-v", "--verbose", action='store_true', help=(
37            "be noisy"))
38    args = parser.parse_args()
39
40    schema_file = args.schema
41    if schema_file is None:
42        try:
43            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
44            schema_file = os.sep + os.path.join(
45                *source_dir, 'schemas', DEFAULT_SCHEMA_FILENAME)
46        except Exception as e:
47            sys.stderr.write(
48                "Could not guess location of {}\n".format(
49                    DEFAULT_SCHEMA_FILENAME))
50            sys.exit(2)
51
52    schema = {}
53    try:
54        with open(schema_file) as fd:
55            schema = json.load(fd)
56    except FileNotFoundError as e:
57        sys.stderr.write(
58            "Could not read schema file '{}'\n".format(schema_file))
59        sys.exit(2)
60
61    config_files = args.config or []
62    if len(config_files) == 0:
63        try:
64            source_dir = os.path.realpath(__file__).split(os.sep)[:-2]
65            configs_dir = os.sep + os.path.join(*source_dir, 'configurations')
66            data = os.walk(configs_dir)
67            for root, _, files in data:
68                for f in files:
69                    if f.endswith('.json'):
70                        config_files.append(os.path.join(root, f))
71        except Exception as e:
72            sys.stderr.write(
73                "Could not guess location of configurations\n")
74            sys.exit(2)
75
76    configs = []
77    for config_file in config_files:
78        try:
79            with open(config_file) as fd:
80                configs.append(json.load(fd))
81        except FileNotFoundError as e:
82            sys.stderr.write(
83                    "Could not parse config file '{}'\n".format(config_file))
84            sys.exit(2)
85
86    expected_fails = []
87    if args.expected_fails:
88        try:
89            with open(args.expected_fails) as fd:
90                for line in fd:
91                    expected_fails.append(line.strip())
92        except Exception as e:
93            sys.stderr.write(
94                    "Could not read expected fails file '{}'\n".format(
95                        args.expected_fails))
96            sys.exit(2)
97
98    base_uri = "file://{}/".format(
99        os.path.split(os.path.realpath(schema_file))[0])
100    resolver = jsonschema.RefResolver(base_uri, schema)
101    validator = jsonschema.Draft7Validator(schema, resolver=resolver)
102
103    results = {
104        "invalid": [],
105        "unexpected_pass": [],
106    }
107    for config_file, config in zip(config_files, configs):
108        name = os.path.split(config_file)[1]
109        expect_fail = name in expected_fails
110        try:
111            validator.validate(config)
112            if expect_fail:
113                results["unexpected_pass"].append(name)
114                if not getattr(args, "continue"):
115                    break
116        except jsonschema.exceptions.ValidationError as e:
117            if not expect_fail:
118                results["invalid"].append(name)
119                if args.verbose:
120                    print(e)
121            if expect_fail or getattr(args, "continue"):
122                continue
123            break
124
125    exit_status = 0
126    if len(results["invalid"]) + len(results["unexpected_pass"]):
127        exit_status = 1
128        unexpected_pass_suffix = " **"
129        show_suffix_explanation = False
130        print("results:")
131        for f in config_files:
132            if any([x in f for x in results["unexpected_pass"]]):
133                show_suffix_explanation = True
134                print("  '{}' passed!{}".format(f, unexpected_pass_suffix))
135            if any([x in f for x in results["invalid"]]):
136                print("  '{}' failed!".format(f))
137
138        if show_suffix_explanation:
139            print("\n** configuration expected to fail")
140
141    sys.exit(exit_status)
142
143
144if __name__ == "__main__":
145    main()
146