1## Copyright (C) 2016 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5 6import os 7import sys 8import json 9import time 10import logging 11import collections 12import unittest 13 14from oeqa.core.loader import OETestLoader 15from oeqa.core.runner import OETestRunner 16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 17 18class OETestContext(object): 19 loaderClass = OETestLoader 20 runnerClass = OETestRunner 21 22 files_dir = os.path.abspath(os.path.join(os.path.dirname( 23 os.path.abspath(__file__)), "../files")) 24 25 def __init__(self, td=None, logger=None): 26 if not type(td) is dict: 27 raise TypeError("td isn't dictionary type") 28 29 self.td = td 30 self.logger = logger 31 self._registry = {} 32 self._registry['cases'] = collections.OrderedDict() 33 34 def _read_modules_from_manifest(self, manifest): 35 if not os.path.exists(manifest): 36 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 37 38 modules = [] 39 for line in open(manifest).readlines(): 40 line = line.strip() 41 if line and not line.startswith("#"): 42 modules.append(line) 43 44 return modules 45 46 def skipTests(self, skips): 47 if not skips: 48 return 49 def skipfuncgen(skipmsg): 50 def func(): 51 raise unittest.SkipTest(skipmsg) 52 return func 53 class_ids = {} 54 for test in self.suites: 55 if test.__class__ not in class_ids: 56 class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1]) 57 for skip in skips: 58 if (test.id()+'.').startswith(skip+'.'): 59 setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip)) 60 for tclass in class_ids: 61 cid = class_ids[tclass] 62 for skip in skips: 63 if (cid + '.').startswith(skip + '.'): 64 setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip)) 65 66 def loadTests(self, module_paths, modules=[], tests=[], 67 modules_manifest="", modules_required=[], **kwargs): 68 if modules_manifest: 69 modules = self._read_modules_from_manifest(modules_manifest) 70 71 self.loader = self.loaderClass(self, module_paths, modules, tests, 72 modules_required, **kwargs) 73 self.suites = self.loader.discover() 74 75 def prepareSuite(self, suites, processes): 76 return suites 77 78 def runTests(self, processes=None, skips=[]): 79 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 80 81 # Dinamically skip those tests specified though arguments 82 self.skipTests(skips) 83 84 self._run_start_time = time.time() 85 if not processes: 86 self.runner.buffer = True 87 result = self.runner.run(self.prepareSuite(self.suites, processes)) 88 self._run_end_time = time.time() 89 90 return result 91 92 def listTests(self, display_type): 93 self.runner = self.runnerClass(self, verbosity=2) 94 return self.runner.list_tests(self.suites, display_type) 95 96class OETestContextExecutor(object): 97 _context_class = OETestContext 98 _script_executor = 'oe-test' 99 100 name = 'core' 101 help = 'core test component example' 102 description = 'executes core test suite example' 103 datetime = time.strftime("%Y%m%d%H%M%S") 104 105 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 106 'cases/example')] 107 default_test_data = os.path.join(default_cases[0], 'data.json') 108 default_tests = None 109 default_json_result_dir = None 110 111 def register_commands(self, logger, subparsers): 112 self.parser = subparsers.add_parser(self.name, help=self.help, 113 description=self.description, group='components') 114 115 self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime) 116 self.parser.add_argument('--output-log', action='store', 117 default=self.default_output_log, 118 help="results output log, default: %s" % self.default_output_log) 119 120 self.parser.add_argument('--json-result-dir', action='store', 121 default=self.default_json_result_dir, 122 help="json result output dir, default: %s" % self.default_json_result_dir) 123 124 group = self.parser.add_mutually_exclusive_group() 125 group.add_argument('--run-tests', action='store', nargs='+', 126 default=self.default_tests, 127 help="tests to run in <module>[.<class>[.<name>]]") 128 group.add_argument('--list-tests', action='store', 129 choices=('module', 'class', 'name'), 130 help="lists available tests") 131 132 if self.default_test_data: 133 self.parser.add_argument('--test-data-file', action='store', 134 default=self.default_test_data, 135 help="data file to load, default: %s" % self.default_test_data) 136 else: 137 self.parser.add_argument('--test-data-file', action='store', 138 help="data file to load") 139 140 if self.default_cases: 141 self.parser.add_argument('CASES_PATHS', action='store', 142 default=self.default_cases, nargs='*', 143 help="paths to directories with test cases, default: %s"\ 144 % self.default_cases) 145 else: 146 self.parser.add_argument('CASES_PATHS', action='store', 147 nargs='+', help="paths to directories with test cases") 148 149 self.parser.set_defaults(func=self.run) 150 151 def _setup_logger(self, logger, args): 152 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 153 ' - %(levelname)s - %(message)s') 154 sh = logger.handlers[0] 155 sh.setFormatter(formatter) 156 fh = logging.FileHandler(args.output_log) 157 fh.setFormatter(formatter) 158 logger.addHandler(fh) 159 160 return logger 161 162 def _process_args(self, logger, args): 163 self.tc_kwargs = {} 164 self.tc_kwargs['init'] = {} 165 self.tc_kwargs['load'] = {} 166 self.tc_kwargs['list'] = {} 167 self.tc_kwargs['run'] = {} 168 169 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 170 if args.test_data_file: 171 self.tc_kwargs['init']['td'] = json.load( 172 open(args.test_data_file, "r")) 173 else: 174 self.tc_kwargs['init']['td'] = {} 175 176 if args.run_tests: 177 self.tc_kwargs['load']['modules'] = args.run_tests 178 self.tc_kwargs['load']['modules_required'] = args.run_tests 179 else: 180 self.tc_kwargs['load']['modules'] = [] 181 182 self.tc_kwargs['run']['skips'] = [] 183 184 self.module_paths = args.CASES_PATHS 185 186 def _get_json_result_dir(self, args): 187 return args.json_result_dir 188 189 def _get_configuration(self): 190 td = self.tc_kwargs['init']['td'] 191 configuration = {'TEST_TYPE': self.name, 192 'MACHINE': td.get("MACHINE"), 193 'DISTRO': td.get("DISTRO"), 194 'IMAGE_BASENAME': td.get("IMAGE_BASENAME"), 195 'DATETIME': td.get("DATETIME")} 196 return configuration 197 198 def _get_result_id(self, configuration): 199 return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'], 200 configuration['MACHINE'], self.datetime) 201 202 def _pre_run(self): 203 pass 204 205 def run(self, logger, args): 206 self._process_args(logger, args) 207 208 self.tc = self._context_class(**self.tc_kwargs['init']) 209 try: 210 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 211 except OEQATestNotFound as ex: 212 logger.error(ex) 213 sys.exit(1) 214 215 if args.list_tests: 216 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 217 else: 218 self._pre_run() 219 rc = self.tc.runTests(**self.tc_kwargs['run']) 220 221 json_result_dir = self._get_json_result_dir(args) 222 if json_result_dir: 223 configuration = self._get_configuration() 224 rc.logDetails(json_result_dir, 225 configuration, 226 self._get_result_id(configuration)) 227 else: 228 rc.logDetails() 229 230 rc.logSummary(self.name) 231 232 output_link = os.path.join(os.path.dirname(args.output_log), 233 "%s-results.log" % self.name) 234 if os.path.exists(output_link): 235 os.remove(output_link) 236 os.symlink(args.output_log, output_link) 237 238 return rc 239 240_executor_class = OETestContextExecutor 241