xref: /openbmc/openbmc/poky/meta/lib/oeqa/core/context.py (revision fc81e38a)
1# Copyright (C) 2016 Intel Corporation
2# Released under the MIT license (see COPYING.MIT)
3
4import os
5import sys
6import json
7import time
8import logging
9import collections
10
11from oeqa.core.loader import OETestLoader
12from oeqa.core.runner import OETestRunner
13from oeqa.core.exception import OEQAMissingManifest, OEQATestNotFound
14
15class OETestContext(object):
16    loaderClass = OETestLoader
17    runnerClass = OETestRunner
18
19    files_dir = os.path.abspath(os.path.join(os.path.dirname(
20        os.path.abspath(__file__)), "../files"))
21
22    def __init__(self, td=None, logger=None):
23        if not type(td) is dict:
24            raise TypeError("td isn't dictionary type")
25
26        self.td = td
27        self.logger = logger
28        self._registry = {}
29        self._registry['cases'] = collections.OrderedDict()
30
31    def _read_modules_from_manifest(self, manifest):
32        if not os.path.exists(manifest):
33            raise OEQAMissingManifest("Manifest does not exist on %s" % manifest)
34
35        modules = []
36        for line in open(manifest).readlines():
37            line = line.strip()
38            if line and not line.startswith("#"):
39                modules.append(line)
40
41        return modules
42
43    def skipTests(self, skips):
44        if not skips:
45            return
46        for test in self.suites:
47            for skip in skips:
48                if test.id().startswith(skip):
49                    setattr(test, 'setUp', lambda: test.skipTest('Skip by the command line argument "%s"' % skip))
50
51    def loadTests(self, module_paths, modules=[], tests=[],
52            modules_manifest="", modules_required=[], filters={}):
53        if modules_manifest:
54            modules = self._read_modules_from_manifest(modules_manifest)
55
56        self.loader = self.loaderClass(self, module_paths, modules, tests,
57                modules_required, filters)
58        self.suites = self.loader.discover()
59
60    def runTests(self, processes=None, skips=[]):
61        self.runner = self.runnerClass(self, descriptions=False, verbosity=2)
62
63        # Dinamically skip those tests specified though arguments
64        self.skipTests(skips)
65
66        self._run_start_time = time.time()
67        if processes:
68            from oeqa.core.utils.concurrencytest import ConcurrentTestSuite
69
70            concurrent_suite = ConcurrentTestSuite(self.suites, processes)
71            result = self.runner.run(concurrent_suite)
72        else:
73            self.runner.buffer = True
74            result = self.runner.run(self.suites)
75        self._run_end_time = time.time()
76
77        return result
78
79    def listTests(self, display_type):
80        self.runner = self.runnerClass(self, verbosity=2)
81        return self.runner.list_tests(self.suites, display_type)
82
83class OETestContextExecutor(object):
84    _context_class = OETestContext
85    _script_executor = 'oe-test'
86
87    name = 'core'
88    help = 'core test component example'
89    description = 'executes core test suite example'
90
91    default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)),
92            'cases/example')]
93    default_test_data = os.path.join(default_cases[0], 'data.json')
94    default_tests = None
95
96    def register_commands(self, logger, subparsers):
97        self.parser = subparsers.add_parser(self.name, help=self.help,
98                description=self.description, group='components')
99
100        self.default_output_log = '%s-results-%s.log' % (self.name,
101                time.strftime("%Y%m%d%H%M%S"))
102        self.parser.add_argument('--output-log', action='store',
103                default=self.default_output_log,
104                help="results output log, default: %s" % self.default_output_log)
105
106        group = self.parser.add_mutually_exclusive_group()
107        group.add_argument('--run-tests', action='store', nargs='+',
108                default=self.default_tests,
109                help="tests to run in <module>[.<class>[.<name>]]")
110        group.add_argument('--list-tests', action='store',
111                choices=('module', 'class', 'name'),
112                help="lists available tests")
113
114        if self.default_test_data:
115            self.parser.add_argument('--test-data-file', action='store',
116                    default=self.default_test_data,
117                    help="data file to load, default: %s" % self.default_test_data)
118        else:
119            self.parser.add_argument('--test-data-file', action='store',
120                    help="data file to load")
121
122        if self.default_cases:
123            self.parser.add_argument('CASES_PATHS', action='store',
124                    default=self.default_cases, nargs='*',
125                    help="paths to directories with test cases, default: %s"\
126                            % self.default_cases)
127        else:
128            self.parser.add_argument('CASES_PATHS', action='store',
129                    nargs='+', help="paths to directories with test cases")
130
131        self.parser.set_defaults(func=self.run)
132
133    def _setup_logger(self, logger, args):
134        formatter = logging.Formatter('%(asctime)s - ' + self.name + \
135                ' - %(levelname)s - %(message)s')
136        sh = logger.handlers[0]
137        sh.setFormatter(formatter)
138        fh = logging.FileHandler(args.output_log)
139        fh.setFormatter(formatter)
140        logger.addHandler(fh)
141
142        return logger
143
144    def _process_args(self, logger, args):
145        self.tc_kwargs = {}
146        self.tc_kwargs['init'] = {}
147        self.tc_kwargs['load'] = {}
148        self.tc_kwargs['list'] = {}
149        self.tc_kwargs['run']  = {}
150
151        self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args)
152        if args.test_data_file:
153            self.tc_kwargs['init']['td'] = json.load(
154                    open(args.test_data_file, "r"))
155        else:
156            self.tc_kwargs['init']['td'] = {}
157
158        if args.run_tests:
159            self.tc_kwargs['load']['modules'] = args.run_tests
160            self.tc_kwargs['load']['modules_required'] = args.run_tests
161        else:
162            self.tc_kwargs['load']['modules'] = []
163
164        self.tc_kwargs['run']['skips'] = []
165
166        self.module_paths = args.CASES_PATHS
167
168    def _pre_run(self):
169        pass
170
171    def run(self, logger, args):
172        self._process_args(logger, args)
173
174        self.tc = self._context_class(**self.tc_kwargs['init'])
175        try:
176            self.tc.loadTests(self.module_paths, **self.tc_kwargs['load'])
177        except OEQATestNotFound as ex:
178            logger.error(ex)
179            sys.exit(1)
180
181        if args.list_tests:
182            rc = self.tc.listTests(args.list_tests, **self.tc_kwargs['list'])
183        else:
184            self._pre_run()
185            rc = self.tc.runTests(**self.tc_kwargs['run'])
186            rc.logDetails()
187            rc.logSummary(self.name)
188
189        output_link = os.path.join(os.path.dirname(args.output_log),
190                "%s-results.log" % self.name)
191        if os.path.exists(output_link):
192            os.remove(output_link)
193        os.symlink(args.output_log, output_link)
194
195        return rc
196
197_executor_class = OETestContextExecutor
198