1# test result tool - report text based test results
2#
3# Copyright (c) 2019, Intel Corporation.
4# Copyright (c) 2019, Linux Foundation
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9import os
10import glob
11import json
12import resulttool.resultutils as resultutils
13from oeqa.utils.git import GitRepo
14import oeqa.utils.gitarchive as gitarchive
15
16
17class ResultsTextReport(object):
18    def __init__(self):
19        self.ptests = {}
20        self.ltptests = {}
21        self.ltpposixtests = {}
22        self.result_types = {'passed': ['PASSED', 'passed'],
23                             'failed': ['FAILED', 'failed', 'ERROR', 'error', 'UNKNOWN'],
24                             'skipped': ['SKIPPED', 'skipped']}
25
26
27    def handle_ptest_result(self, k, status, result, machine):
28        if machine not in self.ptests:
29            self.ptests[machine] = {}
30
31        if k == 'ptestresult.sections':
32            # Ensure tests without any test results still show up on the report
33            for suite in result['ptestresult.sections']:
34                if suite not in self.ptests[machine]:
35                    self.ptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
36                if 'duration' in result['ptestresult.sections'][suite]:
37                    self.ptests[machine][suite]['duration'] = result['ptestresult.sections'][suite]['duration']
38                if 'timeout' in result['ptestresult.sections'][suite]:
39                    self.ptests[machine][suite]['duration'] += " T"
40            return
41        try:
42            _, suite, test = k.split(".", 2)
43        except ValueError:
44            return
45        # Handle 'glib-2.0'
46        if 'ptestresult.sections' in result and suite not in result['ptestresult.sections']:
47            try:
48                _, suite, suite1, test = k.split(".", 3)
49                if suite + "." + suite1 in result['ptestresult.sections']:
50                    suite = suite + "." + suite1
51            except ValueError:
52                pass
53        if suite not in self.ptests[machine]:
54            self.ptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
55        for tk in self.result_types:
56            if status in self.result_types[tk]:
57                self.ptests[machine][suite][tk] += 1
58
59    def handle_ltptest_result(self, k, status, result, machine):
60        if machine not in self.ltptests:
61            self.ltptests[machine] = {}
62
63        if k == 'ltpresult.sections':
64            # Ensure tests without any test results still show up on the report
65            for suite in result['ltpresult.sections']:
66                if suite not in self.ltptests[machine]:
67                    self.ltptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
68                if 'duration' in result['ltpresult.sections'][suite]:
69                    self.ltptests[machine][suite]['duration'] = result['ltpresult.sections'][suite]['duration']
70                if 'timeout' in result['ltpresult.sections'][suite]:
71                    self.ltptests[machine][suite]['duration'] += " T"
72            return
73        try:
74            _, suite, test = k.split(".", 2)
75        except ValueError:
76            return
77        # Handle 'glib-2.0'
78        if 'ltpresult.sections' in result and suite not in result['ltpresult.sections']:
79            try:
80                _, suite, suite1, test = k.split(".", 3)
81                print("split2: %s %s %s" % (suite, suite1, test))
82                if suite + "." + suite1 in result['ltpresult.sections']:
83                    suite = suite + "." + suite1
84            except ValueError:
85                pass
86        if suite not in self.ltptests[machine]:
87            self.ltptests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
88        for tk in self.result_types:
89            if status in self.result_types[tk]:
90                self.ltptests[machine][suite][tk] += 1
91
92    def handle_ltpposixtest_result(self, k, status, result, machine):
93        if machine not in self.ltpposixtests:
94            self.ltpposixtests[machine] = {}
95
96        if k == 'ltpposixresult.sections':
97            # Ensure tests without any test results still show up on the report
98            for suite in result['ltpposixresult.sections']:
99                if suite not in self.ltpposixtests[machine]:
100                    self.ltpposixtests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
101                if 'duration' in result['ltpposixresult.sections'][suite]:
102                    self.ltpposixtests[machine][suite]['duration'] = result['ltpposixresult.sections'][suite]['duration']
103            return
104        try:
105            _, suite, test = k.split(".", 2)
106        except ValueError:
107            return
108        # Handle 'glib-2.0'
109        if 'ltpposixresult.sections' in result and suite not in result['ltpposixresult.sections']:
110            try:
111                _, suite, suite1, test = k.split(".", 3)
112                if suite + "." + suite1 in result['ltpposixresult.sections']:
113                    suite = suite + "." + suite1
114            except ValueError:
115                pass
116        if suite not in self.ltpposixtests[machine]:
117            self.ltpposixtests[machine][suite] = {'passed': 0, 'failed': 0, 'skipped': 0, 'duration' : '-', 'failed_testcases': []}
118        for tk in self.result_types:
119            if status in self.result_types[tk]:
120                self.ltpposixtests[machine][suite][tk] += 1
121
122    def get_aggregated_test_result(self, logger, testresult, machine):
123        test_count_report = {'passed': 0, 'failed': 0, 'skipped': 0, 'failed_testcases': []}
124        result = testresult.get('result', [])
125        for k in result:
126            test_status = result[k].get('status', [])
127            for tk in self.result_types:
128                if test_status in self.result_types[tk]:
129                    test_count_report[tk] += 1
130            if test_status in self.result_types['failed']:
131                test_count_report['failed_testcases'].append(k)
132            if k.startswith("ptestresult."):
133                self.handle_ptest_result(k, test_status, result, machine)
134            if k.startswith("ltpresult."):
135                self.handle_ltptest_result(k, test_status, result, machine)
136            if k.startswith("ltpposixresult."):
137                self.handle_ltpposixtest_result(k, test_status, result, machine)
138        return test_count_report
139
140    def print_test_report(self, template_file_name, test_count_reports):
141        from jinja2 import Environment, FileSystemLoader
142        script_path = os.path.dirname(os.path.realpath(__file__))
143        file_loader = FileSystemLoader(script_path + '/template')
144        env = Environment(loader=file_loader, trim_blocks=True)
145        template = env.get_template(template_file_name)
146        havefailed = False
147        reportvalues = []
148        machines = []
149        cols = ['passed', 'failed', 'skipped']
150        maxlen = {'passed' : 0, 'failed' : 0, 'skipped' : 0, 'result_id': 0, 'testseries' : 0, 'ptest' : 0 ,'ltptest': 0, 'ltpposixtest': 0}
151        for line in test_count_reports:
152            total_tested = line['passed'] + line['failed'] + line['skipped']
153            vals = {}
154            vals['result_id'] = line['result_id']
155            vals['testseries'] = line['testseries']
156            vals['sort'] = line['testseries'] + "_" + line['result_id']
157            vals['failed_testcases'] = line['failed_testcases']
158            for k in cols:
159                vals[k] = "%d (%s%%)" % (line[k], format(line[k] / total_tested * 100, '.0f'))
160            for k in maxlen:
161                if k in vals and len(vals[k]) > maxlen[k]:
162                    maxlen[k] = len(vals[k])
163            reportvalues.append(vals)
164            if line['failed_testcases']:
165                havefailed = True
166            if line['machine'] not in machines:
167                machines.append(line['machine'])
168        for (machine, report) in self.ptests.items():
169            for ptest in self.ptests[machine]:
170                if len(ptest) > maxlen['ptest']:
171                    maxlen['ptest'] = len(ptest)
172        for (machine, report) in self.ltptests.items():
173            for ltptest in self.ltptests[machine]:
174                if len(ltptest) > maxlen['ltptest']:
175                    maxlen['ltptest'] = len(ltptest)
176        for (machine, report) in self.ltpposixtests.items():
177            for ltpposixtest in self.ltpposixtests[machine]:
178                if len(ltpposixtest) > maxlen['ltpposixtest']:
179                    maxlen['ltpposixtest'] = len(ltpposixtest)
180        output = template.render(reportvalues=reportvalues,
181                                 havefailed=havefailed,
182                                 machines=machines,
183                                 ptests=self.ptests,
184                                 ltptests=self.ltptests,
185                                 ltpposixtests=self.ltpposixtests,
186                                 maxlen=maxlen)
187        print(output)
188
189    def view_test_report(self, logger, source_dir, branch, commit, tag):
190        test_count_reports = []
191        if commit:
192            if tag:
193                logger.warning("Ignoring --tag as --commit was specified")
194            tag_name = "{branch}/{commit_number}-g{commit}/{tag_number}"
195            repo = GitRepo(source_dir)
196            revs = gitarchive.get_test_revs(logger, repo, tag_name, branch=branch)
197            rev_index = gitarchive.rev_find(revs, 'commit', commit)
198            testresults = resultutils.git_get_result(repo, revs[rev_index][2])
199        elif tag:
200            repo = GitRepo(source_dir)
201            testresults = resultutils.git_get_result(repo, [tag])
202        else:
203            testresults = resultutils.load_resultsdata(source_dir)
204        for testsuite in testresults:
205            for resultid in testresults[testsuite]:
206                result = testresults[testsuite][resultid]
207                machine = result['configuration']['MACHINE']
208                test_count_report = self.get_aggregated_test_result(logger, result, machine)
209                test_count_report['machine'] = machine
210                test_count_report['testseries'] = result['configuration']['TESTSERIES']
211                test_count_report['result_id'] = resultid
212                test_count_reports.append(test_count_report)
213        self.print_test_report('test_report_full_text.txt', test_count_reports)
214
215def report(args, logger):
216    report = ResultsTextReport()
217    report.view_test_report(logger, args.source_dir, args.branch, args.commit, args.tag)
218    return 0
219
220def register_commands(subparsers):
221    """Register subcommands from this plugin"""
222    parser_build = subparsers.add_parser('report', help='summarise test results',
223                                         description='print a text-based summary of the test results',
224                                         group='analysis')
225    parser_build.set_defaults(func=report)
226    parser_build.add_argument('source_dir',
227                              help='source file/directory/URL that contain the test result files to summarise')
228    parser_build.add_argument('--branch', '-B', default='master', help="Branch to find commit in")
229    parser_build.add_argument('--commit', help="Revision to report")
230    parser_build.add_argument('-t', '--tag', default='',
231                              help='source_dir is a git repository, report on the tag specified from that repository')
232