1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import enum
8import os
9import re
10
11# A parser that can be used to identify weather a line is a test result or a section statement.
12class PtestParser(object):
13    def __init__(self):
14        self.results = {}
15        self.sections = {}
16
17    def parse(self, logfile):
18        test_regex = {}
19        test_regex['PASSED'] = re.compile(r"^PASS:(.+)")
20        test_regex['FAILED'] = re.compile(r"^FAIL:([^(]+)")
21        test_regex['SKIPPED'] = re.compile(r"^SKIP:(.+)")
22
23        section_regex = {}
24        section_regex['begin'] = re.compile(r"^BEGIN: .*/(.+)/ptest")
25        section_regex['end'] = re.compile(r"^END: .*/(.+)/ptest")
26        section_regex['duration'] = re.compile(r"^DURATION: (.+)")
27        section_regex['exitcode'] = re.compile(r"^ERROR: Exit status is (.+)")
28        section_regex['timeout'] = re.compile(r"^TIMEOUT: .*/(.+)/ptest")
29
30        # Cache markers so we don't take the re.search() hit all the time.
31        markers = ("PASS:", "FAIL:", "SKIP:", "BEGIN:", "END:", "DURATION:", "ERROR: Exit", "TIMEOUT:")
32
33        def newsection():
34            return { 'name': "No-section", 'log': [] }
35
36        current_section = newsection()
37
38        with open(logfile, errors='replace') as f:
39            for line in f:
40                if not line.startswith(markers):
41                    current_section['log'].append(line)
42                    continue
43
44                result = section_regex['begin'].search(line)
45                if result:
46                    current_section['name'] = result.group(1)
47                    if current_section['name'] not in self.results:
48                        self.results[current_section['name']] = {}
49                    continue
50
51                result = section_regex['end'].search(line)
52                if result:
53                    if current_section['name'] != result.group(1):
54                        bb.warn("Ptest END log section mismatch %s vs. %s" % (current_section['name'], result.group(1)))
55                    if current_section['name'] in self.sections:
56                        bb.warn("Ptest duplicate section for %s" % (current_section['name']))
57                    self.sections[current_section['name']] = current_section
58                    del self.sections[current_section['name']]['name']
59                    current_section = newsection()
60                    continue
61
62                result = section_regex['timeout'].search(line)
63                if result:
64                    if current_section['name'] != result.group(1):
65                        bb.warn("Ptest TIMEOUT log section mismatch %s vs. %s" % (current_section['name'], result.group(1)))
66                    current_section['timeout'] = True
67                    continue
68
69                for t in ['duration', 'exitcode']:
70                    result = section_regex[t].search(line)
71                    if result:
72                        current_section[t] = result.group(1)
73                        continue
74
75                current_section['log'].append(line)
76
77                for t in test_regex:
78                    result = test_regex[t].search(line)
79                    if result:
80                        try:
81                            self.results[current_section['name']][result.group(1).strip()] = t
82                        except KeyError:
83                            bb.warn("Result with no section: %s - %s" % (t, result.group(1).strip()))
84
85        # Python performance for repeatedly joining long strings is poor, do it all at once at the end.
86        # For 2.1 million lines in a log this reduces 18 hours to 12s.
87        for section in self.sections:
88            self.sections[section]['log'] = "".join(self.sections[section]['log'])
89
90        return self.results, self.sections
91
92    # Log the results as files. The file name is the section name and the contents are the tests in that section.
93    def results_as_files(self, target_dir):
94        if not os.path.exists(target_dir):
95            raise Exception("Target directory does not exist: %s" % target_dir)
96
97        for section in self.results:
98            prefix = 'No-section'
99            if section:
100                prefix = section
101            section_file = os.path.join(target_dir, prefix)
102            # purge the file contents if it exists
103            with open(section_file, 'w') as f:
104                for test_name in sorted(self.results[section]):
105                    status = self.results[section][test_name]
106                    f.write(status + ": " + test_name + "\n")
107
108
109class LtpParser:
110    """
111    Parse the machine-readable LTP log output into a ptest-friendly data structure.
112    """
113    def parse(self, logfile):
114        results = {}
115        # Aaccumulate the duration here but as the log rounds quick tests down
116        # to 0 seconds this is very much a lower bound. The caller can replace
117        # the value.
118        section = {"duration": 0, "log": ""}
119
120        class LtpExitCode(enum.IntEnum):
121            # Exit codes as defined in ltp/include/tst_res_flags.h
122            TPASS = 0  # Test passed flag
123            TFAIL = 1  # Test failed flag
124            TBROK = 2  # Test broken flag
125            TWARN = 4  # Test warning flag
126            TINFO = 16 # Test information flag
127            TCONF = 32 # Test not appropriate for configuration flag
128
129        with open(logfile, errors="replace") as f:
130            # Lines look like this:
131            # tag=cfs_bandwidth01 stime=1689762564 dur=0 exit=exited stat=32 core=no cu=0 cs=0
132            for line in f:
133                if not line.startswith("tag="):
134                    continue
135
136                values = dict(s.split("=") for s in line.strip().split())
137
138                section["duration"] += int(values["dur"])
139                exitcode = int(values["stat"])
140                if values["exit"] == "exited" and exitcode == LtpExitCode.TCONF:
141                    # Exited normally with the "invalid configuration" code
142                    results[values["tag"]] = "SKIPPED"
143                elif exitcode == LtpExitCode.TPASS:
144                    # Successful exit
145                    results[values["tag"]] = "PASSED"
146                else:
147                    # Other exit
148                    results[values["tag"]] = "FAILED"
149
150        return results, section
151
152
153# ltp Compliance log parsing
154class LtpComplianceParser(object):
155    def __init__(self):
156        self.results = {}
157        self.section = {'duration': "", 'log': ""}
158
159    def parse(self, logfile):
160        test_regex = {}
161        test_regex['FAILED'] = re.compile(r"FAIL")
162
163        section_regex = {}
164        section_regex['test'] = re.compile(r"^Executing")
165
166        with open(logfile, errors='replace') as f:
167            name = logfile
168            result = "PASSED"
169            for line in f:
170                regex_result = section_regex['test'].search(line)
171                if regex_result:
172                    name = line.split()[1].strip()
173
174                regex_result = test_regex['FAILED'].search(line)
175                if regex_result:
176                    result = "FAILED"
177            self.results[name] = result
178
179        for test in self.results:
180            result = self.results[test]
181            print (self.results)
182            self.section['log'] = self.section['log'] + ("%s: %s\n" % (result.strip()[:-2], test.strip()))
183
184        return self.results, self.section
185