xref: /openbmc/openbmc/poky/meta/lib/oe/sbom30.py (revision 96e4b4e121e0e2da1535d7d537d6a982a6ff5bc0)
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7from pathlib import Path
8
9import oe.spdx30
10import bb
11import re
12import hashlib
13import uuid
14import os
15import oe.spdx_common
16from datetime import datetime, timezone
17
18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/"
19
20VEX_VERSION = "1.0.0"
21
22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake"
23
24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/"
25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/"
26
27
28@oe.spdx30.register(OE_SPDX_BASE + "id-alias")
29class OEIdAliasExtension(oe.spdx30.extension_Extension):
30    """
31    This extension allows an Element to provide an internal alias for the SPDX
32    ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects
33    created have a unique UUID namespace and the unihash of the task encoded in
34    their SPDX ID. However, this causes a problem for referencing documents
35    across recipes, since the taskhash of a dependency may not factor into the
36    taskhash of the current task and thus the current task won't rebuild and
37    see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and
38    tasks).
39
40    To help work around this, this extension provides a non-unique alias for an
41    Element by which it can be referenced from other tasks/recipes. When a
42    final SBoM is created, references to these aliases will be replaced with
43    the actual unique SPDX ID.
44
45    Most Elements will automatically get an alias created when they are written
46    out if they do not already have one. To suppress the creation of an alias,
47    add an extension with a blank `alias` property.
48
49
50    It is in internal extension that should be removed when writing out a final
51    SBoM
52    """
53
54    CLOSED = True
55    INTERNAL = True
56
57    @classmethod
58    def _register_props(cls):
59        super()._register_props()
60        cls._add_property(
61            "alias",
62            oe.spdx30.StringProp(),
63            OE_SPDX_BASE + "alias",
64            max_count=1,
65        )
66
67        cls._add_property(
68            "link_name",
69            oe.spdx30.StringProp(),
70            OE_SPDX_BASE + "link-name",
71            max_count=1,
72        )
73
74
75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias")
76class OEFileNameAliasExtension(oe.spdx30.extension_Extension):
77    CLOSED = True
78    INTERNAL = True
79
80    @classmethod
81    def _register_props(cls):
82        super()._register_props()
83        cls._add_property(
84            "aliases",
85            oe.spdx30.ListProp(oe.spdx30.StringProp()),
86            OE_SPDX_BASE + "filename-alias",
87        )
88
89
90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned")
91class OELicenseScannedExtension(oe.spdx30.extension_Extension):
92    """
93    The presence of this extension means the file has already been scanned for
94    license information
95    """
96
97    CLOSED = True
98    INTERNAL = True
99
100
101@oe.spdx30.register(OE_SPDX_BASE + "document-extension")
102class OEDocumentExtension(oe.spdx30.extension_Extension):
103    """
104    This extension is added to a SpdxDocument to indicate various useful bits
105    of information about its contents
106    """
107
108    CLOSED = True
109
110    @classmethod
111    def _register_props(cls):
112        super()._register_props()
113        cls._add_property(
114            "is_native",
115            oe.spdx30.BooleanProp(),
116            OE_SPDX_BASE + "is-native",
117            max_count=1,
118        )
119
120
121def spdxid_hash(*items):
122    h = hashlib.md5()
123    for i in items:
124        if isinstance(i, oe.spdx30.Element):
125            h.update(i._id.encode("utf-8"))
126        else:
127            h.update(i.encode("utf-8"))
128    return h.hexdigest()
129
130
131def spdx_sde(d):
132    sde = d.getVar("SOURCE_DATE_EPOCH")
133    if not sde:
134        return datetime.now(timezone.utc)
135
136    return datetime.fromtimestamp(int(sde), timezone.utc)
137
138
139def get_element_link_id(e):
140    """
141    Get the string ID which should be used to link to an Element. If the
142    element has an alias, that will be preferred, otherwise its SPDX ID will be
143    used.
144    """
145    ext = get_alias(e)
146    if ext is not None and ext.alias:
147        return ext.alias
148    return e._id
149
150
151def get_alias(obj):
152    for ext in obj.extension:
153        if not isinstance(ext, OEIdAliasExtension):
154            continue
155        return ext
156
157    return None
158
159
160def hash_id(_id):
161    return hashlib.sha256(_id.encode("utf-8")).hexdigest()
162
163
164def to_list(l):
165    if isinstance(l, set):
166        l = sorted(list(l))
167
168    if not isinstance(l, (list, tuple)):
169        raise TypeError("Must be a list or tuple. Got %s" % type(l))
170
171    return l
172
173
174class ObjectSet(oe.spdx30.SHACLObjectSet):
175    def __init__(self, d):
176        super().__init__()
177        self.d = d
178        self.alias_prefix = None
179
180    def create_index(self):
181        self.by_sha256_hash = {}
182        super().create_index()
183
184    def add_index(self, obj):
185        # Check that all elements are given an ID before being inserted
186        if isinstance(obj, oe.spdx30.Element):
187            if not obj._id:
188                raise ValueError("Element missing ID")
189
190            alias_ext = get_alias(obj)
191            if alias_ext is not None and alias_ext.alias:
192                self.obj_by_id[alias_ext.alias] = obj
193
194            for v in obj.verifiedUsing:
195                if not isinstance(v, oe.spdx30.Hash):
196                    continue
197
198                if v.algorithm == oe.spdx30.HashAlgorithm.sha256:
199                    continue
200
201                self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj)
202
203        super().add_index(obj)
204        if isinstance(obj, oe.spdx30.SpdxDocument):
205            self.doc = obj
206            alias_ext = get_alias(obj)
207            if alias_ext is not None and alias_ext.alias:
208                self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/"
209
210    def __filter_obj(self, obj, attr_filter):
211        return all(getattr(obj, k) == v for k, v in attr_filter.items())
212
213    def foreach_filter(self, typ, *, match_subclass=True, **attr_filter):
214        for obj in self.foreach_type(typ, match_subclass=match_subclass):
215            if self.__filter_obj(obj, attr_filter):
216                yield obj
217
218    def find_filter(self, typ, *, match_subclass=True, **attr_filter):
219        for obj in self.foreach_filter(
220            typ, match_subclass=match_subclass, **attr_filter
221        ):
222            return obj
223        return None
224
225    def foreach_root(self, typ, **attr_filter):
226        for obj in self.doc.rootElement:
227            if not isinstance(obj, typ):
228                continue
229
230            if self.__filter_obj(obj, attr_filter):
231                yield obj
232
233    def find_root(self, typ, **attr_filter):
234        for obj in self.foreach_root(typ, **attr_filter):
235            return obj
236        return None
237
238    def add_root(self, obj):
239        self.add(obj)
240        self.doc.rootElement.append(obj)
241        return obj
242
243    def is_native(self):
244        for e in self.doc.extension:
245            if not isinstance(e, oe.sbom30.OEDocumentExtension):
246                continue
247
248            if e.is_native is not None:
249                return e.is_native
250
251        return False
252
253    def set_is_native(self, is_native):
254        for e in self.doc.extension:
255            if not isinstance(e, oe.sbom30.OEDocumentExtension):
256                continue
257
258            e.is_native = is_native
259            return
260
261        if is_native:
262            self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True))
263
264    def add_aliases(self):
265        for o in self.foreach_type(oe.spdx30.Element):
266            self.set_element_alias(o)
267
268    def new_alias_id(self, obj, replace):
269        unihash = self.d.getVar("BB_UNIHASH")
270        namespace = self.get_namespace()
271        if unihash not in obj._id:
272            bb.warn(f"Unihash {unihash} not found in {obj._id}")
273            return None
274
275        if namespace not in obj._id:
276            bb.warn(f"Namespace {namespace} not found in {obj._id}")
277            return None
278
279        return obj._id.replace(unihash, "UNIHASH").replace(
280            namespace, replace + self.d.getVar("PN")
281        )
282
283    def remove_internal_extensions(self):
284        def remove(o):
285            o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)]
286
287        for o in self.foreach_type(oe.spdx30.Element):
288            remove(o)
289
290        if self.doc:
291            remove(self.doc)
292
293    def get_namespace(self):
294        namespace_uuid = uuid.uuid5(
295            uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE")
296        )
297        pn = self.d.getVar("PN")
298        return "%s/%s-%s" % (
299            self.d.getVar("SPDX_NAMESPACE_PREFIX"),
300            pn,
301            str(uuid.uuid5(namespace_uuid, pn)),
302        )
303
304    def set_element_alias(self, e):
305        if not e._id or e._id.startswith("_:"):
306            return
307
308        alias_ext = get_alias(e)
309        if alias_ext is None:
310            alias_id = self.new_alias_id(e, self.alias_prefix)
311            if alias_id is not None:
312                e.extension.append(OEIdAliasExtension(alias=alias_id))
313        elif (
314            alias_ext.alias
315            and not isinstance(e, oe.spdx30.SpdxDocument)
316            and not alias_ext.alias.startswith(self.alias_prefix)
317        ):
318            bb.warn(
319                f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}"
320            )
321
322    def new_spdxid(self, *suffix, include_unihash=True):
323        items = [self.get_namespace()]
324        if include_unihash:
325            unihash = self.d.getVar("BB_UNIHASH")
326            items.append(unihash)
327        items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix)
328        return "/".join(items)
329
330    def new_import(self, key):
331        base = f"SPDX_IMPORTS_{key}"
332        spdxid = self.d.getVar(f"{base}_spdxid")
333        if not spdxid:
334            bb.fatal(f"{key} is not a valid SPDX_IMPORTS key")
335
336        for i in self.doc.import_:
337            if i.externalSpdxId == spdxid:
338                # Already imported
339                return spdxid
340
341        m = oe.spdx30.ExternalMap(externalSpdxId=spdxid)
342
343        uri = self.d.getVar(f"{base}_uri")
344        if uri:
345            m.locationHint = uri
346
347        for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items():
348            value = self.d.getVar(f"{base}_hash_{pyname}")
349            if value:
350                m.verifiedUsing.append(
351                    oe.spdx30.Hash(
352                        algorithm=algorithm,
353                        hashValue=value,
354                    )
355                )
356
357        self.doc.import_.append(m)
358        return spdxid
359
360    def new_agent(self, varname, *, creation_info=None, add=True):
361        ref_varname = self.d.getVar(f"{varname}_ref")
362        if ref_varname:
363            if ref_varname == varname:
364                bb.fatal(f"{varname} cannot reference itself")
365            return self.new_agent(ref_varname, creation_info=creation_info)
366
367        import_key = self.d.getVar(f"{varname}_import")
368        if import_key:
369            return self.new_import(import_key)
370
371        name = self.d.getVar(f"{varname}_name")
372        if not name:
373            return None
374
375        spdxid = self.new_spdxid("agent", name)
376        agent = self.find_by_id(spdxid)
377        if agent is not None:
378            return agent
379
380        agent_type = self.d.getVar("%s_type" % varname)
381        if agent_type == "person":
382            agent = oe.spdx30.Person()
383        elif agent_type == "software":
384            agent = oe.spdx30.SoftwareAgent()
385        elif agent_type == "organization":
386            agent = oe.spdx30.Organization()
387        elif not agent_type or agent_type == "agent":
388            agent = oe.spdx30.Agent()
389        else:
390            bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname))
391
392        agent._id = spdxid
393        agent.creationInfo = creation_info or self.doc.creationInfo
394        agent.name = name
395
396        comment = self.d.getVar("%s_comment" % varname)
397        if comment:
398            agent.comment = comment
399
400        for (
401            pyname,
402            idtype,
403        ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items():
404            value = self.d.getVar("%s_id_%s" % (varname, pyname))
405            if value:
406                agent.externalIdentifier.append(
407                    oe.spdx30.ExternalIdentifier(
408                        externalIdentifierType=idtype,
409                        identifier=value,
410                    )
411                )
412
413        if add:
414            self.add(agent)
415
416        return agent
417
418    def new_creation_info(self):
419        creation_info = oe.spdx30.CreationInfo()
420
421        name = "%s %s" % (
422            self.d.getVar("SPDX_TOOL_NAME"),
423            self.d.getVar("SPDX_TOOL_VERSION"),
424        )
425        tool = self.add(
426            oe.spdx30.Tool(
427                _id=self.new_spdxid("tool", name),
428                creationInfo=creation_info,
429                name=name,
430            )
431        )
432
433        authors = []
434        for a in self.d.getVar("SPDX_AUTHORS").split():
435            varname = "SPDX_AUTHORS_%s" % a
436            author = self.new_agent(varname, creation_info=creation_info)
437
438            if not author:
439                bb.fatal("Unable to find or create author %s" % a)
440
441            authors.append(author)
442
443        creation_info.created = spdx_sde(self.d)
444        creation_info.specVersion = self.d.getVar("SPDX_VERSION")
445        creation_info.createdBy = authors
446        creation_info.createdUsing = [tool]
447
448        return creation_info
449
450    def copy_creation_info(self, copy):
451        c = oe.spdx30.CreationInfo(
452            created=spdx_sde(self.d),
453            specVersion=self.d.getVar("SPDX_VERSION"),
454        )
455
456        for author in copy.createdBy:
457            if isinstance(author, str):
458                c.createdBy.append(author)
459            else:
460                c.createdBy.append(author._id)
461
462        for tool in copy.createdUsing:
463            if isinstance(tool, str):
464                c.createdUsing.append(tool)
465            else:
466                c.createdUsing.append(tool._id)
467
468        return c
469
470    def new_annotation(self, subject, comment, typ):
471        return self.add(
472            oe.spdx30.Annotation(
473                _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)),
474                creationInfo=self.doc.creationInfo,
475                annotationType=typ,
476                subject=subject,
477                statement=comment,
478            )
479        )
480
481    def _new_relationship(
482        self,
483        cls,
484        from_,
485        typ,
486        to,
487        *,
488        spdxid_name="relationship",
489        **props,
490    ):
491        from_ = to_list(from_)
492        to = to_list(to)
493
494        if not from_:
495            return []
496
497        if not to:
498            to = [oe.spdx30.IndividualElement.NoneElement]
499
500        ret = []
501
502        for f in from_:
503            hash_args = [typ, f]
504            for k in sorted(props.keys()):
505                hash_args.append(props[k])
506            hash_args.extend(to)
507
508            relationship = self.add(
509                cls(
510                    _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)),
511                    creationInfo=self.doc.creationInfo,
512                    from_=f,
513                    relationshipType=typ,
514                    to=to,
515                    **props,
516                )
517            )
518            ret.append(relationship)
519
520        return ret
521
522    def new_relationship(self, from_, typ, to):
523        return self._new_relationship(oe.spdx30.Relationship, from_, typ, to)
524
525    def new_scoped_relationship(self, from_, typ, scope, to):
526        return self._new_relationship(
527            oe.spdx30.LifecycleScopedRelationship,
528            from_,
529            typ,
530            to,
531            scope=scope,
532        )
533
534    def new_license_expression(
535        self, license_expression, license_data, license_text_map={}
536    ):
537        license_list_version = license_data["licenseListVersion"]
538        # SPDX 3 requires that the license list version be a semver
539        # MAJOR.MINOR.MICRO, but the actual license version might be
540        # MAJOR.MINOR on some older versions. As such, manually append a .0
541        # micro version if its missing to keep SPDX happy
542        if license_list_version.count(".") < 2:
543            license_list_version += ".0"
544
545        spdxid = [
546            "license",
547            license_list_version,
548            re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression),
549        ]
550
551        license_text = [
552            (k, license_text_map[k]) for k in sorted(license_text_map.keys())
553        ]
554
555        if not license_text:
556            lic = self.find_filter(
557                oe.spdx30.simplelicensing_LicenseExpression,
558                simplelicensing_licenseExpression=license_expression,
559                simplelicensing_licenseListVersion=license_list_version,
560            )
561            if lic is not None:
562                return lic
563        else:
564            spdxid.append(spdxid_hash(*(v for _, v in license_text)))
565            lic = self.find_by_id(self.new_spdxid(*spdxid))
566            if lic is not None:
567                return lic
568
569        lic = self.add(
570            oe.spdx30.simplelicensing_LicenseExpression(
571                _id=self.new_spdxid(*spdxid),
572                creationInfo=self.doc.creationInfo,
573                simplelicensing_licenseExpression=license_expression,
574                simplelicensing_licenseListVersion=license_list_version,
575            )
576        )
577
578        for key, value in license_text:
579            lic.simplelicensing_customIdToUri.append(
580                oe.spdx30.DictionaryEntry(key=key, value=value)
581            )
582
583        return lic
584
585    def scan_declared_licenses(self, spdx_file, filepath, license_data):
586        for e in spdx_file.extension:
587            if isinstance(e, OELicenseScannedExtension):
588                return
589
590        file_licenses = set()
591        for extracted_lic in oe.spdx_common.extract_licenses(filepath):
592            lic = self.new_license_expression(extracted_lic, license_data)
593            self.set_element_alias(lic)
594            file_licenses.add(lic)
595
596        self.new_relationship(
597            [spdx_file],
598            oe.spdx30.RelationshipType.hasDeclaredLicense,
599            [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses],
600        )
601        spdx_file.extension.append(OELicenseScannedExtension())
602
603    def new_file(self, _id, name, path, *, purposes=[]):
604        sha256_hash = bb.utils.sha256_file(path)
605
606        for f in self.by_sha256_hash.get(sha256_hash, []):
607            if not isinstance(f, oe.spdx30.software_File):
608                continue
609
610            if purposes:
611                new_primary = purposes[0]
612                new_additional = []
613
614                if f.software_primaryPurpose:
615                    new_additional.append(f.software_primaryPurpose)
616                new_additional.extend(f.software_additionalPurpose)
617
618                new_additional = sorted(
619                    list(set(p for p in new_additional if p != new_primary))
620                )
621
622                f.software_primaryPurpose = new_primary
623                f.software_additionalPurpose = new_additional
624
625            if f.name != name:
626                for e in f.extension:
627                    if isinstance(e, OEFileNameAliasExtension):
628                        e.aliases.append(name)
629                        break
630                else:
631                    f.extension.append(OEFileNameAliasExtension(aliases=[name]))
632
633            return f
634
635        spdx_file = oe.spdx30.software_File(
636            _id=_id,
637            creationInfo=self.doc.creationInfo,
638            name=name,
639        )
640        if purposes:
641            spdx_file.software_primaryPurpose = purposes[0]
642            spdx_file.software_additionalPurpose = purposes[1:]
643
644        spdx_file.verifiedUsing.append(
645            oe.spdx30.Hash(
646                algorithm=oe.spdx30.HashAlgorithm.sha256,
647                hashValue=sha256_hash,
648            )
649        )
650
651        return self.add(spdx_file)
652
653    def new_cve_vuln(self, cve):
654        v = oe.spdx30.security_Vulnerability()
655        v._id = self.new_spdxid("vulnerability", cve)
656        v.creationInfo = self.doc.creationInfo
657
658        v.externalIdentifier.append(
659            oe.spdx30.ExternalIdentifier(
660                externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve,
661                identifier=cve,
662                identifierLocator=[
663                    f"https://cveawg.mitre.org/api/cve/{cve}",
664                    f"https://www.cve.org/CVERecord?id={cve}",
665                ],
666            )
667        )
668        return self.add(v)
669
670    def new_vex_patched_relationship(self, from_, to):
671        return self._new_relationship(
672            oe.spdx30.security_VexFixedVulnAssessmentRelationship,
673            from_,
674            oe.spdx30.RelationshipType.fixedIn,
675            to,
676            spdxid_name="vex-fixed",
677            security_vexVersion=VEX_VERSION,
678        )
679
680    def new_vex_unpatched_relationship(self, from_, to):
681        return self._new_relationship(
682            oe.spdx30.security_VexAffectedVulnAssessmentRelationship,
683            from_,
684            oe.spdx30.RelationshipType.affects,
685            to,
686            spdxid_name="vex-affected",
687            security_vexVersion=VEX_VERSION,
688        )
689
690    def new_vex_ignored_relationship(self, from_, to, *, impact_statement):
691        return self._new_relationship(
692            oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship,
693            from_,
694            oe.spdx30.RelationshipType.doesNotAffect,
695            to,
696            spdxid_name="vex-not-affected",
697            security_vexVersion=VEX_VERSION,
698            security_impactStatement=impact_statement,
699        )
700
701    def import_bitbake_build_objset(self):
702        deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX"))
703        bb_objset = load_jsonld(
704            self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True
705        )
706        self.doc.import_.extend(bb_objset.doc.import_)
707        self.update(bb_objset.objects)
708
709        return bb_objset
710
711    def import_bitbake_build(self):
712        def find_bitbake_build(objset):
713            return objset.find_filter(
714                oe.spdx30.build_Build,
715                build_buildType=SPDX_BUILD_TYPE,
716            )
717
718        build = find_bitbake_build(self)
719        if build:
720            return build
721
722        bb_objset = self.import_bitbake_build_objset()
723        build = find_bitbake_build(bb_objset)
724        if build is None:
725            bb.fatal(f"No build found in {deploy_dir_spdx}")
726
727        return build
728
729    def new_task_build(self, name, typ):
730        current_task = self.d.getVar("BB_CURRENTTASK")
731        pn = self.d.getVar("PN")
732
733        build = self.add(
734            oe.spdx30.build_Build(
735                _id=self.new_spdxid("build", name),
736                creationInfo=self.doc.creationInfo,
737                name=f"{pn}:do_{current_task}:{name}",
738                build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}",
739            )
740        )
741
742        if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
743            bitbake_build = self.import_bitbake_build()
744
745            self.new_relationship(
746                [bitbake_build],
747                oe.spdx30.RelationshipType.ancestorOf,
748                [build],
749            )
750
751        if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1":
752            for varname in sorted(self.d.keys()):
753                if varname.startswith("__"):
754                    continue
755
756                value = self.d.getVar(varname, expand=False)
757
758                # TODO: Deal with non-string values
759                if not isinstance(value, str):
760                    continue
761
762                build.build_parameter.append(
763                    oe.spdx30.DictionaryEntry(key=varname, value=value)
764                )
765
766        return build
767
768    def new_archive(self, archive_name):
769        return self.add(
770            oe.spdx30.software_File(
771                _id=self.new_spdxid("archive", str(archive_name)),
772                creationInfo=self.doc.creationInfo,
773                name=str(archive_name),
774                software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
775            )
776        )
777
778    @classmethod
779    def new_objset(cls, d, name, copy_from_bitbake_doc=True):
780        objset = cls(d)
781
782        document = oe.spdx30.SpdxDocument(
783            _id=objset.new_spdxid("document", name),
784            name=name,
785        )
786
787        document.extension.append(
788            OEIdAliasExtension(
789                alias=objset.new_alias_id(
790                    document,
791                    OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/",
792                ),
793            )
794        )
795        objset.doc = document
796        objset.add_index(document)
797
798        if copy_from_bitbake_doc:
799            bb_objset = objset.import_bitbake_build_objset()
800            document.creationInfo = objset.copy_creation_info(
801                bb_objset.doc.creationInfo
802            )
803        else:
804            document.creationInfo = objset.new_creation_info()
805
806        return objset
807
808    def expand_collection(self, *, add_objectsets=[]):
809        """
810        Expands a collection to pull in all missing elements
811
812        Returns the set of ids that could not be found to link into the document
813        """
814        missing_spdxids = set()
815        imports = {e.externalSpdxId: e for e in self.doc.import_}
816
817        def merge_doc(other):
818            nonlocal imports
819
820            for e in other.doc.import_:
821                if not e.externalSpdxId in imports:
822                    imports[e.externalSpdxId] = e
823
824            self.objects |= other.objects
825
826        for o in add_objectsets:
827            merge_doc(o)
828
829        needed_spdxids = self.link()
830        provided_spdxids = set(self.obj_by_id.keys())
831
832        while True:
833            import_spdxids = set(imports.keys())
834            searching_spdxids = (
835                needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids
836            )
837            if not searching_spdxids:
838                break
839
840            spdxid = searching_spdxids.pop()
841            bb.debug(
842                1,
843                f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}",
844            )
845            dep_objset, dep_path = find_by_spdxid(self.d, spdxid)
846
847            if dep_objset:
848                dep_provided = set(dep_objset.obj_by_id.keys())
849                if spdxid not in dep_provided:
850                    bb.fatal(f"{spdxid} not found in {dep_path}")
851                provided_spdxids |= dep_provided
852                needed_spdxids |= dep_objset.missing_ids
853                merge_doc(dep_objset)
854            else:
855                missing_spdxids.add(spdxid)
856
857        self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId)
858        bb.debug(1, "Linking...")
859        self.link()
860
861        # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry
862        # items and resolve any aliases to actual objects.
863        for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
864            for d in lic.simplelicensing_customIdToUri:
865                if d.value.startswith(OE_ALIAS_PREFIX):
866                    obj = self.find_by_id(d.value)
867                    if obj is not None:
868                        d.value = obj._id
869                    else:
870                        self.missing_ids.add(d.value)
871
872        self.missing_ids -= set(imports.keys())
873        return self.missing_ids
874
875
876def load_jsonld(d, path, required=False):
877    deserializer = oe.spdx30.JSONLDDeserializer()
878    objset = ObjectSet(d)
879    try:
880        with path.open("rb") as f:
881            deserializer.read(f, objset)
882    except FileNotFoundError:
883        if required:
884            bb.fatal("No SPDX document named %s found" % path)
885        return None
886
887    if not objset.doc:
888        bb.fatal("SPDX Document %s has no SPDXDocument element" % path)
889        return None
890
891    objset.objects.remove(objset.doc)
892    return objset
893
894
895def jsonld_arch_path(d, arch, subdir, name, deploydir=None):
896    if deploydir is None:
897        deploydir = Path(d.getVar("DEPLOY_DIR_SPDX"))
898    return deploydir / arch / subdir / (name + ".spdx.json")
899
900
901def jsonld_hash_path(h):
902    return Path("by-spdxid-hash") / h[:2], h
903
904
905def load_jsonld_by_arch(d, arch, subdir, name, *, required=False):
906    path = jsonld_arch_path(d, arch, subdir, name)
907    objset = load_jsonld(d, path, required=required)
908    if objset is not None:
909        return (objset, path)
910    return (None, None)
911
912
913def find_jsonld(d, subdir, name, *, required=False):
914    package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split()
915    package_archs.reverse()
916
917    for arch in package_archs:
918        objset, path = load_jsonld_by_arch(d, arch, subdir, name)
919        if objset is not None:
920            return (objset, path)
921
922    if required:
923        bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name))
924
925    return (None, None)
926
927
928def write_jsonld_doc(d, objset, dest):
929    if not isinstance(objset, ObjectSet):
930        bb.fatal("Only an ObjsetSet can be serialized")
931        return
932
933    if not objset.doc:
934        bb.fatal("ObjectSet is missing a SpdxDocument")
935        return
936
937    objset.doc.rootElement = sorted(list(set(objset.doc.rootElement)))
938    objset.doc.profileConformance = sorted(
939        list(
940            getattr(oe.spdx30.ProfileIdentifierType, p)
941            for p in d.getVar("SPDX_PROFILES").split()
942        )
943    )
944
945    dest.parent.mkdir(exist_ok=True, parents=True)
946
947    if d.getVar("SPDX_PRETTY") == "1":
948        serializer = oe.spdx30.JSONLDSerializer(
949            indent=2,
950        )
951    else:
952        serializer = oe.spdx30.JSONLDInlineSerializer()
953
954    objset.objects.add(objset.doc)
955    with dest.open("wb") as f:
956        serializer.write(objset, f, force_at_graph=True)
957    objset.objects.remove(objset.doc)
958
959
960def write_recipe_jsonld_doc(
961    d,
962    objset,
963    subdir,
964    deploydir,
965    *,
966    create_spdx_id_links=True,
967):
968    pkg_arch = d.getVar("SSTATE_PKGARCH")
969
970    dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir)
971
972    def link_id(_id):
973        hash_path = jsonld_hash_path(hash_id(_id))
974
975        link_name = jsonld_arch_path(
976            d,
977            pkg_arch,
978            *hash_path,
979            deploydir=deploydir,
980        )
981        try:
982            link_name.parent.mkdir(exist_ok=True, parents=True)
983            link_name.symlink_to(os.path.relpath(dest, link_name.parent))
984        except:
985            target = link_name.readlink()
986            bb.warn(
987                f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}"
988            )
989            raise
990
991        return hash_path[-1]
992
993    objset.add_aliases()
994
995    try:
996        if create_spdx_id_links:
997            alias_ext = get_alias(objset.doc)
998            if alias_ext is not None and alias_ext.alias:
999                alias_ext.link_name = link_id(alias_ext.alias)
1000
1001    finally:
1002        # It is really helpful for debugging if the JSON document is written
1003        # out, so always do that even if there is an error making the links
1004        write_jsonld_doc(d, objset, dest)
1005
1006
1007def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter):
1008    objset, fn = find_jsonld(d, subdir, fn_name, required=True)
1009
1010    spdx_obj = objset.find_root(obj_type, **attr_filter)
1011    if not spdx_obj:
1012        bb.fatal("No root %s found in %s" % (obj_type.__name__, fn))
1013
1014    return spdx_obj, objset
1015
1016
1017def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter):
1018    objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True)
1019
1020    spdx_obj = objset.find_filter(obj_type, **attr_filter)
1021    if not spdx_obj:
1022        bb.fatal("No %s found in %s" % (obj_type.__name__, fn))
1023
1024    return spdx_obj, objset
1025
1026
1027def find_by_spdxid(d, spdxid, *, required=False):
1028    if spdxid.startswith(OE_ALIAS_PREFIX):
1029        h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0]
1030        return find_jsonld(d, *jsonld_hash_path(h), required=required)
1031    return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required)
1032
1033
1034def create_sbom(d, name, root_elements, add_objectsets=[]):
1035    objset = ObjectSet.new_objset(d, name)
1036
1037    sbom = objset.add(
1038        oe.spdx30.software_Sbom(
1039            _id=objset.new_spdxid("sbom", name),
1040            name=name,
1041            creationInfo=objset.doc.creationInfo,
1042            software_sbomType=[oe.spdx30.software_SbomType.build],
1043            rootElement=root_elements,
1044        )
1045    )
1046
1047    missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets)
1048    if missing_spdxids:
1049        bb.warn(
1050            "The following SPDX IDs were unable to be resolved:\n  "
1051            + "\n  ".join(sorted(list(missing_spdxids)))
1052        )
1053
1054    # Filter out internal extensions from final SBoMs
1055    objset.remove_internal_extensions()
1056
1057    # SBoM should be the only root element of the document
1058    objset.doc.rootElement = [sbom]
1059
1060    # De-duplicate licenses
1061    unique = set()
1062    dedup = {}
1063    for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
1064        for u in unique:
1065            if (
1066                u.simplelicensing_licenseExpression
1067                == lic.simplelicensing_licenseExpression
1068                and u.simplelicensing_licenseListVersion
1069                == lic.simplelicensing_licenseListVersion
1070            ):
1071                dedup[lic] = u
1072                break
1073        else:
1074            unique.add(lic)
1075
1076    if dedup:
1077        for rel in objset.foreach_filter(
1078            oe.spdx30.Relationship,
1079            relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense,
1080        ):
1081            rel.to = [dedup.get(to, to) for to in rel.to]
1082
1083        for rel in objset.foreach_filter(
1084            oe.spdx30.Relationship,
1085            relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense,
1086        ):
1087            rel.to = [dedup.get(to, to) for to in rel.to]
1088
1089        for k, v in dedup.items():
1090            bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}")
1091            objset.objects.remove(k)
1092
1093        objset.create_index()
1094
1095    return objset, sbom
1096