1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import collections
8import os
9import sys
10
11from shutil import rmtree
12from oeqa.runtime.case import OERuntimeTestCase
13from oeqa.core.decorator.depends import OETestDepends
14
15# importlib.resources.open_text in Python <3.10 doesn't search all directories
16# when a package is split across multiple directories. Until we can rely on
17# 3.10+, reimplement the searching logic.
18if sys.version_info < (3, 10):
19    def _open_text(package, resource):
20        import importlib, pathlib
21        module = importlib.import_module(package)
22        for path in module.__path__:
23            candidate = pathlib.Path(path) / resource
24            if candidate.exists():
25                return candidate.open(encoding='utf-8')
26        raise FileNotFoundError
27else:
28    from importlib.resources import open_text as _open_text
29
30
31class ParseLogsTest(OERuntimeTestCase):
32
33    # Which log files should be collected
34    log_locations = ["/var/log/", "/var/log/dmesg", "/tmp/dmesg_output.log"]
35
36    # The keywords that identify error messages in the log files
37    errors = ["error", "cannot", "can't", "failed"]
38
39    # A list of error messages that should be ignored
40    ignore_errors = []
41
42    @classmethod
43    def setUpClass(cls):
44        # When systemd is enabled we need to notice errors on
45        # circular dependencies in units.
46        if 'systemd' in cls.td.get('DISTRO_FEATURES'):
47            cls.errors.extend([
48                'Found ordering cycle on',
49                'Breaking ordering cycle by deleting job',
50                'deleted to break ordering cycle',
51                'Ordering cycle found, skipping',
52                ])
53
54        cls.errors = [s.casefold() for s in cls.errors]
55
56        cls.load_machine_ignores()
57
58    @classmethod
59    def load_machine_ignores(cls):
60        # Add TARGET_ARCH explicitly as not every machine has that in MACHINEOVERRDES (eg qemux86-64)
61        for candidate in ["common", cls.td.get("TARGET_ARCH")] + cls.td.get("MACHINEOVERRIDES").split(":"):
62            try:
63                name = f"parselogs-ignores-{candidate}.txt"
64                for line in _open_text("oeqa.runtime.cases", name):
65                    line = line.strip()
66                    if line and not line.startswith("#"):
67                        cls.ignore_errors.append(line.casefold())
68            except FileNotFoundError:
69                pass
70
71    # Go through the log locations provided and if it's a folder
72    # create a list with all the .log files in it, if it's a file
73    # just add it to that list.
74    def getLogList(self, log_locations):
75        logs = []
76        for location in log_locations:
77            status, _ = self.target.run('test -f %s' % location)
78            if status == 0:
79                logs.append(location)
80            else:
81                status, _ = self.target.run('test -d %s' % location)
82                if status == 0:
83                    cmd = 'find %s -name \\*.log -maxdepth 1 -type f' % location
84                    status, output = self.target.run(cmd)
85                    if status == 0:
86                        output = output.splitlines()
87                        for logfile in output:
88                            logs.append(os.path.join(location, logfile))
89        return logs
90
91    # Copy the log files to be parsed locally
92    def transfer_logs(self, log_list):
93        workdir = self.td.get('WORKDIR')
94        self.target_logs = workdir + '/' + 'target_logs'
95        target_logs = self.target_logs
96        if os.path.exists(target_logs):
97            rmtree(self.target_logs)
98        os.makedirs(target_logs)
99        for f in log_list:
100            self.target.copyFrom(str(f), target_logs)
101
102    # Get the local list of logs
103    def get_local_log_list(self, log_locations):
104        self.transfer_logs(self.getLogList(log_locations))
105        list_dir = os.listdir(self.target_logs)
106        dir_files = [os.path.join(self.target_logs, f) for f in list_dir]
107        logs = [f for f in dir_files if os.path.isfile(f)]
108        return logs
109
110    def get_context(self, lines, index, before=6, after=3):
111        """
112        Given a set of lines and the index of the line that is important, return
113        a number of lines surrounding that line.
114        """
115        last = len(lines)
116
117        start = index - before
118        end = index + after + 1
119
120        if start < 0:
121            end -= start
122            start = 0
123        if end > last:
124            start -= end - last
125            end = last
126
127        return lines[start:end]
128
129    def test_get_context(self):
130        """
131        A test case for the test case.
132        """
133        lines = list(range(0,10))
134        self.assertEqual(self.get_context(lines, 0, 2, 1), [0, 1, 2, 3])
135        self.assertEqual(self.get_context(lines, 5, 2, 1), [3, 4, 5, 6])
136        self.assertEqual(self.get_context(lines, 9, 2, 1), [6, 7, 8, 9])
137
138    def parse_logs(self, logs, lines_before=10, lines_after=10):
139        """
140        Search the log files @logs looking for error lines (marked by
141        @self.errors), ignoring anything listed in @self.ignore_errors.
142
143        Returns a dictionary of log filenames to a dictionary of error lines to
144        the error context (controlled by @lines_before and @lines_after).
145        """
146        results = collections.defaultdict(dict)
147
148        for log in logs:
149            with open(log) as f:
150                lines = f.readlines()
151
152            for i, line in enumerate(lines):
153                line = line.strip()
154                line_lower = line.casefold()
155
156                if any(keyword in line_lower for keyword in self.errors):
157                    if not any(ignore in line_lower for ignore in self.ignore_errors):
158                        results[log][line] = "".join(self.get_context(lines, i, lines_before, lines_after))
159
160        return results
161
162    # Get the output of dmesg and write it in a file.
163    # This file is added to log_locations.
164    def write_dmesg(self):
165        (status, dmesg) = self.target.run('dmesg > /tmp/dmesg_output.log')
166
167    @OETestDepends(['ssh.SSHTest.test_ssh'])
168    def test_parselogs(self):
169        self.write_dmesg()
170        log_list = self.get_local_log_list(self.log_locations)
171        result = self.parse_logs(log_list)
172
173        errcount = 0
174        self.msg = ""
175        for log in result:
176            self.msg += 'Log: ' + log + '\n'
177            self.msg += '-----------------------\n'
178            for error in result[log]:
179                errcount += 1
180                self.msg += 'Central error: ' + error + '\n'
181                self.msg +=  '***********************\n'
182                self.msg +=  result[log][error] + '\n'
183                self.msg +=  '***********************\n'
184        self.msg += '%s errors found in logs.' % errcount
185        self.assertEqual(errcount, 0, msg=self.msg)
186