xref: /openbmc/openbmc/poky/meta/lib/oe/sbom30.py (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7from pathlib import Path
8
9import oe.spdx30
10import bb
11import re
12import hashlib
13import uuid
14import os
15import oe.spdx_common
16from datetime import datetime, timezone
17
18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/"
19
20VEX_VERSION = "1.0.0"
21
22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake"
23
24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/"
25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/"
26
27
28@oe.spdx30.register(OE_SPDX_BASE + "id-alias")
29class OEIdAliasExtension(oe.spdx30.extension_Extension):
30    """
31    This extension allows an Element to provide an internal alias for the SPDX
32    ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects
33    created have a unique UUID namespace and the unihash of the task encoded in
34    their SPDX ID. However, this causes a problem for referencing documents
35    across recipes, since the taskhash of a dependency may not factor into the
36    taskhash of the current task and thus the current task won't rebuild and
37    see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and
38    tasks).
39
40    To help work around this, this extension provides a non-unique alias for an
41    Element by which it can be referenced from other tasks/recipes. When a
42    final SBoM is created, references to these aliases will be replaced with
43    the actual unique SPDX ID.
44
45    Most Elements will automatically get an alias created when they are written
46    out if they do not already have one. To suppress the creation of an alias,
47    add an extension with a blank `alias` property.
48
49
50    It is in internal extension that should be removed when writing out a final
51    SBoM
52    """
53
54    CLOSED = True
55    INTERNAL = True
56
57    @classmethod
58    def _register_props(cls):
59        super()._register_props()
60        cls._add_property(
61            "alias",
62            oe.spdx30.StringProp(),
63            OE_SPDX_BASE + "alias",
64            max_count=1,
65        )
66
67        cls._add_property(
68            "link_name",
69            oe.spdx30.StringProp(),
70            OE_SPDX_BASE + "link-name",
71            max_count=1,
72        )
73
74
75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias")
76class OEFileNameAliasExtension(oe.spdx30.extension_Extension):
77    CLOSED = True
78    INTERNAL = True
79
80    @classmethod
81    def _register_props(cls):
82        super()._register_props()
83        cls._add_property(
84            "aliases",
85            oe.spdx30.ListProp(oe.spdx30.StringProp()),
86            OE_SPDX_BASE + "filename-alias",
87        )
88
89
90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned")
91class OELicenseScannedExtension(oe.spdx30.extension_Extension):
92    """
93    The presence of this extension means the file has already been scanned for
94    license information
95    """
96
97    CLOSED = True
98    INTERNAL = True
99
100
101@oe.spdx30.register(OE_SPDX_BASE + "document-extension")
102class OEDocumentExtension(oe.spdx30.extension_Extension):
103    """
104    This extension is added to a SpdxDocument to indicate various useful bits
105    of information about its contents
106    """
107
108    CLOSED = True
109
110    @classmethod
111    def _register_props(cls):
112        super()._register_props()
113        cls._add_property(
114            "is_native",
115            oe.spdx30.BooleanProp(),
116            OE_SPDX_BASE + "is-native",
117            max_count=1,
118        )
119
120
121def spdxid_hash(*items):
122    h = hashlib.md5()
123    for i in items:
124        if isinstance(i, oe.spdx30.Element):
125            h.update(i._id.encode("utf-8"))
126        else:
127            h.update(i.encode("utf-8"))
128    return h.hexdigest()
129
130
131def spdx_sde(d):
132    sde = d.getVar("SOURCE_DATE_EPOCH")
133    if not sde:
134        return datetime.now(timezone.utc)
135
136    return datetime.fromtimestamp(int(sde), timezone.utc)
137
138
139def get_element_link_id(e):
140    """
141    Get the string ID which should be used to link to an Element. If the
142    element has an alias, that will be preferred, otherwise its SPDX ID will be
143    used.
144    """
145    ext = get_alias(e)
146    if ext is not None and ext.alias:
147        return ext.alias
148    return e._id
149
150
151def get_alias(obj):
152    for ext in obj.extension:
153        if not isinstance(ext, OEIdAliasExtension):
154            continue
155        return ext
156
157    return None
158
159
160def hash_id(_id):
161    return hashlib.sha256(_id.encode("utf-8")).hexdigest()
162
163
164def to_list(l):
165    if isinstance(l, set):
166        l = sorted(list(l))
167
168    if not isinstance(l, (list, tuple)):
169        raise TypeError("Must be a list or tuple. Got %s" % type(l))
170
171    return l
172
173
174class ObjectSet(oe.spdx30.SHACLObjectSet):
175    def __init__(self, d):
176        super().__init__()
177        self.d = d
178        self.alias_prefix = None
179
180    def create_index(self):
181        self.by_sha256_hash = {}
182        super().create_index()
183
184    def add_index(self, obj):
185        # Check that all elements are given an ID before being inserted
186        if isinstance(obj, oe.spdx30.Element):
187            if not obj._id:
188                raise ValueError("Element missing ID")
189
190            alias_ext = get_alias(obj)
191            if alias_ext is not None and alias_ext.alias:
192                self.obj_by_id[alias_ext.alias] = obj
193
194            for v in obj.verifiedUsing:
195                if not isinstance(v, oe.spdx30.Hash):
196                    continue
197
198                if v.algorithm != oe.spdx30.HashAlgorithm.sha256:
199                    continue
200
201                self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj)
202
203        super().add_index(obj)
204        if isinstance(obj, oe.spdx30.SpdxDocument):
205            self.doc = obj
206            alias_ext = get_alias(obj)
207            if alias_ext is not None and alias_ext.alias:
208                self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/"
209
210    def __filter_obj(self, obj, attr_filter):
211        return all(getattr(obj, k) == v for k, v in attr_filter.items())
212
213    def foreach_filter(self, typ, *, match_subclass=True, **attr_filter):
214        for obj in self.foreach_type(typ, match_subclass=match_subclass):
215            if self.__filter_obj(obj, attr_filter):
216                yield obj
217
218    def find_filter(self, typ, *, match_subclass=True, **attr_filter):
219        for obj in self.foreach_filter(
220            typ, match_subclass=match_subclass, **attr_filter
221        ):
222            return obj
223        return None
224
225    def foreach_root(self, typ, **attr_filter):
226        for obj in self.doc.rootElement:
227            if not isinstance(obj, typ):
228                continue
229
230            if self.__filter_obj(obj, attr_filter):
231                yield obj
232
233    def find_root(self, typ, **attr_filter):
234        for obj in self.foreach_root(typ, **attr_filter):
235            return obj
236        return None
237
238    def add_root(self, obj):
239        self.add(obj)
240        self.doc.rootElement.append(obj)
241        return obj
242
243    def is_native(self):
244        for e in self.doc.extension:
245            if not isinstance(e, oe.sbom30.OEDocumentExtension):
246                continue
247
248            if e.is_native is not None:
249                return e.is_native
250
251        return False
252
253    def set_is_native(self, is_native):
254        for e in self.doc.extension:
255            if not isinstance(e, oe.sbom30.OEDocumentExtension):
256                continue
257
258            e.is_native = is_native
259            return
260
261        if is_native:
262            self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True))
263
264    def add_aliases(self):
265        for o in self.foreach_type(oe.spdx30.Element):
266            self.set_element_alias(o)
267
268    def new_alias_id(self, obj, replace):
269        unihash = self.d.getVar("BB_UNIHASH")
270        namespace = self.get_namespace()
271        if unihash not in obj._id:
272            bb.warn(f"Unihash {unihash} not found in {obj._id}")
273            return None
274
275        if namespace not in obj._id:
276            bb.warn(f"Namespace {namespace} not found in {obj._id}")
277            return None
278
279        return obj._id.replace(unihash, "UNIHASH").replace(
280            namespace, replace + self.d.getVar("PN")
281        )
282
283    def remove_internal_extensions(self):
284        def remove(o):
285            o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)]
286
287        for o in self.foreach_type(oe.spdx30.Element):
288            remove(o)
289
290        if self.doc:
291            remove(self.doc)
292
293    def get_namespace(self):
294        namespace_uuid = uuid.uuid5(
295            uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE")
296        )
297        pn = self.d.getVar("PN")
298        return "%s/%s-%s" % (
299            self.d.getVar("SPDX_NAMESPACE_PREFIX"),
300            pn,
301            str(uuid.uuid5(namespace_uuid, pn)),
302        )
303
304    def set_element_alias(self, e):
305        if not e._id or e._id.startswith("_:"):
306            return
307
308        alias_ext = get_alias(e)
309        if alias_ext is None:
310            alias_id = self.new_alias_id(e, self.alias_prefix)
311            if alias_id is not None:
312                e.extension.append(OEIdAliasExtension(alias=alias_id))
313        elif (
314            alias_ext.alias
315            and not isinstance(e, oe.spdx30.SpdxDocument)
316            and not alias_ext.alias.startswith(self.alias_prefix)
317        ):
318            bb.warn(
319                f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}"
320            )
321
322    def new_spdxid(self, *suffix, include_unihash=True):
323        items = [self.get_namespace()]
324        if include_unihash:
325            unihash = self.d.getVar("BB_UNIHASH")
326            items.append(unihash)
327        items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix)
328        return "/".join(items)
329
330    def new_import(self, key):
331        base = f"SPDX_IMPORTS_{key}"
332        spdxid = self.d.getVar(f"{base}_spdxid")
333        if not spdxid:
334            bb.fatal(f"{key} is not a valid SPDX_IMPORTS key")
335
336        for i in self.doc.import_:
337            if i.externalSpdxId == spdxid:
338                # Already imported
339                return spdxid
340
341        m = oe.spdx30.ExternalMap(externalSpdxId=spdxid)
342
343        uri = self.d.getVar(f"{base}_uri")
344        if uri:
345            m.locationHint = uri
346
347        for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items():
348            value = self.d.getVar(f"{base}_hash_{pyname}")
349            if value:
350                m.verifiedUsing.append(
351                    oe.spdx30.Hash(
352                        algorithm=algorithm,
353                        hashValue=value,
354                    )
355                )
356
357        self.doc.import_.append(m)
358        return spdxid
359
360    def new_agent(self, varname, *, creation_info=None, add=True):
361        ref_varname = self.d.getVar(f"{varname}_ref")
362        if ref_varname:
363            if ref_varname == varname:
364                bb.fatal(f"{varname} cannot reference itself")
365            return self.new_agent(ref_varname, creation_info=creation_info)
366
367        import_key = self.d.getVar(f"{varname}_import")
368        if import_key:
369            return self.new_import(import_key)
370
371        name = self.d.getVar(f"{varname}_name")
372        if not name:
373            return None
374
375        spdxid = self.new_spdxid("agent", name)
376        agent = self.find_by_id(spdxid)
377        if agent is not None:
378            return agent
379
380        agent_type = self.d.getVar("%s_type" % varname)
381        if agent_type == "person":
382            agent = oe.spdx30.Person()
383        elif agent_type == "software":
384            agent = oe.spdx30.SoftwareAgent()
385        elif agent_type == "organization":
386            agent = oe.spdx30.Organization()
387        elif not agent_type or agent_type == "agent":
388            agent = oe.spdx30.Agent()
389        else:
390            bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname))
391
392        agent._id = spdxid
393        agent.creationInfo = creation_info or self.doc.creationInfo
394        agent.name = name
395
396        comment = self.d.getVar("%s_comment" % varname)
397        if comment:
398            agent.comment = comment
399
400        for (
401            pyname,
402            idtype,
403        ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items():
404            value = self.d.getVar("%s_id_%s" % (varname, pyname))
405            if value:
406                agent.externalIdentifier.append(
407                    oe.spdx30.ExternalIdentifier(
408                        externalIdentifierType=idtype,
409                        identifier=value,
410                    )
411                )
412
413        if add:
414            self.add(agent)
415
416        return agent
417
418    def new_creation_info(self):
419        creation_info = oe.spdx30.CreationInfo()
420
421        name = "%s %s" % (
422            self.d.getVar("SPDX_TOOL_NAME"),
423            self.d.getVar("SPDX_TOOL_VERSION"),
424        )
425        tool = self.add(
426            oe.spdx30.Tool(
427                _id=self.new_spdxid("tool", name),
428                creationInfo=creation_info,
429                name=name,
430            )
431        )
432
433        authors = []
434        for a in self.d.getVar("SPDX_AUTHORS").split():
435            varname = "SPDX_AUTHORS_%s" % a
436            author = self.new_agent(varname, creation_info=creation_info)
437
438            if not author:
439                bb.fatal("Unable to find or create author %s" % a)
440
441            authors.append(author)
442
443        creation_info.created = spdx_sde(self.d)
444        creation_info.specVersion = self.d.getVar("SPDX_VERSION")
445        creation_info.createdBy = authors
446        creation_info.createdUsing = [tool]
447
448        return creation_info
449
450    def copy_creation_info(self, copy):
451        c = oe.spdx30.CreationInfo(
452            created=spdx_sde(self.d),
453            specVersion=self.d.getVar("SPDX_VERSION"),
454        )
455
456        for author in copy.createdBy:
457            if isinstance(author, str):
458                c.createdBy.append(author)
459            else:
460                c.createdBy.append(author._id)
461
462        for tool in copy.createdUsing:
463            if isinstance(tool, str):
464                c.createdUsing.append(tool)
465            else:
466                c.createdUsing.append(tool._id)
467
468        return c
469
470    def new_annotation(self, subject, comment, typ):
471        return self.add(
472            oe.spdx30.Annotation(
473                _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)),
474                creationInfo=self.doc.creationInfo,
475                annotationType=typ,
476                subject=subject,
477                statement=comment,
478            )
479        )
480
481    def _new_relationship(
482        self,
483        cls,
484        from_,
485        typ,
486        to,
487        *,
488        spdxid_name="relationship",
489        **props,
490    ):
491        from_ = to_list(from_)
492        to = to_list(to)
493
494        if not from_:
495            return []
496
497        if not to:
498            to = [oe.spdx30.IndividualElement.NoneElement]
499
500        ret = []
501
502        for f in from_:
503            hash_args = [typ, f]
504            for k in sorted(props.keys()):
505                hash_args.append(props[k])
506            hash_args.extend(to)
507
508            relationship = self.add(
509                cls(
510                    _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)),
511                    creationInfo=self.doc.creationInfo,
512                    from_=f,
513                    relationshipType=typ,
514                    to=to,
515                    **props,
516                )
517            )
518            ret.append(relationship)
519
520        return ret
521
522    def new_relationship(self, from_, typ, to):
523        return self._new_relationship(oe.spdx30.Relationship, from_, typ, to)
524
525    def new_scoped_relationship(self, from_, typ, scope, to):
526        return self._new_relationship(
527            oe.spdx30.LifecycleScopedRelationship,
528            from_,
529            typ,
530            to,
531            scope=scope,
532        )
533
534    def new_license_expression(
535        self, license_expression, license_data, license_text_map={}
536    ):
537        license_list_version = license_data["licenseListVersion"]
538        # SPDX 3 requires that the license list version be a semver
539        # MAJOR.MINOR.MICRO, but the actual license version might be
540        # MAJOR.MINOR on some older versions. As such, manually append a .0
541        # micro version if its missing to keep SPDX happy
542        if license_list_version.count(".") < 2:
543            license_list_version += ".0"
544
545        spdxid = [
546            "license",
547            license_list_version,
548            re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression),
549        ]
550
551        license_text = [
552            (k, license_text_map[k]) for k in sorted(license_text_map.keys())
553        ]
554
555        if not license_text:
556            lic = self.find_filter(
557                oe.spdx30.simplelicensing_LicenseExpression,
558                simplelicensing_licenseExpression=license_expression,
559                simplelicensing_licenseListVersion=license_list_version,
560            )
561            if lic is not None:
562                return lic
563        else:
564            spdxid.append(spdxid_hash(*(v for _, v in license_text)))
565            lic = self.find_by_id(self.new_spdxid(*spdxid))
566            if lic is not None:
567                return lic
568
569        lic = self.add(
570            oe.spdx30.simplelicensing_LicenseExpression(
571                _id=self.new_spdxid(*spdxid),
572                creationInfo=self.doc.creationInfo,
573                simplelicensing_licenseExpression=license_expression,
574                simplelicensing_licenseListVersion=license_list_version,
575            )
576        )
577
578        for key, value in license_text:
579            lic.simplelicensing_customIdToUri.append(
580                oe.spdx30.DictionaryEntry(key=key, value=value)
581            )
582
583        return lic
584
585    def scan_declared_licenses(self, spdx_file, filepath, license_data):
586        for e in spdx_file.extension:
587            if isinstance(e, OELicenseScannedExtension):
588                return
589
590        file_licenses = set()
591        for extracted_lic in oe.spdx_common.extract_licenses(filepath):
592            lic = self.new_license_expression(extracted_lic, license_data)
593            self.set_element_alias(lic)
594            file_licenses.add(lic)
595
596        self.new_relationship(
597            [spdx_file],
598            oe.spdx30.RelationshipType.hasDeclaredLicense,
599            [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses],
600        )
601        spdx_file.extension.append(OELicenseScannedExtension())
602
603    def new_file(self, _id, name, path, *, purposes=[]):
604        sha256_hash = bb.utils.sha256_file(path)
605
606        for f in self.by_sha256_hash.get(sha256_hash, []):
607            if not isinstance(f, oe.spdx30.software_File):
608                continue
609
610            if purposes:
611                new_primary = purposes[0]
612                new_additional = []
613
614                if f.software_primaryPurpose:
615                    new_additional.append(f.software_primaryPurpose)
616                new_additional.extend(f.software_additionalPurpose)
617
618                new_additional = sorted(
619                    list(set(p for p in new_additional if p != new_primary))
620                )
621
622                f.software_primaryPurpose = new_primary
623                f.software_additionalPurpose = new_additional
624
625            if f.name != name:
626                for e in f.extension:
627                    if isinstance(e, OEFileNameAliasExtension):
628                        e.aliases.append(name)
629                        break
630                else:
631                    f.extension.append(OEFileNameAliasExtension(aliases=[name]))
632
633            return f
634
635        spdx_file = oe.spdx30.software_File(
636            _id=_id,
637            creationInfo=self.doc.creationInfo,
638            name=name,
639        )
640        if purposes:
641            spdx_file.software_primaryPurpose = purposes[0]
642            spdx_file.software_additionalPurpose = purposes[1:]
643
644        spdx_file.verifiedUsing.append(
645            oe.spdx30.Hash(
646                algorithm=oe.spdx30.HashAlgorithm.sha256,
647                hashValue=sha256_hash,
648            )
649        )
650
651        return self.add(spdx_file)
652
653    def new_cve_vuln(self, cve):
654        v = oe.spdx30.security_Vulnerability()
655        v._id = self.new_spdxid("vulnerability", cve)
656        v.creationInfo = self.doc.creationInfo
657
658        v.externalIdentifier.append(
659            oe.spdx30.ExternalIdentifier(
660                externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve,
661                identifier=cve,
662                identifierLocator=[
663                    f"https://cveawg.mitre.org/api/cve/{cve}",
664                    f"https://www.cve.org/CVERecord?id={cve}",
665                ],
666            )
667        )
668        return self.add(v)
669
670    def new_vex_patched_relationship(self, from_, to):
671        return self._new_relationship(
672            oe.spdx30.security_VexFixedVulnAssessmentRelationship,
673            from_,
674            oe.spdx30.RelationshipType.fixedIn,
675            to,
676            spdxid_name="vex-fixed",
677            security_vexVersion=VEX_VERSION,
678        )
679
680    def new_vex_unpatched_relationship(self, from_, to):
681        return self._new_relationship(
682            oe.spdx30.security_VexAffectedVulnAssessmentRelationship,
683            from_,
684            oe.spdx30.RelationshipType.affects,
685            to,
686            spdxid_name="vex-affected",
687            security_vexVersion=VEX_VERSION,
688            security_actionStatement="Mitigation action unknown",
689        )
690
691    def new_vex_ignored_relationship(self, from_, to, *, impact_statement):
692        return self._new_relationship(
693            oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship,
694            from_,
695            oe.spdx30.RelationshipType.doesNotAffect,
696            to,
697            spdxid_name="vex-not-affected",
698            security_vexVersion=VEX_VERSION,
699            security_impactStatement=impact_statement,
700        )
701
702    def import_bitbake_build_objset(self):
703        deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX"))
704        bb_objset = load_jsonld(
705            self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True
706        )
707        self.doc.import_.extend(bb_objset.doc.import_)
708        self.update(bb_objset.objects)
709
710        return bb_objset
711
712    def import_bitbake_build(self):
713        def find_bitbake_build(objset):
714            return objset.find_filter(
715                oe.spdx30.build_Build,
716                build_buildType=SPDX_BUILD_TYPE,
717            )
718
719        build = find_bitbake_build(self)
720        if build:
721            return build
722
723        bb_objset = self.import_bitbake_build_objset()
724        build = find_bitbake_build(bb_objset)
725        if build is None:
726            bb.fatal(f"No build found in {deploy_dir_spdx}")
727
728        return build
729
730    def new_task_build(self, name, typ):
731        current_task = self.d.getVar("BB_CURRENTTASK")
732        pn = self.d.getVar("PN")
733
734        build = self.add(
735            oe.spdx30.build_Build(
736                _id=self.new_spdxid("build", name),
737                creationInfo=self.doc.creationInfo,
738                name=f"{pn}:do_{current_task}:{name}",
739                build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}",
740            )
741        )
742
743        if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
744            bitbake_build = self.import_bitbake_build()
745
746            self.new_relationship(
747                [bitbake_build],
748                oe.spdx30.RelationshipType.ancestorOf,
749                [build],
750            )
751
752        if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1":
753            for varname in sorted(self.d.keys()):
754                if varname.startswith("__"):
755                    continue
756
757                value = self.d.getVar(varname, expand=False)
758
759                # TODO: Deal with non-string values
760                if not isinstance(value, str):
761                    continue
762
763                build.build_parameter.append(
764                    oe.spdx30.DictionaryEntry(key=varname, value=value)
765                )
766
767        return build
768
769    def new_archive(self, archive_name):
770        return self.add(
771            oe.spdx30.software_File(
772                _id=self.new_spdxid("archive", str(archive_name)),
773                creationInfo=self.doc.creationInfo,
774                name=str(archive_name),
775                software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
776            )
777        )
778
779    @classmethod
780    def new_objset(cls, d, name, copy_from_bitbake_doc=True):
781        objset = cls(d)
782
783        document = oe.spdx30.SpdxDocument(
784            _id=objset.new_spdxid("document", name),
785            name=name,
786        )
787
788        document.extension.append(
789            OEIdAliasExtension(
790                alias=objset.new_alias_id(
791                    document,
792                    OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/",
793                ),
794            )
795        )
796        objset.doc = document
797        objset.add_index(document)
798
799        if copy_from_bitbake_doc:
800            bb_objset = objset.import_bitbake_build_objset()
801            document.creationInfo = objset.copy_creation_info(
802                bb_objset.doc.creationInfo
803            )
804        else:
805            document.creationInfo = objset.new_creation_info()
806
807        return objset
808
809    def expand_collection(self, *, add_objectsets=[]):
810        """
811        Expands a collection to pull in all missing elements
812
813        Returns the set of ids that could not be found to link into the document
814        """
815        missing_spdxids = set()
816        imports = {e.externalSpdxId: e for e in self.doc.import_}
817
818        def merge_doc(other):
819            nonlocal imports
820
821            for e in other.doc.import_:
822                if not e.externalSpdxId in imports:
823                    imports[e.externalSpdxId] = e
824
825            self.objects |= other.objects
826
827        for o in add_objectsets:
828            merge_doc(o)
829
830        needed_spdxids = self.link()
831        provided_spdxids = set(self.obj_by_id.keys())
832
833        while True:
834            import_spdxids = set(imports.keys())
835            searching_spdxids = (
836                needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids
837            )
838            if not searching_spdxids:
839                break
840
841            spdxid = searching_spdxids.pop()
842            bb.debug(
843                1,
844                f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}",
845            )
846            dep_objset, dep_path = find_by_spdxid(self.d, spdxid)
847
848            if dep_objset:
849                dep_provided = set(dep_objset.obj_by_id.keys())
850                if spdxid not in dep_provided:
851                    bb.fatal(f"{spdxid} not found in {dep_path}")
852                provided_spdxids |= dep_provided
853                needed_spdxids |= dep_objset.missing_ids
854                merge_doc(dep_objset)
855            else:
856                missing_spdxids.add(spdxid)
857
858        self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId)
859        bb.debug(1, "Linking...")
860        self.link()
861
862        # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry
863        # items and resolve any aliases to actual objects.
864        for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
865            for d in lic.simplelicensing_customIdToUri:
866                if d.value.startswith(OE_ALIAS_PREFIX):
867                    obj = self.find_by_id(d.value)
868                    if obj is not None:
869                        d.value = obj._id
870                    else:
871                        self.missing_ids.add(d.value)
872
873        self.missing_ids -= set(imports.keys())
874        return self.missing_ids
875
876
877def load_jsonld(d, path, required=False):
878    deserializer = oe.spdx30.JSONLDDeserializer()
879    objset = ObjectSet(d)
880    try:
881        with path.open("rb") as f:
882            deserializer.read(f, objset)
883    except FileNotFoundError:
884        if required:
885            bb.fatal("No SPDX document named %s found" % path)
886        return None
887
888    if not objset.doc:
889        bb.fatal("SPDX Document %s has no SPDXDocument element" % path)
890        return None
891
892    objset.objects.remove(objset.doc)
893    return objset
894
895
896def jsonld_arch_path(d, arch, subdir, name, deploydir=None):
897    if deploydir is None:
898        deploydir = Path(d.getVar("DEPLOY_DIR_SPDX"))
899    return deploydir / arch / subdir / (name + ".spdx.json")
900
901
902def jsonld_hash_path(h):
903    return Path("by-spdxid-hash") / h[:2], h
904
905
906def load_jsonld_by_arch(d, arch, subdir, name, *, required=False):
907    path = jsonld_arch_path(d, arch, subdir, name)
908    objset = load_jsonld(d, path, required=required)
909    if objset is not None:
910        return (objset, path)
911    return (None, None)
912
913
914def find_jsonld(d, subdir, name, *, required=False):
915    package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split()
916    package_archs.reverse()
917
918    for arch in package_archs:
919        objset, path = load_jsonld_by_arch(d, arch, subdir, name)
920        if objset is not None:
921            return (objset, path)
922
923    if required:
924        bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name))
925
926    return (None, None)
927
928
929def write_jsonld_doc(d, objset, dest):
930    if not isinstance(objset, ObjectSet):
931        bb.fatal("Only an ObjsetSet can be serialized")
932        return
933
934    if not objset.doc:
935        bb.fatal("ObjectSet is missing a SpdxDocument")
936        return
937
938    objset.doc.rootElement = sorted(list(set(objset.doc.rootElement)))
939    objset.doc.profileConformance = sorted(
940        list(
941            getattr(oe.spdx30.ProfileIdentifierType, p)
942            for p in d.getVar("SPDX_PROFILES").split()
943        )
944    )
945
946    dest.parent.mkdir(exist_ok=True, parents=True)
947
948    if d.getVar("SPDX_PRETTY") == "1":
949        serializer = oe.spdx30.JSONLDSerializer(
950            indent=2,
951        )
952    else:
953        serializer = oe.spdx30.JSONLDInlineSerializer()
954
955    objset.objects.add(objset.doc)
956    with dest.open("wb") as f:
957        serializer.write(objset, f, force_at_graph=True)
958    objset.objects.remove(objset.doc)
959
960
961def write_recipe_jsonld_doc(
962    d,
963    objset,
964    subdir,
965    deploydir,
966    *,
967    create_spdx_id_links=True,
968):
969    pkg_arch = d.getVar("SSTATE_PKGARCH")
970
971    dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir)
972
973    def link_id(_id):
974        hash_path = jsonld_hash_path(hash_id(_id))
975
976        link_name = jsonld_arch_path(
977            d,
978            pkg_arch,
979            *hash_path,
980            deploydir=deploydir,
981        )
982        try:
983            link_name.parent.mkdir(exist_ok=True, parents=True)
984            link_name.symlink_to(os.path.relpath(dest, link_name.parent))
985        except:
986            target = link_name.readlink()
987            bb.warn(
988                f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}"
989            )
990            raise
991
992        return hash_path[-1]
993
994    objset.add_aliases()
995
996    try:
997        if create_spdx_id_links:
998            alias_ext = get_alias(objset.doc)
999            if alias_ext is not None and alias_ext.alias:
1000                alias_ext.link_name = link_id(alias_ext.alias)
1001
1002    finally:
1003        # It is really helpful for debugging if the JSON document is written
1004        # out, so always do that even if there is an error making the links
1005        write_jsonld_doc(d, objset, dest)
1006
1007
1008def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter):
1009    objset, fn = find_jsonld(d, subdir, fn_name, required=True)
1010
1011    spdx_obj = objset.find_root(obj_type, **attr_filter)
1012    if not spdx_obj:
1013        bb.fatal("No root %s found in %s" % (obj_type.__name__, fn))
1014
1015    return spdx_obj, objset
1016
1017
1018def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter):
1019    objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True)
1020
1021    spdx_obj = objset.find_filter(obj_type, **attr_filter)
1022    if not spdx_obj:
1023        bb.fatal("No %s found in %s" % (obj_type.__name__, fn))
1024
1025    return spdx_obj, objset
1026
1027
1028def find_by_spdxid(d, spdxid, *, required=False):
1029    if spdxid.startswith(OE_ALIAS_PREFIX):
1030        h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0]
1031        return find_jsonld(d, *jsonld_hash_path(h), required=required)
1032    return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required)
1033
1034
1035def create_sbom(d, name, root_elements, add_objectsets=[]):
1036    objset = ObjectSet.new_objset(d, name)
1037
1038    sbom = objset.add(
1039        oe.spdx30.software_Sbom(
1040            _id=objset.new_spdxid("sbom", name),
1041            name=name,
1042            creationInfo=objset.doc.creationInfo,
1043            software_sbomType=[oe.spdx30.software_SbomType.build],
1044            rootElement=root_elements,
1045        )
1046    )
1047
1048    missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets)
1049    if missing_spdxids:
1050        bb.warn(
1051            "The following SPDX IDs were unable to be resolved:\n  "
1052            + "\n  ".join(sorted(list(missing_spdxids)))
1053        )
1054
1055    # Filter out internal extensions from final SBoMs
1056    objset.remove_internal_extensions()
1057
1058    # SBoM should be the only root element of the document
1059    objset.doc.rootElement = [sbom]
1060
1061    # De-duplicate licenses
1062    unique = set()
1063    dedup = {}
1064    for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
1065        for u in unique:
1066            if (
1067                u.simplelicensing_licenseExpression
1068                == lic.simplelicensing_licenseExpression
1069                and u.simplelicensing_licenseListVersion
1070                == lic.simplelicensing_licenseListVersion
1071            ):
1072                dedup[lic] = u
1073                break
1074        else:
1075            unique.add(lic)
1076
1077    if dedup:
1078        for rel in objset.foreach_filter(
1079            oe.spdx30.Relationship,
1080            relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense,
1081        ):
1082            rel.to = [dedup.get(to, to) for to in rel.to]
1083
1084        for rel in objset.foreach_filter(
1085            oe.spdx30.Relationship,
1086            relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense,
1087        ):
1088            rel.to = [dedup.get(to, to) for to in rel.to]
1089
1090        for k, v in dedup.items():
1091            bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}")
1092            objset.objects.remove(k)
1093
1094        objset.create_index()
1095
1096    return objset, sbom
1097