xref: /openbmc/openbmc/poky/meta/lib/oe/cve_check.py (revision 96e4b4e121e0e2da1535d7d537d6a982a6ff5bc0)
1 #
2 # Copyright OpenEmbedded Contributors
3 #
4 # SPDX-License-Identifier: MIT
5 #
6 
7 import collections
8 import functools
9 import itertools
10 import os.path
11 import re
12 import oe.patch
13 
14 _Version = collections.namedtuple(
15     "_Version", ["release", "patch_l", "pre_l", "pre_v"]
16 )
17 
18 @functools.total_ordering
19 class Version():
20 
21     def __init__(self, version, suffix=None):
22 
23         suffixes = ["alphabetical", "patch"]
24 
25         if str(suffix) == "alphabetical":
26             version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
27         elif str(suffix) == "patch":
28             version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
29         else:
30             version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
31         regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
32 
33         match = regex.search(version)
34         if not match:
35             raise Exception("Invalid version: '{0}'".format(version))
36 
37         self._version = _Version(
38             release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
39             patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
40             pre_l=match.group("pre_l"),
41             pre_v=match.group("pre_v")
42         )
43 
44         self._key = _cmpkey(
45             self._version.release,
46             self._version.patch_l,
47             self._version.pre_l,
48             self._version.pre_v
49         )
50 
51     def __eq__(self, other):
52         if not isinstance(other, Version):
53             return NotImplemented
54         return self._key == other._key
55 
56     def __gt__(self, other):
57         if not isinstance(other, Version):
58             return NotImplemented
59         return self._key > other._key
60 
61 def _cmpkey(release, patch_l, pre_l, pre_v):
62     # remove leading 0
63     _release = tuple(
64         reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
65     )
66 
67     _patch = patch_l.upper()
68 
69     if pre_l is None and pre_v is None:
70         _pre = float('inf')
71     else:
72         _pre = float(pre_v) if pre_v else float('-inf')
73     return _release, _patch, _pre
74 
75 
76 def parse_cve_from_filename(patch_filename):
77     """
78     Parses CVE ID from the filename
79 
80     Matches the last "CVE-YYYY-ID" in the file name, also if written
81     in lowercase. Possible to have multiple CVE IDs in a single
82     file name, but only the last one will be detected from the file name.
83 
84     Returns the last CVE ID foudn in the filename. If no CVE ID is found
85     an empty string is returned.
86     """
87     cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
88 
89     # Check patch file name for CVE ID
90     fname_match = cve_file_name_match.search(patch_filename)
91     return fname_match.group(1).upper() if fname_match else ""
92 
93 
94 def parse_cves_from_patch_contents(patch_contents):
95     """
96     Parses CVE IDs from patch contents
97 
98     Matches all CVE IDs contained on a line that starts with "CVE: ". Any
99     delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
100     "CVE:" lines can also exist.
101 
102     Returns a set of all CVE IDs found in the patch contents.
103     """
104     cve_ids = set()
105     cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
106     # Search for one or more "CVE: " lines
107     for line in patch_contents.split("\n"):
108         if not line.startswith("CVE:"):
109             continue
110         cve_ids.update(cve_match.findall(line))
111     return cve_ids
112 
113 
114 def parse_cves_from_patch_file(patch_file):
115     """
116     Parses CVE IDs associated with a particular patch file, using both the filename
117     and patch contents.
118 
119     Returns a set of all CVE IDs found in the patch filename and contents.
120     """
121     cve_ids = set()
122     filename_cve = parse_cve_from_filename(patch_file)
123     if filename_cve:
124         bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
125         cve_ids.add(parse_cve_from_filename(patch_file))
126 
127     # Remote patches won't be present and compressed patches won't be
128     # unpacked, so say we're not scanning them
129     if not os.path.isfile(patch_file):
130         bb.note("%s is remote or compressed, not scanning content" % patch_file)
131         return cve_ids
132 
133     with open(patch_file, "r", encoding="utf-8") as f:
134         try:
135             patch_text = f.read()
136         except UnicodeDecodeError:
137             bb.debug(
138                 1,
139                 "Failed to read patch %s using UTF-8 encoding"
140                 " trying with iso8859-1" % patch_file,
141             )
142             f.close()
143             with open(patch_file, "r", encoding="iso8859-1") as f:
144                 patch_text = f.read()
145 
146     cve_ids.update(parse_cves_from_patch_contents(patch_text))
147 
148     if not cve_ids:
149         bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
150     else:
151         bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
152 
153     return cve_ids
154 
155 
156 def get_patched_cves(d):
157     """
158     Determines the CVE IDs that have been solved by either patches incuded within
159     SRC_URI or by setting CVE_STATUS.
160 
161     Returns a dictionary with the CVE IDs as keys and an associated dictonary of
162     relevant metadata as the value.
163     """
164     patched_cves = {}
165     patches = oe.patch.src_patches(d)
166     bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
167 
168     # Check each patch file
169     for url in patches:
170         patch_file = bb.fetch.decodeurl(url)[2]
171         for cve_id in parse_cves_from_patch_file(patch_file):
172             if cve_id not in patched_cves:
173                 patched_cves[cve_id] = {
174                     "abbrev-status": "Patched",
175                     "status": "fix-file-included",
176                     "resource": [patch_file],
177                 }
178             else:
179                 patched_cves[cve_id]["resource"].append(patch_file)
180 
181     # Search for additional patched CVEs
182     for cve_id in d.getVarFlags("CVE_STATUS") or {}:
183         decoded_status = decode_cve_status(d, cve_id)
184         products = d.getVar("CVE_PRODUCT")
185         if has_cve_product_match(decoded_status, products):
186             if cve_id in patched_cves:
187                 bb.warn(
188                     'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
189                     % (
190                         cve_id,
191                         d.getVarFlag("CVE_STATUS", cve_id),
192                         patched_cves[cve_id]["abbrev-status"],
193                         patched_cves[cve_id]["status"],
194                     )
195                 )
196             patched_cves[cve_id] = {
197                 "abbrev-status": decoded_status["mapping"],
198                 "status": decoded_status["detail"],
199                 "justification": decoded_status["description"],
200                 "affected-vendor": decoded_status["vendor"],
201                 "affected-product": decoded_status["product"],
202             }
203 
204     return patched_cves
205 
206 
207 def get_cpe_ids(cve_product, version):
208     """
209     Get list of CPE identifiers for the given product and version
210     """
211 
212     version = version.split("+git")[0]
213 
214     cpe_ids = []
215     for product in cve_product.split():
216         # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
217         # use wildcard for vendor.
218         if ":" in product:
219             vendor, product = product.split(":", 1)
220         else:
221             vendor = "*"
222 
223         cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
224         cpe_ids.append(cpe_id)
225 
226     return cpe_ids
227 
228 def cve_check_merge_jsons(output, data):
229     """
230     Merge the data in the "package" property to the main data file
231     output
232     """
233     if output["version"] != data["version"]:
234         bb.error("Version mismatch when merging JSON outputs")
235         return
236 
237     for product in output["package"]:
238         if product["name"] == data["package"][0]["name"]:
239             bb.error("Error adding the same package %s twice" % product["name"])
240             return
241 
242     output["package"].append(data["package"][0])
243 
244 def update_symlinks(target_path, link_path):
245     """
246     Update a symbolic link link_path to point to target_path.
247     Remove the link and recreate it if exist and is different.
248     """
249     if link_path != target_path and os.path.exists(target_path):
250         if os.path.exists(os.path.realpath(link_path)):
251             os.remove(link_path)
252         os.symlink(os.path.basename(target_path), link_path)
253 
254 
255 def convert_cve_version(version):
256     """
257     This function converts from CVE format to Yocto version format.
258     eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
259 
260     Unless it is redefined using CVE_VERSION in the recipe,
261     cve_check uses the version in the name of the recipe (${PV})
262     to check vulnerabilities against a CVE in the database downloaded from NVD.
263 
264     When the version has an update, i.e.
265     "p1" in OpenSSH 8.3p1,
266     "-rc1" in linux kernel 6.2-rc1,
267     the database stores the version as version_update (8.3_p1, 6.2_rc1).
268     Therefore, we must transform this version before comparing to the
269     recipe version.
270 
271     In this case, the parameter of the function is 8.3_p1.
272     If the version uses the Release Candidate format, "rc",
273     this function replaces the '_' by '-'.
274     If the version uses the Update format, "p",
275     this function removes the '_' completely.
276     """
277     import re
278 
279     matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
280 
281     if not matches:
282         return version
283 
284     version = matches.group(1)
285     update = matches.group(2)
286 
287     if matches.group(3) == "rc":
288         return version + '-' + update
289 
290     return version + update
291 
292 def decode_cve_status(d, cve):
293     """
294     Convert CVE_STATUS into status, vendor, product, detail and description.
295     """
296     status = d.getVarFlag("CVE_STATUS", cve)
297     if not status:
298         return {}
299 
300     status_split = status.split(':', 4)
301     status_out = {}
302     status_out["detail"] = status_split[0]
303     product = "*"
304     vendor = "*"
305     description = ""
306     if len(status_split) >= 4 and status_split[1].strip() == "cpe":
307         # Both vendor and product are mandatory if cpe: present, the syntax is then:
308         # detail: cpe:vendor:product:description
309         vendor = status_split[2].strip()
310         product = status_split[3].strip()
311         description = status_split[4].strip()
312     elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
313         # Malformed CPE
314         bb.warn(
315             'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
316             % (cve, status)
317         )
318     else:
319         # Other case: no CPE, the syntax is then:
320         # detail: description
321         description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
322 
323     status_out["vendor"] = vendor
324     status_out["product"] = product
325     status_out["description"] = description
326 
327     detail = status_out["detail"]
328     status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
329     if status_mapping is None:
330         bb.warn(
331             'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
332             % (detail, cve, status)
333         )
334         status_mapping = "Unpatched"
335     status_out["mapping"] = status_mapping
336 
337     return status_out
338 
339 def has_cve_product_match(detailed_status, products):
340     """
341     Check product/vendor match between detailed_status from decode_cve_status and a string of
342     products (like from CVE_PRODUCT)
343     """
344     for product in products.split():
345         vendor = "*"
346         if ":" in product:
347             vendor, product = product.split(":", 1)
348 
349         if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
350             (product == detailed_status["product"] or detailed_status["product"] == "*"):
351             return True
352 
353     #if no match, return False
354     return False
355