1#
2# SPDX-License-Identifier: MIT
3#
4
5import os
6import json
7import shutil
8
9from oeqa.core.utils.test import getCaseFile, getCaseMethod
10
11def get_package_manager(d, root_path):
12    """
13    Returns an OE package manager that can install packages in root_path.
14    """
15    from oe.package_manager import RpmPM, OpkgPM, DpkgPM
16
17    pkg_class = d.getVar("IMAGE_PKGTYPE")
18    if pkg_class == "rpm":
19        pm = RpmPM(d,
20                   root_path,
21                   d.getVar('TARGET_VENDOR'),
22                   filterbydependencies=False)
23        pm.create_configs()
24
25    elif pkg_class == "ipk":
26        pm = OpkgPM(d,
27                    root_path,
28                    d.getVar("IPKGCONF_TARGET"),
29                    d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"),
30                    filterbydependencies=False)
31
32    elif pkg_class == "deb":
33        pm = DpkgPM(d,
34                    root_path,
35                    d.getVar('PACKAGE_ARCHS'),
36                    d.getVar('DPKG_ARCH'),
37                    filterbydependencies=False)
38
39    pm.write_index()
40    pm.update()
41
42    return pm
43
44def find_packages_to_extract(test_suite):
45    """
46    Returns packages to extract required by runtime tests.
47    """
48    from oeqa.core.utils.test import getSuiteCasesFiles
49
50    needed_packages = {}
51    files = getSuiteCasesFiles(test_suite)
52
53    for f in set(files):
54        json_file = _get_json_file(f)
55        if json_file:
56            needed_packages.update(_get_needed_packages(json_file))
57
58    return needed_packages
59
60def _get_json_file(module_path):
61    """
62    Returns the path of the JSON file for a module, empty if doesn't exitst.
63    """
64
65    json_file = '%s.json' % module_path.rsplit('.', 1)[0]
66    if os.path.isfile(module_path) and os.path.isfile(json_file):
67        return json_file
68    else:
69        return ''
70
71def _get_needed_packages(json_file, test=None):
72    """
73    Returns a dict with needed packages based on a JSON file.
74
75    If a test is specified it will return the dict just for that test.
76    """
77    needed_packages = {}
78
79    with open(json_file) as f:
80        test_packages = json.load(f)
81    for key,value in test_packages.items():
82        needed_packages[key] = value
83
84    if test:
85        if test in needed_packages:
86            needed_packages = needed_packages[test]
87        else:
88            needed_packages = {}
89
90    return needed_packages
91
92def extract_packages(d, needed_packages):
93    """
94    Extract packages that will be needed during runtime.
95    """
96
97    import bb
98    import oe.path
99
100    extracted_path = d.getVar('TEST_EXTRACTED_DIR')
101
102    for key,value in needed_packages.items():
103        packages = ()
104        if isinstance(value, dict):
105            packages = (value, )
106        elif isinstance(value, list):
107            packages = value
108        else:
109            bb.fatal('Failed to process needed packages for %s; '
110                     'Value must be a dict or list' % key)
111
112        for package in packages:
113            pkg = package['pkg']
114            rm = package.get('rm', False)
115            extract = package.get('extract', True)
116
117            if extract:
118                #logger.debug(1, 'Extracting %s' % pkg)
119                dst_dir = os.path.join(extracted_path, pkg)
120                # Same package used for more than one test,
121                # don't need to extract again.
122                if os.path.exists(dst_dir):
123                    continue
124
125                # Extract package and copy it to TEST_EXTRACTED_DIR
126                pkg_dir = _extract_in_tmpdir(d, pkg)
127                oe.path.copytree(pkg_dir, dst_dir)
128                shutil.rmtree(pkg_dir)
129
130            else:
131                #logger.debug(1, 'Copying %s' % pkg)
132                _copy_package(d, pkg)
133
134def _extract_in_tmpdir(d, pkg):
135    """"
136    Returns path to a temp directory where the package was
137    extracted without dependencies.
138    """
139
140    from oeqa.utils.package_manager import get_package_manager
141
142    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
143    pm = get_package_manager(d, pkg_path)
144    extract_dir = pm.extract(pkg)
145    shutil.rmtree(pkg_path)
146
147    return extract_dir
148
149def _copy_package(d, pkg):
150    """
151    Copy the RPM, DEB or IPK package to dst_dir
152    """
153
154    from oeqa.utils.package_manager import get_package_manager
155
156    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
157    dst_dir = d.getVar('TEST_PACKAGED_DIR')
158    pm = get_package_manager(d, pkg_path)
159    pkg_info = pm.package_info(pkg)
160    file_path = pkg_info[pkg]['filepath']
161    shutil.copy2(file_path, dst_dir)
162    shutil.rmtree(pkg_path)
163
164def install_package(test_case):
165    """
166    Installs package in DUT if required.
167    """
168    needed_packages = test_needs_package(test_case)
169    if needed_packages:
170        _install_uninstall_packages(needed_packages, test_case, True)
171
172def uninstall_package(test_case):
173    """
174    Uninstalls package in DUT if required.
175    """
176    needed_packages = test_needs_package(test_case)
177    if needed_packages:
178        _install_uninstall_packages(needed_packages, test_case, False)
179
180def test_needs_package(test_case):
181    """
182    Checks if a test case requires to install/uninstall packages.
183    """
184    test_file = getCaseFile(test_case)
185    json_file = _get_json_file(test_file)
186
187    if json_file:
188        test_method = getCaseMethod(test_case)
189        needed_packages = _get_needed_packages(json_file, test_method)
190        if needed_packages:
191            return needed_packages
192
193    return None
194
195def _install_uninstall_packages(needed_packages, test_case, install=True):
196    """
197    Install/Uninstall packages in the DUT without using a package manager
198    """
199
200    if isinstance(needed_packages, dict):
201        packages = [needed_packages]
202    elif isinstance(needed_packages, list):
203        packages = needed_packages
204
205    for package in packages:
206        pkg = package['pkg']
207        rm = package.get('rm', False)
208        extract = package.get('extract', True)
209        src_dir = os.path.join(test_case.tc.extract_dir, pkg)
210
211        # Install package
212        if install and extract:
213            test_case.tc.target.copyDirTo(src_dir, '/')
214
215        # Uninstall package
216        elif not install and rm:
217            test_case.tc.target.deleteDirStructure(src_dir, '/')
218