xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision 1e34c2d0)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12import unittest
13
14from oeqa.core.loader import OETestLoader
15from oeqa.core.runner import OETestRunner
16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
17
18class OETestContext(object):
19    loaderClass = OETestLoader
20    runnerClass = OETestRunner
21
22    files_dir = os.path.abspath(os.path.join(os.path.dirname(
23        os.path.abspath(__file__)), "../files"))
24
25    def __init__(self, td=None, logger=None):
26        if not type(td) is dict:
27            raise TypeError("td isn't dictionary type")
28
29        self.td = td
30        self.logger = logger
31        self._registry = {}
32        self._registry['cases'] = collections.OrderedDict()
33
34    def _read_modules_from_manifest(self, manifest):
35        if not os.path.exists(manifest):
36            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
37
38        modules = []
39        for line in open(manifest).readlines():
40            line = line.strip()
41            if line and not line.startswith("#"):
42                modules.append(line)
43
44        return modules
45
46    def skipTests(self, skips):
47        if not skips:
48            return
49        def skipfuncgen(skipmsg):
50            def func():
51                raise unittest.SkipTest(skipmsg)
52            return func
53        class_ids = {}
54        for test in self.suites:
55            if test.__class__ not in class_ids:
56                class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1])
57            for skip in skips:
58                if (test.id()+'.').startswith(skip+'.'):
59                    setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
60        for tclass in class_ids:
61            cid = class_ids[tclass]
62            for skip in skips:
63                if (cid + '.').startswith(skip + '.'):
64                    setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip))
65
66    def loadTests(self, module_paths, modules=[], tests=[],
67            modules_manifest="", modules_required=[], **kwargs):
68        if modules_manifest:
69            modules = self._read_modules_from_manifest(modules_manifest)
70
71        self.loader = self.loaderClass(self, module_paths, modules, tests,
72                modules_required, **kwargs)
73        self.suites = self.loader.discover()
74
75    def prepareSuite(self, suites, processes):
76        return suites
77
78    def runTests(self, processes=None, skips=[]):
79        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
80
81        # Dinamically skip those tests specified though arguments
82        self.skipTests(skips)
83
84        self._run_start_time = time.time()
85        if not processes:
86            self.runner.buffer = True
87        result = self.runner.run(self.prepareSuite(self.suites, processes))
88        self._run_end_time = time.time()
89
90        return result
91
92    def listTests(self, display_type):
93        self.runner = self.runnerClass(self, verbosity=2)
94        return self.runner.list_tests(self.suites, display_type)
95
96class OETestContextExecutor(object):
97    _context_class = OETestContext
98    _script_executor = 'oe-test'
99
100    name = 'core'
101    help = 'core test component example'
102    description = 'executes core test suite example'
103    datetime = time.strftime("%Y%m%d%H%M%S")
104
105    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
106            'cases/example')]
107    default_test_data = os.path.join(default_cases[0], 'data.json')
108    default_tests = None
109    default_json_result_dir = None
110
111    def register_commands(self, logger, subparsers):
112        self.parser = subparsers.add_parser(self.name, help=self.help,
113                description=self.description, group='components')
114
115        self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime)
116        self.parser.add_argument('--output-log', action='store',
117                default=self.default_output_log,
118                help="results output log, default: %s" % self.default_output_log)
119
120        self.parser.add_argument('--json-result-dir', action='store',
121                default=self.default_json_result_dir,
122                help="json result output dir, default: %s" % self.default_json_result_dir)
123
124        group = self.parser.add_mutually_exclusive_group()
125        group.add_argument('--run-tests', action='store', nargs='+',
126                default=self.default_tests,
127                help="tests to run in <module>[.<class>[.<name>]]")
128        group.add_argument('--list-tests', action='store',
129                choices=('module', 'class', 'name'),
130                help="lists available tests")
131
132        if self.default_test_data:
133            self.parser.add_argument('--test-data-file', action='store',
134                    default=self.default_test_data,
135                    help="data file to load, default: %s" % self.default_test_data)
136        else:
137            self.parser.add_argument('--test-data-file', action='store',
138                    help="data file to load")
139
140        if self.default_cases:
141            self.parser.add_argument('CASES_PATHS', action='store',
142                    default=self.default_cases, nargs='*',
143                    help="paths to directories with test cases, default: %s"\
144                            % self.default_cases)
145        else:
146            self.parser.add_argument('CASES_PATHS', action='store',
147                    nargs='+', help="paths to directories with test cases")
148
149        self.parser.set_defaults(func=self.run)
150
151    def _setup_logger(self, logger, args):
152        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
153                ' - %(levelname)s - %(message)s')
154        sh = logger.handlers[0]
155        sh.setFormatter(formatter)
156        fh = logging.FileHandler(args.output_log)
157        fh.setFormatter(formatter)
158        logger.addHandler(fh)
159
160        return logger
161
162    def _process_args(self, logger, args):
163        self.tc_kwargs = {}
164        self.tc_kwargs['init'] = {}
165        self.tc_kwargs['load'] = {}
166        self.tc_kwargs['list'] = {}
167        self.tc_kwargs['run']  = {}
168
169        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
170        if args.test_data_file:
171            self.tc_kwargs['init']['td'] = json.load(
172                    open(args.test_data_file, "r"))
173        else:
174            self.tc_kwargs['init']['td'] = {}
175
176        if args.run_tests:
177            self.tc_kwargs['load']['modules'] = args.run_tests
178            self.tc_kwargs['load']['modules_required'] = args.run_tests
179        else:
180            self.tc_kwargs['load']['modules'] = []
181
182        self.tc_kwargs['run']['skips'] = []
183
184        self.module_paths = args.CASES_PATHS
185
186    def _get_json_result_dir(self, args):
187        return args.json_result_dir
188
189    def _get_configuration(self):
190        td = self.tc_kwargs['init']['td']
191        configuration = {'TEST_TYPE': self.name,
192                        'MACHINE': td.get("MACHINE"),
193                        'DISTRO': td.get("DISTRO"),
194                        'IMAGE_BASENAME': td.get("IMAGE_BASENAME"),
195                        'DATETIME': td.get("DATETIME")}
196        return configuration
197
198    def _get_result_id(self, configuration):
199        return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'],
200                                configuration['MACHINE'], self.datetime)
201
202    def _pre_run(self):
203        pass
204
205    def run(self, logger, args):
206        self._process_args(logger, args)
207
208        self.tc = self._context_class(**self.tc_kwargs['init'])
209        try:
210            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
211        except OEQATestNotFound as ex:
212            logger.error(ex)
213            sys.exit(1)
214
215        if args.list_tests:
216            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
217        else:
218            self._pre_run()
219            rc = self.tc.runTests(**self.tc_kwargs['run'])
220
221            json_result_dir = self._get_json_result_dir(args)
222            if json_result_dir:
223                configuration = self._get_configuration()
224                rc.logDetails(json_result_dir,
225                              configuration,
226                              self._get_result_id(configuration))
227            else:
228                rc.logDetails()
229
230            rc.logSummary(self.name)
231
232        output_link = os.path.join(os.path.dirname(args.output_log),
233                "%s-results.log" % self.name)
234        if os.path.exists(output_link):
235            os.remove(output_link)
236        os.symlink(args.output_log, output_link)
237
238        return rc
239
240_executor_class = OETestContextExecutor
241