1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7from pathlib import Path 8 9import oe.spdx30 10import bb 11import re 12import hashlib 13import uuid 14import os 15import oe.spdx_common 16from datetime import datetime, timezone 17 18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/" 19 20VEX_VERSION = "1.0.0" 21 22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake" 23 24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/" 25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/" 26 27 28@oe.spdx30.register(OE_SPDX_BASE + "id-alias") 29class OEIdAliasExtension(oe.spdx30.extension_Extension): 30 """ 31 This extension allows an Element to provide an internal alias for the SPDX 32 ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects 33 created have a unique UUID namespace and the unihash of the task encoded in 34 their SPDX ID. However, this causes a problem for referencing documents 35 across recipes, since the taskhash of a dependency may not factor into the 36 taskhash of the current task and thus the current task won't rebuild and 37 see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and 38 tasks). 39 40 To help work around this, this extension provides a non-unique alias for an 41 Element by which it can be referenced from other tasks/recipes. When a 42 final SBoM is created, references to these aliases will be replaced with 43 the actual unique SPDX ID. 44 45 Most Elements will automatically get an alias created when they are written 46 out if they do not already have one. To suppress the creation of an alias, 47 add an extension with a blank `alias` property. 48 49 50 It is in internal extension that should be removed when writing out a final 51 SBoM 52 """ 53 54 CLOSED = True 55 INTERNAL = True 56 57 @classmethod 58 def _register_props(cls): 59 super()._register_props() 60 cls._add_property( 61 "alias", 62 oe.spdx30.StringProp(), 63 OE_SPDX_BASE + "alias", 64 max_count=1, 65 ) 66 67 cls._add_property( 68 "link_name", 69 oe.spdx30.StringProp(), 70 OE_SPDX_BASE + "link-name", 71 max_count=1, 72 ) 73 74 75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias") 76class OEFileNameAliasExtension(oe.spdx30.extension_Extension): 77 CLOSED = True 78 INTERNAL = True 79 80 @classmethod 81 def _register_props(cls): 82 super()._register_props() 83 cls._add_property( 84 "aliases", 85 oe.spdx30.ListProp(oe.spdx30.StringProp()), 86 OE_SPDX_BASE + "filename-alias", 87 ) 88 89 90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned") 91class OELicenseScannedExtension(oe.spdx30.extension_Extension): 92 """ 93 The presence of this extension means the file has already been scanned for 94 license information 95 """ 96 97 CLOSED = True 98 INTERNAL = True 99 100 101@oe.spdx30.register(OE_SPDX_BASE + "document-extension") 102class OEDocumentExtension(oe.spdx30.extension_Extension): 103 """ 104 This extension is added to a SpdxDocument to indicate various useful bits 105 of information about its contents 106 """ 107 108 CLOSED = True 109 110 @classmethod 111 def _register_props(cls): 112 super()._register_props() 113 cls._add_property( 114 "is_native", 115 oe.spdx30.BooleanProp(), 116 OE_SPDX_BASE + "is-native", 117 max_count=1, 118 ) 119 120 121def spdxid_hash(*items): 122 h = hashlib.md5() 123 for i in items: 124 if isinstance(i, oe.spdx30.Element): 125 h.update(i._id.encode("utf-8")) 126 else: 127 h.update(i.encode("utf-8")) 128 return h.hexdigest() 129 130 131def spdx_sde(d): 132 sde = d.getVar("SOURCE_DATE_EPOCH") 133 if not sde: 134 return datetime.now(timezone.utc) 135 136 return datetime.fromtimestamp(int(sde), timezone.utc) 137 138 139def get_element_link_id(e): 140 """ 141 Get the string ID which should be used to link to an Element. If the 142 element has an alias, that will be preferred, otherwise its SPDX ID will be 143 used. 144 """ 145 ext = get_alias(e) 146 if ext is not None and ext.alias: 147 return ext.alias 148 return e._id 149 150 151def get_alias(obj): 152 for ext in obj.extension: 153 if not isinstance(ext, OEIdAliasExtension): 154 continue 155 return ext 156 157 return None 158 159 160def hash_id(_id): 161 return hashlib.sha256(_id.encode("utf-8")).hexdigest() 162 163 164def to_list(l): 165 if isinstance(l, set): 166 l = sorted(list(l)) 167 168 if not isinstance(l, (list, tuple)): 169 raise TypeError("Must be a list or tuple. Got %s" % type(l)) 170 171 return l 172 173 174class ObjectSet(oe.spdx30.SHACLObjectSet): 175 def __init__(self, d): 176 super().__init__() 177 self.d = d 178 self.alias_prefix = None 179 180 def create_index(self): 181 self.by_sha256_hash = {} 182 super().create_index() 183 184 def add_index(self, obj): 185 # Check that all elements are given an ID before being inserted 186 if isinstance(obj, oe.spdx30.Element): 187 if not obj._id: 188 raise ValueError("Element missing ID") 189 190 alias_ext = get_alias(obj) 191 if alias_ext is not None and alias_ext.alias: 192 self.obj_by_id[alias_ext.alias] = obj 193 194 for v in obj.verifiedUsing: 195 if not isinstance(v, oe.spdx30.Hash): 196 continue 197 198 if v.algorithm == oe.spdx30.HashAlgorithm.sha256: 199 continue 200 201 self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj) 202 203 super().add_index(obj) 204 if isinstance(obj, oe.spdx30.SpdxDocument): 205 self.doc = obj 206 alias_ext = get_alias(obj) 207 if alias_ext is not None and alias_ext.alias: 208 self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/" 209 210 def __filter_obj(self, obj, attr_filter): 211 return all(getattr(obj, k) == v for k, v in attr_filter.items()) 212 213 def foreach_filter(self, typ, *, match_subclass=True, **attr_filter): 214 for obj in self.foreach_type(typ, match_subclass=match_subclass): 215 if self.__filter_obj(obj, attr_filter): 216 yield obj 217 218 def find_filter(self, typ, *, match_subclass=True, **attr_filter): 219 for obj in self.foreach_filter( 220 typ, match_subclass=match_subclass, **attr_filter 221 ): 222 return obj 223 return None 224 225 def foreach_root(self, typ, **attr_filter): 226 for obj in self.doc.rootElement: 227 if not isinstance(obj, typ): 228 continue 229 230 if self.__filter_obj(obj, attr_filter): 231 yield obj 232 233 def find_root(self, typ, **attr_filter): 234 for obj in self.foreach_root(typ, **attr_filter): 235 return obj 236 return None 237 238 def add_root(self, obj): 239 self.add(obj) 240 self.doc.rootElement.append(obj) 241 return obj 242 243 def is_native(self): 244 for e in self.doc.extension: 245 if not isinstance(e, oe.sbom30.OEDocumentExtension): 246 continue 247 248 if e.is_native is not None: 249 return e.is_native 250 251 return False 252 253 def set_is_native(self, is_native): 254 for e in self.doc.extension: 255 if not isinstance(e, oe.sbom30.OEDocumentExtension): 256 continue 257 258 e.is_native = is_native 259 return 260 261 if is_native: 262 self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True)) 263 264 def add_aliases(self): 265 for o in self.foreach_type(oe.spdx30.Element): 266 self.set_element_alias(o) 267 268 def new_alias_id(self, obj, replace): 269 unihash = self.d.getVar("BB_UNIHASH") 270 namespace = self.get_namespace() 271 if unihash not in obj._id: 272 bb.warn(f"Unihash {unihash} not found in {obj._id}") 273 return None 274 275 if namespace not in obj._id: 276 bb.warn(f"Namespace {namespace} not found in {obj._id}") 277 return None 278 279 return obj._id.replace(unihash, "UNIHASH").replace( 280 namespace, replace + self.d.getVar("PN") 281 ) 282 283 def remove_internal_extensions(self): 284 def remove(o): 285 o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)] 286 287 for o in self.foreach_type(oe.spdx30.Element): 288 remove(o) 289 290 if self.doc: 291 remove(self.doc) 292 293 def get_namespace(self): 294 namespace_uuid = uuid.uuid5( 295 uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE") 296 ) 297 pn = self.d.getVar("PN") 298 return "%s/%s-%s" % ( 299 self.d.getVar("SPDX_NAMESPACE_PREFIX"), 300 pn, 301 str(uuid.uuid5(namespace_uuid, pn)), 302 ) 303 304 def set_element_alias(self, e): 305 if not e._id or e._id.startswith("_:"): 306 return 307 308 alias_ext = get_alias(e) 309 if alias_ext is None: 310 alias_id = self.new_alias_id(e, self.alias_prefix) 311 if alias_id is not None: 312 e.extension.append(OEIdAliasExtension(alias=alias_id)) 313 elif ( 314 alias_ext.alias 315 and not isinstance(e, oe.spdx30.SpdxDocument) 316 and not alias_ext.alias.startswith(self.alias_prefix) 317 ): 318 bb.warn( 319 f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}" 320 ) 321 322 def new_spdxid(self, *suffix, include_unihash=True): 323 items = [self.get_namespace()] 324 if include_unihash: 325 unihash = self.d.getVar("BB_UNIHASH") 326 items.append(unihash) 327 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) 328 return "/".join(items) 329 330 def new_import(self, key): 331 base = f"SPDX_IMPORTS_{key}" 332 spdxid = self.d.getVar(f"{base}_spdxid") 333 if not spdxid: 334 bb.fatal(f"{key} is not a valid SPDX_IMPORTS key") 335 336 for i in self.doc.import_: 337 if i.externalSpdxId == spdxid: 338 # Already imported 339 return spdxid 340 341 m = oe.spdx30.ExternalMap(externalSpdxId=spdxid) 342 343 uri = self.d.getVar(f"{base}_uri") 344 if uri: 345 m.locationHint = uri 346 347 for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items(): 348 value = self.d.getVar(f"{base}_hash_{pyname}") 349 if value: 350 m.verifiedUsing.append( 351 oe.spdx30.Hash( 352 algorithm=algorithm, 353 hashValue=value, 354 ) 355 ) 356 357 self.doc.import_.append(m) 358 return spdxid 359 360 def new_agent(self, varname, *, creation_info=None, add=True): 361 ref_varname = self.d.getVar(f"{varname}_ref") 362 if ref_varname: 363 if ref_varname == varname: 364 bb.fatal(f"{varname} cannot reference itself") 365 return self.new_agent(ref_varname, creation_info=creation_info) 366 367 import_key = self.d.getVar(f"{varname}_import") 368 if import_key: 369 return self.new_import(import_key) 370 371 name = self.d.getVar(f"{varname}_name") 372 if not name: 373 return None 374 375 spdxid = self.new_spdxid("agent", name) 376 agent = self.find_by_id(spdxid) 377 if agent is not None: 378 return agent 379 380 agent_type = self.d.getVar("%s_type" % varname) 381 if agent_type == "person": 382 agent = oe.spdx30.Person() 383 elif agent_type == "software": 384 agent = oe.spdx30.SoftwareAgent() 385 elif agent_type == "organization": 386 agent = oe.spdx30.Organization() 387 elif not agent_type or agent_type == "agent": 388 agent = oe.spdx30.Agent() 389 else: 390 bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname)) 391 392 agent._id = spdxid 393 agent.creationInfo = creation_info or self.doc.creationInfo 394 agent.name = name 395 396 comment = self.d.getVar("%s_comment" % varname) 397 if comment: 398 agent.comment = comment 399 400 for ( 401 pyname, 402 idtype, 403 ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items(): 404 value = self.d.getVar("%s_id_%s" % (varname, pyname)) 405 if value: 406 agent.externalIdentifier.append( 407 oe.spdx30.ExternalIdentifier( 408 externalIdentifierType=idtype, 409 identifier=value, 410 ) 411 ) 412 413 if add: 414 self.add(agent) 415 416 return agent 417 418 def new_creation_info(self): 419 creation_info = oe.spdx30.CreationInfo() 420 421 name = "%s %s" % ( 422 self.d.getVar("SPDX_TOOL_NAME"), 423 self.d.getVar("SPDX_TOOL_VERSION"), 424 ) 425 tool = self.add( 426 oe.spdx30.Tool( 427 _id=self.new_spdxid("tool", name), 428 creationInfo=creation_info, 429 name=name, 430 ) 431 ) 432 433 authors = [] 434 for a in self.d.getVar("SPDX_AUTHORS").split(): 435 varname = "SPDX_AUTHORS_%s" % a 436 author = self.new_agent(varname, creation_info=creation_info) 437 438 if not author: 439 bb.fatal("Unable to find or create author %s" % a) 440 441 authors.append(author) 442 443 creation_info.created = spdx_sde(self.d) 444 creation_info.specVersion = self.d.getVar("SPDX_VERSION") 445 creation_info.createdBy = authors 446 creation_info.createdUsing = [tool] 447 448 return creation_info 449 450 def copy_creation_info(self, copy): 451 c = oe.spdx30.CreationInfo( 452 created=spdx_sde(self.d), 453 specVersion=self.d.getVar("SPDX_VERSION"), 454 ) 455 456 for author in copy.createdBy: 457 if isinstance(author, str): 458 c.createdBy.append(author) 459 else: 460 c.createdBy.append(author._id) 461 462 for tool in copy.createdUsing: 463 if isinstance(tool, str): 464 c.createdUsing.append(tool) 465 else: 466 c.createdUsing.append(tool._id) 467 468 return c 469 470 def new_annotation(self, subject, comment, typ): 471 return self.add( 472 oe.spdx30.Annotation( 473 _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)), 474 creationInfo=self.doc.creationInfo, 475 annotationType=typ, 476 subject=subject, 477 statement=comment, 478 ) 479 ) 480 481 def _new_relationship( 482 self, 483 cls, 484 from_, 485 typ, 486 to, 487 *, 488 spdxid_name="relationship", 489 **props, 490 ): 491 from_ = to_list(from_) 492 to = to_list(to) 493 494 if not from_: 495 return [] 496 497 if not to: 498 to = [oe.spdx30.IndividualElement.NoneElement] 499 500 ret = [] 501 502 for f in from_: 503 hash_args = [typ, f] 504 for k in sorted(props.keys()): 505 hash_args.append(props[k]) 506 hash_args.extend(to) 507 508 relationship = self.add( 509 cls( 510 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)), 511 creationInfo=self.doc.creationInfo, 512 from_=f, 513 relationshipType=typ, 514 to=to, 515 **props, 516 ) 517 ) 518 ret.append(relationship) 519 520 return ret 521 522 def new_relationship(self, from_, typ, to): 523 return self._new_relationship(oe.spdx30.Relationship, from_, typ, to) 524 525 def new_scoped_relationship(self, from_, typ, scope, to): 526 return self._new_relationship( 527 oe.spdx30.LifecycleScopedRelationship, 528 from_, 529 typ, 530 to, 531 scope=scope, 532 ) 533 534 def new_license_expression( 535 self, license_expression, license_data, license_text_map={} 536 ): 537 license_list_version = license_data["licenseListVersion"] 538 # SPDX 3 requires that the license list version be a semver 539 # MAJOR.MINOR.MICRO, but the actual license version might be 540 # MAJOR.MINOR on some older versions. As such, manually append a .0 541 # micro version if its missing to keep SPDX happy 542 if license_list_version.count(".") < 2: 543 license_list_version += ".0" 544 545 spdxid = [ 546 "license", 547 license_list_version, 548 re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression), 549 ] 550 551 license_text = [ 552 (k, license_text_map[k]) for k in sorted(license_text_map.keys()) 553 ] 554 555 if not license_text: 556 lic = self.find_filter( 557 oe.spdx30.simplelicensing_LicenseExpression, 558 simplelicensing_licenseExpression=license_expression, 559 simplelicensing_licenseListVersion=license_list_version, 560 ) 561 if lic is not None: 562 return lic 563 else: 564 spdxid.append(spdxid_hash(*(v for _, v in license_text))) 565 lic = self.find_by_id(self.new_spdxid(*spdxid)) 566 if lic is not None: 567 return lic 568 569 lic = self.add( 570 oe.spdx30.simplelicensing_LicenseExpression( 571 _id=self.new_spdxid(*spdxid), 572 creationInfo=self.doc.creationInfo, 573 simplelicensing_licenseExpression=license_expression, 574 simplelicensing_licenseListVersion=license_list_version, 575 ) 576 ) 577 578 for key, value in license_text: 579 lic.simplelicensing_customIdToUri.append( 580 oe.spdx30.DictionaryEntry(key=key, value=value) 581 ) 582 583 return lic 584 585 def scan_declared_licenses(self, spdx_file, filepath, license_data): 586 for e in spdx_file.extension: 587 if isinstance(e, OELicenseScannedExtension): 588 return 589 590 file_licenses = set() 591 for extracted_lic in oe.spdx_common.extract_licenses(filepath): 592 lic = self.new_license_expression(extracted_lic, license_data) 593 self.set_element_alias(lic) 594 file_licenses.add(lic) 595 596 self.new_relationship( 597 [spdx_file], 598 oe.spdx30.RelationshipType.hasDeclaredLicense, 599 [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses], 600 ) 601 spdx_file.extension.append(OELicenseScannedExtension()) 602 603 def new_file(self, _id, name, path, *, purposes=[]): 604 sha256_hash = bb.utils.sha256_file(path) 605 606 for f in self.by_sha256_hash.get(sha256_hash, []): 607 if not isinstance(f, oe.spdx30.software_File): 608 continue 609 610 if purposes: 611 new_primary = purposes[0] 612 new_additional = [] 613 614 if f.software_primaryPurpose: 615 new_additional.append(f.software_primaryPurpose) 616 new_additional.extend(f.software_additionalPurpose) 617 618 new_additional = sorted( 619 list(set(p for p in new_additional if p != new_primary)) 620 ) 621 622 f.software_primaryPurpose = new_primary 623 f.software_additionalPurpose = new_additional 624 625 if f.name != name: 626 for e in f.extension: 627 if isinstance(e, OEFileNameAliasExtension): 628 e.aliases.append(name) 629 break 630 else: 631 f.extension.append(OEFileNameAliasExtension(aliases=[name])) 632 633 return f 634 635 spdx_file = oe.spdx30.software_File( 636 _id=_id, 637 creationInfo=self.doc.creationInfo, 638 name=name, 639 ) 640 if purposes: 641 spdx_file.software_primaryPurpose = purposes[0] 642 spdx_file.software_additionalPurpose = purposes[1:] 643 644 spdx_file.verifiedUsing.append( 645 oe.spdx30.Hash( 646 algorithm=oe.spdx30.HashAlgorithm.sha256, 647 hashValue=sha256_hash, 648 ) 649 ) 650 651 return self.add(spdx_file) 652 653 def new_cve_vuln(self, cve): 654 v = oe.spdx30.security_Vulnerability() 655 v._id = self.new_spdxid("vulnerability", cve) 656 v.creationInfo = self.doc.creationInfo 657 658 v.externalIdentifier.append( 659 oe.spdx30.ExternalIdentifier( 660 externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve, 661 identifier=cve, 662 identifierLocator=[ 663 f"https://cveawg.mitre.org/api/cve/{cve}", 664 f"https://www.cve.org/CVERecord?id={cve}", 665 ], 666 ) 667 ) 668 return self.add(v) 669 670 def new_vex_patched_relationship(self, from_, to): 671 return self._new_relationship( 672 oe.spdx30.security_VexFixedVulnAssessmentRelationship, 673 from_, 674 oe.spdx30.RelationshipType.fixedIn, 675 to, 676 spdxid_name="vex-fixed", 677 security_vexVersion=VEX_VERSION, 678 ) 679 680 def new_vex_unpatched_relationship(self, from_, to): 681 return self._new_relationship( 682 oe.spdx30.security_VexAffectedVulnAssessmentRelationship, 683 from_, 684 oe.spdx30.RelationshipType.affects, 685 to, 686 spdxid_name="vex-affected", 687 security_vexVersion=VEX_VERSION, 688 ) 689 690 def new_vex_ignored_relationship(self, from_, to, *, impact_statement): 691 return self._new_relationship( 692 oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship, 693 from_, 694 oe.spdx30.RelationshipType.doesNotAffect, 695 to, 696 spdxid_name="vex-not-affected", 697 security_vexVersion=VEX_VERSION, 698 security_impactStatement=impact_statement, 699 ) 700 701 def import_bitbake_build_objset(self): 702 deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX")) 703 bb_objset = load_jsonld( 704 self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True 705 ) 706 self.doc.import_.extend(bb_objset.doc.import_) 707 self.update(bb_objset.objects) 708 709 return bb_objset 710 711 def import_bitbake_build(self): 712 def find_bitbake_build(objset): 713 return objset.find_filter( 714 oe.spdx30.build_Build, 715 build_buildType=SPDX_BUILD_TYPE, 716 ) 717 718 build = find_bitbake_build(self) 719 if build: 720 return build 721 722 bb_objset = self.import_bitbake_build_objset() 723 build = find_bitbake_build(bb_objset) 724 if build is None: 725 bb.fatal(f"No build found in {deploy_dir_spdx}") 726 727 return build 728 729 def new_task_build(self, name, typ): 730 current_task = self.d.getVar("BB_CURRENTTASK") 731 pn = self.d.getVar("PN") 732 733 build = self.add( 734 oe.spdx30.build_Build( 735 _id=self.new_spdxid("build", name), 736 creationInfo=self.doc.creationInfo, 737 name=f"{pn}:do_{current_task}:{name}", 738 build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}", 739 ) 740 ) 741 742 if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1": 743 bitbake_build = self.import_bitbake_build() 744 745 self.new_relationship( 746 [bitbake_build], 747 oe.spdx30.RelationshipType.ancestorOf, 748 [build], 749 ) 750 751 if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1": 752 for varname in sorted(self.d.keys()): 753 if varname.startswith("__"): 754 continue 755 756 value = self.d.getVar(varname, expand=False) 757 758 # TODO: Deal with non-string values 759 if not isinstance(value, str): 760 continue 761 762 build.build_parameter.append( 763 oe.spdx30.DictionaryEntry(key=varname, value=value) 764 ) 765 766 return build 767 768 def new_archive(self, archive_name): 769 return self.add( 770 oe.spdx30.software_File( 771 _id=self.new_spdxid("archive", str(archive_name)), 772 creationInfo=self.doc.creationInfo, 773 name=str(archive_name), 774 software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive, 775 ) 776 ) 777 778 @classmethod 779 def new_objset(cls, d, name, copy_from_bitbake_doc=True): 780 objset = cls(d) 781 782 document = oe.spdx30.SpdxDocument( 783 _id=objset.new_spdxid("document", name), 784 name=name, 785 ) 786 787 document.extension.append( 788 OEIdAliasExtension( 789 alias=objset.new_alias_id( 790 document, 791 OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/", 792 ), 793 ) 794 ) 795 objset.doc = document 796 objset.add_index(document) 797 798 if copy_from_bitbake_doc: 799 bb_objset = objset.import_bitbake_build_objset() 800 document.creationInfo = objset.copy_creation_info( 801 bb_objset.doc.creationInfo 802 ) 803 else: 804 document.creationInfo = objset.new_creation_info() 805 806 return objset 807 808 def expand_collection(self, *, add_objectsets=[]): 809 """ 810 Expands a collection to pull in all missing elements 811 812 Returns the set of ids that could not be found to link into the document 813 """ 814 missing_spdxids = set() 815 imports = {e.externalSpdxId: e for e in self.doc.import_} 816 817 def merge_doc(other): 818 nonlocal imports 819 820 for e in other.doc.import_: 821 if not e.externalSpdxId in imports: 822 imports[e.externalSpdxId] = e 823 824 self.objects |= other.objects 825 826 for o in add_objectsets: 827 merge_doc(o) 828 829 needed_spdxids = self.link() 830 provided_spdxids = set(self.obj_by_id.keys()) 831 832 while True: 833 import_spdxids = set(imports.keys()) 834 searching_spdxids = ( 835 needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids 836 ) 837 if not searching_spdxids: 838 break 839 840 spdxid = searching_spdxids.pop() 841 bb.debug( 842 1, 843 f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}", 844 ) 845 dep_objset, dep_path = find_by_spdxid(self.d, spdxid) 846 847 if dep_objset: 848 dep_provided = set(dep_objset.obj_by_id.keys()) 849 if spdxid not in dep_provided: 850 bb.fatal(f"{spdxid} not found in {dep_path}") 851 provided_spdxids |= dep_provided 852 needed_spdxids |= dep_objset.missing_ids 853 merge_doc(dep_objset) 854 else: 855 missing_spdxids.add(spdxid) 856 857 self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId) 858 bb.debug(1, "Linking...") 859 self.link() 860 861 # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry 862 # items and resolve any aliases to actual objects. 863 for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): 864 for d in lic.simplelicensing_customIdToUri: 865 if d.value.startswith(OE_ALIAS_PREFIX): 866 obj = self.find_by_id(d.value) 867 if obj is not None: 868 d.value = obj._id 869 else: 870 self.missing_ids.add(d.value) 871 872 self.missing_ids -= set(imports.keys()) 873 return self.missing_ids 874 875 876def load_jsonld(d, path, required=False): 877 deserializer = oe.spdx30.JSONLDDeserializer() 878 objset = ObjectSet(d) 879 try: 880 with path.open("rb") as f: 881 deserializer.read(f, objset) 882 except FileNotFoundError: 883 if required: 884 bb.fatal("No SPDX document named %s found" % path) 885 return None 886 887 if not objset.doc: 888 bb.fatal("SPDX Document %s has no SPDXDocument element" % path) 889 return None 890 891 objset.objects.remove(objset.doc) 892 return objset 893 894 895def jsonld_arch_path(d, arch, subdir, name, deploydir=None): 896 if deploydir is None: 897 deploydir = Path(d.getVar("DEPLOY_DIR_SPDX")) 898 return deploydir / arch / subdir / (name + ".spdx.json") 899 900 901def jsonld_hash_path(h): 902 return Path("by-spdxid-hash") / h[:2], h 903 904 905def load_jsonld_by_arch(d, arch, subdir, name, *, required=False): 906 path = jsonld_arch_path(d, arch, subdir, name) 907 objset = load_jsonld(d, path, required=required) 908 if objset is not None: 909 return (objset, path) 910 return (None, None) 911 912 913def find_jsonld(d, subdir, name, *, required=False): 914 package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split() 915 package_archs.reverse() 916 917 for arch in package_archs: 918 objset, path = load_jsonld_by_arch(d, arch, subdir, name) 919 if objset is not None: 920 return (objset, path) 921 922 if required: 923 bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name)) 924 925 return (None, None) 926 927 928def write_jsonld_doc(d, objset, dest): 929 if not isinstance(objset, ObjectSet): 930 bb.fatal("Only an ObjsetSet can be serialized") 931 return 932 933 if not objset.doc: 934 bb.fatal("ObjectSet is missing a SpdxDocument") 935 return 936 937 objset.doc.rootElement = sorted(list(set(objset.doc.rootElement))) 938 objset.doc.profileConformance = sorted( 939 list( 940 getattr(oe.spdx30.ProfileIdentifierType, p) 941 for p in d.getVar("SPDX_PROFILES").split() 942 ) 943 ) 944 945 dest.parent.mkdir(exist_ok=True, parents=True) 946 947 if d.getVar("SPDX_PRETTY") == "1": 948 serializer = oe.spdx30.JSONLDSerializer( 949 indent=2, 950 ) 951 else: 952 serializer = oe.spdx30.JSONLDInlineSerializer() 953 954 objset.objects.add(objset.doc) 955 with dest.open("wb") as f: 956 serializer.write(objset, f, force_at_graph=True) 957 objset.objects.remove(objset.doc) 958 959 960def write_recipe_jsonld_doc( 961 d, 962 objset, 963 subdir, 964 deploydir, 965 *, 966 create_spdx_id_links=True, 967): 968 pkg_arch = d.getVar("SSTATE_PKGARCH") 969 970 dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir) 971 972 def link_id(_id): 973 hash_path = jsonld_hash_path(hash_id(_id)) 974 975 link_name = jsonld_arch_path( 976 d, 977 pkg_arch, 978 *hash_path, 979 deploydir=deploydir, 980 ) 981 try: 982 link_name.parent.mkdir(exist_ok=True, parents=True) 983 link_name.symlink_to(os.path.relpath(dest, link_name.parent)) 984 except: 985 target = link_name.readlink() 986 bb.warn( 987 f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}" 988 ) 989 raise 990 991 return hash_path[-1] 992 993 objset.add_aliases() 994 995 try: 996 if create_spdx_id_links: 997 alias_ext = get_alias(objset.doc) 998 if alias_ext is not None and alias_ext.alias: 999 alias_ext.link_name = link_id(alias_ext.alias) 1000 1001 finally: 1002 # It is really helpful for debugging if the JSON document is written 1003 # out, so always do that even if there is an error making the links 1004 write_jsonld_doc(d, objset, dest) 1005 1006 1007def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter): 1008 objset, fn = find_jsonld(d, subdir, fn_name, required=True) 1009 1010 spdx_obj = objset.find_root(obj_type, **attr_filter) 1011 if not spdx_obj: 1012 bb.fatal("No root %s found in %s" % (obj_type.__name__, fn)) 1013 1014 return spdx_obj, objset 1015 1016 1017def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter): 1018 objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True) 1019 1020 spdx_obj = objset.find_filter(obj_type, **attr_filter) 1021 if not spdx_obj: 1022 bb.fatal("No %s found in %s" % (obj_type.__name__, fn)) 1023 1024 return spdx_obj, objset 1025 1026 1027def find_by_spdxid(d, spdxid, *, required=False): 1028 if spdxid.startswith(OE_ALIAS_PREFIX): 1029 h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0] 1030 return find_jsonld(d, *jsonld_hash_path(h), required=required) 1031 return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required) 1032 1033 1034def create_sbom(d, name, root_elements, add_objectsets=[]): 1035 objset = ObjectSet.new_objset(d, name) 1036 1037 sbom = objset.add( 1038 oe.spdx30.software_Sbom( 1039 _id=objset.new_spdxid("sbom", name), 1040 name=name, 1041 creationInfo=objset.doc.creationInfo, 1042 software_sbomType=[oe.spdx30.software_SbomType.build], 1043 rootElement=root_elements, 1044 ) 1045 ) 1046 1047 missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets) 1048 if missing_spdxids: 1049 bb.warn( 1050 "The following SPDX IDs were unable to be resolved:\n " 1051 + "\n ".join(sorted(list(missing_spdxids))) 1052 ) 1053 1054 # Filter out internal extensions from final SBoMs 1055 objset.remove_internal_extensions() 1056 1057 # SBoM should be the only root element of the document 1058 objset.doc.rootElement = [sbom] 1059 1060 # De-duplicate licenses 1061 unique = set() 1062 dedup = {} 1063 for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): 1064 for u in unique: 1065 if ( 1066 u.simplelicensing_licenseExpression 1067 == lic.simplelicensing_licenseExpression 1068 and u.simplelicensing_licenseListVersion 1069 == lic.simplelicensing_licenseListVersion 1070 ): 1071 dedup[lic] = u 1072 break 1073 else: 1074 unique.add(lic) 1075 1076 if dedup: 1077 for rel in objset.foreach_filter( 1078 oe.spdx30.Relationship, 1079 relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense, 1080 ): 1081 rel.to = [dedup.get(to, to) for to in rel.to] 1082 1083 for rel in objset.foreach_filter( 1084 oe.spdx30.Relationship, 1085 relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense, 1086 ): 1087 rel.to = [dedup.get(to, to) for to in rel.to] 1088 1089 for k, v in dedup.items(): 1090 bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}") 1091 objset.objects.remove(k) 1092 1093 objset.create_index() 1094 1095 return objset, sbom 1096