xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision bec4ebc22c43c1ff5c3fddb820d44a88bd3aebf0)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12import unittest
13
14from oeqa.core.loader import OETestLoader
15from oeqa.core.runner import OETestRunner
16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
17
18class OETestContext(object):
19    loaderClass = OETestLoader
20    runnerClass = OETestRunner
21
22    files_dir = os.path.abspath(os.path.join(os.path.dirname(
23        os.path.abspath(__file__)), "../files"))
24
25    def __init__(self, td=None, logger=None):
26        if not type(td) is dict:
27            raise TypeError("td isn't dictionary type")
28
29        self.td = td
30        self.logger = logger
31        self._registry = {}
32        self._registry['cases'] = collections.OrderedDict()
33
34        self.results = unittest.TestResult()
35        unittest.registerResult(self.results)
36
37    def _read_modules_from_manifest(self, manifest):
38        if not os.path.exists(manifest):
39            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
40
41        modules = []
42        for line in open(manifest).readlines():
43            line = line.strip()
44            if line and not line.startswith("#"):
45                modules.append(line)
46
47        return modules
48
49    def skipTests(self, skips):
50        if not skips:
51            return
52        def skipfuncgen(skipmsg):
53            def func():
54                raise unittest.SkipTest(skipmsg)
55            return func
56        class_ids = {}
57        for test in self.suites:
58            if test.__class__ not in class_ids:
59                class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1])
60            for skip in skips:
61                if (test.id()+'.').startswith(skip+'.'):
62                    setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
63        for tclass in class_ids:
64            cid = class_ids[tclass]
65            for skip in skips:
66                if (cid + '.').startswith(skip + '.'):
67                    setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip))
68
69    def loadTests(self, module_paths, modules=[], tests=[],
70            modules_manifest="", modules_required=[], **kwargs):
71        if modules_manifest:
72            modules = self._read_modules_from_manifest(modules_manifest)
73
74        self.loader = self.loaderClass(self, module_paths, modules, tests,
75                modules_required, **kwargs)
76        self.suites = self.loader.discover()
77
78    def prepareSuite(self, suites, processes):
79        return suites
80
81    def runTests(self, processes=None, skips=[]):
82        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
83
84        # Dinamically skip those tests specified though arguments
85        self.skipTests(skips)
86
87        self._run_start_time = time.time()
88        self._run_end_time = self._run_start_time
89        if not processes:
90            self.runner.buffer = True
91        result = self.runner.run(self.prepareSuite(self.suites, processes))
92        self._run_end_time = time.time()
93
94        return result
95
96    def listTests(self, display_type):
97        self.runner = self.runnerClass(self, verbosity=2)
98        return self.runner.list_tests(self.suites, display_type)
99
100class OETestContextExecutor(object):
101    _context_class = OETestContext
102    _script_executor = 'oe-test'
103
104    name = 'core'
105    help = 'core test component example'
106    description = 'executes core test suite example'
107    datetime = time.strftime("%Y%m%d%H%M%S")
108
109    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
110            'cases/example')]
111    default_test_data = os.path.join(default_cases[0], 'data.json')
112    default_tests = None
113    default_json_result_dir = None
114
115    def register_commands(self, logger, subparsers):
116        self.parser = subparsers.add_parser(self.name, help=self.help,
117                description=self.description, group='components')
118
119        self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime)
120        self.parser.add_argument('--output-log', action='store',
121                default=self.default_output_log,
122                help="results output log, default: %s" % self.default_output_log)
123
124        self.parser.add_argument('--json-result-dir', action='store',
125                default=self.default_json_result_dir,
126                help="json result output dir, default: %s" % self.default_json_result_dir)
127
128        group = self.parser.add_mutually_exclusive_group()
129        group.add_argument('--run-tests', action='store', nargs='+',
130                default=self.default_tests,
131                help="tests to run in <module>[.<class>[.<name>]]")
132        group.add_argument('--list-tests', action='store',
133                choices=('module', 'class', 'name'),
134                help="lists available tests")
135
136        if self.default_test_data:
137            self.parser.add_argument('--test-data-file', action='store',
138                    default=self.default_test_data,
139                    help="data file to load, default: %s" % self.default_test_data)
140        else:
141            self.parser.add_argument('--test-data-file', action='store',
142                    help="data file to load")
143
144        if self.default_cases:
145            self.parser.add_argument('CASES_PATHS', action='store',
146                    default=self.default_cases, nargs='*',
147                    help="paths to directories with test cases, default: %s"\
148                            % self.default_cases)
149        else:
150            self.parser.add_argument('CASES_PATHS', action='store',
151                    nargs='+', help="paths to directories with test cases")
152
153        self.parser.set_defaults(func=self.run)
154
155    def _setup_logger(self, logger, args):
156        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
157                ' - %(levelname)s - %(message)s')
158        sh = logger.handlers[0]
159        sh.setFormatter(formatter)
160        fh = logging.FileHandler(args.output_log)
161        fh.setFormatter(formatter)
162        logger.addHandler(fh)
163        if getattr(args, 'verbose', False):
164            logger.setLevel('DEBUG')
165
166        return logger
167
168    def _process_args(self, logger, args):
169        self.tc_kwargs = {}
170        self.tc_kwargs['init'] = {}
171        self.tc_kwargs['load'] = {}
172        self.tc_kwargs['list'] = {}
173        self.tc_kwargs['run']  = {}
174
175        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
176        if args.test_data_file:
177            self.tc_kwargs['init']['td'] = json.load(
178                    open(args.test_data_file, "r"))
179        else:
180            self.tc_kwargs['init']['td'] = {}
181
182        if args.run_tests:
183            self.tc_kwargs['load']['modules'] = args.run_tests
184            self.tc_kwargs['load']['modules_required'] = args.run_tests
185        else:
186            self.tc_kwargs['load']['modules'] = []
187
188        self.tc_kwargs['run']['skips'] = []
189
190        self.module_paths = args.CASES_PATHS
191
192    def _get_json_result_dir(self, args):
193        return args.json_result_dir
194
195    def _get_configuration(self):
196        td = self.tc_kwargs['init']['td']
197        configuration = {'TEST_TYPE': self.name,
198                        'MACHINE': td.get("MACHINE"),
199                        'DISTRO': td.get("DISTRO"),
200                        'IMAGE_BASENAME': td.get("IMAGE_BASENAME"),
201                        'DATETIME': td.get("DATETIME")}
202        return configuration
203
204    def _get_result_id(self, configuration):
205        return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'],
206                                configuration['MACHINE'], self.datetime)
207
208    def _pre_run(self):
209        pass
210
211    def run(self, logger, args):
212        self._process_args(logger, args)
213
214        self.tc = self._context_class(**self.tc_kwargs['init'])
215        try:
216            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
217        except OEQATestNotFound as ex:
218            logger.error(ex)
219            sys.exit(1)
220
221        if args.list_tests:
222            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
223        else:
224            self._pre_run()
225            rc = self.tc.runTests(**self.tc_kwargs['run'])
226
227            json_result_dir = self._get_json_result_dir(args)
228            if json_result_dir:
229                configuration = self._get_configuration()
230                rc.logDetails(json_result_dir,
231                              configuration,
232                              self._get_result_id(configuration))
233            else:
234                rc.logDetails()
235
236            rc.logSummary(self.name)
237
238        output_link = os.path.join(os.path.dirname(args.output_log),
239                "%s-results.log" % self.name)
240        if os.path.exists(output_link):
241            os.remove(output_link)
242        os.symlink(args.output_log, output_link)
243
244        return rc
245
246_executor_class = OETestContextExecutor
247