1# 2# SPDX-License-Identifier: MIT 3# 4 5import os 6import json 7import shutil 8 9from oeqa.core.utils.test import getCaseFile, getCaseMethod 10 11def get_package_manager(d, root_path): 12 """ 13 Returns an OE package manager that can install packages in root_path. 14 """ 15 from oe.package_manager import RpmPM, OpkgPM, DpkgPM 16 17 pkg_class = d.getVar("IMAGE_PKGTYPE") 18 if pkg_class == "rpm": 19 pm = RpmPM(d, 20 root_path, 21 d.getVar('TARGET_VENDOR'), 22 filterbydependencies=False) 23 pm.create_configs() 24 25 elif pkg_class == "ipk": 26 pm = OpkgPM(d, 27 root_path, 28 d.getVar("IPKGCONF_TARGET"), 29 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"), 30 filterbydependencies=False) 31 32 elif pkg_class == "deb": 33 pm = DpkgPM(d, 34 root_path, 35 d.getVar('PACKAGE_ARCHS'), 36 d.getVar('DPKG_ARCH'), 37 filterbydependencies=False) 38 39 pm.write_index() 40 pm.update() 41 42 return pm 43 44def find_packages_to_extract(test_suite): 45 """ 46 Returns packages to extract required by runtime tests. 47 """ 48 from oeqa.core.utils.test import getSuiteCasesFiles 49 50 needed_packages = {} 51 files = getSuiteCasesFiles(test_suite) 52 53 for f in set(files): 54 json_file = _get_json_file(f) 55 if json_file: 56 needed_packages.update(_get_needed_packages(json_file)) 57 58 return needed_packages 59 60def _get_json_file(module_path): 61 """ 62 Returns the path of the JSON file for a module, empty if doesn't exitst. 63 """ 64 65 json_file = '%s.json' % module_path.rsplit('.', 1)[0] 66 if os.path.isfile(module_path) and os.path.isfile(json_file): 67 return json_file 68 else: 69 return '' 70 71def _get_needed_packages(json_file, test=None): 72 """ 73 Returns a dict with needed packages based on a JSON file. 74 75 If a test is specified it will return the dict just for that test. 76 """ 77 needed_packages = {} 78 79 with open(json_file) as f: 80 test_packages = json.load(f) 81 for key,value in test_packages.items(): 82 needed_packages[key] = value 83 84 if test: 85 if test in needed_packages: 86 needed_packages = needed_packages[test] 87 else: 88 needed_packages = {} 89 90 return needed_packages 91 92def extract_packages(d, needed_packages): 93 """ 94 Extract packages that will be needed during runtime. 95 """ 96 97 import bb 98 import oe.path 99 100 extracted_path = d.getVar('TEST_EXTRACTED_DIR') 101 102 for key,value in needed_packages.items(): 103 packages = () 104 if isinstance(value, dict): 105 packages = (value, ) 106 elif isinstance(value, list): 107 packages = value 108 else: 109 bb.fatal('Failed to process needed packages for %s; ' 110 'Value must be a dict or list' % key) 111 112 for package in packages: 113 pkg = package['pkg'] 114 rm = package.get('rm', False) 115 extract = package.get('extract', True) 116 117 if extract: 118 #logger.debug(1, 'Extracting %s' % pkg) 119 dst_dir = os.path.join(extracted_path, pkg) 120 # Same package used for more than one test, 121 # don't need to extract again. 122 if os.path.exists(dst_dir): 123 continue 124 125 # Extract package and copy it to TEST_EXTRACTED_DIR 126 pkg_dir = _extract_in_tmpdir(d, pkg) 127 oe.path.copytree(pkg_dir, dst_dir) 128 shutil.rmtree(pkg_dir) 129 130 else: 131 #logger.debug(1, 'Copying %s' % pkg) 132 _copy_package(d, pkg) 133 134def _extract_in_tmpdir(d, pkg): 135 """" 136 Returns path to a temp directory where the package was 137 extracted without dependencies. 138 """ 139 140 from oeqa.utils.package_manager import get_package_manager 141 142 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 143 pm = get_package_manager(d, pkg_path) 144 extract_dir = pm.extract(pkg) 145 shutil.rmtree(pkg_path) 146 147 return extract_dir 148 149def _copy_package(d, pkg): 150 """ 151 Copy the RPM, DEB or IPK package to dst_dir 152 """ 153 154 from oeqa.utils.package_manager import get_package_manager 155 156 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 157 dst_dir = d.getVar('TEST_PACKAGED_DIR') 158 pm = get_package_manager(d, pkg_path) 159 pkg_info = pm.package_info(pkg) 160 file_path = pkg_info[pkg]['filepath'] 161 shutil.copy2(file_path, dst_dir) 162 shutil.rmtree(pkg_path) 163 164def install_package(test_case): 165 """ 166 Installs package in DUT if required. 167 """ 168 needed_packages = test_needs_package(test_case) 169 if needed_packages: 170 _install_uninstall_packages(needed_packages, test_case, True) 171 172def uninstall_package(test_case): 173 """ 174 Uninstalls package in DUT if required. 175 """ 176 needed_packages = test_needs_package(test_case) 177 if needed_packages: 178 _install_uninstall_packages(needed_packages, test_case, False) 179 180def test_needs_package(test_case): 181 """ 182 Checks if a test case requires to install/uninstall packages. 183 """ 184 test_file = getCaseFile(test_case) 185 json_file = _get_json_file(test_file) 186 187 if json_file: 188 test_method = getCaseMethod(test_case) 189 needed_packages = _get_needed_packages(json_file, test_method) 190 if needed_packages: 191 return needed_packages 192 193 return None 194 195def _install_uninstall_packages(needed_packages, test_case, install=True): 196 """ 197 Install/Uninstall packages in the DUT without using a package manager 198 """ 199 200 if isinstance(needed_packages, dict): 201 packages = [needed_packages] 202 elif isinstance(needed_packages, list): 203 packages = needed_packages 204 205 for package in packages: 206 pkg = package['pkg'] 207 rm = package.get('rm', False) 208 extract = package.get('extract', True) 209 src_dir = os.path.join(test_case.tc.extract_dir, pkg) 210 211 # Install package 212 if install and extract: 213 test_case.tc.target.copyDirTo(src_dir, '/') 214 215 # Uninstall package 216 elif not install and rm: 217 test_case.tc.target.deleteDirStructure(src_dir, '/') 218