1## Copyright (C) 2016 Intel Corporation 2# 3# SPDX-License-Identifier: MIT 4# 5 6import os 7import sys 8import json 9import time 10import logging 11import collections 12import unittest 13 14from oeqa.core.loader import OETestLoader 15from oeqa.core.runner import OETestRunner 16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 17 18class OETestContext(object): 19 loaderClass = OETestLoader 20 runnerClass = OETestRunner 21 22 files_dir = os.path.abspath(os.path.join(os.path.dirname( 23 os.path.abspath(__file__)), "../files")) 24 25 def __init__(self, td=None, logger=None): 26 if not type(td) is dict: 27 raise TypeError("td isn't dictionary type") 28 29 self.td = td 30 self.logger = logger 31 self._registry = {} 32 self._registry['cases'] = collections.OrderedDict() 33 34 def _read_modules_from_manifest(self, manifest): 35 if not os.path.exists(manifest): 36 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 37 38 modules = [] 39 for line in open(manifest).readlines(): 40 line = line.strip() 41 if line and not line.startswith("#"): 42 modules.append(line) 43 44 return modules 45 46 def skipTests(self, skips): 47 if not skips: 48 return 49 def skipfuncgen(skipmsg): 50 def func(): 51 raise unittest.SkipTest(skipmsg) 52 return func 53 for test in self.suites: 54 for skip in skips: 55 if test.id().startswith(skip): 56 setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip)) 57 58 def loadTests(self, module_paths, modules=[], tests=[], 59 modules_manifest="", modules_required=[], filters={}): 60 if modules_manifest: 61 modules = self._read_modules_from_manifest(modules_manifest) 62 63 self.loader = self.loaderClass(self, module_paths, modules, tests, 64 modules_required, filters) 65 self.suites = self.loader.discover() 66 67 def runTests(self, processes=None, skips=[]): 68 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 69 70 # Dinamically skip those tests specified though arguments 71 self.skipTests(skips) 72 73 self._run_start_time = time.time() 74 if processes: 75 from oeqa.core.utils.concurrencytest import ConcurrentTestSuite 76 77 concurrent_suite = ConcurrentTestSuite(self.suites, processes) 78 result = self.runner.run(concurrent_suite) 79 else: 80 self.runner.buffer = True 81 result = self.runner.run(self.suites) 82 self._run_end_time = time.time() 83 84 return result 85 86 def listTests(self, display_type): 87 self.runner = self.runnerClass(self, verbosity=2) 88 return self.runner.list_tests(self.suites, display_type) 89 90class OETestContextExecutor(object): 91 _context_class = OETestContext 92 _script_executor = 'oe-test' 93 94 name = 'core' 95 help = 'core test component example' 96 description = 'executes core test suite example' 97 98 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 99 'cases/example')] 100 default_test_data = os.path.join(default_cases[0], 'data.json') 101 default_tests = None 102 103 def register_commands(self, logger, subparsers): 104 self.parser = subparsers.add_parser(self.name, help=self.help, 105 description=self.description, group='components') 106 107 self.default_output_log = '%s-results-%s.log' % (self.name, 108 time.strftime("%Y%m%d%H%M%S")) 109 self.parser.add_argument('--output-log', action='store', 110 default=self.default_output_log, 111 help="results output log, default: %s" % self.default_output_log) 112 113 group = self.parser.add_mutually_exclusive_group() 114 group.add_argument('--run-tests', action='store', nargs='+', 115 default=self.default_tests, 116 help="tests to run in <module>[.<class>[.<name>]]") 117 group.add_argument('--list-tests', action='store', 118 choices=('module', 'class', 'name'), 119 help="lists available tests") 120 121 if self.default_test_data: 122 self.parser.add_argument('--test-data-file', action='store', 123 default=self.default_test_data, 124 help="data file to load, default: %s" % self.default_test_data) 125 else: 126 self.parser.add_argument('--test-data-file', action='store', 127 help="data file to load") 128 129 if self.default_cases: 130 self.parser.add_argument('CASES_PATHS', action='store', 131 default=self.default_cases, nargs='*', 132 help="paths to directories with test cases, default: %s"\ 133 % self.default_cases) 134 else: 135 self.parser.add_argument('CASES_PATHS', action='store', 136 nargs='+', help="paths to directories with test cases") 137 138 self.parser.set_defaults(func=self.run) 139 140 def _setup_logger(self, logger, args): 141 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 142 ' - %(levelname)s - %(message)s') 143 sh = logger.handlers[0] 144 sh.setFormatter(formatter) 145 fh = logging.FileHandler(args.output_log) 146 fh.setFormatter(formatter) 147 logger.addHandler(fh) 148 149 return logger 150 151 def _process_args(self, logger, args): 152 self.tc_kwargs = {} 153 self.tc_kwargs['init'] = {} 154 self.tc_kwargs['load'] = {} 155 self.tc_kwargs['list'] = {} 156 self.tc_kwargs['run'] = {} 157 158 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 159 if args.test_data_file: 160 self.tc_kwargs['init']['td'] = json.load( 161 open(args.test_data_file, "r")) 162 else: 163 self.tc_kwargs['init']['td'] = {} 164 165 if args.run_tests: 166 self.tc_kwargs['load']['modules'] = args.run_tests 167 self.tc_kwargs['load']['modules_required'] = args.run_tests 168 else: 169 self.tc_kwargs['load']['modules'] = [] 170 171 self.tc_kwargs['run']['skips'] = [] 172 173 self.module_paths = args.CASES_PATHS 174 175 def _pre_run(self): 176 pass 177 178 def run(self, logger, args): 179 self._process_args(logger, args) 180 181 self.tc = self._context_class(**self.tc_kwargs['init']) 182 try: 183 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 184 except OEQATestNotFound as ex: 185 logger.error(ex) 186 sys.exit(1) 187 188 if args.list_tests: 189 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 190 else: 191 self._pre_run() 192 rc = self.tc.runTests(**self.tc_kwargs['run']) 193 rc.logDetails() 194 rc.logSummary(self.name) 195 196 output_link = os.path.join(os.path.dirname(args.output_log), 197 "%s-results.log" % self.name) 198 if os.path.exists(output_link): 199 os.remove(output_link) 200 os.symlink(args.output_log, output_link) 201 202 return rc 203 204_executor_class = OETestContextExecutor 205