1## Copyright (C) 2016 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5 6import os 7import sys 8import json 9import time 10import logging 11import collections 12import unittest 13 14from oeqa.core.loader import OETestLoader 15from oeqa.core.runner import OETestRunner 16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 17 18class OETestContext(object): 19 loaderClass = OETestLoader 20 runnerClass = OETestRunner 21 22 files_dir = os.path.abspath(os.path.join(os.path.dirname( 23 os.path.abspath(__file__)), "../files")) 24 25 def __init__(self, td=None, logger=None): 26 if not type(td) is dict: 27 raise TypeError("td isn't dictionary type") 28 29 self.td = td 30 self.logger = logger 31 self._registry = {} 32 self._registry['cases'] = collections.OrderedDict() 33 34 def _read_modules_from_manifest(self, manifest): 35 if not os.path.exists(manifest): 36 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 37 38 modules = [] 39 for line in open(manifest).readlines(): 40 line = line.strip() 41 if line and not line.startswith("#"): 42 modules.append(line) 43 44 return modules 45 46 def skipTests(self, skips): 47 if not skips: 48 return 49 def skipfuncgen(skipmsg): 50 def func(): 51 raise unittest.SkipTest(skipmsg) 52 return func 53 class_ids = {} 54 for test in self.suites: 55 if test.__class__ not in class_ids: 56 class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1]) 57 for skip in skips: 58 if (test.id()+'.').startswith(skip+'.'): 59 setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip)) 60 for tclass in class_ids: 61 cid = class_ids[tclass] 62 for skip in skips: 63 if (cid + '.').startswith(skip + '.'): 64 setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip)) 65 66 def loadTests(self, module_paths, modules=[], tests=[], 67 modules_manifest="", modules_required=[], **kwargs): 68 if modules_manifest: 69 modules = self._read_modules_from_manifest(modules_manifest) 70 71 self.loader = self.loaderClass(self, module_paths, modules, tests, 72 modules_required, **kwargs) 73 self.suites = self.loader.discover() 74 75 def prepareSuite(self, suites, processes): 76 return suites 77 78 def runTests(self, processes=None, skips=[]): 79 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 80 81 # Dinamically skip those tests specified though arguments 82 self.skipTests(skips) 83 84 self._run_start_time = time.time() 85 if not processes: 86 self.runner.buffer = True 87 result = self.runner.run(self.prepareSuite(self.suites, processes)) 88 self._run_end_time = time.time() 89 90 return result 91 92 def listTests(self, display_type): 93 self.runner = self.runnerClass(self, verbosity=2) 94 return self.runner.list_tests(self.suites, display_type) 95 96class OETestContextExecutor(object): 97 _context_class = OETestContext 98 _script_executor = 'oe-test' 99 100 name = 'core' 101 help = 'core test component example' 102 description = 'executes core test suite example' 103 datetime = time.strftime("%Y%m%d%H%M%S") 104 105 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 106 'cases/example')] 107 default_test_data = os.path.join(default_cases[0], 'data.json') 108 default_tests = None 109 default_json_result_dir = None 110 111 def register_commands(self, logger, subparsers): 112 self.parser = subparsers.add_parser(self.name, help=self.help, 113 description=self.description, group='components') 114 115 self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime) 116 self.parser.add_argument('--output-log', action='store', 117 default=self.default_output_log, 118 help="results output log, default: %s" % self.default_output_log) 119 120 self.parser.add_argument('--json-result-dir', action='store', 121 default=self.default_json_result_dir, 122 help="json result output dir, default: %s" % self.default_json_result_dir) 123 124 group = self.parser.add_mutually_exclusive_group() 125 group.add_argument('--run-tests', action='store', nargs='+', 126 default=self.default_tests, 127 help="tests to run in <module>[.<class>[.<name>]]") 128 group.add_argument('--list-tests', action='store', 129 choices=('module', 'class', 'name'), 130 help="lists available tests") 131 132 if self.default_test_data: 133 self.parser.add_argument('--test-data-file', action='store', 134 default=self.default_test_data, 135 help="data file to load, default: %s" % self.default_test_data) 136 else: 137 self.parser.add_argument('--test-data-file', action='store', 138 help="data file to load") 139 140 if self.default_cases: 141 self.parser.add_argument('CASES_PATHS', action='store', 142 default=self.default_cases, nargs='*', 143 help="paths to directories with test cases, default: %s"\ 144 % self.default_cases) 145 else: 146 self.parser.add_argument('CASES_PATHS', action='store', 147 nargs='+', help="paths to directories with test cases") 148 149 self.parser.set_defaults(func=self.run) 150 151 def _setup_logger(self, logger, args): 152 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 153 ' - %(levelname)s - %(message)s') 154 sh = logger.handlers[0] 155 sh.setFormatter(formatter) 156 fh = logging.FileHandler(args.output_log) 157 fh.setFormatter(formatter) 158 logger.addHandler(fh) 159 if getattr(args, 'verbose', False): 160 logger.setLevel('DEBUG') 161 162 return logger 163 164 def _process_args(self, logger, args): 165 self.tc_kwargs = {} 166 self.tc_kwargs['init'] = {} 167 self.tc_kwargs['load'] = {} 168 self.tc_kwargs['list'] = {} 169 self.tc_kwargs['run'] = {} 170 171 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 172 if args.test_data_file: 173 self.tc_kwargs['init']['td'] = json.load( 174 open(args.test_data_file, "r")) 175 else: 176 self.tc_kwargs['init']['td'] = {} 177 178 if args.run_tests: 179 self.tc_kwargs['load']['modules'] = args.run_tests 180 self.tc_kwargs['load']['modules_required'] = args.run_tests 181 else: 182 self.tc_kwargs['load']['modules'] = [] 183 184 self.tc_kwargs['run']['skips'] = [] 185 186 self.module_paths = args.CASES_PATHS 187 188 def _get_json_result_dir(self, args): 189 return args.json_result_dir 190 191 def _get_configuration(self): 192 td = self.tc_kwargs['init']['td'] 193 configuration = {'TEST_TYPE': self.name, 194 'MACHINE': td.get("MACHINE"), 195 'DISTRO': td.get("DISTRO"), 196 'IMAGE_BASENAME': td.get("IMAGE_BASENAME"), 197 'DATETIME': td.get("DATETIME")} 198 return configuration 199 200 def _get_result_id(self, configuration): 201 return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'], 202 configuration['MACHINE'], self.datetime) 203 204 def _pre_run(self): 205 pass 206 207 def run(self, logger, args): 208 self._process_args(logger, args) 209 210 self.tc = self._context_class(**self.tc_kwargs['init']) 211 try: 212 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 213 except OEQATestNotFound as ex: 214 logger.error(ex) 215 sys.exit(1) 216 217 if args.list_tests: 218 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 219 else: 220 self._pre_run() 221 rc = self.tc.runTests(**self.tc_kwargs['run']) 222 223 json_result_dir = self._get_json_result_dir(args) 224 if json_result_dir: 225 configuration = self._get_configuration() 226 rc.logDetails(json_result_dir, 227 configuration, 228 self._get_result_id(configuration)) 229 else: 230 rc.logDetails() 231 232 rc.logSummary(self.name) 233 234 output_link = os.path.join(os.path.dirname(args.output_log), 235 "%s-results.log" % self.name) 236 if os.path.exists(output_link): 237 os.remove(output_link) 238 os.symlink(args.output_log, output_link) 239 240 return rc 241 242_executor_class = OETestContextExecutor 243