1# 2# SPDX-License-Identifier: MIT 3# 4 5import os 6import json 7import shutil 8 9from oeqa.core.utils.test import getCaseFile, getCaseMethod 10 11def get_package_manager(d, root_path): 12 """ 13 Returns an OE package manager that can install packages in root_path. 14 """ 15 from oe.package_manager.rpm import RpmPM 16 from oe.package_manager.ipk import OpkgPM 17 from oe.package_manager.deb import DpkgPM 18 19 pkg_class = d.getVar("IMAGE_PKGTYPE") 20 if pkg_class == "rpm": 21 pm = RpmPM(d, 22 root_path, 23 d.getVar('TARGET_VENDOR'), 24 filterbydependencies=False) 25 pm.create_configs() 26 27 elif pkg_class == "ipk": 28 pm = OpkgPM(d, 29 root_path, 30 d.getVar("IPKGCONF_TARGET"), 31 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"), 32 filterbydependencies=False) 33 34 elif pkg_class == "deb": 35 pm = DpkgPM(d, 36 root_path, 37 d.getVar('PACKAGE_ARCHS'), 38 d.getVar('DPKG_ARCH'), 39 filterbydependencies=False) 40 41 pm.write_index() 42 pm.update() 43 44 return pm 45 46def find_packages_to_extract(test_suite): 47 """ 48 Returns packages to extract required by runtime tests. 49 """ 50 from oeqa.core.utils.test import getSuiteCasesFiles 51 52 needed_packages = {} 53 files = getSuiteCasesFiles(test_suite) 54 55 for f in set(files): 56 json_file = _get_json_file(f) 57 if json_file: 58 needed_packages.update(_get_needed_packages(json_file)) 59 60 return needed_packages 61 62def _get_json_file(module_path): 63 """ 64 Returns the path of the JSON file for a module, empty if doesn't exitst. 65 """ 66 67 json_file = '%s.json' % module_path.rsplit('.', 1)[0] 68 if os.path.isfile(module_path) and os.path.isfile(json_file): 69 return json_file 70 else: 71 return '' 72 73def _get_needed_packages(json_file, test=None): 74 """ 75 Returns a dict with needed packages based on a JSON file. 76 77 If a test is specified it will return the dict just for that test. 78 """ 79 needed_packages = {} 80 81 with open(json_file) as f: 82 test_packages = json.load(f) 83 for key,value in test_packages.items(): 84 needed_packages[key] = value 85 86 if test: 87 if test in needed_packages: 88 needed_packages = needed_packages[test] 89 else: 90 needed_packages = {} 91 92 return needed_packages 93 94def extract_packages(d, needed_packages): 95 """ 96 Extract packages that will be needed during runtime. 97 """ 98 99 import bb 100 import oe.path 101 102 extracted_path = d.getVar('TEST_EXTRACTED_DIR') 103 104 for key,value in needed_packages.items(): 105 packages = () 106 if isinstance(value, dict): 107 packages = (value, ) 108 elif isinstance(value, list): 109 packages = value 110 else: 111 bb.fatal('Failed to process needed packages for %s; ' 112 'Value must be a dict or list' % key) 113 114 for package in packages: 115 pkg = package['pkg'] 116 rm = package.get('rm', False) 117 extract = package.get('extract', True) 118 119 if extract: 120 #logger.debug('Extracting %s' % pkg) 121 dst_dir = os.path.join(extracted_path, pkg) 122 # Same package used for more than one test, 123 # don't need to extract again. 124 if os.path.exists(dst_dir): 125 continue 126 127 # Extract package and copy it to TEST_EXTRACTED_DIR 128 pkg_dir = _extract_in_tmpdir(d, pkg) 129 oe.path.copytree(pkg_dir, dst_dir) 130 shutil.rmtree(pkg_dir) 131 132 else: 133 #logger.debug('Copying %s' % pkg) 134 _copy_package(d, pkg) 135 136def _extract_in_tmpdir(d, pkg): 137 """" 138 Returns path to a temp directory where the package was 139 extracted without dependencies. 140 """ 141 142 from oeqa.utils.package_manager import get_package_manager 143 144 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 145 pm = get_package_manager(d, pkg_path) 146 extract_dir = pm.extract(pkg) 147 shutil.rmtree(pkg_path) 148 149 return extract_dir 150 151def _copy_package(d, pkg): 152 """ 153 Copy the RPM, DEB or IPK package to dst_dir 154 """ 155 156 from oeqa.utils.package_manager import get_package_manager 157 158 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 159 dst_dir = d.getVar('TEST_PACKAGED_DIR') 160 pm = get_package_manager(d, pkg_path) 161 pkg_info = pm.package_info(pkg) 162 file_path = pkg_info[pkg]['filepath'] 163 shutil.copy2(file_path, dst_dir) 164 shutil.rmtree(pkg_path) 165 166def install_package(test_case): 167 """ 168 Installs package in DUT if required. 169 """ 170 needed_packages = test_needs_package(test_case) 171 if needed_packages: 172 _install_uninstall_packages(needed_packages, test_case, True) 173 174def uninstall_package(test_case): 175 """ 176 Uninstalls package in DUT if required. 177 """ 178 needed_packages = test_needs_package(test_case) 179 if needed_packages: 180 _install_uninstall_packages(needed_packages, test_case, False) 181 182def test_needs_package(test_case): 183 """ 184 Checks if a test case requires to install/uninstall packages. 185 """ 186 test_file = getCaseFile(test_case) 187 json_file = _get_json_file(test_file) 188 189 if json_file: 190 test_method = getCaseMethod(test_case) 191 needed_packages = _get_needed_packages(json_file, test_method) 192 if needed_packages: 193 return needed_packages 194 195 return None 196 197def _install_uninstall_packages(needed_packages, test_case, install=True): 198 """ 199 Install/Uninstall packages in the DUT without using a package manager 200 """ 201 202 if isinstance(needed_packages, dict): 203 packages = [needed_packages] 204 elif isinstance(needed_packages, list): 205 packages = needed_packages 206 207 for package in packages: 208 pkg = package['pkg'] 209 rm = package.get('rm', False) 210 extract = package.get('extract', True) 211 src_dir = os.path.join(test_case.tc.extract_dir, pkg) 212 213 # Install package 214 if install and extract: 215 test_case.tc.target.copyDirTo(src_dir, '/') 216 217 # Uninstall package 218 elif not install and rm: 219 test_case.tc.target.deleteDirStructure(src_dir, '/') 220