1#
2# SPDX-License-Identifier: MIT
3#
4
5import os
6import json
7import shutil
8
9from oeqa.core.utils.test import getCaseFile, getCaseMethod
10
11def get_package_manager(d, root_path):
12    """
13    Returns an OE package manager that can install packages in root_path.
14    """
15    from oe.package_manager.rpm import RpmPM
16    from oe.package_manager.ipk import OpkgPM
17    from oe.package_manager.deb import DpkgPM
18
19    pkg_class = d.getVar("IMAGE_PKGTYPE")
20    if pkg_class == "rpm":
21        pm = RpmPM(d,
22                   root_path,
23                   d.getVar('TARGET_VENDOR'),
24                   filterbydependencies=False)
25        pm.create_configs()
26
27    elif pkg_class == "ipk":
28        pm = OpkgPM(d,
29                    root_path,
30                    d.getVar("IPKGCONF_TARGET"),
31                    d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"),
32                    filterbydependencies=False)
33
34    elif pkg_class == "deb":
35        pm = DpkgPM(d,
36                    root_path,
37                    d.getVar('PACKAGE_ARCHS'),
38                    d.getVar('DPKG_ARCH'),
39                    filterbydependencies=False)
40
41    pm.write_index()
42    pm.update()
43
44    return pm
45
46def find_packages_to_extract(test_suite):
47    """
48    Returns packages to extract required by runtime tests.
49    """
50    from oeqa.core.utils.test import getSuiteCasesFiles
51
52    needed_packages = {}
53    files = getSuiteCasesFiles(test_suite)
54
55    for f in set(files):
56        json_file = _get_json_file(f)
57        if json_file:
58            needed_packages.update(_get_needed_packages(json_file))
59
60    return needed_packages
61
62def _get_json_file(module_path):
63    """
64    Returns the path of the JSON file for a module, empty if doesn't exitst.
65    """
66
67    json_file = '%s.json' % module_path.rsplit('.', 1)[0]
68    if os.path.isfile(module_path) and os.path.isfile(json_file):
69        return json_file
70    else:
71        return ''
72
73def _get_needed_packages(json_file, test=None):
74    """
75    Returns a dict with needed packages based on a JSON file.
76
77    If a test is specified it will return the dict just for that test.
78    """
79    needed_packages = {}
80
81    with open(json_file) as f:
82        test_packages = json.load(f)
83    for key,value in test_packages.items():
84        needed_packages[key] = value
85
86    if test:
87        if test in needed_packages:
88            needed_packages = needed_packages[test]
89        else:
90            needed_packages = {}
91
92    return needed_packages
93
94def extract_packages(d, needed_packages):
95    """
96    Extract packages that will be needed during runtime.
97    """
98
99    import bb
100    import oe.path
101
102    extracted_path = d.getVar('TEST_EXTRACTED_DIR')
103
104    for key,value in needed_packages.items():
105        packages = ()
106        if isinstance(value, dict):
107            packages = (value, )
108        elif isinstance(value, list):
109            packages = value
110        else:
111            bb.fatal('Failed to process needed packages for %s; '
112                     'Value must be a dict or list' % key)
113
114        for package in packages:
115            pkg = package['pkg']
116            rm = package.get('rm', False)
117            extract = package.get('extract', True)
118
119            if extract:
120                #logger.debug('Extracting %s' % pkg)
121                dst_dir = os.path.join(extracted_path, pkg)
122                # Same package used for more than one test,
123                # don't need to extract again.
124                if os.path.exists(dst_dir):
125                    continue
126
127                # Extract package and copy it to TEST_EXTRACTED_DIR
128                pkg_dir = _extract_in_tmpdir(d, pkg)
129                oe.path.copytree(pkg_dir, dst_dir)
130                shutil.rmtree(pkg_dir)
131
132            else:
133                #logger.debug('Copying %s' % pkg)
134                _copy_package(d, pkg)
135
136def _extract_in_tmpdir(d, pkg):
137    """"
138    Returns path to a temp directory where the package was
139    extracted without dependencies.
140    """
141
142    from oeqa.utils.package_manager import get_package_manager
143
144    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
145    pm = get_package_manager(d, pkg_path)
146    extract_dir = pm.extract(pkg)
147    shutil.rmtree(pkg_path)
148
149    return extract_dir
150
151def _copy_package(d, pkg):
152    """
153    Copy the RPM, DEB or IPK package to dst_dir
154    """
155
156    from oeqa.utils.package_manager import get_package_manager
157
158    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
159    dst_dir = d.getVar('TEST_PACKAGED_DIR')
160    pm = get_package_manager(d, pkg_path)
161    pkg_info = pm.package_info(pkg)
162    file_path = pkg_info[pkg]['filepath']
163    shutil.copy2(file_path, dst_dir)
164    shutil.rmtree(pkg_path)
165
166def install_package(test_case):
167    """
168    Installs package in DUT if required.
169    """
170    needed_packages = test_needs_package(test_case)
171    if needed_packages:
172        _install_uninstall_packages(needed_packages, test_case, True)
173
174def uninstall_package(test_case):
175    """
176    Uninstalls package in DUT if required.
177    """
178    needed_packages = test_needs_package(test_case)
179    if needed_packages:
180        _install_uninstall_packages(needed_packages, test_case, False)
181
182def test_needs_package(test_case):
183    """
184    Checks if a test case requires to install/uninstall packages.
185    """
186    test_file = getCaseFile(test_case)
187    json_file = _get_json_file(test_file)
188
189    if json_file:
190        test_method = getCaseMethod(test_case)
191        needed_packages = _get_needed_packages(json_file, test_method)
192        if needed_packages:
193            return needed_packages
194
195    return None
196
197def _install_uninstall_packages(needed_packages, test_case, install=True):
198    """
199    Install/Uninstall packages in the DUT without using a package manager
200    """
201
202    if isinstance(needed_packages, dict):
203        packages = [needed_packages]
204    elif isinstance(needed_packages, list):
205        packages = needed_packages
206
207    for package in packages:
208        pkg = package['pkg']
209        rm = package.get('rm', False)
210        extract = package.get('extract', True)
211        src_dir = os.path.join(test_case.tc.extract_dir, pkg)
212
213        # Install package
214        if install and extract:
215            test_case.tc.target.copyDirTo(src_dir, '/')
216
217        # Uninstall package
218        elif not install and rm:
219            test_case.tc.target.deleteDirStructure(src_dir, '/')
220