1## Copyright (C) 2016 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5 6import os 7import sys 8import json 9import time 10import logging 11import collections 12import unittest 13 14from oeqa.core.loader import OETestLoader 15from oeqa.core.runner import OETestRunner 16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 17 18class OETestContext(object): 19 loaderClass = OETestLoader 20 runnerClass = OETestRunner 21 22 files_dir = os.path.abspath(os.path.join(os.path.dirname( 23 os.path.abspath(__file__)), "../files")) 24 25 def __init__(self, td=None, logger=None): 26 if not type(td) is dict: 27 raise TypeError("td isn't dictionary type") 28 29 self.td = td 30 self.logger = logger 31 self._registry = {} 32 self._registry['cases'] = collections.OrderedDict() 33 34 self.results = unittest.TestResult() 35 unittest.registerResult(self.results) 36 37 def _read_modules_from_manifest(self, manifest): 38 if not os.path.exists(manifest): 39 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 40 41 modules = [] 42 for line in open(manifest).readlines(): 43 line = line.strip() 44 if line and not line.startswith("#"): 45 modules.append(line) 46 47 return modules 48 49 def skipTests(self, skips): 50 if not skips: 51 return 52 def skipfuncgen(skipmsg): 53 def func(): 54 raise unittest.SkipTest(skipmsg) 55 return func 56 class_ids = {} 57 for test in self.suites: 58 if test.__class__ not in class_ids: 59 class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1]) 60 for skip in skips: 61 if (test.id()+'.').startswith(skip+'.'): 62 setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip)) 63 for tclass in class_ids: 64 cid = class_ids[tclass] 65 for skip in skips: 66 if (cid + '.').startswith(skip + '.'): 67 setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip)) 68 69 def loadTests(self, module_paths, modules=[], tests=[], 70 modules_manifest="", modules_required=[], **kwargs): 71 if modules_manifest: 72 modules = self._read_modules_from_manifest(modules_manifest) 73 74 self.loader = self.loaderClass(self, module_paths, modules, tests, 75 modules_required, **kwargs) 76 self.suites = self.loader.discover() 77 78 def prepareSuite(self, suites, processes): 79 return suites 80 81 def runTests(self, processes=None, skips=[]): 82 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 83 84 # Dynamically skip those tests specified though arguments 85 self.skipTests(skips) 86 87 self._run_start_time = time.time() 88 self._run_end_time = self._run_start_time 89 if not processes: 90 self.runner.buffer = True 91 result = self.runner.run(self.prepareSuite(self.suites, processes)) 92 self._run_end_time = time.time() 93 94 return result 95 96 def listTests(self, display_type): 97 self.runner = self.runnerClass(self, verbosity=2) 98 return self.runner.list_tests(self.suites, display_type) 99 100class OETestContextExecutor(object): 101 _context_class = OETestContext 102 _script_executor = 'oe-test' 103 104 name = 'core' 105 help = 'core test component example' 106 description = 'executes core test suite example' 107 datetime = time.strftime("%Y%m%d%H%M%S") 108 109 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 110 'cases/example')] 111 default_test_data = os.path.join(default_cases[0], 'data.json') 112 default_tests = None 113 default_json_result_dir = None 114 115 def register_commands(self, logger, subparsers): 116 self.parser = subparsers.add_parser(self.name, help=self.help, 117 description=self.description, group='components') 118 119 self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime) 120 self.parser.add_argument('--output-log', action='store', 121 default=self.default_output_log, 122 help="results output log, default: %s" % self.default_output_log) 123 124 self.parser.add_argument('--json-result-dir', action='store', 125 default=self.default_json_result_dir, 126 help="json result output dir, default: %s" % self.default_json_result_dir) 127 128 group = self.parser.add_mutually_exclusive_group() 129 group.add_argument('--run-tests', action='store', nargs='+', 130 default=self.default_tests, 131 help="tests to run in <module>[.<class>[.<name>]]") 132 group.add_argument('--list-tests', action='store', 133 choices=('module', 'class', 'name'), 134 help="lists available tests") 135 136 if self.default_test_data: 137 self.parser.add_argument('--test-data-file', action='store', 138 default=self.default_test_data, 139 help="data file to load, default: %s" % self.default_test_data) 140 else: 141 self.parser.add_argument('--test-data-file', action='store', 142 help="data file to load") 143 144 if self.default_cases: 145 self.parser.add_argument('CASES_PATHS', action='store', 146 default=self.default_cases, nargs='*', 147 help="paths to directories with test cases, default: %s"\ 148 % self.default_cases) 149 else: 150 self.parser.add_argument('CASES_PATHS', action='store', 151 nargs='+', help="paths to directories with test cases") 152 153 self.parser.set_defaults(func=self.run) 154 155 def _setup_logger(self, logger, args): 156 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 157 ' - %(levelname)s - %(message)s') 158 sh = logger.handlers[0] 159 sh.setFormatter(formatter) 160 fh = logging.FileHandler(args.output_log) 161 fh.setFormatter(formatter) 162 logger.addHandler(fh) 163 if getattr(args, 'verbose', False): 164 logger.setLevel('DEBUG') 165 166 return logger 167 168 def _process_args(self, logger, args): 169 self.tc_kwargs = {} 170 self.tc_kwargs['init'] = {} 171 self.tc_kwargs['load'] = {} 172 self.tc_kwargs['list'] = {} 173 self.tc_kwargs['run'] = {} 174 175 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 176 if args.test_data_file: 177 self.tc_kwargs['init']['td'] = json.load( 178 open(args.test_data_file, "r")) 179 else: 180 self.tc_kwargs['init']['td'] = {} 181 182 if args.run_tests: 183 self.tc_kwargs['load']['modules'] = args.run_tests 184 self.tc_kwargs['load']['modules_required'] = args.run_tests 185 else: 186 self.tc_kwargs['load']['modules'] = [] 187 188 self.tc_kwargs['run']['skips'] = [] 189 190 self.module_paths = args.CASES_PATHS 191 192 def _get_json_result_dir(self, args): 193 return args.json_result_dir 194 195 def _get_configuration(self): 196 td = self.tc_kwargs['init']['td'] 197 configuration = {'TEST_TYPE': self.name, 198 'MACHINE': td.get("MACHINE"), 199 'DISTRO': td.get("DISTRO"), 200 'IMAGE_BASENAME': td.get("IMAGE_BASENAME"), 201 'DATETIME': td.get("DATETIME")} 202 return configuration 203 204 def _get_result_id(self, configuration): 205 return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'], 206 configuration['MACHINE'], self.datetime) 207 208 def _pre_run(self): 209 pass 210 211 def run(self, logger, args): 212 self._process_args(logger, args) 213 214 self.tc = self._context_class(**self.tc_kwargs['init']) 215 try: 216 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 217 except OEQATestNotFound as ex: 218 logger.error(ex) 219 sys.exit(1) 220 221 if args.list_tests: 222 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 223 else: 224 self._pre_run() 225 rc = self.tc.runTests(**self.tc_kwargs['run']) 226 227 json_result_dir = self._get_json_result_dir(args) 228 if json_result_dir: 229 configuration = self._get_configuration() 230 rc.logDetails(json_result_dir, 231 configuration, 232 self._get_result_id(configuration)) 233 else: 234 rc.logDetails() 235 236 rc.logSummary(self.name) 237 238 output_link = os.path.join(os.path.dirname(args.output_log), 239 "%s-results.log" % self.name) 240 if os.path.exists(output_link): 241 os.remove(output_link) 242 os.symlink(args.output_log, output_link) 243 244 return rc 245 246_executor_class = OETestContextExecutor 247