xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision 6dbb316a)
1## Copyright (C) 2016 Intel Corporation
2#
3# SPDX-License-Identifier: MIT
4#
5
6import os
7import sys
8import json
9import time
10import logging
11import collections
12import unittest
13
14from oeqa.core.loader import OETestLoader
15from oeqa.core.runner import OETestRunner
16from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
17
18class OETestContext(object):
19    loaderClass = OETestLoader
20    runnerClass = OETestRunner
21
22    files_dir = os.path.abspath(os.path.join(os.path.dirname(
23        os.path.abspath(__file__)), "../files"))
24
25    def __init__(self, td=None, logger=None):
26        if not type(td) is dict:
27            raise TypeError("td isn't dictionary type")
28
29        self.td = td
30        self.logger = logger
31        self._registry = {}
32        self._registry['cases'] = collections.OrderedDict()
33
34    def _read_modules_from_manifest(self, manifest):
35        if not os.path.exists(manifest):
36            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
37
38        modules = []
39        for line in open(manifest).readlines():
40            line = line.strip()
41            if line and not line.startswith("#"):
42                modules.append(line)
43
44        return modules
45
46    def skipTests(self, skips):
47        if not skips:
48            return
49        def skipfuncgen(skipmsg):
50            def func():
51                raise unittest.SkipTest(skipmsg)
52            return func
53        class_ids = {}
54        for test in self.suites:
55            if test.__class__ not in class_ids:
56                class_ids[test.__class__] = '.'.join(test.id().split('.')[:-1])
57            for skip in skips:
58                if (test.id()+'.').startswith(skip+'.'):
59                    setattr(test, 'setUp', skipfuncgen('Skip by the command line argument "%s"' % skip))
60        for tclass in class_ids:
61            cid = class_ids[tclass]
62            for skip in skips:
63                if (cid + '.').startswith(skip + '.'):
64                    setattr(tclass, 'setUpHooker', skipfuncgen('Skip by the command line argument "%s"' % skip))
65
66    def loadTests(self, module_paths, modules=[], tests=[],
67            modules_manifest="", modules_required=[], **kwargs):
68        if modules_manifest:
69            modules = self._read_modules_from_manifest(modules_manifest)
70
71        self.loader = self.loaderClass(self, module_paths, modules, tests,
72                modules_required, **kwargs)
73        self.suites = self.loader.discover()
74
75    def runTests(self, processes=None, skips=[]):
76        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
77
78        # Dinamically skip those tests specified though arguments
79        self.skipTests(skips)
80
81        self._run_start_time = time.time()
82        if processes:
83            from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
84
85            concurrent_suite = ConcurrentTestSuite(self.suites, processes)
86            result = self.runner.run(concurrent_suite)
87        else:
88            self.runner.buffer = True
89            result = self.runner.run(self.suites)
90        self._run_end_time = time.time()
91
92        return result
93
94    def listTests(self, display_type):
95        self.runner = self.runnerClass(self, verbosity=2)
96        return self.runner.list_tests(self.suites, display_type)
97
98class OETestContextExecutor(object):
99    _context_class = OETestContext
100    _script_executor = 'oe-test'
101
102    name = 'core'
103    help = 'core test component example'
104    description = 'executes core test suite example'
105
106    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
107            'cases/example')]
108    default_test_data = os.path.join(default_cases[0], 'data.json')
109    default_tests = None
110
111    def register_commands(self, logger, subparsers):
112        self.parser = subparsers.add_parser(self.name, help=self.help,
113                description=self.description, group='components')
114
115        self.default_output_log = '%s-results-%s.log' % (self.name,
116                time.strftime("%Y%m%d%H%M%S"))
117        self.parser.add_argument('--output-log', action='store',
118                default=self.default_output_log,
119                help="results output log, default: %s" % self.default_output_log)
120
121        group = self.parser.add_mutually_exclusive_group()
122        group.add_argument('--run-tests', action='store', nargs='+',
123                default=self.default_tests,
124                help="tests to run in <module>[.<class>[.<name>]]")
125        group.add_argument('--list-tests', action='store',
126                choices=('module', 'class', 'name'),
127                help="lists available tests")
128
129        if self.default_test_data:
130            self.parser.add_argument('--test-data-file', action='store',
131                    default=self.default_test_data,
132                    help="data file to load, default: %s" % self.default_test_data)
133        else:
134            self.parser.add_argument('--test-data-file', action='store',
135                    help="data file to load")
136
137        if self.default_cases:
138            self.parser.add_argument('CASES_PATHS', action='store',
139                    default=self.default_cases, nargs='*',
140                    help="paths to directories with test cases, default: %s"\
141                            % self.default_cases)
142        else:
143            self.parser.add_argument('CASES_PATHS', action='store',
144                    nargs='+', help="paths to directories with test cases")
145
146        self.parser.set_defaults(func=self.run)
147
148    def _setup_logger(self, logger, args):
149        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
150                ' - %(levelname)s - %(message)s')
151        sh = logger.handlers[0]
152        sh.setFormatter(formatter)
153        fh = logging.FileHandler(args.output_log)
154        fh.setFormatter(formatter)
155        logger.addHandler(fh)
156
157        return logger
158
159    def _process_args(self, logger, args):
160        self.tc_kwargs = {}
161        self.tc_kwargs['init'] = {}
162        self.tc_kwargs['load'] = {}
163        self.tc_kwargs['list'] = {}
164        self.tc_kwargs['run']  = {}
165
166        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
167        if args.test_data_file:
168            self.tc_kwargs['init']['td'] = json.load(
169                    open(args.test_data_file, "r"))
170        else:
171            self.tc_kwargs['init']['td'] = {}
172
173        if args.run_tests:
174            self.tc_kwargs['load']['modules'] = args.run_tests
175            self.tc_kwargs['load']['modules_required'] = args.run_tests
176        else:
177            self.tc_kwargs['load']['modules'] = []
178
179        self.tc_kwargs['run']['skips'] = []
180
181        self.module_paths = args.CASES_PATHS
182
183    def _pre_run(self):
184        pass
185
186    def run(self, logger, args):
187        self._process_args(logger, args)
188
189        self.tc = self._context_class(**self.tc_kwargs['init'])
190        try:
191            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
192        except OEQATestNotFound as ex:
193            logger.error(ex)
194            sys.exit(1)
195
196        if args.list_tests:
197            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
198        else:
199            self._pre_run()
200            rc = self.tc.runTests(**self.tc_kwargs['run'])
201            rc.logDetails()
202            rc.logSummary(self.name)
203
204        output_link = os.path.join(os.path.dirname(args.output_log),
205                "%s-results.log" % self.name)
206        if os.path.exists(output_link):
207            os.remove(output_link)
208        os.symlink(args.output_log, output_link)
209
210        return rc
211
212_executor_class = OETestContextExecutor
213