1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7import enum 8import os 9import re 10 11# A parser that can be used to identify weather a line is a test result or a section statement. 12class PtestParser(object): 13 def __init__(self): 14 self.results = {} 15 self.sections = {} 16 17 def parse(self, logfile): 18 test_regex = {} 19 test_regex['PASSED'] = re.compile(r"^PASS:(.+)") 20 test_regex['FAILED'] = re.compile(r"^FAIL:([^(]+)") 21 test_regex['SKIPPED'] = re.compile(r"^SKIP:(.+)") 22 23 section_regex = {} 24 section_regex['begin'] = re.compile(r"^BEGIN: .*/(.+)/ptest") 25 section_regex['end'] = re.compile(r"^END: .*/(.+)/ptest") 26 section_regex['duration'] = re.compile(r"^DURATION: (.+)") 27 section_regex['exitcode'] = re.compile(r"^ERROR: Exit status is (.+)") 28 section_regex['timeout'] = re.compile(r"^TIMEOUT: .*/(.+)/ptest") 29 30 # Cache markers so we don't take the re.search() hit all the time. 31 markers = ("PASS:", "FAIL:", "SKIP:", "BEGIN:", "END:", "DURATION:", "ERROR: Exit", "TIMEOUT:") 32 33 def newsection(): 34 return { 'name': "No-section", 'log': [] } 35 36 current_section = newsection() 37 38 with open(logfile, errors='replace') as f: 39 for line in f: 40 if not line.startswith(markers): 41 current_section['log'].append(line) 42 continue 43 44 result = section_regex['begin'].search(line) 45 if result: 46 current_section['name'] = result.group(1) 47 if current_section['name'] not in self.results: 48 self.results[current_section['name']] = {} 49 continue 50 51 result = section_regex['end'].search(line) 52 if result: 53 if current_section['name'] != result.group(1): 54 bb.warn("Ptest END log section mismatch %s vs. %s" % (current_section['name'], result.group(1))) 55 if current_section['name'] in self.sections: 56 bb.warn("Ptest duplicate section for %s" % (current_section['name'])) 57 self.sections[current_section['name']] = current_section 58 del self.sections[current_section['name']]['name'] 59 current_section = newsection() 60 continue 61 62 result = section_regex['timeout'].search(line) 63 if result: 64 if current_section['name'] != result.group(1): 65 bb.warn("Ptest TIMEOUT log section mismatch %s vs. %s" % (current_section['name'], result.group(1))) 66 current_section['timeout'] = True 67 continue 68 69 for t in ['duration', 'exitcode']: 70 result = section_regex[t].search(line) 71 if result: 72 current_section[t] = result.group(1) 73 continue 74 75 current_section['log'].append(line) 76 77 for t in test_regex: 78 result = test_regex[t].search(line) 79 if result: 80 try: 81 self.results[current_section['name']][result.group(1).strip()] = t 82 except KeyError: 83 bb.warn("Result with no section: %s - %s" % (t, result.group(1).strip())) 84 85 # Python performance for repeatedly joining long strings is poor, do it all at once at the end. 86 # For 2.1 million lines in a log this reduces 18 hours to 12s. 87 for section in self.sections: 88 self.sections[section]['log'] = "".join(self.sections[section]['log']) 89 90 return self.results, self.sections 91 92 # Log the results as files. The file name is the section name and the contents are the tests in that section. 93 def results_as_files(self, target_dir): 94 if not os.path.exists(target_dir): 95 raise Exception("Target directory does not exist: %s" % target_dir) 96 97 for section in self.results: 98 prefix = 'No-section' 99 if section: 100 prefix = section 101 section_file = os.path.join(target_dir, prefix) 102 # purge the file contents if it exists 103 with open(section_file, 'w') as f: 104 for test_name in sorted(self.results[section]): 105 status = self.results[section][test_name] 106 f.write(status + ": " + test_name + "\n") 107 108 109class LtpParser: 110 """ 111 Parse the machine-readable LTP log output into a ptest-friendly data structure. 112 """ 113 def parse(self, logfile): 114 results = {} 115 # Aaccumulate the duration here but as the log rounds quick tests down 116 # to 0 seconds this is very much a lower bound. The caller can replace 117 # the value. 118 section = {"duration": 0, "log": ""} 119 120 class LtpExitCode(enum.IntEnum): 121 # Exit codes as defined in ltp/include/tst_res_flags.h 122 TPASS = 0 # Test passed flag 123 TFAIL = 1 # Test failed flag 124 TBROK = 2 # Test broken flag 125 TWARN = 4 # Test warning flag 126 TINFO = 16 # Test information flag 127 TCONF = 32 # Test not appropriate for configuration flag 128 129 with open(logfile, errors="replace") as f: 130 # Lines look like this: 131 # tag=cfs_bandwidth01 stime=1689762564 dur=0 exit=exited stat=32 core=no cu=0 cs=0 132 for line in f: 133 if not line.startswith("tag="): 134 continue 135 136 values = dict(s.split("=") for s in line.strip().split()) 137 138 section["duration"] += int(values["dur"]) 139 exitcode = int(values["stat"]) 140 if values["exit"] == "exited" and exitcode == LtpExitCode.TCONF: 141 # Exited normally with the "invalid configuration" code 142 results[values["tag"]] = "SKIPPED" 143 elif exitcode == LtpExitCode.TPASS: 144 # Successful exit 145 results[values["tag"]] = "PASSED" 146 else: 147 # Other exit 148 results[values["tag"]] = "FAILED" 149 150 return results, section 151 152 153# ltp Compliance log parsing 154class LtpComplianceParser(object): 155 def __init__(self): 156 self.results = {} 157 self.section = {'duration': "", 'log': ""} 158 159 def parse(self, logfile): 160 test_regex = {} 161 test_regex['FAILED'] = re.compile(r"FAIL") 162 163 section_regex = {} 164 section_regex['test'] = re.compile(r"^Executing") 165 166 with open(logfile, errors='replace') as f: 167 name = logfile 168 result = "PASSED" 169 for line in f: 170 regex_result = section_regex['test'].search(line) 171 if regex_result: 172 name = line.split()[1].strip() 173 174 regex_result = test_regex['FAILED'].search(line) 175 if regex_result: 176 result = "FAILED" 177 self.results[name] = result 178 179 for test in self.results: 180 result = self.results[test] 181 print (self.results) 182 self.section['log'] = self.section['log'] + ("%s: %s\n" % (result.strip()[:-2], test.strip())) 183 184 return self.results, self.section 185