1#
2# Copyright (C) 2016 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7from unittest import SkipTest
8
9from oeqa.core.exception import OEQADependency
10
11from . import OETestDiscover, registerDecorator
12
13def _add_depends(registry, case, depends):
14    module_name = case.__module__
15    class_name = case.__class__.__name__
16
17    case_id = case.id()
18
19    for depend in depends:
20        dparts = depend.split('.')
21
22        if len(dparts) == 1:
23            depend_id = ".".join((module_name, class_name, dparts[0]))
24        elif len(dparts) == 2:
25            depend_id = ".".join((module_name, dparts[0], dparts[1]))
26        else:
27            depend_id = depend
28
29        if not case_id in registry:
30            registry[case_id] = []
31        if not depend_id in registry[case_id]:
32            registry[case_id].append(depend_id)
33
34def _validate_test_case_depends(cases, depends):
35    for case in depends:
36        if not case in cases:
37            continue
38        for dep in depends[case]:
39            if not dep in cases:
40                raise OEQADependency("TestCase %s depends on %s and isn't available"\
41                       ", cases available %s." % (case, dep, str(cases.keys())))
42
43def _order_test_case_by_depends(cases, depends):
44    def _dep_resolve(graph, node, resolved, seen):
45        seen.append(node)
46        for edge in graph[node]:
47            if edge not in resolved:
48                if edge in seen:
49                    raise OEQADependency("Test cases %s and %s have a circular" \
50                                       " dependency." % (node, edge))
51                _dep_resolve(graph, edge, resolved, seen)
52        resolved.append(node)
53
54    dep_graph = {}
55    dep_graph['__root__'] = cases.keys()
56    for case in cases:
57        if case in depends:
58            dep_graph[case] = depends[case]
59        else:
60            dep_graph[case] = []
61
62    cases_ordered = []
63    _dep_resolve(dep_graph, '__root__', cases_ordered, [])
64    cases_ordered.remove('__root__')
65
66    return [cases[case_id] for case_id in cases_ordered]
67
68def _skipTestDependency(case, depends):
69    for dep in depends:
70        found = False
71        for test, _ in case.tc.results.successes:
72            if test.id() == dep:
73                found = True
74                break
75        if not found:
76            raise SkipTest("Test case %s depends on %s but it didn't pass/run." \
77                        % (case.id(), dep))
78
79@registerDecorator
80class OETestDepends(OETestDiscover):
81    attrs = ('depends',)
82
83    def bind(self, registry, case):
84        super(OETestDepends, self).bind(registry, case)
85        if not registry.get('depends'):
86            registry['depends'] = {}
87        _add_depends(registry['depends'], case, self.depends)
88
89    @staticmethod
90    def discover(registry):
91        if registry.get('depends'):
92            _validate_test_case_depends(registry['cases'], registry['depends'])
93            return _order_test_case_by_depends(registry['cases'], registry['depends'])
94        else:
95            return [registry['cases'][case_id] for case_id in registry['cases']]
96
97    def setUpDecorator(self):
98        _skipTestDependency(self.case, self.depends)
99