1## Copyright (C) 2016 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5 6import os 7import sys 8import json 9import time 10import logging 11import collections 12import unittest 13 14from oeqa.core.loader import OETestLoader 15from oeqa.core.runner import OETestRunner 16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 17 18class OETestContext(object): 19 loaderClass = OETestLoader 20 runnerClass = OETestRunner 21 22 files_dir = os.path.abspath(os.path.join(os.path.dirname( 23 os.path.abspath(__file__)), "../files")) 24 25 def __init__(self, td=None, logger=None): 26 if not type(td) is dict: 27 raise TypeError("td isn't dictionary type") 28 29 self.td = td 30 self.logger = logger 31 self._registry = {} 32 self._registry['cases'] = collections.OrderedDict() 33 34 def _read_modules_from_manifest(self, manifest): 35 if not os.path.exists(manifest): 36 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 37 38 modules = [] 39 for line in open(manifest).readlines(): 40 line = line.strip() 41 if line and not line.startswith("#"): 42 modules.append(line) 43 44 return modules 45 46 def skipTests(self, skips): 47 if not skips: 48 return 49 def skipfuncgen(skipmsg): 50 def func(): 51 raise unittest.SkipTest(skipmsg) 52 return func 53 class_ids = {} 54 for test in self.suites: 55 if test.__class__ not in class_ids: 56 class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1]) 57 for skip in skips: 58 if (test.id()+'.').startswith(skip+'.'): 59 setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip)) 60 for tclass in class_ids: 61 cid = class_ids[tclass] 62 for skip in skips: 63 if (cid + '.').startswith(skip + '.'): 64 setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip)) 65 66 def loadTests(self, module_paths, modules=[], tests=[], 67 modules_manifest="", modules_required=[], filters={}): 68 if modules_manifest: 69 modules = self._read_modules_from_manifest(modules_manifest) 70 71 self.loader = self.loaderClass(self, module_paths, modules, tests, 72 modules_required, filters) 73 self.suites = self.loader.discover() 74 75 def runTests(self, processes=None, skips=[]): 76 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 77 78 # Dinamically skip those tests specified though arguments 79 self.skipTests(skips) 80 81 self._run_start_time = time.time() 82 if processes: 83 from oeqa.core.utils.concurrencytest import ConcurrentTestSuite 84 85 concurrent_suite = ConcurrentTestSuite(self.suites, processes) 86 result = self.runner.run(concurrent_suite) 87 else: 88 self.runner.buffer = True 89 result = self.runner.run(self.suites) 90 self._run_end_time = time.time() 91 92 return result 93 94 def listTests(self, display_type): 95 self.runner = self.runnerClass(self, verbosity=2) 96 return self.runner.list_tests(self.suites, display_type) 97 98class OETestContextExecutor(object): 99 _context_class = OETestContext 100 _script_executor = 'oe-test' 101 102 name = 'core' 103 help = 'core test component example' 104 description = 'executes core test suite example' 105 106 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 107 'cases/example')] 108 default_test_data = os.path.join(default_cases[0], 'data.json') 109 default_tests = None 110 111 def register_commands(self, logger, subparsers): 112 self.parser = subparsers.add_parser(self.name, help=self.help, 113 description=self.description, group='components') 114 115 self.default_output_log = '%s-results-%s.log' % (self.name, 116 time.strftime("%Y%m%d%H%M%S")) 117 self.parser.add_argument('--output-log', action='store', 118 default=self.default_output_log, 119 help="results output log, default: %s" % self.default_output_log) 120 121 group = self.parser.add_mutually_exclusive_group() 122 group.add_argument('--run-tests', action='store', nargs='+', 123 default=self.default_tests, 124 help="tests to run in <module>[.<class>[.<name>]]") 125 group.add_argument('--list-tests', action='store', 126 choices=('module', 'class', 'name'), 127 help="lists available tests") 128 129 if self.default_test_data: 130 self.parser.add_argument('--test-data-file', action='store', 131 default=self.default_test_data, 132 help="data file to load, default: %s" % self.default_test_data) 133 else: 134 self.parser.add_argument('--test-data-file', action='store', 135 help="data file to load") 136 137 if self.default_cases: 138 self.parser.add_argument('CASES_PATHS', action='store', 139 default=self.default_cases, nargs='*', 140 help="paths to directories with test cases, default: %s"\ 141 % self.default_cases) 142 else: 143 self.parser.add_argument('CASES_PATHS', action='store', 144 nargs='+', help="paths to directories with test cases") 145 146 self.parser.set_defaults(func=self.run) 147 148 def _setup_logger(self, logger, args): 149 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 150 ' - %(levelname)s - %(message)s') 151 sh = logger.handlers[0] 152 sh.setFormatter(formatter) 153 fh = logging.FileHandler(args.output_log) 154 fh.setFormatter(formatter) 155 logger.addHandler(fh) 156 157 return logger 158 159 def _process_args(self, logger, args): 160 self.tc_kwargs = {} 161 self.tc_kwargs['init'] = {} 162 self.tc_kwargs['load'] = {} 163 self.tc_kwargs['list'] = {} 164 self.tc_kwargs['run'] = {} 165 166 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 167 if args.test_data_file: 168 self.tc_kwargs['init']['td'] = json.load( 169 open(args.test_data_file, "r")) 170 else: 171 self.tc_kwargs['init']['td'] = {} 172 173 if args.run_tests: 174 self.tc_kwargs['load']['modules'] = args.run_tests 175 self.tc_kwargs['load']['modules_required'] = args.run_tests 176 else: 177 self.tc_kwargs['load']['modules'] = [] 178 179 self.tc_kwargs['run']['skips'] = [] 180 181 self.module_paths = args.CASES_PATHS 182 183 def _pre_run(self): 184 pass 185 186 def run(self, logger, args): 187 self._process_args(logger, args) 188 189 self.tc = self._context_class(**self.tc_kwargs['init']) 190 try: 191 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 192 except OEQATestNotFound as ex: 193 logger.error(ex) 194 sys.exit(1) 195 196 if args.list_tests: 197 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 198 else: 199 self._pre_run() 200 rc = self.tc.runTests(**self.tc_kwargs['run']) 201 rc.logDetails() 202 rc.logSummary(self.name) 203 204 output_link = os.path.join(os.path.dirname(args.output_log), 205 "%s-results.log" % self.name) 206 if os.path.exists(output_link): 207 os.remove(output_link) 208 os.symlink(args.output_log, output_link) 209 210 return rc 211 212_executor_class = OETestContextExecutor 213