xref: /openbmc/openbmc/poky/meta/lib/oe/cve_check.py (revision 96e4b4e121e0e2da1535d7d537d6a982a6ff5bc0)
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import collections
8import functools
9import itertools
10import os.path
11import re
12import oe.patch
13
14_Version = collections.namedtuple(
15    "_Version", ["release", "patch_l", "pre_l", "pre_v"]
16)
17
18@functools.total_ordering
19class Version():
20
21    def __init__(self, version, suffix=None):
22
23        suffixes = ["alphabetical", "patch"]
24
25        if str(suffix) == "alphabetical":
26            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
27        elif str(suffix) == "patch":
28            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
29        else:
30            version_pattern =  r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
31        regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
32
33        match = regex.search(version)
34        if not match:
35            raise Exception("Invalid version: '{0}'".format(version))
36
37        self._version = _Version(
38            release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
39            patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
40            pre_l=match.group("pre_l"),
41            pre_v=match.group("pre_v")
42        )
43
44        self._key = _cmpkey(
45            self._version.release,
46            self._version.patch_l,
47            self._version.pre_l,
48            self._version.pre_v
49        )
50
51    def __eq__(self, other):
52        if not isinstance(other, Version):
53            return NotImplemented
54        return self._key == other._key
55
56    def __gt__(self, other):
57        if not isinstance(other, Version):
58            return NotImplemented
59        return self._key > other._key
60
61def _cmpkey(release, patch_l, pre_l, pre_v):
62    # remove leading 0
63    _release = tuple(
64        reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
65    )
66
67    _patch = patch_l.upper()
68
69    if pre_l is None and pre_v is None:
70        _pre = float('inf')
71    else:
72        _pre = float(pre_v) if pre_v else float('-inf')
73    return _release, _patch, _pre
74
75
76def parse_cve_from_filename(patch_filename):
77    """
78    Parses CVE ID from the filename
79
80    Matches the last "CVE-YYYY-ID" in the file name, also if written
81    in lowercase. Possible to have multiple CVE IDs in a single
82    file name, but only the last one will be detected from the file name.
83
84    Returns the last CVE ID foudn in the filename. If no CVE ID is found
85    an empty string is returned.
86    """
87    cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
88
89    # Check patch file name for CVE ID
90    fname_match = cve_file_name_match.search(patch_filename)
91    return fname_match.group(1).upper() if fname_match else ""
92
93
94def parse_cves_from_patch_contents(patch_contents):
95    """
96    Parses CVE IDs from patch contents
97
98    Matches all CVE IDs contained on a line that starts with "CVE: ". Any
99    delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
100    "CVE:" lines can also exist.
101
102    Returns a set of all CVE IDs found in the patch contents.
103    """
104    cve_ids = set()
105    cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
106    # Search for one or more "CVE: " lines
107    for line in patch_contents.split("\n"):
108        if not line.startswith("CVE:"):
109            continue
110        cve_ids.update(cve_match.findall(line))
111    return cve_ids
112
113
114def parse_cves_from_patch_file(patch_file):
115    """
116    Parses CVE IDs associated with a particular patch file, using both the filename
117    and patch contents.
118
119    Returns a set of all CVE IDs found in the patch filename and contents.
120    """
121    cve_ids = set()
122    filename_cve = parse_cve_from_filename(patch_file)
123    if filename_cve:
124        bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
125        cve_ids.add(parse_cve_from_filename(patch_file))
126
127    # Remote patches won't be present and compressed patches won't be
128    # unpacked, so say we're not scanning them
129    if not os.path.isfile(patch_file):
130        bb.note("%s is remote or compressed, not scanning content" % patch_file)
131        return cve_ids
132
133    with open(patch_file, "r", encoding="utf-8") as f:
134        try:
135            patch_text = f.read()
136        except UnicodeDecodeError:
137            bb.debug(
138                1,
139                "Failed to read patch %s using UTF-8 encoding"
140                " trying with iso8859-1" % patch_file,
141            )
142            f.close()
143            with open(patch_file, "r", encoding="iso8859-1") as f:
144                patch_text = f.read()
145
146    cve_ids.update(parse_cves_from_patch_contents(patch_text))
147
148    if not cve_ids:
149        bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
150    else:
151        bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
152
153    return cve_ids
154
155
156def get_patched_cves(d):
157    """
158    Determines the CVE IDs that have been solved by either patches incuded within
159    SRC_URI or by setting CVE_STATUS.
160
161    Returns a dictionary with the CVE IDs as keys and an associated dictonary of
162    relevant metadata as the value.
163    """
164    patched_cves = {}
165    patches = oe.patch.src_patches(d)
166    bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
167
168    # Check each patch file
169    for url in patches:
170        patch_file = bb.fetch.decodeurl(url)[2]
171        for cve_id in parse_cves_from_patch_file(patch_file):
172            if cve_id not in patched_cves:
173                patched_cves[cve_id] = {
174                    "abbrev-status": "Patched",
175                    "status": "fix-file-included",
176                    "resource": [patch_file],
177                }
178            else:
179                patched_cves[cve_id]["resource"].append(patch_file)
180
181    # Search for additional patched CVEs
182    for cve_id in d.getVarFlags("CVE_STATUS") or {}:
183        decoded_status = decode_cve_status(d, cve_id)
184        products = d.getVar("CVE_PRODUCT")
185        if has_cve_product_match(decoded_status, products):
186            if cve_id in patched_cves:
187                bb.warn(
188                    'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
189                    % (
190                        cve_id,
191                        d.getVarFlag("CVE_STATUS", cve_id),
192                        patched_cves[cve_id]["abbrev-status"],
193                        patched_cves[cve_id]["status"],
194                    )
195                )
196            patched_cves[cve_id] = {
197                "abbrev-status": decoded_status["mapping"],
198                "status": decoded_status["detail"],
199                "justification": decoded_status["description"],
200                "affected-vendor": decoded_status["vendor"],
201                "affected-product": decoded_status["product"],
202            }
203
204    return patched_cves
205
206
207def get_cpe_ids(cve_product, version):
208    """
209    Get list of CPE identifiers for the given product and version
210    """
211
212    version = version.split("+git")[0]
213
214    cpe_ids = []
215    for product in cve_product.split():
216        # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
217        # use wildcard for vendor.
218        if ":" in product:
219            vendor, product = product.split(":", 1)
220        else:
221            vendor = "*"
222
223        cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
224        cpe_ids.append(cpe_id)
225
226    return cpe_ids
227
228def cve_check_merge_jsons(output, data):
229    """
230    Merge the data in the "package" property to the main data file
231    output
232    """
233    if output["version"] != data["version"]:
234        bb.error("Version mismatch when merging JSON outputs")
235        return
236
237    for product in output["package"]:
238        if product["name"] == data["package"][0]["name"]:
239            bb.error("Error adding the same package %s twice" % product["name"])
240            return
241
242    output["package"].append(data["package"][0])
243
244def update_symlinks(target_path, link_path):
245    """
246    Update a symbolic link link_path to point to target_path.
247    Remove the link and recreate it if exist and is different.
248    """
249    if link_path != target_path and os.path.exists(target_path):
250        if os.path.exists(os.path.realpath(link_path)):
251            os.remove(link_path)
252        os.symlink(os.path.basename(target_path), link_path)
253
254
255def convert_cve_version(version):
256    """
257    This function converts from CVE format to Yocto version format.
258    eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
259
260    Unless it is redefined using CVE_VERSION in the recipe,
261    cve_check uses the version in the name of the recipe (${PV})
262    to check vulnerabilities against a CVE in the database downloaded from NVD.
263
264    When the version has an update, i.e.
265    "p1" in OpenSSH 8.3p1,
266    "-rc1" in linux kernel 6.2-rc1,
267    the database stores the version as version_update (8.3_p1, 6.2_rc1).
268    Therefore, we must transform this version before comparing to the
269    recipe version.
270
271    In this case, the parameter of the function is 8.3_p1.
272    If the version uses the Release Candidate format, "rc",
273    this function replaces the '_' by '-'.
274    If the version uses the Update format, "p",
275    this function removes the '_' completely.
276    """
277    import re
278
279    matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
280
281    if not matches:
282        return version
283
284    version = matches.group(1)
285    update = matches.group(2)
286
287    if matches.group(3) == "rc":
288        return version + '-' + update
289
290    return version + update
291
292def decode_cve_status(d, cve):
293    """
294    Convert CVE_STATUS into status, vendor, product, detail and description.
295    """
296    status = d.getVarFlag("CVE_STATUS", cve)
297    if not status:
298        return {}
299
300    status_split = status.split(':', 4)
301    status_out = {}
302    status_out["detail"] = status_split[0]
303    product = "*"
304    vendor = "*"
305    description = ""
306    if len(status_split) >= 4 and status_split[1].strip() == "cpe":
307        # Both vendor and product are mandatory if cpe: present, the syntax is then:
308        # detail: cpe:vendor:product:description
309        vendor = status_split[2].strip()
310        product = status_split[3].strip()
311        description = status_split[4].strip()
312    elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
313        # Malformed CPE
314        bb.warn(
315            'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
316            % (cve, status)
317        )
318    else:
319        # Other case: no CPE, the syntax is then:
320        # detail: description
321        description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
322
323    status_out["vendor"] = vendor
324    status_out["product"] = product
325    status_out["description"] = description
326
327    detail = status_out["detail"]
328    status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
329    if status_mapping is None:
330        bb.warn(
331            'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
332            % (detail, cve, status)
333        )
334        status_mapping = "Unpatched"
335    status_out["mapping"] = status_mapping
336
337    return status_out
338
339def has_cve_product_match(detailed_status, products):
340    """
341    Check product/vendor match between detailed_status from decode_cve_status and a string of
342    products (like from CVE_PRODUCT)
343    """
344    for product in products.split():
345        vendor = "*"
346        if ":" in product:
347            vendor, product = product.split(":", 1)
348
349        if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
350            (product == detailed_status["product"] or detailed_status["product"] == "*"):
351            return True
352
353    #if no match, return False
354    return False
355