1# 2# Copyright OpenEmbedded Contributors 3# 4# SPDX-License-Identifier: MIT 5# 6 7import os 8import json 9import shutil 10 11from oeqa.core.utils.test import getCaseFile, getCaseMethod 12 13def get_package_manager(d, root_path): 14 """ 15 Returns an OE package manager that can install packages in root_path. 16 """ 17 from oe.package_manager.rpm import RpmPM 18 from oe.package_manager.ipk import OpkgPM 19 from oe.package_manager.deb import DpkgPM 20 21 pkg_class = d.getVar("IMAGE_PKGTYPE") 22 if pkg_class == "rpm": 23 pm = RpmPM(d, 24 root_path, 25 d.getVar('TARGET_VENDOR'), 26 filterbydependencies=False) 27 pm.create_configs() 28 29 elif pkg_class == "ipk": 30 pm = OpkgPM(d, 31 root_path, 32 d.getVar("IPKGCONF_TARGET"), 33 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"), 34 filterbydependencies=False) 35 36 elif pkg_class == "deb": 37 pm = DpkgPM(d, 38 root_path, 39 d.getVar('PACKAGE_ARCHS'), 40 d.getVar('DPKG_ARCH'), 41 filterbydependencies=False) 42 43 pm.write_index() 44 pm.update() 45 46 return pm 47 48def find_packages_to_extract(test_suite): 49 """ 50 Returns packages to extract required by runtime tests. 51 """ 52 from oeqa.core.utils.test import getSuiteCasesFiles 53 54 needed_packages = {} 55 files = getSuiteCasesFiles(test_suite) 56 57 for f in set(files): 58 json_file = _get_json_file(f) 59 if json_file: 60 needed_packages.update(_get_needed_packages(json_file)) 61 62 return needed_packages 63 64def _get_json_file(module_path): 65 """ 66 Returns the path of the JSON file for a module, empty if doesn't exitst. 67 """ 68 69 json_file = '%s.json' % module_path.rsplit('.', 1)[0] 70 if os.path.isfile(module_path) and os.path.isfile(json_file): 71 return json_file 72 else: 73 return '' 74 75def _get_needed_packages(json_file, test=None): 76 """ 77 Returns a dict with needed packages based on a JSON file. 78 79 If a test is specified it will return the dict just for that test. 80 """ 81 needed_packages = {} 82 83 with open(json_file) as f: 84 test_packages = json.load(f) 85 for key,value in test_packages.items(): 86 needed_packages[key] = value 87 88 if test: 89 if test in needed_packages: 90 needed_packages = needed_packages[test] 91 else: 92 needed_packages = {} 93 94 return needed_packages 95 96def extract_packages(d, needed_packages): 97 """ 98 Extract packages that will be needed during runtime. 99 """ 100 101 import bb 102 import oe.path 103 104 extracted_path = d.getVar('TEST_EXTRACTED_DIR') 105 106 for key,value in needed_packages.items(): 107 packages = () 108 if isinstance(value, dict): 109 packages = (value, ) 110 elif isinstance(value, list): 111 packages = value 112 else: 113 bb.fatal('Failed to process needed packages for %s; ' 114 'Value must be a dict or list' % key) 115 116 for package in packages: 117 pkg = package['pkg'] 118 rm = package.get('rm', False) 119 extract = package.get('extract', True) 120 121 if extract: 122 #logger.debug('Extracting %s' % pkg) 123 dst_dir = os.path.join(extracted_path, pkg) 124 # Same package used for more than one test, 125 # don't need to extract again. 126 if os.path.exists(dst_dir): 127 continue 128 129 # Extract package and copy it to TEST_EXTRACTED_DIR 130 pkg_dir = _extract_in_tmpdir(d, pkg) 131 oe.path.copytree(pkg_dir, dst_dir) 132 shutil.rmtree(pkg_dir) 133 134 else: 135 #logger.debug('Copying %s' % pkg) 136 _copy_package(d, pkg) 137 138def _extract_in_tmpdir(d, pkg): 139 """" 140 Returns path to a temp directory where the package was 141 extracted without dependencies. 142 """ 143 144 from oeqa.utils.package_manager import get_package_manager 145 146 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 147 pm = get_package_manager(d, pkg_path) 148 extract_dir = pm.extract(pkg) 149 shutil.rmtree(pkg_path) 150 151 return extract_dir 152 153def _copy_package(d, pkg): 154 """ 155 Copy the RPM, DEB or IPK package to dst_dir 156 """ 157 158 from oeqa.utils.package_manager import get_package_manager 159 160 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 161 dst_dir = d.getVar('TEST_PACKAGED_DIR') 162 pm = get_package_manager(d, pkg_path) 163 pkg_info = pm.package_info(pkg) 164 file_path = pkg_info[pkg]['filepath'] 165 shutil.copy2(file_path, dst_dir) 166 shutil.rmtree(pkg_path) 167 168def install_package(test_case): 169 """ 170 Installs package in DUT if required. 171 """ 172 needed_packages = test_needs_package(test_case) 173 if needed_packages: 174 _install_uninstall_packages(needed_packages, test_case, True) 175 176def uninstall_package(test_case): 177 """ 178 Uninstalls package in DUT if required. 179 """ 180 needed_packages = test_needs_package(test_case) 181 if needed_packages: 182 _install_uninstall_packages(needed_packages, test_case, False) 183 184def test_needs_package(test_case): 185 """ 186 Checks if a test case requires to install/uninstall packages. 187 """ 188 test_file = getCaseFile(test_case) 189 json_file = _get_json_file(test_file) 190 191 if json_file: 192 test_method = getCaseMethod(test_case) 193 needed_packages = _get_needed_packages(json_file, test_method) 194 if needed_packages: 195 return needed_packages 196 197 return None 198 199def _install_uninstall_packages(needed_packages, test_case, install=True): 200 """ 201 Install/Uninstall packages in the DUT without using a package manager 202 """ 203 204 if isinstance(needed_packages, dict): 205 packages = [needed_packages] 206 elif isinstance(needed_packages, list): 207 packages = needed_packages 208 209 for package in packages: 210 pkg = package['pkg'] 211 rm = package.get('rm', False) 212 extract = package.get('extract', True) 213 src_dir = os.path.join(test_case.tc.extract_dir, pkg) 214 215 # Install package 216 if install and extract: 217 test_case.tc.target.copyDirTo(src_dir, '/') 218 219 # Uninstall package 220 elif not install and rm: 221 test_case.tc.target.deleteDirStructure(src_dir, '/') 222