1# Copyright (C) 2016 Intel Corporation 2# Released under the MIT license (see COPYING.MIT) 3 4import os 5import sys 6import json 7import time 8import logging 9import collections 10 11from oeqa.core.loader import OETestLoader 12from oeqa.core.runner import OETestRunner 13from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound 14 15class OETestContext(object): 16 loaderClass = OETestLoader 17 runnerClass = OETestRunner 18 19 files_dir = os.path.abspath(os.path.join(os.path.dirname( 20 os.path.abspath(__file__)), "../files")) 21 22 def __init__(self, td=None, logger=None): 23 if not type(td) is dict: 24 raise TypeError("td isn't dictionary type") 25 26 self.td = td 27 self.logger = logger 28 self._registry = {} 29 self._registry['cases'] = collections.OrderedDict() 30 31 def _read_modules_from_manifest(self, manifest): 32 if not os.path.exists(manifest): 33 raise OEQAMissingManifest("Manifest does not exist on %s" % manifest) 34 35 modules = [] 36 for line in open(manifest).readlines(): 37 line = line.strip() 38 if line and not line.startswith("#"): 39 modules.append(line) 40 41 return modules 42 43 def skipTests(self, skips): 44 if not skips: 45 return 46 for test in self.suites: 47 for skip in skips: 48 if test.id().startswith(skip): 49 setattr(test, 'setUp', lambda: test.skipTest('Skip by the command line argument "%s"' % skip)) 50 51 def loadTests(self, module_paths, modules=[], tests=[], 52 modules_manifest="", modules_required=[], filters={}): 53 if modules_manifest: 54 modules = self._read_modules_from_manifest(modules_manifest) 55 56 self.loader = self.loaderClass(self, module_paths, modules, tests, 57 modules_required, filters) 58 self.suites = self.loader.discover() 59 60 def runTests(self, processes=None, skips=[]): 61 self.runner = self.runnerClass(self, descriptions=False, verbosity=2) 62 63 # Dinamically skip those tests specified though arguments 64 self.skipTests(skips) 65 66 self._run_start_time = time.time() 67 if processes: 68 from oeqa.core.utils.concurrencytest import ConcurrentTestSuite 69 70 concurrent_suite = ConcurrentTestSuite(self.suites, processes) 71 result = self.runner.run(concurrent_suite) 72 else: 73 self.runner.buffer = True 74 result = self.runner.run(self.suites) 75 self._run_end_time = time.time() 76 77 return result 78 79 def listTests(self, display_type): 80 self.runner = self.runnerClass(self, verbosity=2) 81 return self.runner.list_tests(self.suites, display_type) 82 83class OETestContextExecutor(object): 84 _context_class = OETestContext 85 _script_executor = 'oe-test' 86 87 name = 'core' 88 help = 'core test component example' 89 description = 'executes core test suite example' 90 91 default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), 92 'cases/example')] 93 default_test_data = os.path.join(default_cases[0], 'data.json') 94 default_tests = None 95 96 def register_commands(self, logger, subparsers): 97 self.parser = subparsers.add_parser(self.name, help=self.help, 98 description=self.description, group='components') 99 100 self.default_output_log = '%s-results-%s.log' % (self.name, 101 time.strftime("%Y%m%d%H%M%S")) 102 self.parser.add_argument('--output-log', action='store', 103 default=self.default_output_log, 104 help="results output log, default: %s" % self.default_output_log) 105 106 group = self.parser.add_mutually_exclusive_group() 107 group.add_argument('--run-tests', action='store', nargs='+', 108 default=self.default_tests, 109 help="tests to run in <module>[.<class>[.<name>]]") 110 group.add_argument('--list-tests', action='store', 111 choices=('module', 'class', 'name'), 112 help="lists available tests") 113 114 if self.default_test_data: 115 self.parser.add_argument('--test-data-file', action='store', 116 default=self.default_test_data, 117 help="data file to load, default: %s" % self.default_test_data) 118 else: 119 self.parser.add_argument('--test-data-file', action='store', 120 help="data file to load") 121 122 if self.default_cases: 123 self.parser.add_argument('CASES_PATHS', action='store', 124 default=self.default_cases, nargs='*', 125 help="paths to directories with test cases, default: %s"\ 126 % self.default_cases) 127 else: 128 self.parser.add_argument('CASES_PATHS', action='store', 129 nargs='+', help="paths to directories with test cases") 130 131 self.parser.set_defaults(func=self.run) 132 133 def _setup_logger(self, logger, args): 134 formatter = logging.Formatter('%(asctime)s - ' + self.name + \ 135 ' - %(levelname)s - %(message)s') 136 sh = logger.handlers[0] 137 sh.setFormatter(formatter) 138 fh = logging.FileHandler(args.output_log) 139 fh.setFormatter(formatter) 140 logger.addHandler(fh) 141 142 return logger 143 144 def _process_args(self, logger, args): 145 self.tc_kwargs = {} 146 self.tc_kwargs['init'] = {} 147 self.tc_kwargs['load'] = {} 148 self.tc_kwargs['list'] = {} 149 self.tc_kwargs['run'] = {} 150 151 self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) 152 if args.test_data_file: 153 self.tc_kwargs['init']['td'] = json.load( 154 open(args.test_data_file, "r")) 155 else: 156 self.tc_kwargs['init']['td'] = {} 157 158 if args.run_tests: 159 self.tc_kwargs['load']['modules'] = args.run_tests 160 self.tc_kwargs['load']['modules_required'] = args.run_tests 161 else: 162 self.tc_kwargs['load']['modules'] = [] 163 164 self.tc_kwargs['run']['skips'] = [] 165 166 self.module_paths = args.CASES_PATHS 167 168 def _pre_run(self): 169 pass 170 171 def run(self, logger, args): 172 self._process_args(logger, args) 173 174 self.tc = self._context_class(**self.tc_kwargs['init']) 175 try: 176 self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) 177 except OEQATestNotFound as ex: 178 logger.error(ex) 179 sys.exit(1) 180 181 if args.list_tests: 182 rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list']) 183 else: 184 self._pre_run() 185 rc = self.tc.runTests(**self.tc_kwargs['run']) 186 rc.logDetails() 187 rc.logSummary(self.name) 188 189 output_link = os.path.join(os.path.dirname(args.output_log), 190 "%s-results.log" % self.name) 191 if os.path.exists(output_link): 192 os.remove(output_link) 193 os.symlink(args.output_log, output_link) 194 195 return rc 196 197_executor_class = OETestContextExecutor 198