xref: /openbmc/openbmc/poky/meta/lib/oe/sbom30.py (revision 8460358c)
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7from pathlib import Path
8
9import oe.spdx30
10import bb
11import re
12import hashlib
13import uuid
14import os
15import oe.spdx_common
16from datetime import datetime, timezone
17
18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/"
19
20VEX_VERSION = "1.0.0"
21
22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake"
23
24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/"
25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/"
26
27
28@oe.spdx30.register(OE_SPDX_BASE + "id-alias")
29class OEIdAliasExtension(oe.spdx30.extension_Extension):
30    """
31    This extension allows an Element to provide an internal alias for the SPDX
32    ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects
33    created have a unique UUID namespace and the unihash of the task encoded in
34    their SPDX ID. However, this causes a problem for referencing documents
35    across recipes, since the taskhash of a dependency may not factor into the
36    taskhash of the current task and thus the current task won't rebuild and
37    see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and
38    tasks).
39
40    To help work around this, this extension provides a non-unique alias for an
41    Element by which it can be referenced from other tasks/recipes. When a
42    final SBoM is created, references to these aliases will be replaced with
43    the actual unique SPDX ID.
44
45    Most Elements will automatically get an alias created when they are written
46    out if they do not already have one. To suppress the creation of an alias,
47    add an extension with a blank `alias` property.
48
49
50    It is in internal extension that should be removed when writing out a final
51    SBoM
52    """
53
54    CLOSED = True
55    INTERNAL = True
56
57    @classmethod
58    def _register_props(cls):
59        super()._register_props()
60        cls._add_property(
61            "alias",
62            oe.spdx30.StringProp(),
63            OE_SPDX_BASE + "alias",
64            max_count=1,
65        )
66
67        cls._add_property(
68            "link_name",
69            oe.spdx30.StringProp(),
70            OE_SPDX_BASE + "link-name",
71            max_count=1,
72        )
73
74
75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias")
76class OEFileNameAliasExtension(oe.spdx30.extension_Extension):
77    CLOSED = True
78    INTERNAL = True
79
80    @classmethod
81    def _register_props(cls):
82        super()._register_props()
83        cls._add_property(
84            "aliases",
85            oe.spdx30.ListProp(oe.spdx30.StringProp()),
86            OE_SPDX_BASE + "filename-alias",
87        )
88
89
90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned")
91class OELicenseScannedExtension(oe.spdx30.extension_Extension):
92    """
93    The presence of this extension means the file has already been scanned for
94    license information
95    """
96
97    CLOSED = True
98    INTERNAL = True
99
100
101@oe.spdx30.register(OE_SPDX_BASE + "document-extension")
102class OEDocumentExtension(oe.spdx30.extension_Extension):
103    """
104    This extension is added to a SpdxDocument to indicate various useful bits
105    of information about its contents
106    """
107
108    CLOSED = True
109
110    @classmethod
111    def _register_props(cls):
112        super()._register_props()
113        cls._add_property(
114            "is_native",
115            oe.spdx30.BooleanProp(),
116            OE_SPDX_BASE + "is-native",
117            max_count=1,
118        )
119
120
121def spdxid_hash(*items):
122    h = hashlib.md5()
123    for i in items:
124        if isinstance(i, oe.spdx30.Element):
125            h.update(i._id.encode("utf-8"))
126        else:
127            h.update(i.encode("utf-8"))
128    return h.hexdigest()
129
130
131def spdx_sde(d):
132    sde = d.getVar("SOURCE_DATE_EPOCH")
133    if not sde:
134        return datetime.now(timezone.utc)
135
136    return datetime.fromtimestamp(int(sde), timezone.utc)
137
138
139def get_element_link_id(e):
140    """
141    Get the string ID which should be used to link to an Element. If the
142    element has an alias, that will be preferred, otherwise its SPDX ID will be
143    used.
144    """
145    ext = get_alias(e)
146    if ext is not None and ext.alias:
147        return ext.alias
148    return e._id
149
150
151def get_alias(obj):
152    for ext in obj.extension:
153        if not isinstance(ext, OEIdAliasExtension):
154            continue
155        return ext
156
157    return None
158
159
160def hash_id(_id):
161    return hashlib.sha256(_id.encode("utf-8")).hexdigest()
162
163
164def to_list(l):
165    if isinstance(l, set):
166        l = sorted(list(l))
167
168    if not isinstance(l, (list, tuple)):
169        raise TypeError("Must be a list or tuple. Got %s" % type(l))
170
171    return l
172
173
174class ObjectSet(oe.spdx30.SHACLObjectSet):
175    def __init__(self, d):
176        super().__init__()
177        self.d = d
178        self.alias_prefix = None
179
180    def create_index(self):
181        self.by_sha256_hash = {}
182        super().create_index()
183
184    def add_index(self, obj):
185        # Check that all elements are given an ID before being inserted
186        if isinstance(obj, oe.spdx30.Element):
187            if not obj._id:
188                raise ValueError("Element missing ID")
189
190            alias_ext = get_alias(obj)
191            if alias_ext is not None and alias_ext.alias:
192                self.obj_by_id[alias_ext.alias] = obj
193
194            for v in obj.verifiedUsing:
195                if not isinstance(v, oe.spdx30.Hash):
196                    continue
197
198                if v.algorithm == oe.spdx30.HashAlgorithm.sha256:
199                    continue
200
201                self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj)
202
203        super().add_index(obj)
204        if isinstance(obj, oe.spdx30.SpdxDocument):
205            self.doc = obj
206            alias_ext = get_alias(obj)
207            if alias_ext is not None and alias_ext.alias:
208                self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/"
209
210    def __filter_obj(self, obj, attr_filter):
211        return all(getattr(obj, k) == v for k, v in attr_filter.items())
212
213    def foreach_filter(self, typ, *, match_subclass=True, **attr_filter):
214        for obj in self.foreach_type(typ, match_subclass=match_subclass):
215            if self.__filter_obj(obj, attr_filter):
216                yield obj
217
218    def find_filter(self, typ, *, match_subclass=True, **attr_filter):
219        for obj in self.foreach_filter(
220            typ, match_subclass=match_subclass, **attr_filter
221        ):
222            return obj
223        return None
224
225    def foreach_root(self, typ, **attr_filter):
226        for obj in self.doc.rootElement:
227            if not isinstance(obj, typ):
228                continue
229
230            if self.__filter_obj(obj, attr_filter):
231                yield obj
232
233    def find_root(self, typ, **attr_filter):
234        for obj in self.foreach_root(typ, **attr_filter):
235            return obj
236        return None
237
238    def add_root(self, obj):
239        self.add(obj)
240        self.doc.rootElement.append(obj)
241        return obj
242
243    def is_native(self):
244        for e in self.doc.extension:
245            if not isinstance(e, oe.sbom30.OEDocumentExtension):
246                continue
247
248            if e.is_native is not None:
249                return e.is_native
250
251        return False
252
253    def set_is_native(self, is_native):
254        for e in self.doc.extension:
255            if not isinstance(e, oe.sbom30.OEDocumentExtension):
256                continue
257
258            e.is_native = is_native
259            return
260
261        if is_native:
262            self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True))
263
264    def add_aliases(self):
265        for o in self.foreach_type(oe.spdx30.Element):
266            self.set_element_alias(o)
267
268    def new_alias_id(self, obj, replace):
269        unihash = self.d.getVar("BB_UNIHASH")
270        namespace = self.get_namespace() + "/"
271        if unihash not in obj._id:
272            bb.warn(f"Unihash {unihash} not found in {obj._id}")
273            return None
274
275        if namespace not in obj._id:
276            bb.warn(f"Namespace {namespace} not found in {obj._id}")
277            return None
278
279        return obj._id.replace(unihash, "UNIHASH").replace(
280            namespace, replace + self.d.getVar("PN")
281        )
282
283    def remove_internal_extensions(self):
284        def remove(o):
285            o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)]
286
287        for o in self.foreach_type(oe.spdx30.Element):
288            remove(o)
289
290        if self.doc:
291            remove(self.doc)
292
293    def get_namespace(self):
294        namespace_uuid = uuid.uuid5(
295            uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE")
296        )
297        pn = self.d.getVar("PN")
298        return "%s/%s-%s" % (
299            self.d.getVar("SPDX_NAMESPACE_PREFIX"),
300            pn,
301            str(uuid.uuid5(namespace_uuid, pn)),
302        )
303
304    def set_element_alias(self, e):
305        if not e._id or e._id.startswith("_:"):
306            return
307
308        alias_ext = get_alias(e)
309        if alias_ext is None:
310            alias_id = self.new_alias_id(e, self.alias_prefix)
311            if alias_id is not None:
312                e.extension.append(OEIdAliasExtension(alias=alias_id))
313        elif (
314            alias_ext.alias
315            and not isinstance(e, oe.spdx30.SpdxDocument)
316            and not alias_ext.alias.startswith(self.alias_prefix)
317        ):
318            bb.warn(
319                f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}"
320            )
321
322    def new_spdxid(self, *suffix, include_unihash=True):
323        items = [self.get_namespace()]
324        if include_unihash:
325            unihash = self.d.getVar("BB_UNIHASH")
326            items.append(unihash)
327        items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix)
328        return "/".join(items)
329
330    def new_import(self, key):
331        base = f"SPDX_IMPORTS_{key}"
332        spdxid = self.d.getVar(f"{base}_spdxid")
333        if not spdxid:
334            bb.fatal(f"{key} is not a valid SPDX_IMPORTS key")
335
336        for i in self.doc.import_:
337            if i.externalSpdxId == spdxid:
338                # Already imported
339                return spdxid
340
341        m = oe.spdx30.ExternalMap(externalSpdxId=spdxid)
342
343        uri = self.d.getVar(f"{base}_uri")
344        if uri:
345            m.locationHint = uri
346
347        for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items():
348            value = self.d.getVar(f"{base}_hash_{pyname}")
349            if value:
350                m.verifiedUsing.append(
351                    oe.spdx30.Hash(
352                        algorithm=algorithm,
353                        hashValue=value,
354                    )
355                )
356
357        self.doc.import_.append(m)
358        return spdxid
359
360    def new_agent(self, varname, *, creation_info=None, add=True):
361        ref_varname = self.d.getVar(f"{varname}_ref")
362        if ref_varname:
363            if ref_varname == varname:
364                bb.fatal(f"{varname} cannot reference itself")
365            return self.new_agent(ref_varname, creation_info=creation_info)
366
367        import_key = self.d.getVar(f"{varname}_import")
368        if import_key:
369            return self.new_import(import_key)
370
371        name = self.d.getVar(f"{varname}_name")
372        if not name:
373            return None
374
375        spdxid = self.new_spdxid("agent", name)
376        agent = self.find_by_id(spdxid)
377        if agent is not None:
378            return agent
379
380        agent_type = self.d.getVar("%s_type" % varname)
381        if agent_type == "person":
382            agent = oe.spdx30.Person()
383        elif agent_type == "software":
384            agent = oe.spdx30.SoftwareAgent()
385        elif agent_type == "organization":
386            agent = oe.spdx30.Organization()
387        elif not agent_type or agent_type == "agent":
388            agent = oe.spdx30.Agent()
389        else:
390            bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname))
391
392        agent._id = spdxid
393        agent.creationInfo = creation_info or self.doc.creationInfo
394        agent.name = name
395
396        comment = self.d.getVar("%s_comment" % varname)
397        if comment:
398            agent.comment = comment
399
400        for (
401            pyname,
402            idtype,
403        ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items():
404            value = self.d.getVar("%s_id_%s" % (varname, pyname))
405            if value:
406                agent.externalIdentifier.append(
407                    oe.spdx30.ExternalIdentifier(
408                        externalIdentifierType=idtype,
409                        identifier=value,
410                    )
411                )
412
413        if add:
414            self.add(agent)
415
416        return agent
417
418    def new_creation_info(self):
419        creation_info = oe.spdx30.CreationInfo()
420
421        name = "%s %s" % (
422            self.d.getVar("SPDX_TOOL_NAME"),
423            self.d.getVar("SPDX_TOOL_VERSION"),
424        )
425        tool = self.add(
426            oe.spdx30.Tool(
427                _id=self.new_spdxid("tool", name),
428                creationInfo=creation_info,
429                name=name,
430            )
431        )
432
433        authors = []
434        for a in self.d.getVar("SPDX_AUTHORS").split():
435            varname = "SPDX_AUTHORS_%s" % a
436            author = self.new_agent(varname, creation_info=creation_info)
437
438            if not author:
439                bb.fatal("Unable to find or create author %s" % a)
440
441            authors.append(author)
442
443        creation_info.created = spdx_sde(self.d)
444        creation_info.specVersion = self.d.getVar("SPDX_VERSION")
445        creation_info.createdBy = authors
446        creation_info.createdUsing = [tool]
447
448        return creation_info
449
450    def copy_creation_info(self, copy):
451        c = oe.spdx30.CreationInfo(
452            created=spdx_sde(self.d),
453            specVersion=self.d.getVar("SPDX_VERSION"),
454        )
455
456        for author in copy.createdBy:
457            if isinstance(author, str):
458                c.createdBy.append(author)
459            else:
460                c.createdBy.append(author._id)
461
462        for tool in copy.createdUsing:
463            if isinstance(tool, str):
464                c.createdUsing.append(tool)
465            else:
466                c.createdUsing.append(tool._id)
467
468        return c
469
470    def new_annotation(self, subject, comment, typ):
471        return self.add(
472            oe.spdx30.Annotation(
473                _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)),
474                creationInfo=self.doc.creationInfo,
475                annotationType=typ,
476                subject=subject,
477                statement=comment,
478            )
479        )
480
481    def _new_relationship(
482        self,
483        cls,
484        from_,
485        typ,
486        to,
487        *,
488        spdxid_name="relationship",
489        **props,
490    ):
491        from_ = to_list(from_)
492        to = to_list(to)
493
494        if not from_:
495            return []
496
497        if not to:
498            to = [oe.spdx30.Element.NoneElement]
499
500        ret = []
501
502        for f in from_:
503            hash_args = [typ, f]
504            for k in sorted(props.keys()):
505                hash_args.append(props[k])
506            hash_args.extend(to)
507
508            relationship = self.add(
509                cls(
510                    _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)),
511                    creationInfo=self.doc.creationInfo,
512                    from_=f,
513                    relationshipType=typ,
514                    to=to,
515                    **props,
516                )
517            )
518            ret.append(relationship)
519
520        return ret
521
522    def new_relationship(self, from_, typ, to):
523        return self._new_relationship(oe.spdx30.Relationship, from_, typ, to)
524
525    def new_scoped_relationship(self, from_, typ, scope, to):
526        return self._new_relationship(
527            oe.spdx30.LifecycleScopedRelationship,
528            from_,
529            typ,
530            to,
531            scope=scope,
532        )
533
534    def new_license_expression(
535        self, license_expression, license_data, license_text_map={}
536    ):
537        license_list_version = license_data["licenseListVersion"]
538        # SPDX 3 requires that the license list version be a semver
539        # MAJOR.MINOR.MICRO, but the actual license version might be
540        # MAJOR.MINOR on some older versions. As such, manually append a .0
541        # micro version if its missing to keep SPDX happy
542        if license_list_version.count(".") < 2:
543            license_list_version += ".0"
544
545        spdxid = [
546            "license",
547            license_list_version,
548            re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression),
549        ]
550
551        license_text = [
552            (k, license_text_map[k]) for k in sorted(license_text_map.keys())
553        ]
554
555        if not license_text:
556            lic = self.find_filter(
557                oe.spdx30.simplelicensing_LicenseExpression,
558                simplelicensing_licenseExpression=license_expression,
559                simplelicensing_licenseListVersion=license_list_version,
560            )
561            if lic is not None:
562                return lic
563        else:
564            spdxid.append(spdxid_hash(*(v for _, v in license_text)))
565            lic = self.find_by_id(self.new_spdxid(*spdxid))
566            if lic is not None:
567                return lic
568
569        lic = self.add(
570            oe.spdx30.simplelicensing_LicenseExpression(
571                _id=self.new_spdxid(*spdxid),
572                creationInfo=self.doc.creationInfo,
573                simplelicensing_licenseExpression=license_expression,
574                simplelicensing_licenseListVersion=license_list_version,
575            )
576        )
577
578        for key, value in license_text:
579            lic.simplelicensing_customIdToUri.append(
580                oe.spdx30.DictionaryEntry(key=key, value=value)
581            )
582
583        return lic
584
585    def scan_declared_licenses(self, spdx_file, filepath, license_data):
586        for e in spdx_file.extension:
587            if isinstance(e, OELicenseScannedExtension):
588                return
589
590        file_licenses = set()
591        for extracted_lic in oe.spdx_common.extract_licenses(filepath):
592            file_licenses.add(self.new_license_expression(extracted_lic, license_data))
593
594        self.new_relationship(
595            [spdx_file],
596            oe.spdx30.RelationshipType.hasDeclaredLicense,
597            file_licenses,
598        )
599        spdx_file.extension.append(OELicenseScannedExtension())
600
601    def new_file(self, _id, name, path, *, purposes=[]):
602        sha256_hash = bb.utils.sha256_file(path)
603
604        for f in self.by_sha256_hash.get(sha256_hash, []):
605            if not isinstance(f, oe.spdx30.software_File):
606                continue
607
608            if purposes:
609                new_primary = purposes[0]
610                new_additional = []
611
612                if f.software_primaryPurpose:
613                    new_additional.append(f.software_primaryPurpose)
614                new_additional.extend(f.software_additionalPurpose)
615
616                new_additional = sorted(
617                    list(set(p for p in new_additional if p != new_primary))
618                )
619
620                f.software_primaryPurpose = new_primary
621                f.software_additionalPurpose = new_additional
622
623            if f.name != name:
624                for e in f.extension:
625                    if isinstance(e, OEFileNameAliasExtension):
626                        e.aliases.append(name)
627                        break
628                else:
629                    f.extension.append(OEFileNameAliasExtension(aliases=[name]))
630
631            return f
632
633        spdx_file = oe.spdx30.software_File(
634            _id=_id,
635            creationInfo=self.doc.creationInfo,
636            name=name,
637        )
638        if purposes:
639            spdx_file.software_primaryPurpose = purposes[0]
640            spdx_file.software_additionalPurpose = purposes[1:]
641
642        spdx_file.verifiedUsing.append(
643            oe.spdx30.Hash(
644                algorithm=oe.spdx30.HashAlgorithm.sha256,
645                hashValue=sha256_hash,
646            )
647        )
648
649        return self.add(spdx_file)
650
651    def new_cve_vuln(self, cve):
652        v = oe.spdx30.security_Vulnerability()
653        v._id = self.new_spdxid("vulnerability", cve)
654        v.creationInfo = self.doc.creationInfo
655
656        v.externalIdentifier.append(
657            oe.spdx30.ExternalIdentifier(
658                externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve,
659                identifier=cve,
660                identifierLocator=[
661                    f"https://cveawg.mitre.org/api/cve/{cve}",
662                    f"https://www.cve.org/CVERecord?id={cve}",
663                ],
664            )
665        )
666        return self.add(v)
667
668    def new_vex_patched_relationship(self, from_, to):
669        return self._new_relationship(
670            oe.spdx30.security_VexFixedVulnAssessmentRelationship,
671            from_,
672            oe.spdx30.RelationshipType.fixedIn,
673            to,
674            spdxid_name="vex-fixed",
675            security_vexVersion=VEX_VERSION,
676        )
677
678    def new_vex_unpatched_relationship(self, from_, to):
679        return self._new_relationship(
680            oe.spdx30.security_VexAffectedVulnAssessmentRelationship,
681            from_,
682            oe.spdx30.RelationshipType.affects,
683            to,
684            spdxid_name="vex-affected",
685            security_vexVersion=VEX_VERSION,
686        )
687
688    def new_vex_ignored_relationship(self, from_, to, *, impact_statement):
689        return self._new_relationship(
690            oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship,
691            from_,
692            oe.spdx30.RelationshipType.doesNotAffect,
693            to,
694            spdxid_name="vex-not-affected",
695            security_vexVersion=VEX_VERSION,
696            security_impactStatement=impact_statement,
697        )
698
699    def import_bitbake_build_objset(self):
700        deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX"))
701        bb_objset = load_jsonld(
702            self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True
703        )
704        self.doc.import_.extend(bb_objset.doc.import_)
705        self.update(bb_objset.objects)
706
707        return bb_objset
708
709    def import_bitbake_build(self):
710        def find_bitbake_build(objset):
711            return objset.find_filter(
712                oe.spdx30.build_Build,
713                build_buildType=SPDX_BUILD_TYPE,
714            )
715
716        build = find_bitbake_build(self)
717        if build:
718            return build
719
720        bb_objset = self.import_bitbake_build_objset()
721        build = find_bitbake_build(bb_objset)
722        if build is None:
723            bb.fatal(f"No build found in {deploy_dir_spdx}")
724
725        return build
726
727    def new_task_build(self, name, typ):
728        current_task = self.d.getVar("BB_CURRENTTASK")
729        pn = self.d.getVar("PN")
730
731        build = self.add(
732            oe.spdx30.build_Build(
733                _id=self.new_spdxid("build", name),
734                creationInfo=self.doc.creationInfo,
735                name=f"{pn}:do_{current_task}:{name}",
736                build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}",
737            )
738        )
739
740        if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
741            bitbake_build = self.import_bitbake_build()
742
743            self.new_relationship(
744                [bitbake_build],
745                oe.spdx30.RelationshipType.ancestorOf,
746                [build],
747            )
748
749        if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1":
750            for varname in sorted(self.d.keys()):
751                if varname.startswith("__"):
752                    continue
753
754                value = self.d.getVar(varname, expand=False)
755
756                # TODO: Deal with non-string values
757                if not isinstance(value, str):
758                    continue
759
760                build.build_parameter.append(
761                    oe.spdx30.DictionaryEntry(key=varname, value=value)
762                )
763
764        return build
765
766    def new_archive(self, archive_name):
767        return self.add(
768            oe.spdx30.software_File(
769                _id=self.new_spdxid("archive", str(archive_name)),
770                creationInfo=self.doc.creationInfo,
771                name=str(archive_name),
772                software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
773            )
774        )
775
776    @classmethod
777    def new_objset(cls, d, name, copy_from_bitbake_doc=True):
778        objset = cls(d)
779
780        document = oe.spdx30.SpdxDocument(
781            _id=objset.new_spdxid("document", name),
782            name=name,
783        )
784
785        document.extension.append(
786            OEIdAliasExtension(
787                alias=objset.new_alias_id(
788                    document,
789                    OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/",
790                ),
791            )
792        )
793        objset.doc = document
794        objset.add_index(document)
795
796        if copy_from_bitbake_doc:
797            bb_objset = objset.import_bitbake_build_objset()
798            document.creationInfo = objset.copy_creation_info(
799                bb_objset.doc.creationInfo
800            )
801        else:
802            document.creationInfo = objset.new_creation_info()
803
804        return objset
805
806    def expand_collection(self, *, add_objectsets=[]):
807        """
808        Expands a collection to pull in all missing elements
809
810        Returns the set of ids that could not be found to link into the document
811        """
812        missing_spdxids = set()
813        imports = {e.externalSpdxId: e for e in self.doc.import_}
814
815        def merge_doc(other):
816            nonlocal imports
817
818            for e in other.doc.import_:
819                if not e.externalSpdxId in imports:
820                    imports[e.externalSpdxId] = e
821
822            self.objects |= other.objects
823
824        for o in add_objectsets:
825            merge_doc(o)
826
827        needed_spdxids = self.link()
828        provided_spdxids = set(self.obj_by_id.keys())
829
830        while True:
831            import_spdxids = set(imports.keys())
832            searching_spdxids = (
833                needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids
834            )
835            if not searching_spdxids:
836                break
837
838            spdxid = searching_spdxids.pop()
839            bb.debug(
840                1,
841                f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}",
842            )
843            dep_objset, dep_path = find_by_spdxid(self.d, spdxid)
844
845            if dep_objset:
846                dep_provided = set(dep_objset.obj_by_id.keys())
847                if spdxid not in dep_provided:
848                    bb.fatal(f"{spdxid} not found in {dep_path}")
849                provided_spdxids |= dep_provided
850                needed_spdxids |= dep_objset.missing_ids
851                merge_doc(dep_objset)
852            else:
853                missing_spdxids.add(spdxid)
854
855        self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId)
856        bb.debug(1, "Linking...")
857        self.link()
858        self.missing_ids -= set(imports.keys())
859        return self.missing_ids
860
861
862def load_jsonld(d, path, required=False):
863    deserializer = oe.spdx30.JSONLDDeserializer()
864    objset = ObjectSet(d)
865    try:
866        with path.open("rb") as f:
867            deserializer.read(f, objset)
868    except FileNotFoundError:
869        if required:
870            bb.fatal("No SPDX document named %s found" % path)
871        return None
872
873    if not objset.doc:
874        bb.fatal("SPDX Document %s has no SPDXDocument element" % path)
875        return None
876
877    objset.objects.remove(objset.doc)
878    return objset
879
880
881def jsonld_arch_path(d, arch, subdir, name, deploydir=None):
882    if deploydir is None:
883        deploydir = Path(d.getVar("DEPLOY_DIR_SPDX"))
884    return deploydir / arch / subdir / (name + ".spdx.json")
885
886
887def jsonld_hash_path(h):
888    return Path("by-spdxid-hash") / h[:2], h
889
890
891def load_jsonld_by_arch(d, arch, subdir, name, *, required=False):
892    path = jsonld_arch_path(d, arch, subdir, name)
893    objset = load_jsonld(d, path, required=required)
894    if objset is not None:
895        return (objset, path)
896    return (None, None)
897
898
899def find_jsonld(d, subdir, name, *, required=False):
900    package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split()
901    package_archs.reverse()
902
903    for arch in package_archs:
904        objset, path = load_jsonld_by_arch(d, arch, subdir, name)
905        if objset is not None:
906            return (objset, path)
907
908    if required:
909        bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name))
910
911    return (None, None)
912
913
914def write_jsonld_doc(d, objset, dest):
915    if not isinstance(objset, ObjectSet):
916        bb.fatal("Only an ObjsetSet can be serialized")
917        return
918
919    if not objset.doc:
920        bb.fatal("ObjectSet is missing a SpdxDocument")
921        return
922
923    objset.doc.rootElement = sorted(list(set(objset.doc.rootElement)))
924    objset.doc.profileConformance = sorted(
925        list(
926            getattr(oe.spdx30.ProfileIdentifierType, p)
927            for p in d.getVar("SPDX_PROFILES").split()
928        )
929    )
930
931    dest.parent.mkdir(exist_ok=True, parents=True)
932
933    if d.getVar("SPDX_PRETTY") == "1":
934        serializer = oe.spdx30.JSONLDSerializer(
935            indent=2,
936        )
937    else:
938        serializer = oe.spdx30.JSONLDInlineSerializer()
939
940    objset.objects.add(objset.doc)
941    with dest.open("wb") as f:
942        serializer.write(objset, f, force_at_graph=True)
943    objset.objects.remove(objset.doc)
944
945
946def write_recipe_jsonld_doc(
947    d,
948    objset,
949    subdir,
950    deploydir,
951    *,
952    create_spdx_id_links=True,
953):
954    pkg_arch = d.getVar("SSTATE_PKGARCH")
955
956    dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir)
957
958    def link_id(_id):
959        hash_path = jsonld_hash_path(hash_id(_id))
960
961        link_name = jsonld_arch_path(
962            d,
963            pkg_arch,
964            *hash_path,
965            deploydir=deploydir,
966        )
967        try:
968            link_name.parent.mkdir(exist_ok=True, parents=True)
969            link_name.symlink_to(os.path.relpath(dest, link_name.parent))
970        except:
971            target = link_name.readlink()
972            bb.warn(
973                f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}"
974            )
975            raise
976
977        return hash_path[-1]
978
979    objset.add_aliases()
980
981    try:
982        if create_spdx_id_links:
983            alias_ext = get_alias(objset.doc)
984            if alias_ext is not None and alias_ext.alias:
985                alias_ext.link_name = link_id(alias_ext.alias)
986
987    finally:
988        # It is really helpful for debugging if the JSON document is written
989        # out, so always do that even if there is an error making the links
990        write_jsonld_doc(d, objset, dest)
991
992
993def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter):
994    objset, fn = find_jsonld(d, subdir, fn_name, required=True)
995
996    spdx_obj = objset.find_root(obj_type, **attr_filter)
997    if not spdx_obj:
998        bb.fatal("No root %s found in %s" % (obj_type.__name__, fn))
999
1000    return spdx_obj, objset
1001
1002
1003def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter):
1004    objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True)
1005
1006    spdx_obj = objset.find_filter(obj_type, **attr_filter)
1007    if not spdx_obj:
1008        bb.fatal("No %s found in %s" % (obj_type.__name__, fn))
1009
1010    return spdx_obj, objset
1011
1012
1013def find_by_spdxid(d, spdxid, *, required=False):
1014    if spdxid.startswith(OE_ALIAS_PREFIX):
1015        h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0]
1016        return find_jsonld(d, *jsonld_hash_path(h), required=required)
1017    return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required)
1018
1019
1020def create_sbom(d, name, root_elements, add_objectsets=[]):
1021    objset = ObjectSet.new_objset(d, name)
1022
1023    sbom = objset.add(
1024        oe.spdx30.software_Sbom(
1025            _id=objset.new_spdxid("sbom", name),
1026            name=name,
1027            creationInfo=objset.doc.creationInfo,
1028            software_sbomType=[oe.spdx30.software_SbomType.build],
1029            rootElement=root_elements,
1030        )
1031    )
1032
1033    missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets)
1034    if missing_spdxids:
1035        bb.warn(
1036            "The following SPDX IDs were unable to be resolved:\n  "
1037            + "\n  ".join(sorted(list(missing_spdxids)))
1038        )
1039
1040    # Filter out internal extensions from final SBoMs
1041    objset.remove_internal_extensions()
1042
1043    # SBoM should be the only root element of the document
1044    objset.doc.rootElement = [sbom]
1045
1046    # De-duplicate licenses
1047    unique = set()
1048    dedup = {}
1049    for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
1050        for u in unique:
1051            if (
1052                u.simplelicensing_licenseExpression
1053                == lic.simplelicensing_licenseExpression
1054                and u.simplelicensing_licenseListVersion
1055                == lic.simplelicensing_licenseListVersion
1056            ):
1057                dedup[lic] = u
1058                break
1059        else:
1060            unique.add(lic)
1061
1062    if dedup:
1063        for rel in objset.foreach_filter(
1064            oe.spdx30.Relationship,
1065            relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense,
1066        ):
1067            rel.to = [dedup.get(to, to) for to in rel.to]
1068
1069        for rel in objset.foreach_filter(
1070            oe.spdx30.Relationship,
1071            relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense,
1072        ):
1073            rel.to = [dedup.get(to, to) for to in rel.to]
1074
1075        for k, v in dedup.items():
1076            bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}")
1077            objset.objects.remove(k)
1078
1079        objset.create_index()
1080
1081    return objset, sbom
1082