xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision 15ae2509)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12import unittest
13
14from oeqa.core.loader import OETestLoader
15from oeqa.core.runner import OETestRunner
16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
17
18class OETestContext(object):
19    loaderClass = OETestLoader
20    runnerClass = OETestRunner
21
22    files_dir = os.path.abspath(os.path.join(os.path.dirname(
23        os.path.abspath(__file__)), "../files"))
24
25    def __init__(self, td=None, logger=None):
26        if not type(td) is dict:
27            raise TypeError("td isn't dictionary type")
28
29        self.td = td
30        self.logger = logger
31        self._registry = {}
32        self._registry['cases'] = collections.OrderedDict()
33
34    def _read_modules_from_manifest(self, manifest):
35        if not os.path.exists(manifest):
36            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
37
38        modules = []
39        for line in open(manifest).readlines():
40            line = line.strip()
41            if line and not line.startswith("#"):
42                modules.append(line)
43
44        return modules
45
46    def skipTests(self, skips):
47        if not skips:
48            return
49        def skipfuncgen(skipmsg):
50            def func():
51                raise unittest.SkipTest(skipmsg)
52            return func
53        for test in self.suites:
54            for skip in skips:
55                if test.id().startswith(skip):
56                    setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
57
58    def loadTests(self, module_paths, modules=[], tests=[],
59            modules_manifest="", modules_required=[], filters={}):
60        if modules_manifest:
61            modules = self._read_modules_from_manifest(modules_manifest)
62
63        self.loader = self.loaderClass(self, module_paths, modules, tests,
64                modules_required, filters)
65        self.suites = self.loader.discover()
66
67    def runTests(self, processes=None, skips=[]):
68        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
69
70        # Dinamically skip those tests specified though arguments
71        self.skipTests(skips)
72
73        self._run_start_time = time.time()
74        if processes:
75            from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
76
77            concurrent_suite = ConcurrentTestSuite(self.suites, processes)
78            result = self.runner.run(concurrent_suite)
79        else:
80            self.runner.buffer = True
81            result = self.runner.run(self.suites)
82        self._run_end_time = time.time()
83
84        return result
85
86    def listTests(self, display_type):
87        self.runner = self.runnerClass(self, verbosity=2)
88        return self.runner.list_tests(self.suites, display_type)
89
90class OETestContextExecutor(object):
91    _context_class = OETestContext
92    _script_executor = 'oe-test'
93
94    name = 'core'
95    help = 'core test component example'
96    description = 'executes core test suite example'
97
98    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
99            'cases/example')]
100    default_test_data = os.path.join(default_cases[0], 'data.json')
101    default_tests = None
102
103    def register_commands(self, logger, subparsers):
104        self.parser = subparsers.add_parser(self.name, help=self.help,
105                description=self.description, group='components')
106
107        self.default_output_log = '%s-results-%s.log' % (self.name,
108                time.strftime("%Y%m%d%H%M%S"))
109        self.parser.add_argument('--output-log', action='store',
110                default=self.default_output_log,
111                help="results output log, default: %s" % self.default_output_log)
112
113        group = self.parser.add_mutually_exclusive_group()
114        group.add_argument('--run-tests', action='store', nargs='+',
115                default=self.default_tests,
116                help="tests to run in <module>[.<class>[.<name>]]")
117        group.add_argument('--list-tests', action='store',
118                choices=('module', 'class', 'name'),
119                help="lists available tests")
120
121        if self.default_test_data:
122            self.parser.add_argument('--test-data-file', action='store',
123                    default=self.default_test_data,
124                    help="data file to load, default: %s" % self.default_test_data)
125        else:
126            self.parser.add_argument('--test-data-file', action='store',
127                    help="data file to load")
128
129        if self.default_cases:
130            self.parser.add_argument('CASES_PATHS', action='store',
131                    default=self.default_cases, nargs='*',
132                    help="paths to directories with test cases, default: %s"\
133                            % self.default_cases)
134        else:
135            self.parser.add_argument('CASES_PATHS', action='store',
136                    nargs='+', help="paths to directories with test cases")
137
138        self.parser.set_defaults(func=self.run)
139
140    def _setup_logger(self, logger, args):
141        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
142                ' - %(levelname)s - %(message)s')
143        sh = logger.handlers[0]
144        sh.setFormatter(formatter)
145        fh = logging.FileHandler(args.output_log)
146        fh.setFormatter(formatter)
147        logger.addHandler(fh)
148
149        return logger
150
151    def _process_args(self, logger, args):
152        self.tc_kwargs = {}
153        self.tc_kwargs['init'] = {}
154        self.tc_kwargs['load'] = {}
155        self.tc_kwargs['list'] = {}
156        self.tc_kwargs['run']  = {}
157
158        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
159        if args.test_data_file:
160            self.tc_kwargs['init']['td'] = json.load(
161                    open(args.test_data_file, "r"))
162        else:
163            self.tc_kwargs['init']['td'] = {}
164
165        if args.run_tests:
166            self.tc_kwargs['load']['modules'] = args.run_tests
167            self.tc_kwargs['load']['modules_required'] = args.run_tests
168        else:
169            self.tc_kwargs['load']['modules'] = []
170
171        self.tc_kwargs['run']['skips'] = []
172
173        self.module_paths = args.CASES_PATHS
174
175    def _pre_run(self):
176        pass
177
178    def run(self, logger, args):
179        self._process_args(logger, args)
180
181        self.tc = self._context_class(**self.tc_kwargs['init'])
182        try:
183            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
184        except OEQATestNotFound as ex:
185            logger.error(ex)
186            sys.exit(1)
187
188        if args.list_tests:
189            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
190        else:
191            self._pre_run()
192            rc = self.tc.runTests(**self.tc_kwargs['run'])
193            rc.logDetails()
194            rc.logSummary(self.name)
195
196        output_link = os.path.join(os.path.dirname(args.output_log),
197                "%s-results.log" % self.name)
198        if os.path.exists(output_link):
199            os.remove(output_link)
200        os.symlink(args.output_log, output_link)
201
202        return rc
203
204_executor_class = OETestContextExecutor
205