1# 2# Copyright (C) 2016 Intel Corporation 3# 4# SPDX-License-Identifier: MIT 5# 6 7from unittest import SkipTest 8 9from oeqa.core.exception import OEQADependency 10 11from . import OETestDiscover, registerDecorator 12 13def _add_depends(registry, case, depends): 14 module_name = case.__module__ 15 class_name = case.__class__.__name__ 16 17 case_id = case.id() 18 19 for depend in depends: 20 dparts = depend.split('.') 21 22 if len(dparts) == 1: 23 depend_id = ".".join((module_name, class_name, dparts[0])) 24 elif len(dparts) == 2: 25 depend_id = ".".join((module_name, dparts[0], dparts[1])) 26 else: 27 depend_id = depend 28 29 if not case_id in registry: 30 registry[case_id] = [] 31 if not depend_id in registry[case_id]: 32 registry[case_id].append(depend_id) 33 34def _validate_test_case_depends(cases, depends): 35 for case in depends: 36 if not case in cases: 37 continue 38 for dep in depends[case]: 39 if not dep in cases: 40 raise OEQADependency("TestCase %s depends on %s and isn't available"\ 41 ", cases available %s." % (case, dep, str(cases.keys()))) 42 43def _order_test_case_by_depends(cases, depends): 44 def _dep_resolve(graph, node, resolved, seen): 45 seen.append(node) 46 for edge in graph[node]: 47 if edge not in resolved: 48 if edge in seen: 49 raise OEQADependency("Test cases %s and %s have a circular" \ 50 " dependency." % (node, edge)) 51 _dep_resolve(graph, edge, resolved, seen) 52 resolved.append(node) 53 54 dep_graph = {} 55 dep_graph['__root__'] = cases.keys() 56 for case in cases: 57 if case in depends: 58 dep_graph[case] = depends[case] 59 else: 60 dep_graph[case] = [] 61 62 cases_ordered = [] 63 _dep_resolve(dep_graph, '__root__', cases_ordered, []) 64 cases_ordered.remove('__root__') 65 66 return [cases[case_id] for case_id in cases_ordered] 67 68def _skipTestDependency(case, depends): 69 for dep in depends: 70 found = False 71 for test, _ in case.tc.results.successes: 72 if test.id() == dep: 73 found = True 74 break 75 if not found: 76 raise SkipTest("Test case %s depends on %s but it didn't pass/run." \ 77 % (case.id(), dep)) 78 79@registerDecorator 80class OETestDepends(OETestDiscover): 81 attrs = ('depends',) 82 83 def bind(self, registry, case): 84 super(OETestDepends, self).bind(registry, case) 85 if not registry.get('depends'): 86 registry['depends'] = {} 87 _add_depends(registry['depends'], case, self.depends) 88 89 @staticmethod 90 def discover(registry): 91 if registry.get('depends'): 92 _validate_test_case_depends(registry['cases'], registry['depends']) 93 return _order_test_case_by_depends(registry['cases'], registry['depends']) 94 else: 95 return [registry['cases'][case_id] for case_id in registry['cases']] 96 97 def setUpDecorator(self): 98 _skipTestDependency(self.case, self.depends) 99