1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import os
8import re
9import shutil
10import datetime
11
12import oeqa.utils.ftools as ftools
13from oeqa.selftest.case import OESelftestTestCase
14from oeqa.utils.commands import runCmd, bitbake, get_bb_var
15from oeqa.utils.network import get_free_port
16
17import bb.utils
18
19class BitbakePrTests(OESelftestTestCase):
20
21    @classmethod
22    def setUpClass(cls):
23        super(BitbakePrTests, cls).setUpClass()
24        cls.pkgdata_dir = get_bb_var('PKGDATA_DIR')
25
26        cls.exported_db_path = os.path.join(cls.builddir, 'export.inc')
27        cls.current_db_path = os.path.join(get_bb_var('PERSISTENT_DIR'), 'prserv.sqlite3')
28
29    def cleanup(self):
30        # Ensure any memory resident bitbake is stopped
31        bitbake("-m")
32        # Remove any existing export file or prserv database
33        bb.utils.remove(self.exported_db_path)
34        bb.utils.remove(self.current_db_path + "*")
35
36    def get_pr_version(self, package_name):
37        package_data_file = os.path.join(self.pkgdata_dir, 'runtime', package_name)
38        package_data = ftools.read_file(package_data_file)
39        find_pr = re.search(r"PKGR: r[0-9]+\.([0-9]+)", package_data)
40        self.assertTrue(find_pr, "No PKG revision found via regex 'PKGR: r[0-9]+\.([0-9]+)' in %s" % package_data_file)
41        return int(find_pr.group(1))
42
43    def get_task_stamp(self, package_name, recipe_task):
44        stampdata = get_bb_var('STAMP', target=package_name).split('/')
45        prefix = stampdata[-1]
46        package_stamps_path = "/".join(stampdata[:-1])
47        stamps = []
48        for stamp in os.listdir(package_stamps_path):
49            find_stamp = re.match(r"%s\.%s\.([a-z0-9]{32})" % (re.escape(prefix), recipe_task), stamp)
50            if find_stamp:
51                stamps.append(find_stamp.group(1))
52        self.assertFalse(len(stamps) == 0, msg="Cound not find stamp for task %s for recipe %s" % (recipe_task, package_name))
53        self.assertFalse(len(stamps) > 1, msg="Found multiple %s stamps for the %s recipe in the %s directory." % (recipe_task, package_name, package_stamps_path))
54        return str(stamps[0])
55
56    def increment_package_pr(self, package_name):
57        inc_data = "do_package:append() {\n    bb.build.exec_func('do_test_prserv', d)\n}\ndo_test_prserv() {\necho \"The current date is: %s\" > ${PKGDESTWORK}/${PN}.datestamp\n}" % datetime.datetime.now()
58        self.write_recipeinc(package_name, inc_data)
59        res = bitbake(package_name, ignore_status=True)
60        self.delete_recipeinc(package_name)
61        self.assertEqual(res.status, 0, msg=res.output)
62
63    def config_pr_tests(self, package_name, package_type='rpm', pr_socket='localhost:0'):
64        self.cleanup()
65        config_package_data = 'PACKAGE_CLASSES = "package_%s"' % package_type
66        self.write_config(config_package_data)
67        config_server_data = 'PRSERV_HOST = "%s"' % pr_socket
68        self.append_config(config_server_data)
69
70    def run_test_pr_service(self, package_name, package_type='rpm', track_task='do_package', pr_socket='localhost:0'):
71        self.config_pr_tests(package_name, package_type, pr_socket)
72
73        self.increment_package_pr(package_name)
74        pr_1 = self.get_pr_version(package_name)
75        stamp_1 = self.get_task_stamp(package_name, track_task)
76
77        self.increment_package_pr(package_name)
78        pr_2 = self.get_pr_version(package_name)
79        stamp_2 = self.get_task_stamp(package_name, track_task)
80
81        self.assertTrue(pr_2 - pr_1 == 1, "New PR %s did not increment as expected (from %s), difference should be 1" % (pr_2, pr_1))
82        self.assertTrue(stamp_1 != stamp_2, "Different pkg rev. but same stamp: %s" % stamp_1)
83
84        self.cleanup()
85
86    def run_test_pr_export_import(self, package_name, replace_current_db=True):
87        self.config_pr_tests(package_name)
88
89        self.increment_package_pr(package_name)
90        pr_1 = self.get_pr_version(package_name)
91
92        export_result = runCmd("bitbake-prserv-tool export %s" % self.exported_db_path, ignore_status=True)
93        self.assertEqual(export_result.status, 0, msg="PR Service database export failed: %s" % export_result.output)
94        self.assertTrue(os.path.exists(self.exported_db_path), msg="%s didn't exist, tool output %s" % (self.exported_db_path, export_result.output))
95
96        if replace_current_db:
97            self.assertTrue(os.path.exists(self.current_db_path), msg="Path to current PR Service database is invalid: %s" % self.current_db_path)
98            os.remove(self.current_db_path)
99
100        import_result = runCmd("bitbake-prserv-tool import %s" % self.exported_db_path, ignore_status=True)
101        #os.remove(self.exported_db_path)
102        self.assertEqual(import_result.status, 0, msg="PR Service database import failed: %s" % import_result.output)
103
104        self.increment_package_pr(package_name)
105        pr_2 = self.get_pr_version(package_name)
106
107        self.assertTrue(pr_2 - pr_1 == 1, "New PR %s did not increment as expected (from %s), difference should be 1" % (pr_2, pr_1))
108
109        self.cleanup()
110
111    def test_import_export_replace_db(self):
112        self.run_test_pr_export_import('m4')
113
114    def test_import_export_override_db(self):
115        self.run_test_pr_export_import('m4', replace_current_db=False)
116
117    def test_pr_service_rpm_arch_dep(self):
118        self.run_test_pr_service('m4', 'rpm', 'do_package')
119
120    def test_pr_service_deb_arch_dep(self):
121        self.run_test_pr_service('m4', 'deb', 'do_package')
122
123    def test_pr_service_ipk_arch_dep(self):
124        self.run_test_pr_service('m4', 'ipk', 'do_package')
125
126    def test_pr_service_rpm_arch_indep(self):
127        self.run_test_pr_service('xcursor-transparent-theme', 'rpm', 'do_package')
128
129    def test_pr_service_deb_arch_indep(self):
130        self.run_test_pr_service('xcursor-transparent-theme', 'deb', 'do_package')
131
132    def test_pr_service_ipk_arch_indep(self):
133        self.run_test_pr_service('xcursor-transparent-theme', 'ipk', 'do_package')
134
135    def test_stopping_prservice_message(self):
136        port = get_free_port()
137
138        runCmd('bitbake-prserv --host localhost --port %s --loglevel=DEBUG --start' % port)
139        ret = runCmd('bitbake-prserv --host localhost --port %s --loglevel=DEBUG --stop' % port)
140
141        self.assertEqual(ret.status, 0)
142
143