1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7from pathlib import Path 8 9import oe.spdx30 10import bb 11import re 12import hashlib 13import uuid 14import os 15import oe.spdx_common 16from datetime import datetime, timezone 17 18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/" 19 20VEX_VERSION = "1.0.0" 21 22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake" 23 24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/" 25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/" 26 27 28@oe.spdx30.register(OE_SPDX_BASE + "id-alias") 29class OEIdAliasExtension(oe.spdx30.extension_Extension): 30 """ 31 This extension allows an Element to provide an internal alias for the SPDX 32 ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects 33 created have a unique UUID namespace and the unihash of the task encoded in 34 their SPDX ID. However, this causes a problem for referencing documents 35 across recipes, since the taskhash of a dependency may not factor into the 36 taskhash of the current task and thus the current task won't rebuild and 37 see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and 38 tasks). 39 40 To help work around this, this extension provides a non-unique alias for an 41 Element by which it can be referenced from other tasks/recipes. When a 42 final SBoM is created, references to these aliases will be replaced with 43 the actual unique SPDX ID. 44 45 Most Elements will automatically get an alias created when they are written 46 out if they do not already have one. To suppress the creation of an alias, 47 add an extension with a blank `alias` property. 48 49 50 It is in internal extension that should be removed when writing out a final 51 SBoM 52 """ 53 54 CLOSED = True 55 INTERNAL = True 56 57 @classmethod 58 def _register_props(cls): 59 super()._register_props() 60 cls._add_property( 61 "alias", 62 oe.spdx30.StringProp(), 63 OE_SPDX_BASE + "alias", 64 max_count=1, 65 ) 66 67 cls._add_property( 68 "link_name", 69 oe.spdx30.StringProp(), 70 OE_SPDX_BASE + "link-name", 71 max_count=1, 72 ) 73 74 75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias") 76class OEFileNameAliasExtension(oe.spdx30.extension_Extension): 77 CLOSED = True 78 INTERNAL = True 79 80 @classmethod 81 def _register_props(cls): 82 super()._register_props() 83 cls._add_property( 84 "aliases", 85 oe.spdx30.ListProp(oe.spdx30.StringProp()), 86 OE_SPDX_BASE + "filename-alias", 87 ) 88 89 90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned") 91class OELicenseScannedExtension(oe.spdx30.extension_Extension): 92 """ 93 The presence of this extension means the file has already been scanned for 94 license information 95 """ 96 97 CLOSED = True 98 INTERNAL = True 99 100 101@oe.spdx30.register(OE_SPDX_BASE + "document-extension") 102class OEDocumentExtension(oe.spdx30.extension_Extension): 103 """ 104 This extension is added to a SpdxDocument to indicate various useful bits 105 of information about its contents 106 """ 107 108 CLOSED = True 109 110 @classmethod 111 def _register_props(cls): 112 super()._register_props() 113 cls._add_property( 114 "is_native", 115 oe.spdx30.BooleanProp(), 116 OE_SPDX_BASE + "is-native", 117 max_count=1, 118 ) 119 120 121def spdxid_hash(*items): 122 h = hashlib.md5() 123 for i in items: 124 if isinstance(i, oe.spdx30.Element): 125 h.update(i._id.encode("utf-8")) 126 else: 127 h.update(i.encode("utf-8")) 128 return h.hexdigest() 129 130 131def spdx_sde(d): 132 sde = d.getVar("SOURCE_DATE_EPOCH") 133 if not sde: 134 return datetime.now(timezone.utc) 135 136 return datetime.fromtimestamp(int(sde), timezone.utc) 137 138 139def get_element_link_id(e): 140 """ 141 Get the string ID which should be used to link to an Element. If the 142 element has an alias, that will be preferred, otherwise its SPDX ID will be 143 used. 144 """ 145 ext = get_alias(e) 146 if ext is not None and ext.alias: 147 return ext.alias 148 return e._id 149 150 151def get_alias(obj): 152 for ext in obj.extension: 153 if not isinstance(ext, OEIdAliasExtension): 154 continue 155 return ext 156 157 return None 158 159 160def hash_id(_id): 161 return hashlib.sha256(_id.encode("utf-8")).hexdigest() 162 163 164def to_list(l): 165 if isinstance(l, set): 166 l = sorted(list(l)) 167 168 if not isinstance(l, (list, tuple)): 169 raise TypeError("Must be a list or tuple. Got %s" % type(l)) 170 171 return l 172 173 174class ObjectSet(oe.spdx30.SHACLObjectSet): 175 def __init__(self, d): 176 super().__init__() 177 self.d = d 178 self.alias_prefix = None 179 180 def create_index(self): 181 self.by_sha256_hash = {} 182 super().create_index() 183 184 def add_index(self, obj): 185 # Check that all elements are given an ID before being inserted 186 if isinstance(obj, oe.spdx30.Element): 187 if not obj._id: 188 raise ValueError("Element missing ID") 189 190 alias_ext = get_alias(obj) 191 if alias_ext is not None and alias_ext.alias: 192 self.obj_by_id[alias_ext.alias] = obj 193 194 for v in obj.verifiedUsing: 195 if not isinstance(v, oe.spdx30.Hash): 196 continue 197 198 if v.algorithm != oe.spdx30.HashAlgorithm.sha256: 199 continue 200 201 self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj) 202 203 super().add_index(obj) 204 if isinstance(obj, oe.spdx30.SpdxDocument): 205 self.doc = obj 206 alias_ext = get_alias(obj) 207 if alias_ext is not None and alias_ext.alias: 208 self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/" 209 210 def __filter_obj(self, obj, attr_filter): 211 return all(getattr(obj, k) == v for k, v in attr_filter.items()) 212 213 def foreach_filter(self, typ, *, match_subclass=True, **attr_filter): 214 for obj in self.foreach_type(typ, match_subclass=match_subclass): 215 if self.__filter_obj(obj, attr_filter): 216 yield obj 217 218 def find_filter(self, typ, *, match_subclass=True, **attr_filter): 219 for obj in self.foreach_filter( 220 typ, match_subclass=match_subclass, **attr_filter 221 ): 222 return obj 223 return None 224 225 def foreach_root(self, typ, **attr_filter): 226 for obj in self.doc.rootElement: 227 if not isinstance(obj, typ): 228 continue 229 230 if self.__filter_obj(obj, attr_filter): 231 yield obj 232 233 def find_root(self, typ, **attr_filter): 234 for obj in self.foreach_root(typ, **attr_filter): 235 return obj 236 return None 237 238 def add_root(self, obj): 239 self.add(obj) 240 self.doc.rootElement.append(obj) 241 return obj 242 243 def is_native(self): 244 for e in self.doc.extension: 245 if not isinstance(e, oe.sbom30.OEDocumentExtension): 246 continue 247 248 if e.is_native is not None: 249 return e.is_native 250 251 return False 252 253 def set_is_native(self, is_native): 254 for e in self.doc.extension: 255 if not isinstance(e, oe.sbom30.OEDocumentExtension): 256 continue 257 258 e.is_native = is_native 259 return 260 261 if is_native: 262 self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True)) 263 264 def add_aliases(self): 265 for o in self.foreach_type(oe.spdx30.Element): 266 self.set_element_alias(o) 267 268 def new_alias_id(self, obj, replace): 269 unihash = self.d.getVar("BB_UNIHASH") 270 namespace = self.get_namespace() 271 if unihash not in obj._id: 272 bb.warn(f"Unihash {unihash} not found in {obj._id}") 273 return None 274 275 if namespace not in obj._id: 276 bb.warn(f"Namespace {namespace} not found in {obj._id}") 277 return None 278 279 return obj._id.replace(unihash, "UNIHASH").replace( 280 namespace, replace + self.d.getVar("PN") 281 ) 282 283 def remove_internal_extensions(self): 284 def remove(o): 285 o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)] 286 287 for o in self.foreach_type(oe.spdx30.Element): 288 remove(o) 289 290 if self.doc: 291 remove(self.doc) 292 293 def get_namespace(self): 294 namespace_uuid = uuid.uuid5( 295 uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE") 296 ) 297 pn = self.d.getVar("PN") 298 return "%s/%s-%s" % ( 299 self.d.getVar("SPDX_NAMESPACE_PREFIX"), 300 pn, 301 str(uuid.uuid5(namespace_uuid, pn)), 302 ) 303 304 def set_element_alias(self, e): 305 if not e._id or e._id.startswith("_:"): 306 return 307 308 alias_ext = get_alias(e) 309 if alias_ext is None: 310 alias_id = self.new_alias_id(e, self.alias_prefix) 311 if alias_id is not None: 312 e.extension.append(OEIdAliasExtension(alias=alias_id)) 313 elif ( 314 alias_ext.alias 315 and not isinstance(e, oe.spdx30.SpdxDocument) 316 and not alias_ext.alias.startswith(self.alias_prefix) 317 ): 318 bb.warn( 319 f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}" 320 ) 321 322 def new_spdxid(self, *suffix, include_unihash=True): 323 items = [self.get_namespace()] 324 if include_unihash: 325 unihash = self.d.getVar("BB_UNIHASH") 326 items.append(unihash) 327 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) 328 return "/".join(items) 329 330 def new_import(self, key): 331 base = f"SPDX_IMPORTS_{key}" 332 spdxid = self.d.getVar(f"{base}_spdxid") 333 if not spdxid: 334 bb.fatal(f"{key} is not a valid SPDX_IMPORTS key") 335 336 for i in self.doc.import_: 337 if i.externalSpdxId == spdxid: 338 # Already imported 339 return spdxid 340 341 m = oe.spdx30.ExternalMap(externalSpdxId=spdxid) 342 343 uri = self.d.getVar(f"{base}_uri") 344 if uri: 345 m.locationHint = uri 346 347 for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items(): 348 value = self.d.getVar(f"{base}_hash_{pyname}") 349 if value: 350 m.verifiedUsing.append( 351 oe.spdx30.Hash( 352 algorithm=algorithm, 353 hashValue=value, 354 ) 355 ) 356 357 self.doc.import_.append(m) 358 return spdxid 359 360 def new_agent(self, varname, *, creation_info=None, add=True): 361 ref_varname = self.d.getVar(f"{varname}_ref") 362 if ref_varname: 363 if ref_varname == varname: 364 bb.fatal(f"{varname} cannot reference itself") 365 return self.new_agent(ref_varname, creation_info=creation_info) 366 367 import_key = self.d.getVar(f"{varname}_import") 368 if import_key: 369 return self.new_import(import_key) 370 371 name = self.d.getVar(f"{varname}_name") 372 if not name: 373 return None 374 375 spdxid = self.new_spdxid("agent", name) 376 agent = self.find_by_id(spdxid) 377 if agent is not None: 378 return agent 379 380 agent_type = self.d.getVar("%s_type" % varname) 381 if agent_type == "person": 382 agent = oe.spdx30.Person() 383 elif agent_type == "software": 384 agent = oe.spdx30.SoftwareAgent() 385 elif agent_type == "organization": 386 agent = oe.spdx30.Organization() 387 elif not agent_type or agent_type == "agent": 388 agent = oe.spdx30.Agent() 389 else: 390 bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname)) 391 392 agent._id = spdxid 393 agent.creationInfo = creation_info or self.doc.creationInfo 394 agent.name = name 395 396 comment = self.d.getVar("%s_comment" % varname) 397 if comment: 398 agent.comment = comment 399 400 for ( 401 pyname, 402 idtype, 403 ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items(): 404 value = self.d.getVar("%s_id_%s" % (varname, pyname)) 405 if value: 406 agent.externalIdentifier.append( 407 oe.spdx30.ExternalIdentifier( 408 externalIdentifierType=idtype, 409 identifier=value, 410 ) 411 ) 412 413 if add: 414 self.add(agent) 415 416 return agent 417 418 def new_creation_info(self): 419 creation_info = oe.spdx30.CreationInfo() 420 421 name = "%s %s" % ( 422 self.d.getVar("SPDX_TOOL_NAME"), 423 self.d.getVar("SPDX_TOOL_VERSION"), 424 ) 425 tool = self.add( 426 oe.spdx30.Tool( 427 _id=self.new_spdxid("tool", name), 428 creationInfo=creation_info, 429 name=name, 430 ) 431 ) 432 433 authors = [] 434 for a in self.d.getVar("SPDX_AUTHORS").split(): 435 varname = "SPDX_AUTHORS_%s" % a 436 author = self.new_agent(varname, creation_info=creation_info) 437 438 if not author: 439 bb.fatal("Unable to find or create author %s" % a) 440 441 authors.append(author) 442 443 creation_info.created = spdx_sde(self.d) 444 creation_info.specVersion = self.d.getVar("SPDX_VERSION") 445 creation_info.createdBy = authors 446 creation_info.createdUsing = [tool] 447 448 return creation_info 449 450 def copy_creation_info(self, copy): 451 c = oe.spdx30.CreationInfo( 452 created=spdx_sde(self.d), 453 specVersion=self.d.getVar("SPDX_VERSION"), 454 ) 455 456 for author in copy.createdBy: 457 if isinstance(author, str): 458 c.createdBy.append(author) 459 else: 460 c.createdBy.append(author._id) 461 462 for tool in copy.createdUsing: 463 if isinstance(tool, str): 464 c.createdUsing.append(tool) 465 else: 466 c.createdUsing.append(tool._id) 467 468 return c 469 470 def new_annotation(self, subject, comment, typ): 471 return self.add( 472 oe.spdx30.Annotation( 473 _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)), 474 creationInfo=self.doc.creationInfo, 475 annotationType=typ, 476 subject=subject, 477 statement=comment, 478 ) 479 ) 480 481 def _new_relationship( 482 self, 483 cls, 484 from_, 485 typ, 486 to, 487 *, 488 spdxid_name="relationship", 489 **props, 490 ): 491 from_ = to_list(from_) 492 to = to_list(to) 493 494 if not from_: 495 return [] 496 497 if not to: 498 to = [oe.spdx30.IndividualElement.NoneElement] 499 500 ret = [] 501 502 for f in from_: 503 hash_args = [typ, f] 504 for k in sorted(props.keys()): 505 hash_args.append(props[k]) 506 hash_args.extend(to) 507 508 relationship = self.add( 509 cls( 510 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)), 511 creationInfo=self.doc.creationInfo, 512 from_=f, 513 relationshipType=typ, 514 to=to, 515 **props, 516 ) 517 ) 518 ret.append(relationship) 519 520 return ret 521 522 def new_relationship(self, from_, typ, to): 523 return self._new_relationship(oe.spdx30.Relationship, from_, typ, to) 524 525 def new_scoped_relationship(self, from_, typ, scope, to): 526 return self._new_relationship( 527 oe.spdx30.LifecycleScopedRelationship, 528 from_, 529 typ, 530 to, 531 scope=scope, 532 ) 533 534 def new_license_expression( 535 self, license_expression, license_data, license_text_map={} 536 ): 537 license_list_version = license_data["licenseListVersion"] 538 # SPDX 3 requires that the license list version be a semver 539 # MAJOR.MINOR.MICRO, but the actual license version might be 540 # MAJOR.MINOR on some older versions. As such, manually append a .0 541 # micro version if its missing to keep SPDX happy 542 if license_list_version.count(".") < 2: 543 license_list_version += ".0" 544 545 spdxid = [ 546 "license", 547 license_list_version, 548 re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression), 549 ] 550 551 license_text = [ 552 (k, license_text_map[k]) for k in sorted(license_text_map.keys()) 553 ] 554 555 if not license_text: 556 lic = self.find_filter( 557 oe.spdx30.simplelicensing_LicenseExpression, 558 simplelicensing_licenseExpression=license_expression, 559 simplelicensing_licenseListVersion=license_list_version, 560 ) 561 if lic is not None: 562 return lic 563 else: 564 spdxid.append(spdxid_hash(*(v for _, v in license_text))) 565 lic = self.find_by_id(self.new_spdxid(*spdxid)) 566 if lic is not None: 567 return lic 568 569 lic = self.add( 570 oe.spdx30.simplelicensing_LicenseExpression( 571 _id=self.new_spdxid(*spdxid), 572 creationInfo=self.doc.creationInfo, 573 simplelicensing_licenseExpression=license_expression, 574 simplelicensing_licenseListVersion=license_list_version, 575 ) 576 ) 577 578 for key, value in license_text: 579 lic.simplelicensing_customIdToUri.append( 580 oe.spdx30.DictionaryEntry(key=key, value=value) 581 ) 582 583 return lic 584 585 def scan_declared_licenses(self, spdx_file, filepath, license_data): 586 for e in spdx_file.extension: 587 if isinstance(e, OELicenseScannedExtension): 588 return 589 590 file_licenses = set() 591 for extracted_lic in oe.spdx_common.extract_licenses(filepath): 592 lic = self.new_license_expression(extracted_lic, license_data) 593 self.set_element_alias(lic) 594 file_licenses.add(lic) 595 596 self.new_relationship( 597 [spdx_file], 598 oe.spdx30.RelationshipType.hasDeclaredLicense, 599 [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses], 600 ) 601 spdx_file.extension.append(OELicenseScannedExtension()) 602 603 def new_file(self, _id, name, path, *, purposes=[]): 604 sha256_hash = bb.utils.sha256_file(path) 605 606 for f in self.by_sha256_hash.get(sha256_hash, []): 607 if not isinstance(f, oe.spdx30.software_File): 608 continue 609 610 if purposes: 611 new_primary = purposes[0] 612 new_additional = [] 613 614 if f.software_primaryPurpose: 615 new_additional.append(f.software_primaryPurpose) 616 new_additional.extend(f.software_additionalPurpose) 617 618 new_additional = sorted( 619 list(set(p for p in new_additional if p != new_primary)) 620 ) 621 622 f.software_primaryPurpose = new_primary 623 f.software_additionalPurpose = new_additional 624 625 if f.name != name: 626 for e in f.extension: 627 if isinstance(e, OEFileNameAliasExtension): 628 e.aliases.append(name) 629 break 630 else: 631 f.extension.append(OEFileNameAliasExtension(aliases=[name])) 632 633 return f 634 635 spdx_file = oe.spdx30.software_File( 636 _id=_id, 637 creationInfo=self.doc.creationInfo, 638 name=name, 639 ) 640 if purposes: 641 spdx_file.software_primaryPurpose = purposes[0] 642 spdx_file.software_additionalPurpose = purposes[1:] 643 644 spdx_file.verifiedUsing.append( 645 oe.spdx30.Hash( 646 algorithm=oe.spdx30.HashAlgorithm.sha256, 647 hashValue=sha256_hash, 648 ) 649 ) 650 651 return self.add(spdx_file) 652 653 def new_cve_vuln(self, cve): 654 v = oe.spdx30.security_Vulnerability() 655 v._id = self.new_spdxid("vulnerability", cve) 656 v.creationInfo = self.doc.creationInfo 657 658 v.externalIdentifier.append( 659 oe.spdx30.ExternalIdentifier( 660 externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve, 661 identifier=cve, 662 identifierLocator=[ 663 f"https://cveawg.mitre.org/api/cve/{cve}", 664 f"https://www.cve.org/CVERecord?id={cve}", 665 ], 666 ) 667 ) 668 return self.add(v) 669 670 def new_vex_patched_relationship(self, from_, to): 671 return self._new_relationship( 672 oe.spdx30.security_VexFixedVulnAssessmentRelationship, 673 from_, 674 oe.spdx30.RelationshipType.fixedIn, 675 to, 676 spdxid_name="vex-fixed", 677 security_vexVersion=VEX_VERSION, 678 ) 679 680 def new_vex_unpatched_relationship(self, from_, to): 681 return self._new_relationship( 682 oe.spdx30.security_VexAffectedVulnAssessmentRelationship, 683 from_, 684 oe.spdx30.RelationshipType.affects, 685 to, 686 spdxid_name="vex-affected", 687 security_vexVersion=VEX_VERSION, 688 security_actionStatement="Mitigation action unknown", 689 ) 690 691 def new_vex_ignored_relationship(self, from_, to, *, impact_statement): 692 return self._new_relationship( 693 oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship, 694 from_, 695 oe.spdx30.RelationshipType.doesNotAffect, 696 to, 697 spdxid_name="vex-not-affected", 698 security_vexVersion=VEX_VERSION, 699 security_impactStatement=impact_statement, 700 ) 701 702 def import_bitbake_build_objset(self): 703 deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX")) 704 bb_objset = load_jsonld( 705 self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True 706 ) 707 self.doc.import_.extend(bb_objset.doc.import_) 708 self.update(bb_objset.objects) 709 710 return bb_objset 711 712 def import_bitbake_build(self): 713 def find_bitbake_build(objset): 714 return objset.find_filter( 715 oe.spdx30.build_Build, 716 build_buildType=SPDX_BUILD_TYPE, 717 ) 718 719 build = find_bitbake_build(self) 720 if build: 721 return build 722 723 bb_objset = self.import_bitbake_build_objset() 724 build = find_bitbake_build(bb_objset) 725 if build is None: 726 bb.fatal(f"No build found in {deploy_dir_spdx}") 727 728 return build 729 730 def new_task_build(self, name, typ): 731 current_task = self.d.getVar("BB_CURRENTTASK") 732 pn = self.d.getVar("PN") 733 734 build = self.add( 735 oe.spdx30.build_Build( 736 _id=self.new_spdxid("build", name), 737 creationInfo=self.doc.creationInfo, 738 name=f"{pn}:do_{current_task}:{name}", 739 build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}", 740 ) 741 ) 742 743 if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1": 744 bitbake_build = self.import_bitbake_build() 745 746 self.new_relationship( 747 [bitbake_build], 748 oe.spdx30.RelationshipType.ancestorOf, 749 [build], 750 ) 751 752 if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1": 753 for varname in sorted(self.d.keys()): 754 if varname.startswith("__"): 755 continue 756 757 value = self.d.getVar(varname, expand=False) 758 759 # TODO: Deal with non-string values 760 if not isinstance(value, str): 761 continue 762 763 build.build_parameter.append( 764 oe.spdx30.DictionaryEntry(key=varname, value=value) 765 ) 766 767 return build 768 769 def new_archive(self, archive_name): 770 return self.add( 771 oe.spdx30.software_File( 772 _id=self.new_spdxid("archive", str(archive_name)), 773 creationInfo=self.doc.creationInfo, 774 name=str(archive_name), 775 software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive, 776 ) 777 ) 778 779 @classmethod 780 def new_objset(cls, d, name, copy_from_bitbake_doc=True): 781 objset = cls(d) 782 783 document = oe.spdx30.SpdxDocument( 784 _id=objset.new_spdxid("document", name), 785 name=name, 786 ) 787 788 document.extension.append( 789 OEIdAliasExtension( 790 alias=objset.new_alias_id( 791 document, 792 OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/", 793 ), 794 ) 795 ) 796 objset.doc = document 797 objset.add_index(document) 798 799 if copy_from_bitbake_doc: 800 bb_objset = objset.import_bitbake_build_objset() 801 document.creationInfo = objset.copy_creation_info( 802 bb_objset.doc.creationInfo 803 ) 804 else: 805 document.creationInfo = objset.new_creation_info() 806 807 return objset 808 809 def expand_collection(self, *, add_objectsets=[]): 810 """ 811 Expands a collection to pull in all missing elements 812 813 Returns the set of ids that could not be found to link into the document 814 """ 815 missing_spdxids = set() 816 imports = {e.externalSpdxId: e for e in self.doc.import_} 817 818 def merge_doc(other): 819 nonlocal imports 820 821 for e in other.doc.import_: 822 if not e.externalSpdxId in imports: 823 imports[e.externalSpdxId] = e 824 825 self.objects |= other.objects 826 827 for o in add_objectsets: 828 merge_doc(o) 829 830 needed_spdxids = self.link() 831 provided_spdxids = set(self.obj_by_id.keys()) 832 833 while True: 834 import_spdxids = set(imports.keys()) 835 searching_spdxids = ( 836 needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids 837 ) 838 if not searching_spdxids: 839 break 840 841 spdxid = searching_spdxids.pop() 842 bb.debug( 843 1, 844 f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}", 845 ) 846 dep_objset, dep_path = find_by_spdxid(self.d, spdxid) 847 848 if dep_objset: 849 dep_provided = set(dep_objset.obj_by_id.keys()) 850 if spdxid not in dep_provided: 851 bb.fatal(f"{spdxid} not found in {dep_path}") 852 provided_spdxids |= dep_provided 853 needed_spdxids |= dep_objset.missing_ids 854 merge_doc(dep_objset) 855 else: 856 missing_spdxids.add(spdxid) 857 858 self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId) 859 bb.debug(1, "Linking...") 860 self.link() 861 862 # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry 863 # items and resolve any aliases to actual objects. 864 for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): 865 for d in lic.simplelicensing_customIdToUri: 866 if d.value.startswith(OE_ALIAS_PREFIX): 867 obj = self.find_by_id(d.value) 868 if obj is not None: 869 d.value = obj._id 870 else: 871 self.missing_ids.add(d.value) 872 873 self.missing_ids -= set(imports.keys()) 874 return self.missing_ids 875 876 877def load_jsonld(d, path, required=False): 878 deserializer = oe.spdx30.JSONLDDeserializer() 879 objset = ObjectSet(d) 880 try: 881 with path.open("rb") as f: 882 deserializer.read(f, objset) 883 except FileNotFoundError: 884 if required: 885 bb.fatal("No SPDX document named %s found" % path) 886 return None 887 888 if not objset.doc: 889 bb.fatal("SPDX Document %s has no SPDXDocument element" % path) 890 return None 891 892 objset.objects.remove(objset.doc) 893 return objset 894 895 896def jsonld_arch_path(d, arch, subdir, name, deploydir=None): 897 if deploydir is None: 898 deploydir = Path(d.getVar("DEPLOY_DIR_SPDX")) 899 return deploydir / arch / subdir / (name + ".spdx.json") 900 901 902def jsonld_hash_path(h): 903 return Path("by-spdxid-hash") / h[:2], h 904 905 906def load_jsonld_by_arch(d, arch, subdir, name, *, required=False): 907 path = jsonld_arch_path(d, arch, subdir, name) 908 objset = load_jsonld(d, path, required=required) 909 if objset is not None: 910 return (objset, path) 911 return (None, None) 912 913 914def find_jsonld(d, subdir, name, *, required=False): 915 package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split() 916 package_archs.reverse() 917 918 for arch in package_archs: 919 objset, path = load_jsonld_by_arch(d, arch, subdir, name) 920 if objset is not None: 921 return (objset, path) 922 923 if required: 924 bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name)) 925 926 return (None, None) 927 928 929def write_jsonld_doc(d, objset, dest): 930 if not isinstance(objset, ObjectSet): 931 bb.fatal("Only an ObjsetSet can be serialized") 932 return 933 934 if not objset.doc: 935 bb.fatal("ObjectSet is missing a SpdxDocument") 936 return 937 938 objset.doc.rootElement = sorted(list(set(objset.doc.rootElement))) 939 objset.doc.profileConformance = sorted( 940 list( 941 getattr(oe.spdx30.ProfileIdentifierType, p) 942 for p in d.getVar("SPDX_PROFILES").split() 943 ) 944 ) 945 946 dest.parent.mkdir(exist_ok=True, parents=True) 947 948 if d.getVar("SPDX_PRETTY") == "1": 949 serializer = oe.spdx30.JSONLDSerializer( 950 indent=2, 951 ) 952 else: 953 serializer = oe.spdx30.JSONLDInlineSerializer() 954 955 objset.objects.add(objset.doc) 956 with dest.open("wb") as f: 957 serializer.write(objset, f, force_at_graph=True) 958 objset.objects.remove(objset.doc) 959 960 961def write_recipe_jsonld_doc( 962 d, 963 objset, 964 subdir, 965 deploydir, 966 *, 967 create_spdx_id_links=True, 968): 969 pkg_arch = d.getVar("SSTATE_PKGARCH") 970 971 dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir) 972 973 def link_id(_id): 974 hash_path = jsonld_hash_path(hash_id(_id)) 975 976 link_name = jsonld_arch_path( 977 d, 978 pkg_arch, 979 *hash_path, 980 deploydir=deploydir, 981 ) 982 try: 983 link_name.parent.mkdir(exist_ok=True, parents=True) 984 link_name.symlink_to(os.path.relpath(dest, link_name.parent)) 985 except: 986 target = link_name.readlink() 987 bb.warn( 988 f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}" 989 ) 990 raise 991 992 return hash_path[-1] 993 994 objset.add_aliases() 995 996 try: 997 if create_spdx_id_links: 998 alias_ext = get_alias(objset.doc) 999 if alias_ext is not None and alias_ext.alias: 1000 alias_ext.link_name = link_id(alias_ext.alias) 1001 1002 finally: 1003 # It is really helpful for debugging if the JSON document is written 1004 # out, so always do that even if there is an error making the links 1005 write_jsonld_doc(d, objset, dest) 1006 1007 1008def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter): 1009 objset, fn = find_jsonld(d, subdir, fn_name, required=True) 1010 1011 spdx_obj = objset.find_root(obj_type, **attr_filter) 1012 if not spdx_obj: 1013 bb.fatal("No root %s found in %s" % (obj_type.__name__, fn)) 1014 1015 return spdx_obj, objset 1016 1017 1018def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter): 1019 objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True) 1020 1021 spdx_obj = objset.find_filter(obj_type, **attr_filter) 1022 if not spdx_obj: 1023 bb.fatal("No %s found in %s" % (obj_type.__name__, fn)) 1024 1025 return spdx_obj, objset 1026 1027 1028def find_by_spdxid(d, spdxid, *, required=False): 1029 if spdxid.startswith(OE_ALIAS_PREFIX): 1030 h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0] 1031 return find_jsonld(d, *jsonld_hash_path(h), required=required) 1032 return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required) 1033 1034 1035def create_sbom(d, name, root_elements, add_objectsets=[]): 1036 objset = ObjectSet.new_objset(d, name) 1037 1038 sbom = objset.add( 1039 oe.spdx30.software_Sbom( 1040 _id=objset.new_spdxid("sbom", name), 1041 name=name, 1042 creationInfo=objset.doc.creationInfo, 1043 software_sbomType=[oe.spdx30.software_SbomType.build], 1044 rootElement=root_elements, 1045 ) 1046 ) 1047 1048 missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets) 1049 if missing_spdxids: 1050 bb.warn( 1051 "The following SPDX IDs were unable to be resolved:\n " 1052 + "\n ".join(sorted(list(missing_spdxids))) 1053 ) 1054 1055 # Filter out internal extensions from final SBoMs 1056 objset.remove_internal_extensions() 1057 1058 # SBoM should be the only root element of the document 1059 objset.doc.rootElement = [sbom] 1060 1061 # De-duplicate licenses 1062 unique = set() 1063 dedup = {} 1064 for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): 1065 for u in unique: 1066 if ( 1067 u.simplelicensing_licenseExpression 1068 == lic.simplelicensing_licenseExpression 1069 and u.simplelicensing_licenseListVersion 1070 == lic.simplelicensing_licenseListVersion 1071 ): 1072 dedup[lic] = u 1073 break 1074 else: 1075 unique.add(lic) 1076 1077 if dedup: 1078 for rel in objset.foreach_filter( 1079 oe.spdx30.Relationship, 1080 relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense, 1081 ): 1082 rel.to = [dedup.get(to, to) for to in rel.to] 1083 1084 for rel in objset.foreach_filter( 1085 oe.spdx30.Relationship, 1086 relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense, 1087 ): 1088 rel.to = [dedup.get(to, to) for to in rel.to] 1089 1090 for k, v in dedup.items(): 1091 bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}") 1092 objset.objects.remove(k) 1093 1094 objset.create_index() 1095 1096 return objset, sbom 1097