1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import sys
8from unittest.case import TestCase
9from contextlib import contextmanager
10from io import StringIO
11from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
12
13class TestPackagesFilterOutSystem(TestCase):
14    def test_filter(self):
15        """
16        Test that oe.utils.packages_filter_out_system works.
17        """
18        try:
19            import bb
20        except ImportError:
21            self.skipTest("Cannot import bb")
22
23        d = bb.data_smart.DataSmart()
24        d.setVar("PN", "foo")
25
26        d.setVar("PACKAGES", "foo foo-doc foo-dev")
27        pkgs = packages_filter_out_system(d)
28        self.assertEqual(pkgs, [])
29
30        d.setVar("PACKAGES", "foo foo-doc foo-data foo-dev")
31        pkgs = packages_filter_out_system(d)
32        self.assertEqual(pkgs, ["foo-data"])
33
34        d.setVar("PACKAGES", "foo foo-locale-en-gb")
35        pkgs = packages_filter_out_system(d)
36        self.assertEqual(pkgs, [])
37
38        d.setVar("PACKAGES", "foo foo-data foo-locale-en-gb")
39        pkgs = packages_filter_out_system(d)
40        self.assertEqual(pkgs, ["foo-data"])
41
42
43class TestTrimVersion(TestCase):
44    def test_version_exception(self):
45        with self.assertRaises(TypeError):
46            trim_version(None, 2)
47        with self.assertRaises(TypeError):
48            trim_version((1, 2, 3), 2)
49
50    def test_num_exception(self):
51        with self.assertRaises(ValueError):
52            trim_version("1.2.3", 0)
53        with self.assertRaises(ValueError):
54            trim_version("1.2.3", -1)
55
56    def test_valid(self):
57        self.assertEqual(trim_version("1.2.3", 1), "1")
58        self.assertEqual(trim_version("1.2.3", 2), "1.2")
59        self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
60        self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
61
62
63class TestMultiprocessLaunch(TestCase):
64
65    def test_multiprocesslaunch(self):
66        import bb
67
68        def testfunction(item, d):
69            if item == "2":
70                raise KeyError("Invalid number %s" % item)
71            return "Found %s" % item
72
73        def dummyerror(msg):
74            print("ERROR: %s" % msg)
75        def dummyfatal(msg):
76            print("ERROR: %s" % msg)
77            raise bb.BBHandledException()
78
79        @contextmanager
80        def captured_output():
81            new_out, new_err = StringIO(), StringIO()
82            old_out, old_err = sys.stdout, sys.stderr
83            try:
84                sys.stdout, sys.stderr = new_out, new_err
85                yield sys.stdout, sys.stderr
86            finally:
87                sys.stdout, sys.stderr = old_out, old_err
88
89        d = bb.data_smart.DataSmart()
90        bb.error = dummyerror
91        bb.fatal = dummyfatal
92
93        # Assert the function returns the right results
94        result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
95        self.assertIn("Found 3", result)
96        self.assertIn("Found 4", result)
97        self.assertIn("Found 5", result)
98        self.assertIn("Found 6", result)
99        self.assertEqual(len(result), 4)
100
101        # Assert the function prints exceptions
102        with captured_output() as (out, err):
103            self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
104        self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())
105