1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7import collections 8import functools 9import itertools 10import os.path 11import re 12import oe.patch 13 14_Version = collections.namedtuple( 15 "_Version", ["release", "patch_l", "pre_l", "pre_v"] 16) 17 18@functools.total_ordering 19class Version(): 20 21 def __init__(self, version, suffix=None): 22 23 suffixes = ["alphabetical", "patch"] 24 25 if str(suffix) == "alphabetical": 26 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 27 elif str(suffix) == "patch": 28 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 29 else: 30 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 31 regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE) 32 33 match = regex.search(version) 34 if not match: 35 raise Exception("Invalid version: '{0}'".format(version)) 36 37 self._version = _Version( 38 release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")), 39 patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "", 40 pre_l=match.group("pre_l"), 41 pre_v=match.group("pre_v") 42 ) 43 44 self._key = _cmpkey( 45 self._version.release, 46 self._version.patch_l, 47 self._version.pre_l, 48 self._version.pre_v 49 ) 50 51 def __eq__(self, other): 52 if not isinstance(other, Version): 53 return NotImplemented 54 return self._key == other._key 55 56 def __gt__(self, other): 57 if not isinstance(other, Version): 58 return NotImplemented 59 return self._key > other._key 60 61def _cmpkey(release, patch_l, pre_l, pre_v): 62 # remove leading 0 63 _release = tuple( 64 reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release)))) 65 ) 66 67 _patch = patch_l.upper() 68 69 if pre_l is None and pre_v is None: 70 _pre = float('inf') 71 else: 72 _pre = float(pre_v) if pre_v else float('-inf') 73 return _release, _patch, _pre 74 75 76def parse_cve_from_filename(patch_filename): 77 """ 78 Parses CVE ID from the filename 79 80 Matches the last "CVE-YYYY-ID" in the file name, also if written 81 in lowercase. Possible to have multiple CVE IDs in a single 82 file name, but only the last one will be detected from the file name. 83 84 Returns the last CVE ID foudn in the filename. If no CVE ID is found 85 an empty string is returned. 86 """ 87 cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE) 88 89 # Check patch file name for CVE ID 90 fname_match = cve_file_name_match.search(patch_filename) 91 return fname_match.group(1).upper() if fname_match else "" 92 93 94def parse_cves_from_patch_contents(patch_contents): 95 """ 96 Parses CVE IDs from patch contents 97 98 Matches all CVE IDs contained on a line that starts with "CVE: ". Any 99 delimiter (',', '&', "and", etc.) can be used without any issues. Multiple 100 "CVE:" lines can also exist. 101 102 Returns a set of all CVE IDs found in the patch contents. 103 """ 104 cve_ids = set() 105 cve_match = re.compile(r"CVE-\d{4}-\d{4,}") 106 # Search for one or more "CVE: " lines 107 for line in patch_contents.split("\n"): 108 if not line.startswith("CVE:"): 109 continue 110 cve_ids.update(cve_match.findall(line)) 111 return cve_ids 112 113 114def parse_cves_from_patch_file(patch_file): 115 """ 116 Parses CVE IDs associated with a particular patch file, using both the filename 117 and patch contents. 118 119 Returns a set of all CVE IDs found in the patch filename and contents. 120 """ 121 cve_ids = set() 122 filename_cve = parse_cve_from_filename(patch_file) 123 if filename_cve: 124 bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file)) 125 cve_ids.add(parse_cve_from_filename(patch_file)) 126 127 # Remote patches won't be present and compressed patches won't be 128 # unpacked, so say we're not scanning them 129 if not os.path.isfile(patch_file): 130 bb.note("%s is remote or compressed, not scanning content" % patch_file) 131 return cve_ids 132 133 with open(patch_file, "r", encoding="utf-8") as f: 134 try: 135 patch_text = f.read() 136 except UnicodeDecodeError: 137 bb.debug( 138 1, 139 "Failed to read patch %s using UTF-8 encoding" 140 " trying with iso8859-1" % patch_file, 141 ) 142 f.close() 143 with open(patch_file, "r", encoding="iso8859-1") as f: 144 patch_text = f.read() 145 146 cve_ids.update(parse_cves_from_patch_contents(patch_text)) 147 148 if not cve_ids: 149 bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file) 150 else: 151 bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids)))) 152 153 return cve_ids 154 155 156@bb.parse.vardeps("CVE_STATUS") 157def get_patched_cves(d): 158 """ 159 Determines the CVE IDs that have been solved by either patches incuded within 160 SRC_URI or by setting CVE_STATUS. 161 162 Returns a dictionary with the CVE IDs as keys and an associated dictonary of 163 relevant metadata as the value. 164 """ 165 patched_cves = {} 166 patches = oe.patch.src_patches(d) 167 bb.debug(2, "Scanning %d patches for CVEs" % len(patches)) 168 169 # Check each patch file 170 for url in patches: 171 patch_file = bb.fetch.decodeurl(url)[2] 172 for cve_id in parse_cves_from_patch_file(patch_file): 173 if cve_id not in patched_cves: 174 patched_cves[cve_id] = { 175 "abbrev-status": "Patched", 176 "status": "fix-file-included", 177 "resource": [patch_file], 178 } 179 else: 180 patched_cves[cve_id]["resource"].append(patch_file) 181 182 # Search for additional patched CVEs 183 for cve_id in d.getVarFlags("CVE_STATUS") or {}: 184 decoded_status = decode_cve_status(d, cve_id) 185 products = d.getVar("CVE_PRODUCT") 186 if has_cve_product_match(decoded_status, products): 187 if cve_id in patched_cves: 188 bb.warn( 189 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"' 190 % ( 191 cve_id, 192 d.getVarFlag("CVE_STATUS", cve_id), 193 patched_cves[cve_id]["abbrev-status"], 194 patched_cves[cve_id]["status"], 195 ) 196 ) 197 patched_cves[cve_id] = { 198 "abbrev-status": decoded_status["mapping"], 199 "status": decoded_status["detail"], 200 "justification": decoded_status["description"], 201 "affected-vendor": decoded_status["vendor"], 202 "affected-product": decoded_status["product"], 203 } 204 205 return patched_cves 206 207 208def get_cpe_ids(cve_product, version): 209 """ 210 Get list of CPE identifiers for the given product and version 211 """ 212 213 version = version.split("+git")[0] 214 215 cpe_ids = [] 216 for product in cve_product.split(): 217 # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not, 218 # use wildcard for vendor. 219 if ":" in product: 220 vendor, product = product.split(":", 1) 221 else: 222 vendor = "*" 223 224 cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version) 225 cpe_ids.append(cpe_id) 226 227 return cpe_ids 228 229def cve_check_merge_jsons(output, data): 230 """ 231 Merge the data in the "package" property to the main data file 232 output 233 """ 234 if output["version"] != data["version"]: 235 bb.error("Version mismatch when merging JSON outputs") 236 return 237 238 for product in output["package"]: 239 if product["name"] == data["package"][0]["name"]: 240 bb.error("Error adding the same package %s twice" % product["name"]) 241 return 242 243 output["package"].append(data["package"][0]) 244 245def update_symlinks(target_path, link_path): 246 """ 247 Update a symbolic link link_path to point to target_path. 248 Remove the link and recreate it if exist and is different. 249 """ 250 if link_path != target_path and os.path.exists(target_path): 251 if os.path.exists(os.path.realpath(link_path)): 252 os.remove(link_path) 253 os.symlink(os.path.basename(target_path), link_path) 254 255 256def convert_cve_version(version): 257 """ 258 This function converts from CVE format to Yocto version format. 259 eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1 260 261 Unless it is redefined using CVE_VERSION in the recipe, 262 cve_check uses the version in the name of the recipe (${PV}) 263 to check vulnerabilities against a CVE in the database downloaded from NVD. 264 265 When the version has an update, i.e. 266 "p1" in OpenSSH 8.3p1, 267 "-rc1" in linux kernel 6.2-rc1, 268 the database stores the version as version_update (8.3_p1, 6.2_rc1). 269 Therefore, we must transform this version before comparing to the 270 recipe version. 271 272 In this case, the parameter of the function is 8.3_p1. 273 If the version uses the Release Candidate format, "rc", 274 this function replaces the '_' by '-'. 275 If the version uses the Update format, "p", 276 this function removes the '_' completely. 277 """ 278 import re 279 280 matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version) 281 282 if not matches: 283 return version 284 285 version = matches.group(1) 286 update = matches.group(2) 287 288 if matches.group(3) == "rc": 289 return version + '-' + update 290 291 return version + update 292 293@bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP") 294def decode_cve_status(d, cve): 295 """ 296 Convert CVE_STATUS into status, vendor, product, detail and description. 297 """ 298 status = d.getVarFlag("CVE_STATUS", cve) 299 if not status: 300 return {} 301 302 status_split = status.split(':', 4) 303 status_out = {} 304 status_out["detail"] = status_split[0] 305 product = "*" 306 vendor = "*" 307 description = "" 308 if len(status_split) >= 4 and status_split[1].strip() == "cpe": 309 # Both vendor and product are mandatory if cpe: present, the syntax is then: 310 # detail: cpe:vendor:product:description 311 vendor = status_split[2].strip() 312 product = status_split[3].strip() 313 description = status_split[4].strip() 314 elif len(status_split) >= 2 and status_split[1].strip() == "cpe": 315 # Malformed CPE 316 bb.warn( 317 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE' 318 % (cve, status) 319 ) 320 else: 321 # Other case: no CPE, the syntax is then: 322 # detail: description 323 description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else "" 324 325 status_out["vendor"] = vendor 326 status_out["product"] = product 327 status_out["description"] = description 328 329 detail = status_out["detail"] 330 status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail) 331 if status_mapping is None: 332 bb.warn( 333 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched' 334 % (detail, cve, status) 335 ) 336 status_mapping = "Unpatched" 337 status_out["mapping"] = status_mapping 338 339 return status_out 340 341def has_cve_product_match(detailed_status, products): 342 """ 343 Check product/vendor match between detailed_status from decode_cve_status and a string of 344 products (like from CVE_PRODUCT) 345 """ 346 for product in products.split(): 347 vendor = "*" 348 if ":" in product: 349 vendor, product = product.split(":", 1) 350 351 if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \ 352 (product == detailed_status["product"] or detailed_status["product"] == "*"): 353 return True 354 355 #if no match, return False 356 return False 357 358def extend_cve_status(d): 359 # do this only once in case multiple classes use this 360 if d.getVar("CVE_STATUS_EXTENDED"): 361 return 362 d.setVar("CVE_STATUS_EXTENDED", "1") 363 364 # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS 365 cve_check_ignore = d.getVar("CVE_CHECK_IGNORE") 366 if cve_check_ignore: 367 bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS") 368 for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split(): 369 d.setVarFlag("CVE_STATUS", cve, "ignored") 370 371 # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once 372 for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split(): 373 cve_group = d.getVar(cve_status_group) 374 if cve_group is not None: 375 for cve in cve_group.split(): 376 d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status")) 377 else: 378 bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group) 379