1#
2# SPDX-License-Identifier: MIT
3#
4
5import sys
6from unittest.case import TestCase
7from contextlib import contextmanager
8from io import StringIO
9from oe.utils import packages_filter_out_system, trim_version, multiprocess_launch
10
11class TestPackagesFilterOutSystem(TestCase):
12    def test_filter(self):
13        """
14        Test that oe.utils.packages_filter_out_system works.
15        """
16        try:
17            import bb
18        except ImportError:
19            self.skipTest("Cannot import bb")
20
21        d = bb.data_smart.DataSmart()
22        d.setVar("PN", "foo")
23
24        d.setVar("PACKAGES", "foo foo-doc foo-dev")
25        pkgs = packages_filter_out_system(d)
26        self.assertEqual(pkgs, [])
27
28        d.setVar("PACKAGES", "foo foo-doc foo-data foo-dev")
29        pkgs = packages_filter_out_system(d)
30        self.assertEqual(pkgs, ["foo-data"])
31
32        d.setVar("PACKAGES", "foo foo-locale-en-gb")
33        pkgs = packages_filter_out_system(d)
34        self.assertEqual(pkgs, [])
35
36        d.setVar("PACKAGES", "foo foo-data foo-locale-en-gb")
37        pkgs = packages_filter_out_system(d)
38        self.assertEqual(pkgs, ["foo-data"])
39
40
41class TestTrimVersion(TestCase):
42    def test_version_exception(self):
43        with self.assertRaises(TypeError):
44            trim_version(None, 2)
45        with self.assertRaises(TypeError):
46            trim_version((1, 2, 3), 2)
47
48    def test_num_exception(self):
49        with self.assertRaises(ValueError):
50            trim_version("1.2.3", 0)
51        with self.assertRaises(ValueError):
52            trim_version("1.2.3", -1)
53
54    def test_valid(self):
55        self.assertEqual(trim_version("1.2.3", 1), "1")
56        self.assertEqual(trim_version("1.2.3", 2), "1.2")
57        self.assertEqual(trim_version("1.2.3", 3), "1.2.3")
58        self.assertEqual(trim_version("1.2.3", 4), "1.2.3")
59
60
61class TestMultiprocessLaunch(TestCase):
62
63    def test_multiprocesslaunch(self):
64        import bb
65
66        def testfunction(item, d):
67            if item == "2" or item == "1":
68                raise KeyError("Invalid number %s" % item)
69            return "Found %s" % item
70
71        def dummyerror(msg):
72            print("ERROR: %s" % msg)
73        def dummyfatal(msg):
74            print("ERROR: %s" % msg)
75            raise bb.BBHandledException()
76
77        @contextmanager
78        def captured_output():
79            new_out, new_err = StringIO(), StringIO()
80            old_out, old_err = sys.stdout, sys.stderr
81            try:
82                sys.stdout, sys.stderr = new_out, new_err
83                yield sys.stdout, sys.stderr
84            finally:
85                sys.stdout, sys.stderr = old_out, old_err
86
87        d = bb.data_smart.DataSmart()
88        bb.error = dummyerror
89        bb.fatal = dummyfatal
90
91        # Assert the function returns the right results
92        result = multiprocess_launch(testfunction, ["3", "4", "5", "6"], d, extraargs=(d,))
93        self.assertIn("Found 3", result)
94        self.assertIn("Found 4", result)
95        self.assertIn("Found 5", result)
96        self.assertIn("Found 6", result)
97        self.assertEqual(len(result), 4)
98
99        # Assert the function prints exceptions
100        with captured_output() as (out, err):
101            self.assertRaises(bb.BBHandledException, multiprocess_launch, testfunction, ["1", "2", "3", "4", "5", "6"], d, extraargs=(d,))
102        self.assertIn("KeyError: 'Invalid number 1'", out.getvalue())
103        self.assertIn("KeyError: 'Invalid number 2'", out.getvalue())
104