1# test result tool - report text based test results
2#
3# Copyright (c) 2019, Intel Corporation.
4# Copyright (c) 2019, Linux Foundation
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9import os
10import glob
11import json
12import resulttool.resultutils as resultutils
13from oeqa.utils.git import GitRepo
14import oeqa.utils.gitarchive as gitarchive
15
16
17class ResultsTextReport(object):
18    def __init__(self):
19        self.ptests = {}
20        self.ltptests = {}
21        self.ltpposixtests = {}
22        self.result_types = {'passed': ['PASSED', 'passed', 'PASS', 'XFAIL'],
23                             'failed': ['FAILED', 'failed', 'FAIL', 'ERROR', 'error', 'UNKNOWN', 'XPASS'],
24                             'skipped': ['SKIPPED', 'skipped', 'UNSUPPORTED', 'UNTESTED', 'UNRESOLVED']}
25
26
27    def handle_ptest_result(self, k, status, result, machine):
28        if machine not in self.ptests:
29            self.ptests[machine] = {}
30
31        if k == 'ptestresult.sections':
32            # Ensure tests without any test results still show up on the report
33            for suite in result['ptestresult.sections']:
34                if suite not in self.ptests[machine]:
35                    self.ptests[machine][suite] = {
36                            'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-',
37                            'failed_testcases': [], "testcases": set(),
38                            }
39                if 'duration' in result['ptestresult.sections'][suite]:
40                    self.ptests[machine][suite]['duration'] = result['ptestresult.sections'][suite]['duration']
41                if 'timeout' in result['ptestresult.sections'][suite]:
42                    self.ptests[machine][suite]['duration'] += " T"
43            return True
44
45        # process test result
46        try:
47            _, suite, test = k.split(".", 2)
48        except ValueError:
49            return True
50
51        # Handle 'glib-2.0'
52        if 'ptestresult.sections' in result and suite not in result['ptestresult.sections']:
53            try:
54                _, suite, suite1, test = k.split(".", 3)
55                if suite + "." + suite1 in result['ptestresult.sections']:
56                    suite = suite + "." + suite1
57            except ValueError:
58                pass
59
60        if suite not in self.ptests[machine]:
61            self.ptests[machine][suite] = {
62                    'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-',
63                    'failed_testcases': [], "testcases": set(),
64                    }
65
66        # do not process duplicate results
67        if test in self.ptests[machine][suite]["testcases"]:
68            print("Warning duplicate ptest result '{}.{}' for {}".format(suite, test, machine))
69            return False
70
71        for tk in self.result_types:
72            if status in self.result_types[tk]:
73                self.ptests[machine][suite][tk] += 1
74        self.ptests[machine][suite]["testcases"].add(test)
75        return True
76
77    def handle_ltptest_result(self, k, status, result, machine):
78        if machine not in self.ltptests:
79            self.ltptests[machine] = {}
80
81        if k == 'ltpresult.sections':
82            # Ensure tests without any test results still show up on the report
83            for suite in result['ltpresult.sections']:
84                if suite not in self.ltptests[machine]:
85                    self.ltptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
86                if 'duration' in result['ltpresult.sections'][suite]:
87                    self.ltptests[machine][suite]['duration'] = result['ltpresult.sections'][suite]['duration']
88                if 'timeout' in result['ltpresult.sections'][suite]:
89                    self.ltptests[machine][suite]['duration'] += " T"
90            return
91        try:
92            _, suite, test = k.split(".", 2)
93        except ValueError:
94            return
95        # Handle 'glib-2.0'
96        if 'ltpresult.sections' in result and suite not in result['ltpresult.sections']:
97            try:
98                _, suite, suite1, test = k.split(".", 3)
99                if suite + "." + suite1 in result['ltpresult.sections']:
100                    suite = suite + "." + suite1
101            except ValueError:
102                pass
103        if suite not in self.ltptests[machine]:
104            self.ltptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
105        for tk in self.result_types:
106            if status in self.result_types[tk]:
107                self.ltptests[machine][suite][tk] += 1
108
109    def handle_ltpposixtest_result(self, k, status, result, machine):
110        if machine not in self.ltpposixtests:
111            self.ltpposixtests[machine] = {}
112
113        if k == 'ltpposixresult.sections':
114            # Ensure tests without any test results still show up on the report
115            for suite in result['ltpposixresult.sections']:
116                if suite not in self.ltpposixtests[machine]:
117                    self.ltpposixtests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
118                if 'duration' in result['ltpposixresult.sections'][suite]:
119                    self.ltpposixtests[machine][suite]['duration'] = result['ltpposixresult.sections'][suite]['duration']
120            return
121        try:
122            _, suite, test = k.split(".", 2)
123        except ValueError:
124            return
125        # Handle 'glib-2.0'
126        if 'ltpposixresult.sections' in result and suite not in result['ltpposixresult.sections']:
127            try:
128                _, suite, suite1, test = k.split(".", 3)
129                if suite + "." + suite1 in result['ltpposixresult.sections']:
130                    suite = suite + "." + suite1
131            except ValueError:
132                pass
133        if suite not in self.ltpposixtests[machine]:
134            self.ltpposixtests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
135        for tk in self.result_types:
136            if status in self.result_types[tk]:
137                self.ltpposixtests[machine][suite][tk] += 1
138
139    def get_aggregated_test_result(self, logger, testresult, machine):
140        test_count_report = {'passed': 0, 'failed': 0, 'skipped': 0, 'failed_testcases': []}
141        result = testresult.get('result', [])
142        for k in result:
143            test_status = result[k].get('status', [])
144            if k.startswith("ptestresult."):
145                if not self.handle_ptest_result(k, test_status, result, machine):
146                    continue
147            elif k.startswith("ltpresult."):
148                self.handle_ltptest_result(k, test_status, result, machine)
149            elif k.startswith("ltpposixresult."):
150                self.handle_ltpposixtest_result(k, test_status, result, machine)
151
152            # process result if it was not skipped by a handler
153            for tk in self.result_types:
154                if test_status in self.result_types[tk]:
155                    test_count_report[tk] += 1
156            if test_status in self.result_types['failed']:
157                test_count_report['failed_testcases'].append(k)
158        return test_count_report
159
160    def print_test_report(self, template_file_name, test_count_reports):
161        from jinja2 import Environment, FileSystemLoader
162        script_path = os.path.dirname(os.path.realpath(__file__))
163        file_loader = FileSystemLoader(script_path + '/template')
164        env = Environment(loader=file_loader, trim_blocks=True)
165        template = env.get_template(template_file_name)
166        havefailed = False
167        reportvalues = []
168        machines = []
169        cols = ['passed', 'failed', 'skipped']
170        maxlen = {'passed' : 0, 'failed' : 0, 'skipped' : 0, 'result_id': 0, 'testseries' : 0, 'ptest' : 0 ,'ltptest': 0, 'ltpposixtest': 0}
171        for line in test_count_reports:
172            total_tested = line['passed'] + line['failed'] + line['skipped']
173            vals = {}
174            vals['result_id'] = line['result_id']
175            vals['testseries'] = line['testseries']
176            vals['sort'] = line['testseries'] + "_" + line['result_id']
177            vals['failed_testcases'] = line['failed_testcases']
178            for k in cols:
179                if total_tested:
180                    vals[k] = "%d (%s%%)" % (line[k], format(line[k] / total_tested * 100, '.0f'))
181                else:
182                    vals[k] = "0 (0%)"
183            for k in maxlen:
184                if k in vals and len(vals[k]) > maxlen[k]:
185                    maxlen[k] = len(vals[k])
186            reportvalues.append(vals)
187            if line['failed_testcases']:
188                havefailed = True
189            if line['machine'] not in machines:
190                machines.append(line['machine'])
191        reporttotalvalues = {}
192        for k in cols:
193            reporttotalvalues[k] = '%s' % sum([line[k] for line in test_count_reports])
194        reporttotalvalues['count'] = '%s' % len(test_count_reports)
195        for (machine, report) in self.ptests.items():
196            for ptest in self.ptests[machine]:
197                if len(ptest) > maxlen['ptest']:
198                    maxlen['ptest'] = len(ptest)
199        for (machine, report) in self.ltptests.items():
200            for ltptest in self.ltptests[machine]:
201                if len(ltptest) > maxlen['ltptest']:
202                    maxlen['ltptest'] = len(ltptest)
203        for (machine, report) in self.ltpposixtests.items():
204            for ltpposixtest in self.ltpposixtests[machine]:
205                if len(ltpposixtest) > maxlen['ltpposixtest']:
206                    maxlen['ltpposixtest'] = len(ltpposixtest)
207        output = template.render(reportvalues=reportvalues,
208                                 reporttotalvalues=reporttotalvalues,
209                                 havefailed=havefailed,
210                                 machines=machines,
211                                 ptests=self.ptests,
212                                 ltptests=self.ltptests,
213                                 ltpposixtests=self.ltpposixtests,
214                                 maxlen=maxlen)
215        print(output)
216
217    def view_test_report(self, logger, source_dir, branch, commit, tag, use_regression_map, raw_test, selected_test_case_only):
218        def print_selected_testcase_result(testresults, selected_test_case_only):
219            for testsuite in testresults:
220                for resultid in testresults[testsuite]:
221                    result = testresults[testsuite][resultid]['result']
222                    test_case_result = result.get(selected_test_case_only, {})
223                    if test_case_result.get('status'):
224                        print('Found selected test case result for %s from %s' % (selected_test_case_only,
225                                                                                           resultid))
226                        print(test_case_result['status'])
227                    else:
228                        print('Could not find selected test case result for %s from %s' % (selected_test_case_only,
229                                                                                           resultid))
230                    if test_case_result.get('log'):
231                        print(test_case_result['log'])
232        test_count_reports = []
233        configmap = resultutils.store_map
234        if use_regression_map:
235            configmap = resultutils.regression_map
236        if commit:
237            if tag:
238                logger.warning("Ignoring --tag as --commit was specified")
239            tag_name = "{branch}/{commit_number}-g{commit}/{tag_number}"
240            repo = GitRepo(source_dir)
241            revs = gitarchive.get_test_revs(logger, repo, tag_name, branch=branch)
242            rev_index = gitarchive.rev_find(revs, 'commit', commit)
243            testresults = resultutils.git_get_result(repo, revs[rev_index][2], configmap=configmap)
244        elif tag:
245            repo = GitRepo(source_dir)
246            testresults = resultutils.git_get_result(repo, [tag], configmap=configmap)
247        else:
248            testresults = resultutils.load_resultsdata(source_dir, configmap=configmap)
249        if raw_test:
250            raw_results = {}
251            for testsuite in testresults:
252                result = testresults[testsuite].get(raw_test, {})
253                if result:
254                    raw_results[testsuite] = {raw_test: result}
255            if raw_results:
256                if selected_test_case_only:
257                    print_selected_testcase_result(raw_results, selected_test_case_only)
258                else:
259                    print(json.dumps(raw_results, sort_keys=True, indent=4))
260            else:
261                print('Could not find raw test result for %s' % raw_test)
262            return 0
263        if selected_test_case_only:
264            print_selected_testcase_result(testresults, selected_test_case_only)
265            return 0
266        for testsuite in testresults:
267            for resultid in testresults[testsuite]:
268                skip = False
269                result = testresults[testsuite][resultid]
270                machine = result['configuration']['MACHINE']
271
272                # Check to see if there is already results for these kinds of tests for the machine
273                for key in result['result'].keys():
274                    testtype = str(key).split('.')[0]
275                    if ((machine in self.ltptests and testtype == "ltpiresult" and self.ltptests[machine]) or
276                        (machine in self.ltpposixtests and testtype == "ltpposixresult" and self.ltpposixtests[machine])):
277                        print("Already have test results for %s on %s, skipping %s" %(str(key).split('.')[0], machine, resultid))
278                        skip = True
279                        break
280                if skip:
281                    break
282
283                test_count_report = self.get_aggregated_test_result(logger, result, machine)
284                test_count_report['machine'] = machine
285                test_count_report['testseries'] = result['configuration']['TESTSERIES']
286                test_count_report['result_id'] = resultid
287                test_count_reports.append(test_count_report)
288        self.print_test_report('test_report_full_text.txt', test_count_reports)
289
290def report(args, logger):
291    report = ResultsTextReport()
292    report.view_test_report(logger, args.source_dir, args.branch, args.commit, args.tag, args.use_regression_map,
293                            args.raw_test_only, args.selected_test_case_only)
294    return 0
295
296def register_commands(subparsers):
297    """Register subcommands from this plugin"""
298    parser_build = subparsers.add_parser('report', help='summarise test results',
299                                         description='print a text-based summary of the test results',
300                                         group='analysis')
301    parser_build.set_defaults(func=report)
302    parser_build.add_argument('source_dir',
303                              help='source file/directory/URL that contain the test result files to summarise')
304    parser_build.add_argument('--branch', '-B', default='master', help="Branch to find commit in")
305    parser_build.add_argument('--commit', help="Revision to report")
306    parser_build.add_argument('-t', '--tag', default='',
307                              help='source_dir is a git repository, report on the tag specified from that repository')
308    parser_build.add_argument('-m', '--use_regression_map', action='store_true',
309                              help='instead of the default "store_map", use the "regression_map" for report')
310    parser_build.add_argument('-r', '--raw_test_only', default='',
311                              help='output raw test result only for the user provided test result id')
312    parser_build.add_argument('-s', '--selected_test_case_only', default='',
313                              help='output selected test case result for the user provided test case id, if both test '
314                                   'result id and test case id are provided then output the selected test case result '
315                                   'from the provided test result id')
316