xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision c342db35)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12
13from oeqa.core.loader import OETestLoader
14from oeqa.core.runner import OETestRunner
15from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
16
17class OETestContext(object):
18    loaderClass = OETestLoader
19    runnerClass = OETestRunner
20
21    files_dir = os.path.abspath(os.path.join(os.path.dirname(
22        os.path.abspath(__file__)), "../files"))
23
24    def __init__(self, td=None, logger=None):
25        if not type(td) is dict:
26            raise TypeError("td isn't dictionary type")
27
28        self.td = td
29        self.logger = logger
30        self._registry = {}
31        self._registry['cases'] = collections.OrderedDict()
32
33    def _read_modules_from_manifest(self, manifest):
34        if not os.path.exists(manifest):
35            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
36
37        modules = []
38        for line in open(manifest).readlines():
39            line = line.strip()
40            if line and not line.startswith("#"):
41                modules.append(line)
42
43        return modules
44
45    def skipTests(self, skips):
46        if not skips:
47            return
48        for test in self.suites:
49            for skip in skips:
50                if test.id().startswith(skip):
51                    setattr(test, 'setUp', lambda: test.skipTest('Skip by the command line argument "%s"' % skip))
52
53    def loadTests(self, module_paths, modules=[], tests=[],
54            modules_manifest="", modules_required=[], filters={}):
55        if modules_manifest:
56            modules = self._read_modules_from_manifest(modules_manifest)
57
58        self.loader = self.loaderClass(self, module_paths, modules, tests,
59                modules_required, filters)
60        self.suites = self.loader.discover()
61
62    def runTests(self, processes=None, skips=[]):
63        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
64
65        # Dinamically skip those tests specified though arguments
66        self.skipTests(skips)
67
68        self._run_start_time = time.time()
69        if processes:
70            from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
71
72            concurrent_suite = ConcurrentTestSuite(self.suites, processes)
73            result = self.runner.run(concurrent_suite)
74        else:
75            self.runner.buffer = True
76            result = self.runner.run(self.suites)
77        self._run_end_time = time.time()
78
79        return result
80
81    def listTests(self, display_type):
82        self.runner = self.runnerClass(self, verbosity=2)
83        return self.runner.list_tests(self.suites, display_type)
84
85class OETestContextExecutor(object):
86    _context_class = OETestContext
87    _script_executor = 'oe-test'
88
89    name = 'core'
90    help = 'core test component example'
91    description = 'executes core test suite example'
92
93    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
94            'cases/example')]
95    default_test_data = os.path.join(default_cases[0], 'data.json')
96    default_tests = None
97
98    def register_commands(self, logger, subparsers):
99        self.parser = subparsers.add_parser(self.name, help=self.help,
100                description=self.description, group='components')
101
102        self.default_output_log = '%s-results-%s.log' % (self.name,
103                time.strftime("%Y%m%d%H%M%S"))
104        self.parser.add_argument('--output-log', action='store',
105                default=self.default_output_log,
106                help="results output log, default: %s" % self.default_output_log)
107
108        group = self.parser.add_mutually_exclusive_group()
109        group.add_argument('--run-tests', action='store', nargs='+',
110                default=self.default_tests,
111                help="tests to run in <module>[.<class>[.<name>]]")
112        group.add_argument('--list-tests', action='store',
113                choices=('module', 'class', 'name'),
114                help="lists available tests")
115
116        if self.default_test_data:
117            self.parser.add_argument('--test-data-file', action='store',
118                    default=self.default_test_data,
119                    help="data file to load, default: %s" % self.default_test_data)
120        else:
121            self.parser.add_argument('--test-data-file', action='store',
122                    help="data file to load")
123
124        if self.default_cases:
125            self.parser.add_argument('CASES_PATHS', action='store',
126                    default=self.default_cases, nargs='*',
127                    help="paths to directories with test cases, default: %s"\
128                            % self.default_cases)
129        else:
130            self.parser.add_argument('CASES_PATHS', action='store',
131                    nargs='+', help="paths to directories with test cases")
132
133        self.parser.set_defaults(func=self.run)
134
135    def _setup_logger(self, logger, args):
136        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
137                ' - %(levelname)s - %(message)s')
138        sh = logger.handlers[0]
139        sh.setFormatter(formatter)
140        fh = logging.FileHandler(args.output_log)
141        fh.setFormatter(formatter)
142        logger.addHandler(fh)
143
144        return logger
145
146    def _process_args(self, logger, args):
147        self.tc_kwargs = {}
148        self.tc_kwargs['init'] = {}
149        self.tc_kwargs['load'] = {}
150        self.tc_kwargs['list'] = {}
151        self.tc_kwargs['run']  = {}
152
153        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
154        if args.test_data_file:
155            self.tc_kwargs['init']['td'] = json.load(
156                    open(args.test_data_file, "r"))
157        else:
158            self.tc_kwargs['init']['td'] = {}
159
160        if args.run_tests:
161            self.tc_kwargs['load']['modules'] = args.run_tests
162            self.tc_kwargs['load']['modules_required'] = args.run_tests
163        else:
164            self.tc_kwargs['load']['modules'] = []
165
166        self.tc_kwargs['run']['skips'] = []
167
168        self.module_paths = args.CASES_PATHS
169
170    def _pre_run(self):
171        pass
172
173    def run(self, logger, args):
174        self._process_args(logger, args)
175
176        self.tc = self._context_class(**self.tc_kwargs['init'])
177        try:
178            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
179        except OEQATestNotFound as ex:
180            logger.error(ex)
181            sys.exit(1)
182
183        if args.list_tests:
184            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
185        else:
186            self._pre_run()
187            rc = self.tc.runTests(**self.tc_kwargs['run'])
188            rc.logDetails()
189            rc.logSummary(self.name)
190
191        output_link = os.path.join(os.path.dirname(args.output_log),
192                "%s-results.log" % self.name)
193        if os.path.exists(output_link):
194            os.remove(output_link)
195        os.symlink(args.output_log, output_link)
196
197        return rc
198
199_executor_class = OETestContextExecutor
200