1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import os
8import json
9import shutil
10
11from oeqa.core.utils.test import getCaseFile, getCaseMethod
12
13def get_package_manager(d, root_path):
14    """
15    Returns an OE package manager that can install packages in root_path.
16    """
17    from oe.package_manager.rpm import RpmPM
18    from oe.package_manager.ipk import OpkgPM
19    from oe.package_manager.deb import DpkgPM
20
21    pkg_class = d.getVar("IMAGE_PKGTYPE")
22    if pkg_class == "rpm":
23        pm = RpmPM(d,
24                   root_path,
25                   d.getVar('TARGET_VENDOR'),
26                   filterbydependencies=False)
27        pm.create_configs()
28
29    elif pkg_class == "ipk":
30        pm = OpkgPM(d,
31                    root_path,
32                    d.getVar("IPKGCONF_TARGET"),
33                    d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"),
34                    filterbydependencies=False)
35
36    elif pkg_class == "deb":
37        pm = DpkgPM(d,
38                    root_path,
39                    d.getVar('PACKAGE_ARCHS'),
40                    d.getVar('DPKG_ARCH'),
41                    filterbydependencies=False)
42
43    pm.write_index()
44    pm.update()
45
46    return pm
47
48def find_packages_to_extract(test_suite):
49    """
50    Returns packages to extract required by runtime tests.
51    """
52    from oeqa.core.utils.test import getSuiteCasesFiles
53
54    needed_packages = {}
55    files = getSuiteCasesFiles(test_suite)
56
57    for f in set(files):
58        json_file = _get_json_file(f)
59        if json_file:
60            needed_packages.update(_get_needed_packages(json_file))
61
62    return needed_packages
63
64def _get_json_file(module_path):
65    """
66    Returns the path of the JSON file for a module, empty if doesn't exitst.
67    """
68
69    json_file = '%s.json' % module_path.rsplit('.', 1)[0]
70    if os.path.isfile(module_path) and os.path.isfile(json_file):
71        return json_file
72    else:
73        return ''
74
75def _get_needed_packages(json_file, test=None):
76    """
77    Returns a dict with needed packages based on a JSON file.
78
79    If a test is specified it will return the dict just for that test.
80    """
81    needed_packages = {}
82
83    with open(json_file) as f:
84        test_packages = json.load(f)
85    for key,value in test_packages.items():
86        needed_packages[key] = value
87
88    if test:
89        if test in needed_packages:
90            needed_packages = needed_packages[test]
91        else:
92            needed_packages = {}
93
94    return needed_packages
95
96def extract_packages(d, needed_packages):
97    """
98    Extract packages that will be needed during runtime.
99    """
100
101    import bb
102    import oe.path
103
104    extracted_path = d.getVar('TEST_EXTRACTED_DIR')
105
106    for key,value in needed_packages.items():
107        packages = ()
108        if isinstance(value, dict):
109            packages = (value, )
110        elif isinstance(value, list):
111            packages = value
112        else:
113            bb.fatal('Failed to process needed packages for %s; '
114                     'Value must be a dict or list' % key)
115
116        for package in packages:
117            pkg = package['pkg']
118            rm = package.get('rm', False)
119            extract = package.get('extract', True)
120
121            if extract:
122                #logger.debug('Extracting %s' % pkg)
123                dst_dir = os.path.join(extracted_path, pkg)
124                # Same package used for more than one test,
125                # don't need to extract again.
126                if os.path.exists(dst_dir):
127                    continue
128
129                # Extract package and copy it to TEST_EXTRACTED_DIR
130                pkg_dir = _extract_in_tmpdir(d, pkg)
131                oe.path.copytree(pkg_dir, dst_dir)
132                shutil.rmtree(pkg_dir)
133
134            else:
135                #logger.debug('Copying %s' % pkg)
136                _copy_package(d, pkg)
137
138def _extract_in_tmpdir(d, pkg):
139    """"
140    Returns path to a temp directory where the package was
141    extracted without dependencies.
142    """
143
144    from oeqa.utils.package_manager import get_package_manager
145
146    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
147    pm = get_package_manager(d, pkg_path)
148    extract_dir = pm.extract(pkg)
149    shutil.rmtree(pkg_path)
150
151    return extract_dir
152
153def _copy_package(d, pkg):
154    """
155    Copy the RPM, DEB or IPK package to dst_dir
156    """
157
158    from oeqa.utils.package_manager import get_package_manager
159
160    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
161    dst_dir = d.getVar('TEST_PACKAGED_DIR')
162    pm = get_package_manager(d, pkg_path)
163    pkg_info = pm.package_info(pkg)
164    file_path = pkg_info[pkg]['filepath']
165    shutil.copy2(file_path, dst_dir)
166    shutil.rmtree(pkg_path)
167
168def install_package(test_case):
169    """
170    Installs package in DUT if required.
171    """
172    needed_packages = test_needs_package(test_case)
173    if needed_packages:
174        _install_uninstall_packages(needed_packages, test_case, True)
175
176def uninstall_package(test_case):
177    """
178    Uninstalls package in DUT if required.
179    """
180    needed_packages = test_needs_package(test_case)
181    if needed_packages:
182        _install_uninstall_packages(needed_packages, test_case, False)
183
184def test_needs_package(test_case):
185    """
186    Checks if a test case requires to install/uninstall packages.
187    """
188    test_file = getCaseFile(test_case)
189    json_file = _get_json_file(test_file)
190
191    if json_file:
192        test_method = getCaseMethod(test_case)
193        needed_packages = _get_needed_packages(json_file, test_method)
194        if needed_packages:
195            return needed_packages
196
197    return None
198
199def _install_uninstall_packages(needed_packages, test_case, install=True):
200    """
201    Install/Uninstall packages in the DUT without using a package manager
202    """
203
204    if isinstance(needed_packages, dict):
205        packages = [needed_packages]
206    elif isinstance(needed_packages, list):
207        packages = needed_packages
208
209    for package in packages:
210        pkg = package['pkg']
211        rm = package.get('rm', False)
212        extract = package.get('extract', True)
213        src_dir = os.path.join(test_case.tc.extract_dir, pkg)
214
215        # Install package
216        if install and extract:
217            test_case.tc.target.copyDirTo(src_dir, '/')
218
219        # Uninstall package
220        elif not install and rm:
221            test_case.tc.target.deleteDirStructure(src_dir, '/')
222