xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision c9f7865a)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12import unittest
13
14from oeqa.core.loader import OETestLoader
15from oeqa.core.runner import OETestRunner
16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
17
18class OETestContext(object):
19    loaderClass = OETestLoader
20    runnerClass = OETestRunner
21
22    files_dir = os.path.abspath(os.path.join(os.path.dirname(
23        os.path.abspath(__file__)), "../files"))
24
25    def __init__(self, td=None, logger=None):
26        if not type(td) is dict:
27            raise TypeError("td isn't dictionary type")
28
29        self.td = td
30        self.logger = logger
31        self._registry = {}
32        self._registry['cases'] = collections.OrderedDict()
33
34    def _read_modules_from_manifest(self, manifest):
35        if not os.path.exists(manifest):
36            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
37
38        modules = []
39        for line in open(manifest).readlines():
40            line = line.strip()
41            if line and not line.startswith("#"):
42                modules.append(line)
43
44        return modules
45
46    def skipTests(self, skips):
47        if not skips:
48            return
49        def skipfuncgen(skipmsg):
50            def func():
51                raise unittest.SkipTest(skipmsg)
52            return func
53        class_ids = {}
54        for test in self.suites:
55            if test.__class__ not in class_ids:
56                class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1])
57            for skip in skips:
58                if (test.id()+'.').startswith(skip+'.'):
59                    setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
60        for tclass in class_ids:
61            cid = class_ids[tclass]
62            for skip in skips:
63                if (cid + '.').startswith(skip + '.'):
64                    setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip))
65
66    def loadTests(self, module_paths, modules=[], tests=[],
67            modules_manifest="", modules_required=[], **kwargs):
68        if modules_manifest:
69            modules = self._read_modules_from_manifest(modules_manifest)
70
71        self.loader = self.loaderClass(self, module_paths, modules, tests,
72                modules_required, **kwargs)
73        self.suites = self.loader.discover()
74
75    def prepareSuite(self, suites, processes):
76        return suites
77
78    def runTests(self, processes=None, skips=[]):
79        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
80
81        # Dinamically skip those tests specified though arguments
82        self.skipTests(skips)
83
84        self._run_start_time = time.time()
85        if not processes:
86            self.runner.buffer = True
87        result = self.runner.run(self.prepareSuite(self.suites, processes))
88        self._run_end_time = time.time()
89
90        return result
91
92    def listTests(self, display_type):
93        self.runner = self.runnerClass(self, verbosity=2)
94        return self.runner.list_tests(self.suites, display_type)
95
96class OETestContextExecutor(object):
97    _context_class = OETestContext
98    _script_executor = 'oe-test'
99
100    name = 'core'
101    help = 'core test component example'
102    description = 'executes core test suite example'
103    datetime = time.strftime("%Y%m%d%H%M%S")
104
105    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
106            'cases/example')]
107    default_test_data = os.path.join(default_cases[0], 'data.json')
108    default_tests = None
109    default_json_result_dir = None
110
111    def register_commands(self, logger, subparsers):
112        self.parser = subparsers.add_parser(self.name, help=self.help,
113                description=self.description, group='components')
114
115        self.default_output_log = '%s-results-%s.log' % (self.name, self.datetime)
116        self.parser.add_argument('--output-log', action='store',
117                default=self.default_output_log,
118                help="results output log, default: %s" % self.default_output_log)
119
120        self.parser.add_argument('--json-result-dir', action='store',
121                default=self.default_json_result_dir,
122                help="json result output dir, default: %s" % self.default_json_result_dir)
123
124        group = self.parser.add_mutually_exclusive_group()
125        group.add_argument('--run-tests', action='store', nargs='+',
126                default=self.default_tests,
127                help="tests to run in <module>[.<class>[.<name>]]")
128        group.add_argument('--list-tests', action='store',
129                choices=('module', 'class', 'name'),
130                help="lists available tests")
131
132        if self.default_test_data:
133            self.parser.add_argument('--test-data-file', action='store',
134                    default=self.default_test_data,
135                    help="data file to load, default: %s" % self.default_test_data)
136        else:
137            self.parser.add_argument('--test-data-file', action='store',
138                    help="data file to load")
139
140        if self.default_cases:
141            self.parser.add_argument('CASES_PATHS', action='store',
142                    default=self.default_cases, nargs='*',
143                    help="paths to directories with test cases, default: %s"\
144                            % self.default_cases)
145        else:
146            self.parser.add_argument('CASES_PATHS', action='store',
147                    nargs='+', help="paths to directories with test cases")
148
149        self.parser.set_defaults(func=self.run)
150
151    def _setup_logger(self, logger, args):
152        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
153                ' - %(levelname)s - %(message)s')
154        sh = logger.handlers[0]
155        sh.setFormatter(formatter)
156        fh = logging.FileHandler(args.output_log)
157        fh.setFormatter(formatter)
158        logger.addHandler(fh)
159        if getattr(args, 'verbose', False):
160            logger.setLevel('DEBUG')
161
162        return logger
163
164    def _process_args(self, logger, args):
165        self.tc_kwargs = {}
166        self.tc_kwargs['init'] = {}
167        self.tc_kwargs['load'] = {}
168        self.tc_kwargs['list'] = {}
169        self.tc_kwargs['run']  = {}
170
171        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
172        if args.test_data_file:
173            self.tc_kwargs['init']['td'] = json.load(
174                    open(args.test_data_file, "r"))
175        else:
176            self.tc_kwargs['init']['td'] = {}
177
178        if args.run_tests:
179            self.tc_kwargs['load']['modules'] = args.run_tests
180            self.tc_kwargs['load']['modules_required'] = args.run_tests
181        else:
182            self.tc_kwargs['load']['modules'] = []
183
184        self.tc_kwargs['run']['skips'] = []
185
186        self.module_paths = args.CASES_PATHS
187
188    def _get_json_result_dir(self, args):
189        return args.json_result_dir
190
191    def _get_configuration(self):
192        td = self.tc_kwargs['init']['td']
193        configuration = {'TEST_TYPE': self.name,
194                        'MACHINE': td.get("MACHINE"),
195                        'DISTRO': td.get("DISTRO"),
196                        'IMAGE_BASENAME': td.get("IMAGE_BASENAME"),
197                        'DATETIME': td.get("DATETIME")}
198        return configuration
199
200    def _get_result_id(self, configuration):
201        return '%s_%s_%s_%s' % (configuration['TEST_TYPE'], configuration['IMAGE_BASENAME'],
202                                configuration['MACHINE'], self.datetime)
203
204    def _pre_run(self):
205        pass
206
207    def run(self, logger, args):
208        self._process_args(logger, args)
209
210        self.tc = self._context_class(**self.tc_kwargs['init'])
211        try:
212            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
213        except OEQATestNotFound as ex:
214            logger.error(ex)
215            sys.exit(1)
216
217        if args.list_tests:
218            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
219        else:
220            self._pre_run()
221            rc = self.tc.runTests(**self.tc_kwargs['run'])
222
223            json_result_dir = self._get_json_result_dir(args)
224            if json_result_dir:
225                configuration = self._get_configuration()
226                rc.logDetails(json_result_dir,
227                              configuration,
228                              self._get_result_id(configuration))
229            else:
230                rc.logDetails()
231
232            rc.logSummary(self.name)
233
234        output_link = os.path.join(os.path.dirname(args.output_log),
235                "%s-results.log" % self.name)
236        if os.path.exists(output_link):
237            os.remove(output_link)
238        os.symlink(args.output_log, output_link)
239
240        return rc
241
242_executor_class = OETestContextExecutor
243