1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7import collections 8import os 9import sys 10 11from shutil import rmtree 12from oeqa.runtime.case import OERuntimeTestCase 13from oeqa.core.decorator.depends import OETestDepends 14 15# importlib.resources.open_text in Python <3.10 doesn't search all directories 16# when a package is split across multiple directories. Until we can rely on 17# 3.10+, reimplement the searching logic. 18if sys.version_info < (3, 10): 19 def _open_text(package, resource): 20 import importlib, pathlib 21 module = importlib.import_module(package) 22 for path in module.__path__: 23 candidate = pathlib.Path(path) / resource 24 if candidate.exists(): 25 return candidate.open(encoding='utf-8') 26 raise FileNotFoundError 27else: 28 from importlib.resources import open_text as _open_text 29 30 31class ParseLogsTest(OERuntimeTestCase): 32 33 # Which log files should be collected 34 log_locations = ["/var/log/", "/var/log/dmesg", "/tmp/dmesg_output.log"] 35 36 # The keywords that identify error messages in the log files 37 errors = ["error", "cannot", "can't", "failed"] 38 39 # A list of error messages that should be ignored 40 ignore_errors = [] 41 42 @classmethod 43 def setUpClass(cls): 44 # When systemd is enabled we need to notice errors on 45 # circular dependencies in units. 46 if 'systemd' in cls.td.get('DISTRO_FEATURES'): 47 cls.errors.extend([ 48 'Found ordering cycle on', 49 'Breaking ordering cycle by deleting job', 50 'deleted to break ordering cycle', 51 'Ordering cycle found, skipping', 52 ]) 53 54 cls.errors = [s.casefold() for s in cls.errors] 55 56 cls.load_machine_ignores() 57 58 @classmethod 59 def load_machine_ignores(cls): 60 # Add TARGET_ARCH explicitly as not every machine has that in MACHINEOVERRDES (eg qemux86-64) 61 for candidate in ["common", cls.td.get("TARGET_ARCH")] + cls.td.get("MACHINEOVERRIDES").split(":"): 62 try: 63 name = f"parselogs-ignores-{candidate}.txt" 64 for line in _open_text("oeqa.runtime.cases", name): 65 line = line.strip() 66 if line and not line.startswith("#"): 67 cls.ignore_errors.append(line.casefold()) 68 except FileNotFoundError: 69 pass 70 71 # Go through the log locations provided and if it's a folder 72 # create a list with all the .log files in it, if it's a file 73 # just add it to that list. 74 def getLogList(self, log_locations): 75 logs = [] 76 for location in log_locations: 77 status, _ = self.target.run('test -f %s' % location) 78 if status == 0: 79 logs.append(location) 80 else: 81 status, _ = self.target.run('test -d %s' % location) 82 if status == 0: 83 cmd = 'find %s -name \\*.log -maxdepth 1 -type f' % location 84 status, output = self.target.run(cmd) 85 if status == 0: 86 output = output.splitlines() 87 for logfile in output: 88 logs.append(os.path.join(location, logfile)) 89 return logs 90 91 # Copy the log files to be parsed locally 92 def transfer_logs(self, log_list): 93 workdir = self.td.get('WORKDIR') 94 self.target_logs = workdir + '/' + 'target_logs' 95 target_logs = self.target_logs 96 if os.path.exists(target_logs): 97 rmtree(self.target_logs) 98 os.makedirs(target_logs) 99 for f in log_list: 100 self.target.copyFrom(str(f), target_logs) 101 102 # Get the local list of logs 103 def get_local_log_list(self, log_locations): 104 self.transfer_logs(self.getLogList(log_locations)) 105 list_dir = os.listdir(self.target_logs) 106 dir_files = [os.path.join(self.target_logs, f) for f in list_dir] 107 logs = [f for f in dir_files if os.path.isfile(f)] 108 return logs 109 110 def get_context(self, lines, index, before=6, after=3): 111 """ 112 Given a set of lines and the index of the line that is important, return 113 a number of lines surrounding that line. 114 """ 115 last = len(lines) 116 117 start = index - before 118 end = index + after + 1 119 120 if start < 0: 121 end -= start 122 start = 0 123 if end > last: 124 start -= end - last 125 end = last 126 127 return lines[start:end] 128 129 def test_get_context(self): 130 """ 131 A test case for the test case. 132 """ 133 lines = list(range(0,10)) 134 self.assertEqual(self.get_context(lines, 0, 2, 1), [0, 1, 2, 3]) 135 self.assertEqual(self.get_context(lines, 5, 2, 1), [3, 4, 5, 6]) 136 self.assertEqual(self.get_context(lines, 9, 2, 1), [6, 7, 8, 9]) 137 138 def parse_logs(self, logs, lines_before=10, lines_after=10): 139 """ 140 Search the log files @logs looking for error lines (marked by 141 @self.errors), ignoring anything listed in @self.ignore_errors. 142 143 Returns a dictionary of log filenames to a dictionary of error lines to 144 the error context (controlled by @lines_before and @lines_after). 145 """ 146 results = collections.defaultdict(dict) 147 148 for log in logs: 149 with open(log) as f: 150 lines = f.readlines() 151 152 for i, line in enumerate(lines): 153 line = line.strip() 154 line_lower = line.casefold() 155 156 if any(keyword in line_lower for keyword in self.errors): 157 if not any(ignore in line_lower for ignore in self.ignore_errors): 158 results[log][line] = "".join(self.get_context(lines, i, lines_before, lines_after)) 159 160 return results 161 162 # Get the output of dmesg and write it in a file. 163 # This file is added to log_locations. 164 def write_dmesg(self): 165 (status, dmesg) = self.target.run('dmesg > /tmp/dmesg_output.log') 166 167 @OETestDepends(['ssh.SSHTest.test_ssh']) 168 def test_parselogs(self): 169 self.write_dmesg() 170 log_list = self.get_local_log_list(self.log_locations) 171 result = self.parse_logs(log_list) 172 173 errcount = 0 174 self.msg = "" 175 for log in result: 176 self.msg += 'Log: ' + log + '\n' 177 self.msg += '-----------------------\n' 178 for error in result[log]: 179 errcount += 1 180 self.msg += 'Central error: ' + error + '\n' 181 self.msg += '***********************\n' 182 self.msg += result[log][error] + '\n' 183 self.msg += '***********************\n' 184 self.msg += '%s errors found in logs.' % errcount 185 self.assertEqual(errcount, 0, msg=self.msg) 186