xref: /openbmc/openbmc/poky/meta/lib/oe/cve_check.py (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import collections
8import functools
9import itertools
10import os.path
11import re
12import oe.patch
13
14_Version = collections.namedtuple(
15    "_Version", ["release", "patch_l", "pre_l", "pre_v"]
16)
17
18@functools.total_ordering
19class Version():
20
21    def __init__(self, version, suffix=None):
22
23        suffixes = ["alphabetical", "patch"]
24
25        if str(suffix) == "alphabetical":
26            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
27        elif str(suffix) == "patch":
28            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
29        else:
30            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
31        regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
32
33        match = regex.search(version)
34        if not match:
35            raise Exception("Invalid version: '{0}'".format(version))
36
37        self._version = _Version(
38            release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
39            patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
40            pre_l=match.group("pre_l"),
41            pre_v=match.group("pre_v")
42        )
43
44        self._key = _cmpkey(
45            self._version.release,
46            self._version.patch_l,
47            self._version.pre_l,
48            self._version.pre_v
49        )
50
51    def __eq__(self, other):
52        if not isinstance(other, Version):
53            return NotImplemented
54        return self._key == other._key
55
56    def __gt__(self, other):
57        if not isinstance(other, Version):
58            return NotImplemented
59        return self._key > other._key
60
61def _cmpkey(release, patch_l, pre_l, pre_v):
62    # remove leading 0
63    _release = tuple(
64        reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
65    )
66
67    _patch = patch_l.upper()
68
69    if pre_l is None and pre_v is None:
70        _pre = float('inf')
71    else:
72        _pre = float(pre_v) if pre_v else float('-inf')
73    return _release, _patch, _pre
74
75
76def parse_cve_from_filename(patch_filename):
77    """
78    Parses CVE ID from the filename
79
80    Matches the last "CVE-YYYY-ID" in the file name, also if written
81    in lowercase. Possible to have multiple CVE IDs in a single
82    file name, but only the last one will be detected from the file name.
83
84    Returns the last CVE ID foudn in the filename. If no CVE ID is found
85    an empty string is returned.
86    """
87    cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
88
89    # Check patch file name for CVE ID
90    fname_match = cve_file_name_match.search(patch_filename)
91    return fname_match.group(1).upper() if fname_match else ""
92
93
94def parse_cves_from_patch_contents(patch_contents):
95    """
96    Parses CVE IDs from patch contents
97
98    Matches all CVE IDs contained on a line that starts with "CVE: ". Any
99    delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
100    "CVE:" lines can also exist.
101
102    Returns a set of all CVE IDs found in the patch contents.
103    """
104    cve_ids = set()
105    cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
106    # Search for one or more "CVE: " lines
107    for line in patch_contents.split("\n"):
108        if not line.startswith("CVE:"):
109            continue
110        cve_ids.update(cve_match.findall(line))
111    return cve_ids
112
113
114def parse_cves_from_patch_file(patch_file):
115    """
116    Parses CVE IDs associated with a particular patch file, using both the filename
117    and patch contents.
118
119    Returns a set of all CVE IDs found in the patch filename and contents.
120    """
121    cve_ids = set()
122    filename_cve = parse_cve_from_filename(patch_file)
123    if filename_cve:
124        bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
125        cve_ids.add(parse_cve_from_filename(patch_file))
126
127    # Remote patches won't be present and compressed patches won't be
128    # unpacked, so say we're not scanning them
129    if not os.path.isfile(patch_file):
130        bb.note("%s is remote or compressed, not scanning content" % patch_file)
131        return cve_ids
132
133    with open(patch_file, "r", encoding="utf-8") as f:
134        try:
135            patch_text = f.read()
136        except UnicodeDecodeError:
137            bb.debug(
138                1,
139                "Failed to read patch %s using UTF-8 encoding"
140                " trying with iso8859-1" % patch_file,
141            )
142            f.close()
143            with open(patch_file, "r", encoding="iso8859-1") as f:
144                patch_text = f.read()
145
146    cve_ids.update(parse_cves_from_patch_contents(patch_text))
147
148    if not cve_ids:
149        bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
150    else:
151        bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
152
153    return cve_ids
154
155
156@bb.parse.vardeps("CVE_STATUS")
157def get_patched_cves(d):
158    """
159    Determines the CVE IDs that have been solved by either patches incuded within
160    SRC_URI or by setting CVE_STATUS.
161
162    Returns a dictionary with the CVE IDs as keys and an associated dictonary of
163    relevant metadata as the value.
164    """
165    patched_cves = {}
166    patches = oe.patch.src_patches(d)
167    bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
168
169    # Check each patch file
170    for url in patches:
171        patch_file = bb.fetch.decodeurl(url)[2]
172        for cve_id in parse_cves_from_patch_file(patch_file):
173            if cve_id not in patched_cves:
174                patched_cves[cve_id] = {
175                    "abbrev-status": "Patched",
176                    "status": "fix-file-included",
177                    "resource": [patch_file],
178                }
179            else:
180                patched_cves[cve_id]["resource"].append(patch_file)
181
182    # Search for additional patched CVEs
183    for cve_id in d.getVarFlags("CVE_STATUS") or {}:
184        decoded_status = decode_cve_status(d, cve_id)
185        products = d.getVar("CVE_PRODUCT")
186        if has_cve_product_match(decoded_status, products):
187            if cve_id in patched_cves:
188                bb.warn(
189                    'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
190                    % (
191                        cve_id,
192                        d.getVarFlag("CVE_STATUS", cve_id),
193                        patched_cves[cve_id]["abbrev-status"],
194                        patched_cves[cve_id]["status"],
195                    )
196                )
197            patched_cves[cve_id] = {
198                "abbrev-status": decoded_status["mapping"],
199                "status": decoded_status["detail"],
200                "justification": decoded_status["description"],
201                "affected-vendor": decoded_status["vendor"],
202                "affected-product": decoded_status["product"],
203            }
204
205    return patched_cves
206
207
208def get_cpe_ids(cve_product, version):
209    """
210    Get list of CPE identifiers for the given product and version
211    """
212
213    version = version.split("+git")[0]
214
215    cpe_ids = []
216    for product in cve_product.split():
217        # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
218        # use wildcard for vendor.
219        if ":" in product:
220            vendor, product = product.split(":", 1)
221        else:
222            vendor = "*"
223
224        cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
225        cpe_ids.append(cpe_id)
226
227    return cpe_ids
228
229def cve_check_merge_jsons(output, data):
230    """
231    Merge the data in the "package" property to the main data file
232    output
233    """
234    if output["version"] != data["version"]:
235        bb.error("Version mismatch when merging JSON outputs")
236        return
237
238    for product in output["package"]:
239        if product["name"] == data["package"][0]["name"]:
240            bb.error("Error adding the same package %s twice" % product["name"])
241            return
242
243    output["package"].append(data["package"][0])
244
245def update_symlinks(target_path, link_path):
246    """
247    Update a symbolic link link_path to point to target_path.
248    Remove the link and recreate it if exist and is different.
249    """
250    if link_path != target_path and os.path.exists(target_path):
251        if os.path.exists(os.path.realpath(link_path)):
252            os.remove(link_path)
253        os.symlink(os.path.basename(target_path), link_path)
254
255
256def convert_cve_version(version):
257    """
258    This function converts from CVE format to Yocto version format.
259    eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
260
261    Unless it is redefined using CVE_VERSION in the recipe,
262    cve_check uses the version in the name of the recipe (${PV})
263    to check vulnerabilities against a CVE in the database downloaded from NVD.
264
265    When the version has an update, i.e.
266    "p1" in OpenSSH 8.3p1,
267    "-rc1" in linux kernel 6.2-rc1,
268    the database stores the version as version_update (8.3_p1, 6.2_rc1).
269    Therefore, we must transform this version before comparing to the
270    recipe version.
271
272    In this case, the parameter of the function is 8.3_p1.
273    If the version uses the Release Candidate format, "rc",
274    this function replaces the '_' by '-'.
275    If the version uses the Update format, "p",
276    this function removes the '_' completely.
277    """
278    import re
279
280    matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
281
282    if not matches:
283        return version
284
285    version = matches.group(1)
286    update = matches.group(2)
287
288    if matches.group(3) == "rc":
289        return version + '-' + update
290
291    return version + update
292
293@bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP")
294def decode_cve_status(d, cve):
295    """
296    Convert CVE_STATUS into status, vendor, product, detail and description.
297    """
298    status = d.getVarFlag("CVE_STATUS", cve)
299    if not status:
300        return {}
301
302    status_split = status.split(':', 4)
303    status_out = {}
304    status_out["detail"] = status_split[0]
305    product = "*"
306    vendor = "*"
307    description = ""
308    if len(status_split) >= 4 and status_split[1].strip() == "cpe":
309        # Both vendor and product are mandatory if cpe: present, the syntax is then:
310        # detail: cpe:vendor:product:description
311        vendor = status_split[2].strip()
312        product = status_split[3].strip()
313        description = status_split[4].strip()
314    elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
315        # Malformed CPE
316        bb.warn(
317            'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
318            % (cve, status)
319        )
320    else:
321        # Other case: no CPE, the syntax is then:
322        # detail: description
323        description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
324
325    status_out["vendor"] = vendor
326    status_out["product"] = product
327    status_out["description"] = description
328
329    detail = status_out["detail"]
330    status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
331    if status_mapping is None:
332        bb.warn(
333            'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
334            % (detail, cve, status)
335        )
336        status_mapping = "Unpatched"
337    status_out["mapping"] = status_mapping
338
339    return status_out
340
341def has_cve_product_match(detailed_status, products):
342    """
343    Check product/vendor match between detailed_status from decode_cve_status and a string of
344    products (like from CVE_PRODUCT)
345    """
346    for product in products.split():
347        vendor = "*"
348        if ":" in product:
349            vendor, product = product.split(":", 1)
350
351        if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
352            (product == detailed_status["product"] or detailed_status["product"] == "*"):
353            return True
354
355    #if no match, return False
356    return False
357
358def extend_cve_status(d):
359    # do this only once in case multiple classes use this
360    if d.getVar("CVE_STATUS_EXTENDED"):
361        return
362    d.setVar("CVE_STATUS_EXTENDED", "1")
363
364    # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS
365    cve_check_ignore = d.getVar("CVE_CHECK_IGNORE")
366    if cve_check_ignore:
367        bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS")
368        for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split():
369            d.setVarFlag("CVE_STATUS", cve, "ignored")
370
371    # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once
372    for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split():
373        cve_group = d.getVar(cve_status_group)
374        if cve_group is not None:
375            for cve in cve_group.split():
376                d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status"))
377        else:
378            bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group)
379