1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7from pathlib import Path 8 9import oe.spdx30 10import bb 11import re 12import hashlib 13import uuid 14import os 15import oe.spdx_common 16from datetime import datetime, timezone 17 18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/" 19 20VEX_VERSION = "1.0.0" 21 22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake" 23 24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/" 25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/" 26 27 28@oe.spdx30.register(OE_SPDX_BASE + "id-alias") 29class OEIdAliasExtension(oe.spdx30.extension_Extension): 30 """ 31 This extension allows an Element to provide an internal alias for the SPDX 32 ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects 33 created have a unique UUID namespace and the unihash of the task encoded in 34 their SPDX ID. However, this causes a problem for referencing documents 35 across recipes, since the taskhash of a dependency may not factor into the 36 taskhash of the current task and thus the current task won't rebuild and 37 see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and 38 tasks). 39 40 To help work around this, this extension provides a non-unique alias for an 41 Element by which it can be referenced from other tasks/recipes. When a 42 final SBoM is created, references to these aliases will be replaced with 43 the actual unique SPDX ID. 44 45 Most Elements will automatically get an alias created when they are written 46 out if they do not already have one. To suppress the creation of an alias, 47 add an extension with a blank `alias` property. 48 49 50 It is in internal extension that should be removed when writing out a final 51 SBoM 52 """ 53 54 CLOSED = True 55 INTERNAL = True 56 57 @classmethod 58 def _register_props(cls): 59 super()._register_props() 60 cls._add_property( 61 "alias", 62 oe.spdx30.StringProp(), 63 OE_SPDX_BASE + "alias", 64 max_count=1, 65 ) 66 67 cls._add_property( 68 "link_name", 69 oe.spdx30.StringProp(), 70 OE_SPDX_BASE + "link-name", 71 max_count=1, 72 ) 73 74 75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias") 76class OEFileNameAliasExtension(oe.spdx30.extension_Extension): 77 CLOSED = True 78 INTERNAL = True 79 80 @classmethod 81 def _register_props(cls): 82 super()._register_props() 83 cls._add_property( 84 "aliases", 85 oe.spdx30.ListProp(oe.spdx30.StringProp()), 86 OE_SPDX_BASE + "filename-alias", 87 ) 88 89 90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned") 91class OELicenseScannedExtension(oe.spdx30.extension_Extension): 92 """ 93 The presence of this extension means the file has already been scanned for 94 license information 95 """ 96 97 CLOSED = True 98 INTERNAL = True 99 100 101@oe.spdx30.register(OE_SPDX_BASE + "document-extension") 102class OEDocumentExtension(oe.spdx30.extension_Extension): 103 """ 104 This extension is added to a SpdxDocument to indicate various useful bits 105 of information about its contents 106 """ 107 108 CLOSED = True 109 110 @classmethod 111 def _register_props(cls): 112 super()._register_props() 113 cls._add_property( 114 "is_native", 115 oe.spdx30.BooleanProp(), 116 OE_SPDX_BASE + "is-native", 117 max_count=1, 118 ) 119 120 121def spdxid_hash(*items): 122 h = hashlib.md5() 123 for i in items: 124 if isinstance(i, oe.spdx30.Element): 125 h.update(i._id.encode("utf-8")) 126 else: 127 h.update(i.encode("utf-8")) 128 return h.hexdigest() 129 130 131def spdx_sde(d): 132 sde = d.getVar("SOURCE_DATE_EPOCH") 133 if not sde: 134 return datetime.now(timezone.utc) 135 136 return datetime.fromtimestamp(int(sde), timezone.utc) 137 138 139def get_element_link_id(e): 140 """ 141 Get the string ID which should be used to link to an Element. If the 142 element has an alias, that will be preferred, otherwise its SPDX ID will be 143 used. 144 """ 145 ext = get_alias(e) 146 if ext is not None and ext.alias: 147 return ext.alias 148 return e._id 149 150 151def get_alias(obj): 152 for ext in obj.extension: 153 if not isinstance(ext, OEIdAliasExtension): 154 continue 155 return ext 156 157 return None 158 159 160def hash_id(_id): 161 return hashlib.sha256(_id.encode("utf-8")).hexdigest() 162 163 164def to_list(l): 165 if isinstance(l, set): 166 l = sorted(list(l)) 167 168 if not isinstance(l, (list, tuple)): 169 raise TypeError("Must be a list or tuple. Got %s" % type(l)) 170 171 return l 172 173 174class ObjectSet(oe.spdx30.SHACLObjectSet): 175 def __init__(self, d): 176 super().__init__() 177 self.d = d 178 self.alias_prefix = None 179 180 def create_index(self): 181 self.by_sha256_hash = {} 182 super().create_index() 183 184 def add_index(self, obj): 185 # Check that all elements are given an ID before being inserted 186 if isinstance(obj, oe.spdx30.Element): 187 if not obj._id: 188 raise ValueError("Element missing ID") 189 190 alias_ext = get_alias(obj) 191 if alias_ext is not None and alias_ext.alias: 192 self.obj_by_id[alias_ext.alias] = obj 193 194 for v in obj.verifiedUsing: 195 if not isinstance(v, oe.spdx30.Hash): 196 continue 197 198 if v.algorithm == oe.spdx30.HashAlgorithm.sha256: 199 continue 200 201 self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj) 202 203 super().add_index(obj) 204 if isinstance(obj, oe.spdx30.SpdxDocument): 205 self.doc = obj 206 alias_ext = get_alias(obj) 207 if alias_ext is not None and alias_ext.alias: 208 self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/" 209 210 def __filter_obj(self, obj, attr_filter): 211 return all(getattr(obj, k) == v for k, v in attr_filter.items()) 212 213 def foreach_filter(self, typ, *, match_subclass=True, **attr_filter): 214 for obj in self.foreach_type(typ, match_subclass=match_subclass): 215 if self.__filter_obj(obj, attr_filter): 216 yield obj 217 218 def find_filter(self, typ, *, match_subclass=True, **attr_filter): 219 for obj in self.foreach_filter( 220 typ, match_subclass=match_subclass, **attr_filter 221 ): 222 return obj 223 return None 224 225 def foreach_root(self, typ, **attr_filter): 226 for obj in self.doc.rootElement: 227 if not isinstance(obj, typ): 228 continue 229 230 if self.__filter_obj(obj, attr_filter): 231 yield obj 232 233 def find_root(self, typ, **attr_filter): 234 for obj in self.foreach_root(typ, **attr_filter): 235 return obj 236 return None 237 238 def add_root(self, obj): 239 self.add(obj) 240 self.doc.rootElement.append(obj) 241 return obj 242 243 def is_native(self): 244 for e in self.doc.extension: 245 if not isinstance(e, oe.sbom30.OEDocumentExtension): 246 continue 247 248 if e.is_native is not None: 249 return e.is_native 250 251 return False 252 253 def set_is_native(self, is_native): 254 for e in self.doc.extension: 255 if not isinstance(e, oe.sbom30.OEDocumentExtension): 256 continue 257 258 e.is_native = is_native 259 return 260 261 if is_native: 262 self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True)) 263 264 def add_aliases(self): 265 for o in self.foreach_type(oe.spdx30.Element): 266 self.set_element_alias(o) 267 268 def new_alias_id(self, obj, replace): 269 unihash = self.d.getVar("BB_UNIHASH") 270 namespace = self.get_namespace() + "/" 271 if unihash not in obj._id: 272 bb.warn(f"Unihash {unihash} not found in {obj._id}") 273 return None 274 275 if namespace not in obj._id: 276 bb.warn(f"Namespace {namespace} not found in {obj._id}") 277 return None 278 279 return obj._id.replace(unihash, "UNIHASH").replace( 280 namespace, replace + self.d.getVar("PN") 281 ) 282 283 def remove_internal_extensions(self): 284 def remove(o): 285 o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)] 286 287 for o in self.foreach_type(oe.spdx30.Element): 288 remove(o) 289 290 if self.doc: 291 remove(self.doc) 292 293 def get_namespace(self): 294 namespace_uuid = uuid.uuid5( 295 uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE") 296 ) 297 pn = self.d.getVar("PN") 298 return "%s/%s-%s" % ( 299 self.d.getVar("SPDX_NAMESPACE_PREFIX"), 300 pn, 301 str(uuid.uuid5(namespace_uuid, pn)), 302 ) 303 304 def set_element_alias(self, e): 305 if not e._id or e._id.startswith("_:"): 306 return 307 308 alias_ext = get_alias(e) 309 if alias_ext is None: 310 alias_id = self.new_alias_id(e, self.alias_prefix) 311 if alias_id is not None: 312 e.extension.append(OEIdAliasExtension(alias=alias_id)) 313 elif ( 314 alias_ext.alias 315 and not isinstance(e, oe.spdx30.SpdxDocument) 316 and not alias_ext.alias.startswith(self.alias_prefix) 317 ): 318 bb.warn( 319 f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}" 320 ) 321 322 def new_spdxid(self, *suffix, include_unihash=True): 323 items = [self.get_namespace()] 324 if include_unihash: 325 unihash = self.d.getVar("BB_UNIHASH") 326 items.append(unihash) 327 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) 328 return "/".join(items) 329 330 def new_import(self, key): 331 base = f"SPDX_IMPORTS_{key}" 332 spdxid = self.d.getVar(f"{base}_spdxid") 333 if not spdxid: 334 bb.fatal(f"{key} is not a valid SPDX_IMPORTS key") 335 336 for i in self.doc.import_: 337 if i.externalSpdxId == spdxid: 338 # Already imported 339 return spdxid 340 341 m = oe.spdx30.ExternalMap(externalSpdxId=spdxid) 342 343 uri = self.d.getVar(f"{base}_uri") 344 if uri: 345 m.locationHint = uri 346 347 for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items(): 348 value = self.d.getVar(f"{base}_hash_{pyname}") 349 if value: 350 m.verifiedUsing.append( 351 oe.spdx30.Hash( 352 algorithm=algorithm, 353 hashValue=value, 354 ) 355 ) 356 357 self.doc.import_.append(m) 358 return spdxid 359 360 def new_agent(self, varname, *, creation_info=None, add=True): 361 ref_varname = self.d.getVar(f"{varname}_ref") 362 if ref_varname: 363 if ref_varname == varname: 364 bb.fatal(f"{varname} cannot reference itself") 365 return self.new_agent(ref_varname, creation_info=creation_info) 366 367 import_key = self.d.getVar(f"{varname}_import") 368 if import_key: 369 return self.new_import(import_key) 370 371 name = self.d.getVar(f"{varname}_name") 372 if not name: 373 return None 374 375 spdxid = self.new_spdxid("agent", name) 376 agent = self.find_by_id(spdxid) 377 if agent is not None: 378 return agent 379 380 agent_type = self.d.getVar("%s_type" % varname) 381 if agent_type == "person": 382 agent = oe.spdx30.Person() 383 elif agent_type == "software": 384 agent = oe.spdx30.SoftwareAgent() 385 elif agent_type == "organization": 386 agent = oe.spdx30.Organization() 387 elif not agent_type or agent_type == "agent": 388 agent = oe.spdx30.Agent() 389 else: 390 bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname)) 391 392 agent._id = spdxid 393 agent.creationInfo = creation_info or self.doc.creationInfo 394 agent.name = name 395 396 comment = self.d.getVar("%s_comment" % varname) 397 if comment: 398 agent.comment = comment 399 400 for ( 401 pyname, 402 idtype, 403 ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items(): 404 value = self.d.getVar("%s_id_%s" % (varname, pyname)) 405 if value: 406 agent.externalIdentifier.append( 407 oe.spdx30.ExternalIdentifier( 408 externalIdentifierType=idtype, 409 identifier=value, 410 ) 411 ) 412 413 if add: 414 self.add(agent) 415 416 return agent 417 418 def new_creation_info(self): 419 creation_info = oe.spdx30.CreationInfo() 420 421 name = "%s %s" % ( 422 self.d.getVar("SPDX_TOOL_NAME"), 423 self.d.getVar("SPDX_TOOL_VERSION"), 424 ) 425 tool = self.add( 426 oe.spdx30.Tool( 427 _id=self.new_spdxid("tool", name), 428 creationInfo=creation_info, 429 name=name, 430 ) 431 ) 432 433 authors = [] 434 for a in self.d.getVar("SPDX_AUTHORS").split(): 435 varname = "SPDX_AUTHORS_%s" % a 436 author = self.new_agent(varname, creation_info=creation_info) 437 438 if not author: 439 bb.fatal("Unable to find or create author %s" % a) 440 441 authors.append(author) 442 443 creation_info.created = spdx_sde(self.d) 444 creation_info.specVersion = self.d.getVar("SPDX_VERSION") 445 creation_info.createdBy = authors 446 creation_info.createdUsing = [tool] 447 448 return creation_info 449 450 def copy_creation_info(self, copy): 451 c = oe.spdx30.CreationInfo( 452 created=spdx_sde(self.d), 453 specVersion=self.d.getVar("SPDX_VERSION"), 454 ) 455 456 for author in copy.createdBy: 457 if isinstance(author, str): 458 c.createdBy.append(author) 459 else: 460 c.createdBy.append(author._id) 461 462 for tool in copy.createdUsing: 463 if isinstance(tool, str): 464 c.createdUsing.append(tool) 465 else: 466 c.createdUsing.append(tool._id) 467 468 return c 469 470 def new_annotation(self, subject, comment, typ): 471 return self.add( 472 oe.spdx30.Annotation( 473 _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)), 474 creationInfo=self.doc.creationInfo, 475 annotationType=typ, 476 subject=subject, 477 statement=comment, 478 ) 479 ) 480 481 def _new_relationship( 482 self, 483 cls, 484 from_, 485 typ, 486 to, 487 *, 488 spdxid_name="relationship", 489 **props, 490 ): 491 from_ = to_list(from_) 492 to = to_list(to) 493 494 if not from_: 495 return [] 496 497 if not to: 498 to = [oe.spdx30.Element.NoneElement] 499 500 ret = [] 501 502 for f in from_: 503 hash_args = [typ, f] 504 for k in sorted(props.keys()): 505 hash_args.append(props[k]) 506 hash_args.extend(to) 507 508 relationship = self.add( 509 cls( 510 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)), 511 creationInfo=self.doc.creationInfo, 512 from_=f, 513 relationshipType=typ, 514 to=to, 515 **props, 516 ) 517 ) 518 ret.append(relationship) 519 520 return ret 521 522 def new_relationship(self, from_, typ, to): 523 return self._new_relationship(oe.spdx30.Relationship, from_, typ, to) 524 525 def new_scoped_relationship(self, from_, typ, scope, to): 526 return self._new_relationship( 527 oe.spdx30.LifecycleScopedRelationship, 528 from_, 529 typ, 530 to, 531 scope=scope, 532 ) 533 534 def new_license_expression( 535 self, license_expression, license_data, license_text_map={} 536 ): 537 license_list_version = license_data["licenseListVersion"] 538 # SPDX 3 requires that the license list version be a semver 539 # MAJOR.MINOR.MICRO, but the actual license version might be 540 # MAJOR.MINOR on some older versions. As such, manually append a .0 541 # micro version if its missing to keep SPDX happy 542 if license_list_version.count(".") < 2: 543 license_list_version += ".0" 544 545 spdxid = [ 546 "license", 547 license_list_version, 548 re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression), 549 ] 550 551 license_text = [ 552 (k, license_text_map[k]) for k in sorted(license_text_map.keys()) 553 ] 554 555 if not license_text: 556 lic = self.find_filter( 557 oe.spdx30.simplelicensing_LicenseExpression, 558 simplelicensing_licenseExpression=license_expression, 559 simplelicensing_licenseListVersion=license_list_version, 560 ) 561 if lic is not None: 562 return lic 563 else: 564 spdxid.append(spdxid_hash(*(v for _, v in license_text))) 565 lic = self.find_by_id(self.new_spdxid(*spdxid)) 566 if lic is not None: 567 return lic 568 569 lic = self.add( 570 oe.spdx30.simplelicensing_LicenseExpression( 571 _id=self.new_spdxid(*spdxid), 572 creationInfo=self.doc.creationInfo, 573 simplelicensing_licenseExpression=license_expression, 574 simplelicensing_licenseListVersion=license_list_version, 575 ) 576 ) 577 578 for key, value in license_text: 579 lic.simplelicensing_customIdToUri.append( 580 oe.spdx30.DictionaryEntry(key=key, value=value) 581 ) 582 583 return lic 584 585 def scan_declared_licenses(self, spdx_file, filepath, license_data): 586 for e in spdx_file.extension: 587 if isinstance(e, OELicenseScannedExtension): 588 return 589 590 file_licenses = set() 591 for extracted_lic in oe.spdx_common.extract_licenses(filepath): 592 file_licenses.add(self.new_license_expression(extracted_lic, license_data)) 593 594 self.new_relationship( 595 [spdx_file], 596 oe.spdx30.RelationshipType.hasDeclaredLicense, 597 file_licenses, 598 ) 599 spdx_file.extension.append(OELicenseScannedExtension()) 600 601 def new_file(self, _id, name, path, *, purposes=[]): 602 sha256_hash = bb.utils.sha256_file(path) 603 604 for f in self.by_sha256_hash.get(sha256_hash, []): 605 if not isinstance(f, oe.spdx30.software_File): 606 continue 607 608 if purposes: 609 new_primary = purposes[0] 610 new_additional = [] 611 612 if f.software_primaryPurpose: 613 new_additional.append(f.software_primaryPurpose) 614 new_additional.extend(f.software_additionalPurpose) 615 616 new_additional = sorted( 617 list(set(p for p in new_additional if p != new_primary)) 618 ) 619 620 f.software_primaryPurpose = new_primary 621 f.software_additionalPurpose = new_additional 622 623 if f.name != name: 624 for e in f.extension: 625 if isinstance(e, OEFileNameAliasExtension): 626 e.aliases.append(name) 627 break 628 else: 629 f.extension.append(OEFileNameAliasExtension(aliases=[name])) 630 631 return f 632 633 spdx_file = oe.spdx30.software_File( 634 _id=_id, 635 creationInfo=self.doc.creationInfo, 636 name=name, 637 ) 638 if purposes: 639 spdx_file.software_primaryPurpose = purposes[0] 640 spdx_file.software_additionalPurpose = purposes[1:] 641 642 spdx_file.verifiedUsing.append( 643 oe.spdx30.Hash( 644 algorithm=oe.spdx30.HashAlgorithm.sha256, 645 hashValue=sha256_hash, 646 ) 647 ) 648 649 return self.add(spdx_file) 650 651 def new_cve_vuln(self, cve): 652 v = oe.spdx30.security_Vulnerability() 653 v._id = self.new_spdxid("vulnerability", cve) 654 v.creationInfo = self.doc.creationInfo 655 656 v.externalIdentifier.append( 657 oe.spdx30.ExternalIdentifier( 658 externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve, 659 identifier=cve, 660 identifierLocator=[ 661 f"https://cveawg.mitre.org/api/cve/{cve}", 662 f"https://www.cve.org/CVERecord?id={cve}", 663 ], 664 ) 665 ) 666 return self.add(v) 667 668 def new_vex_patched_relationship(self, from_, to): 669 return self._new_relationship( 670 oe.spdx30.security_VexFixedVulnAssessmentRelationship, 671 from_, 672 oe.spdx30.RelationshipType.fixedIn, 673 to, 674 spdxid_name="vex-fixed", 675 security_vexVersion=VEX_VERSION, 676 ) 677 678 def new_vex_unpatched_relationship(self, from_, to): 679 return self._new_relationship( 680 oe.spdx30.security_VexAffectedVulnAssessmentRelationship, 681 from_, 682 oe.spdx30.RelationshipType.affects, 683 to, 684 spdxid_name="vex-affected", 685 security_vexVersion=VEX_VERSION, 686 ) 687 688 def new_vex_ignored_relationship(self, from_, to, *, impact_statement): 689 return self._new_relationship( 690 oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship, 691 from_, 692 oe.spdx30.RelationshipType.doesNotAffect, 693 to, 694 spdxid_name="vex-not-affected", 695 security_vexVersion=VEX_VERSION, 696 security_impactStatement=impact_statement, 697 ) 698 699 def import_bitbake_build_objset(self): 700 deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX")) 701 bb_objset = load_jsonld( 702 self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True 703 ) 704 self.doc.import_.extend(bb_objset.doc.import_) 705 self.update(bb_objset.objects) 706 707 return bb_objset 708 709 def import_bitbake_build(self): 710 def find_bitbake_build(objset): 711 return objset.find_filter( 712 oe.spdx30.build_Build, 713 build_buildType=SPDX_BUILD_TYPE, 714 ) 715 716 build = find_bitbake_build(self) 717 if build: 718 return build 719 720 bb_objset = self.import_bitbake_build_objset() 721 build = find_bitbake_build(bb_objset) 722 if build is None: 723 bb.fatal(f"No build found in {deploy_dir_spdx}") 724 725 return build 726 727 def new_task_build(self, name, typ): 728 current_task = self.d.getVar("BB_CURRENTTASK") 729 pn = self.d.getVar("PN") 730 731 build = self.add( 732 oe.spdx30.build_Build( 733 _id=self.new_spdxid("build", name), 734 creationInfo=self.doc.creationInfo, 735 name=f"{pn}:do_{current_task}:{name}", 736 build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}", 737 ) 738 ) 739 740 if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1": 741 bitbake_build = self.import_bitbake_build() 742 743 self.new_relationship( 744 [bitbake_build], 745 oe.spdx30.RelationshipType.ancestorOf, 746 [build], 747 ) 748 749 if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1": 750 for varname in sorted(self.d.keys()): 751 if varname.startswith("__"): 752 continue 753 754 value = self.d.getVar(varname, expand=False) 755 756 # TODO: Deal with non-string values 757 if not isinstance(value, str): 758 continue 759 760 build.build_parameter.append( 761 oe.spdx30.DictionaryEntry(key=varname, value=value) 762 ) 763 764 return build 765 766 def new_archive(self, archive_name): 767 return self.add( 768 oe.spdx30.software_File( 769 _id=self.new_spdxid("archive", str(archive_name)), 770 creationInfo=self.doc.creationInfo, 771 name=str(archive_name), 772 software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive, 773 ) 774 ) 775 776 @classmethod 777 def new_objset(cls, d, name, copy_from_bitbake_doc=True): 778 objset = cls(d) 779 780 document = oe.spdx30.SpdxDocument( 781 _id=objset.new_spdxid("document", name), 782 name=name, 783 ) 784 785 document.extension.append( 786 OEIdAliasExtension( 787 alias=objset.new_alias_id( 788 document, 789 OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/", 790 ), 791 ) 792 ) 793 objset.doc = document 794 objset.add_index(document) 795 796 if copy_from_bitbake_doc: 797 bb_objset = objset.import_bitbake_build_objset() 798 document.creationInfo = objset.copy_creation_info( 799 bb_objset.doc.creationInfo 800 ) 801 else: 802 document.creationInfo = objset.new_creation_info() 803 804 return objset 805 806 def expand_collection(self, *, add_objectsets=[]): 807 """ 808 Expands a collection to pull in all missing elements 809 810 Returns the set of ids that could not be found to link into the document 811 """ 812 missing_spdxids = set() 813 imports = {e.externalSpdxId: e for e in self.doc.import_} 814 815 def merge_doc(other): 816 nonlocal imports 817 818 for e in other.doc.import_: 819 if not e.externalSpdxId in imports: 820 imports[e.externalSpdxId] = e 821 822 self.objects |= other.objects 823 824 for o in add_objectsets: 825 merge_doc(o) 826 827 needed_spdxids = self.link() 828 provided_spdxids = set(self.obj_by_id.keys()) 829 830 while True: 831 import_spdxids = set(imports.keys()) 832 searching_spdxids = ( 833 needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids 834 ) 835 if not searching_spdxids: 836 break 837 838 spdxid = searching_spdxids.pop() 839 bb.debug( 840 1, 841 f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}", 842 ) 843 dep_objset, dep_path = find_by_spdxid(self.d, spdxid) 844 845 if dep_objset: 846 dep_provided = set(dep_objset.obj_by_id.keys()) 847 if spdxid not in dep_provided: 848 bb.fatal(f"{spdxid} not found in {dep_path}") 849 provided_spdxids |= dep_provided 850 needed_spdxids |= dep_objset.missing_ids 851 merge_doc(dep_objset) 852 else: 853 missing_spdxids.add(spdxid) 854 855 self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId) 856 bb.debug(1, "Linking...") 857 self.link() 858 self.missing_ids -= set(imports.keys()) 859 return self.missing_ids 860 861 862def load_jsonld(d, path, required=False): 863 deserializer = oe.spdx30.JSONLDDeserializer() 864 objset = ObjectSet(d) 865 try: 866 with path.open("rb") as f: 867 deserializer.read(f, objset) 868 except FileNotFoundError: 869 if required: 870 bb.fatal("No SPDX document named %s found" % path) 871 return None 872 873 if not objset.doc: 874 bb.fatal("SPDX Document %s has no SPDXDocument element" % path) 875 return None 876 877 objset.objects.remove(objset.doc) 878 return objset 879 880 881def jsonld_arch_path(d, arch, subdir, name, deploydir=None): 882 if deploydir is None: 883 deploydir = Path(d.getVar("DEPLOY_DIR_SPDX")) 884 return deploydir / arch / subdir / (name + ".spdx.json") 885 886 887def jsonld_hash_path(h): 888 return Path("by-spdxid-hash") / h[:2], h 889 890 891def load_jsonld_by_arch(d, arch, subdir, name, *, required=False): 892 path = jsonld_arch_path(d, arch, subdir, name) 893 objset = load_jsonld(d, path, required=required) 894 if objset is not None: 895 return (objset, path) 896 return (None, None) 897 898 899def find_jsonld(d, subdir, name, *, required=False): 900 package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split() 901 package_archs.reverse() 902 903 for arch in package_archs: 904 objset, path = load_jsonld_by_arch(d, arch, subdir, name) 905 if objset is not None: 906 return (objset, path) 907 908 if required: 909 bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name)) 910 911 return (None, None) 912 913 914def write_jsonld_doc(d, objset, dest): 915 if not isinstance(objset, ObjectSet): 916 bb.fatal("Only an ObjsetSet can be serialized") 917 return 918 919 if not objset.doc: 920 bb.fatal("ObjectSet is missing a SpdxDocument") 921 return 922 923 objset.doc.rootElement = sorted(list(set(objset.doc.rootElement))) 924 objset.doc.profileConformance = sorted( 925 list( 926 getattr(oe.spdx30.ProfileIdentifierType, p) 927 for p in d.getVar("SPDX_PROFILES").split() 928 ) 929 ) 930 931 dest.parent.mkdir(exist_ok=True, parents=True) 932 933 if d.getVar("SPDX_PRETTY") == "1": 934 serializer = oe.spdx30.JSONLDSerializer( 935 indent=2, 936 ) 937 else: 938 serializer = oe.spdx30.JSONLDInlineSerializer() 939 940 objset.objects.add(objset.doc) 941 with dest.open("wb") as f: 942 serializer.write(objset, f, force_at_graph=True) 943 objset.objects.remove(objset.doc) 944 945 946def write_recipe_jsonld_doc( 947 d, 948 objset, 949 subdir, 950 deploydir, 951 *, 952 create_spdx_id_links=True, 953): 954 pkg_arch = d.getVar("SSTATE_PKGARCH") 955 956 dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir) 957 958 def link_id(_id): 959 hash_path = jsonld_hash_path(hash_id(_id)) 960 961 link_name = jsonld_arch_path( 962 d, 963 pkg_arch, 964 *hash_path, 965 deploydir=deploydir, 966 ) 967 try: 968 link_name.parent.mkdir(exist_ok=True, parents=True) 969 link_name.symlink_to(os.path.relpath(dest, link_name.parent)) 970 except: 971 target = link_name.readlink() 972 bb.warn( 973 f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}" 974 ) 975 raise 976 977 return hash_path[-1] 978 979 objset.add_aliases() 980 981 try: 982 if create_spdx_id_links: 983 alias_ext = get_alias(objset.doc) 984 if alias_ext is not None and alias_ext.alias: 985 alias_ext.link_name = link_id(alias_ext.alias) 986 987 finally: 988 # It is really helpful for debugging if the JSON document is written 989 # out, so always do that even if there is an error making the links 990 write_jsonld_doc(d, objset, dest) 991 992 993def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter): 994 objset, fn = find_jsonld(d, subdir, fn_name, required=True) 995 996 spdx_obj = objset.find_root(obj_type, **attr_filter) 997 if not spdx_obj: 998 bb.fatal("No root %s found in %s" % (obj_type.__name__, fn)) 999 1000 return spdx_obj, objset 1001 1002 1003def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter): 1004 objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True) 1005 1006 spdx_obj = objset.find_filter(obj_type, **attr_filter) 1007 if not spdx_obj: 1008 bb.fatal("No %s found in %s" % (obj_type.__name__, fn)) 1009 1010 return spdx_obj, objset 1011 1012 1013def find_by_spdxid(d, spdxid, *, required=False): 1014 if spdxid.startswith(OE_ALIAS_PREFIX): 1015 h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0] 1016 return find_jsonld(d, *jsonld_hash_path(h), required=required) 1017 return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required) 1018 1019 1020def create_sbom(d, name, root_elements, add_objectsets=[]): 1021 objset = ObjectSet.new_objset(d, name) 1022 1023 sbom = objset.add( 1024 oe.spdx30.software_Sbom( 1025 _id=objset.new_spdxid("sbom", name), 1026 name=name, 1027 creationInfo=objset.doc.creationInfo, 1028 software_sbomType=[oe.spdx30.software_SbomType.build], 1029 rootElement=root_elements, 1030 ) 1031 ) 1032 1033 missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets) 1034 if missing_spdxids: 1035 bb.warn( 1036 "The following SPDX IDs were unable to be resolved:\n " 1037 + "\n ".join(sorted(list(missing_spdxids))) 1038 ) 1039 1040 # Filter out internal extensions from final SBoMs 1041 objset.remove_internal_extensions() 1042 1043 # SBoM should be the only root element of the document 1044 objset.doc.rootElement = [sbom] 1045 1046 # De-duplicate licenses 1047 unique = set() 1048 dedup = {} 1049 for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): 1050 for u in unique: 1051 if ( 1052 u.simplelicensing_licenseExpression 1053 == lic.simplelicensing_licenseExpression 1054 and u.simplelicensing_licenseListVersion 1055 == lic.simplelicensing_licenseListVersion 1056 ): 1057 dedup[lic] = u 1058 break 1059 else: 1060 unique.add(lic) 1061 1062 if dedup: 1063 for rel in objset.foreach_filter( 1064 oe.spdx30.Relationship, 1065 relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense, 1066 ): 1067 rel.to = [dedup.get(to, to) for to in rel.to] 1068 1069 for rel in objset.foreach_filter( 1070 oe.spdx30.Relationship, 1071 relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense, 1072 ): 1073 rel.to = [dedup.get(to, to) for to in rel.to] 1074 1075 for k, v in dedup.items(): 1076 bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}") 1077 objset.objects.remove(k) 1078 1079 objset.create_index() 1080 1081 return objset, sbom 1082