1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7import collections 8import functools 9import itertools 10import os.path 11import re 12import oe.patch 13 14_Version = collections.namedtuple( 15 "_Version", ["release", "patch_l", "pre_l", "pre_v"] 16) 17 18@functools.total_ordering 19class Version(): 20 21 def __init__(self, version, suffix=None): 22 23 suffixes = ["alphabetical", "patch"] 24 25 if str(suffix) == "alphabetical": 26 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 27 elif str(suffix) == "patch": 28 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 29 else: 30 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" 31 regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE) 32 33 match = regex.search(version) 34 if not match: 35 raise Exception("Invalid version: '{0}'".format(version)) 36 37 self._version = _Version( 38 release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")), 39 patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "", 40 pre_l=match.group("pre_l"), 41 pre_v=match.group("pre_v") 42 ) 43 44 self._key = _cmpkey( 45 self._version.release, 46 self._version.patch_l, 47 self._version.pre_l, 48 self._version.pre_v 49 ) 50 51 def __eq__(self, other): 52 if not isinstance(other, Version): 53 return NotImplemented 54 return self._key == other._key 55 56 def __gt__(self, other): 57 if not isinstance(other, Version): 58 return NotImplemented 59 return self._key > other._key 60 61def _cmpkey(release, patch_l, pre_l, pre_v): 62 # remove leading 0 63 _release = tuple( 64 reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release)))) 65 ) 66 67 _patch = patch_l.upper() 68 69 if pre_l is None and pre_v is None: 70 _pre = float('inf') 71 else: 72 _pre = float(pre_v) if pre_v else float('-inf') 73 return _release, _patch, _pre 74 75 76def parse_cve_from_filename(patch_filename): 77 """ 78 Parses CVE ID from the filename 79 80 Matches the last "CVE-YYYY-ID" in the file name, also if written 81 in lowercase. Possible to have multiple CVE IDs in a single 82 file name, but only the last one will be detected from the file name. 83 84 Returns the last CVE ID foudn in the filename. If no CVE ID is found 85 an empty string is returned. 86 """ 87 cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE) 88 89 # Check patch file name for CVE ID 90 fname_match = cve_file_name_match.search(patch_filename) 91 return fname_match.group(1).upper() if fname_match else "" 92 93 94def parse_cves_from_patch_contents(patch_contents): 95 """ 96 Parses CVE IDs from patch contents 97 98 Matches all CVE IDs contained on a line that starts with "CVE: ". Any 99 delimiter (',', '&', "and", etc.) can be used without any issues. Multiple 100 "CVE:" lines can also exist. 101 102 Returns a set of all CVE IDs found in the patch contents. 103 """ 104 cve_ids = set() 105 cve_match = re.compile(r"CVE-\d{4}-\d{4,}") 106 # Search for one or more "CVE: " lines 107 for line in patch_contents.split("\n"): 108 if not line.startswith("CVE:"): 109 continue 110 cve_ids.update(cve_match.findall(line)) 111 return cve_ids 112 113 114def parse_cves_from_patch_file(patch_file): 115 """ 116 Parses CVE IDs associated with a particular patch file, using both the filename 117 and patch contents. 118 119 Returns a set of all CVE IDs found in the patch filename and contents. 120 """ 121 cve_ids = set() 122 filename_cve = parse_cve_from_filename(patch_file) 123 if filename_cve: 124 bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file)) 125 cve_ids.add(parse_cve_from_filename(patch_file)) 126 127 # Remote patches won't be present and compressed patches won't be 128 # unpacked, so say we're not scanning them 129 if not os.path.isfile(patch_file): 130 bb.note("%s is remote or compressed, not scanning content" % patch_file) 131 return cve_ids 132 133 with open(patch_file, "r", encoding="utf-8") as f: 134 try: 135 patch_text = f.read() 136 except UnicodeDecodeError: 137 bb.debug( 138 1, 139 "Failed to read patch %s using UTF-8 encoding" 140 " trying with iso8859-1" % patch_file, 141 ) 142 f.close() 143 with open(patch_file, "r", encoding="iso8859-1") as f: 144 patch_text = f.read() 145 146 cve_ids.update(parse_cves_from_patch_contents(patch_text)) 147 148 if not cve_ids: 149 bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file) 150 else: 151 bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids)))) 152 153 return cve_ids 154 155 156def get_patched_cves(d): 157 """ 158 Determines the CVE IDs that have been solved by either patches incuded within 159 SRC_URI or by setting CVE_STATUS. 160 161 Returns a dictionary with the CVE IDs as keys and an associated dictonary of 162 relevant metadata as the value. 163 """ 164 patched_cves = {} 165 patches = oe.patch.src_patches(d) 166 bb.debug(2, "Scanning %d patches for CVEs" % len(patches)) 167 168 # Check each patch file 169 for url in patches: 170 patch_file = bb.fetch.decodeurl(url)[2] 171 for cve_id in parse_cves_from_patch_file(patch_file): 172 if cve_id not in patched_cves: 173 patched_cves[cve_id] = { 174 "abbrev-status": "Patched", 175 "status": "fix-file-included", 176 "resource": [patch_file], 177 } 178 else: 179 patched_cves[cve_id]["resource"].append(patch_file) 180 181 # Search for additional patched CVEs 182 for cve_id in d.getVarFlags("CVE_STATUS") or {}: 183 decoded_status = decode_cve_status(d, cve_id) 184 products = d.getVar("CVE_PRODUCT") 185 if has_cve_product_match(decoded_status, products): 186 if cve_id in patched_cves: 187 bb.warn( 188 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"' 189 % ( 190 cve_id, 191 d.getVarFlag("CVE_STATUS", cve_id), 192 patched_cves[cve_id]["abbrev-status"], 193 patched_cves[cve_id]["status"], 194 ) 195 ) 196 patched_cves[cve_id] = { 197 "abbrev-status": decoded_status["mapping"], 198 "status": decoded_status["detail"], 199 "justification": decoded_status["description"], 200 "affected-vendor": decoded_status["vendor"], 201 "affected-product": decoded_status["product"], 202 } 203 204 return patched_cves 205 206 207def get_cpe_ids(cve_product, version): 208 """ 209 Get list of CPE identifiers for the given product and version 210 """ 211 212 version = version.split("+git")[0] 213 214 cpe_ids = [] 215 for product in cve_product.split(): 216 # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not, 217 # use wildcard for vendor. 218 if ":" in product: 219 vendor, product = product.split(":", 1) 220 else: 221 vendor = "*" 222 223 cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version) 224 cpe_ids.append(cpe_id) 225 226 return cpe_ids 227 228def cve_check_merge_jsons(output, data): 229 """ 230 Merge the data in the "package" property to the main data file 231 output 232 """ 233 if output["version"] != data["version"]: 234 bb.error("Version mismatch when merging JSON outputs") 235 return 236 237 for product in output["package"]: 238 if product["name"] == data["package"][0]["name"]: 239 bb.error("Error adding the same package %s twice" % product["name"]) 240 return 241 242 output["package"].append(data["package"][0]) 243 244def update_symlinks(target_path, link_path): 245 """ 246 Update a symbolic link link_path to point to target_path. 247 Remove the link and recreate it if exist and is different. 248 """ 249 if link_path != target_path and os.path.exists(target_path): 250 if os.path.exists(os.path.realpath(link_path)): 251 os.remove(link_path) 252 os.symlink(os.path.basename(target_path), link_path) 253 254 255def convert_cve_version(version): 256 """ 257 This function converts from CVE format to Yocto version format. 258 eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1 259 260 Unless it is redefined using CVE_VERSION in the recipe, 261 cve_check uses the version in the name of the recipe (${PV}) 262 to check vulnerabilities against a CVE in the database downloaded from NVD. 263 264 When the version has an update, i.e. 265 "p1" in OpenSSH 8.3p1, 266 "-rc1" in linux kernel 6.2-rc1, 267 the database stores the version as version_update (8.3_p1, 6.2_rc1). 268 Therefore, we must transform this version before comparing to the 269 recipe version. 270 271 In this case, the parameter of the function is 8.3_p1. 272 If the version uses the Release Candidate format, "rc", 273 this function replaces the '_' by '-'. 274 If the version uses the Update format, "p", 275 this function removes the '_' completely. 276 """ 277 import re 278 279 matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version) 280 281 if not matches: 282 return version 283 284 version = matches.group(1) 285 update = matches.group(2) 286 287 if matches.group(3) == "rc": 288 return version + '-' + update 289 290 return version + update 291 292def decode_cve_status(d, cve): 293 """ 294 Convert CVE_STATUS into status, vendor, product, detail and description. 295 """ 296 status = d.getVarFlag("CVE_STATUS", cve) 297 if not status: 298 return {} 299 300 status_split = status.split(':', 4) 301 status_out = {} 302 status_out["detail"] = status_split[0] 303 product = "*" 304 vendor = "*" 305 description = "" 306 if len(status_split) >= 4 and status_split[1].strip() == "cpe": 307 # Both vendor and product are mandatory if cpe: present, the syntax is then: 308 # detail: cpe:vendor:product:description 309 vendor = status_split[2].strip() 310 product = status_split[3].strip() 311 description = status_split[4].strip() 312 elif len(status_split) >= 2 and status_split[1].strip() == "cpe": 313 # Malformed CPE 314 bb.warn( 315 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE' 316 % (cve, status) 317 ) 318 else: 319 # Other case: no CPE, the syntax is then: 320 # detail: description 321 description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else "" 322 323 status_out["vendor"] = vendor 324 status_out["product"] = product 325 status_out["description"] = description 326 327 detail = status_out["detail"] 328 status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail) 329 if status_mapping is None: 330 bb.warn( 331 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched' 332 % (detail, cve, status) 333 ) 334 status_mapping = "Unpatched" 335 status_out["mapping"] = status_mapping 336 337 return status_out 338 339def has_cve_product_match(detailed_status, products): 340 """ 341 Check product/vendor match between detailed_status from decode_cve_status and a string of 342 products (like from CVE_PRODUCT) 343 """ 344 for product in products.split(): 345 vendor = "*" 346 if ":" in product: 347 vendor, product = product.split(":", 1) 348 349 if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \ 350 (product == detailed_status["product"] or detailed_status["product"] == "*"): 351 return True 352 353 #if no match, return False 354 return False 355