1import os 2import json 3import shutil 4 5from oeqa.core.utils.test import getCaseFile, getCaseMethod 6 7def get_package_manager(d, root_path): 8 """ 9 Returns an OE package manager that can install packages in root_path. 10 """ 11 from oe.package_manager import RpmPM, OpkgPM, DpkgPM 12 13 pkg_class = d.getVar("IMAGE_PKGTYPE") 14 if pkg_class == "rpm": 15 pm = RpmPM(d, 16 root_path, 17 d.getVar('TARGET_VENDOR'), 18 filterbydependencies=False) 19 pm.create_configs() 20 21 elif pkg_class == "ipk": 22 pm = OpkgPM(d, 23 root_path, 24 d.getVar("IPKGCONF_TARGET"), 25 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS")) 26 27 elif pkg_class == "deb": 28 pm = DpkgPM(d, 29 root_path, 30 d.getVar('PACKAGE_ARCHS'), 31 d.getVar('DPKG_ARCH')) 32 33 pm.write_index() 34 pm.update() 35 36 return pm 37 38def find_packages_to_extract(test_suite): 39 """ 40 Returns packages to extract required by runtime tests. 41 """ 42 from oeqa.core.utils.test import getSuiteCasesFiles 43 44 needed_packages = {} 45 files = getSuiteCasesFiles(test_suite) 46 47 for f in set(files): 48 json_file = _get_json_file(f) 49 if json_file: 50 needed_packages.update(_get_needed_packages(json_file)) 51 52 return needed_packages 53 54def _get_json_file(module_path): 55 """ 56 Returns the path of the JSON file for a module, empty if doesn't exitst. 57 """ 58 59 json_file = '%s.json' % module_path.rsplit('.', 1)[0] 60 if os.path.isfile(module_path) and os.path.isfile(json_file): 61 return json_file 62 else: 63 return '' 64 65def _get_needed_packages(json_file, test=None): 66 """ 67 Returns a dict with needed packages based on a JSON file. 68 69 If a test is specified it will return the dict just for that test. 70 """ 71 needed_packages = {} 72 73 with open(json_file) as f: 74 test_packages = json.load(f) 75 for key,value in test_packages.items(): 76 needed_packages[key] = value 77 78 if test: 79 if test in needed_packages: 80 needed_packages = needed_packages[test] 81 else: 82 needed_packages = {} 83 84 return needed_packages 85 86def extract_packages(d, needed_packages): 87 """ 88 Extract packages that will be needed during runtime. 89 """ 90 91 import bb 92 import oe.path 93 94 extracted_path = d.getVar('TEST_EXTRACTED_DIR') 95 96 for key,value in needed_packages.items(): 97 packages = () 98 if isinstance(value, dict): 99 packages = (value, ) 100 elif isinstance(value, list): 101 packages = value 102 else: 103 bb.fatal('Failed to process needed packages for %s; ' 104 'Value must be a dict or list' % key) 105 106 for package in packages: 107 pkg = package['pkg'] 108 rm = package.get('rm', False) 109 extract = package.get('extract', True) 110 111 if extract: 112 #logger.debug(1, 'Extracting %s' % pkg) 113 dst_dir = os.path.join(extracted_path, pkg) 114 # Same package used for more than one test, 115 # don't need to extract again. 116 if os.path.exists(dst_dir): 117 continue 118 119 # Extract package and copy it to TEST_EXTRACTED_DIR 120 pkg_dir = _extract_in_tmpdir(d, pkg) 121 oe.path.copytree(pkg_dir, dst_dir) 122 shutil.rmtree(pkg_dir) 123 124 else: 125 #logger.debug(1, 'Copying %s' % pkg) 126 _copy_package(d, pkg) 127 128def _extract_in_tmpdir(d, pkg): 129 """" 130 Returns path to a temp directory where the package was 131 extracted without dependencies. 132 """ 133 134 from oeqa.utils.package_manager import get_package_manager 135 136 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 137 pm = get_package_manager(d, pkg_path) 138 extract_dir = pm.extract(pkg) 139 shutil.rmtree(pkg_path) 140 141 return extract_dir 142 143def _copy_package(d, pkg): 144 """ 145 Copy the RPM, DEB or IPK package to dst_dir 146 """ 147 148 from oeqa.utils.package_manager import get_package_manager 149 150 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 151 dst_dir = d.getVar('TEST_PACKAGED_DIR') 152 pm = get_package_manager(d, pkg_path) 153 pkg_info = pm.package_info(pkg) 154 file_path = pkg_info[pkg]['filepath'] 155 shutil.copy2(file_path, dst_dir) 156 shutil.rmtree(pkg_path) 157 158def install_package(test_case): 159 """ 160 Installs package in DUT if required. 161 """ 162 needed_packages = test_needs_package(test_case) 163 if needed_packages: 164 _install_uninstall_packages(needed_packages, test_case, True) 165 166def uninstall_package(test_case): 167 """ 168 Uninstalls package in DUT if required. 169 """ 170 needed_packages = test_needs_package(test_case) 171 if needed_packages: 172 _install_uninstall_packages(needed_packages, test_case, False) 173 174def test_needs_package(test_case): 175 """ 176 Checks if a test case requires to install/uninstall packages. 177 """ 178 test_file = getCaseFile(test_case) 179 json_file = _get_json_file(test_file) 180 181 if json_file: 182 test_method = getCaseMethod(test_case) 183 needed_packages = _get_needed_packages(json_file, test_method) 184 if needed_packages: 185 return needed_packages 186 187 return None 188 189def _install_uninstall_packages(needed_packages, test_case, install=True): 190 """ 191 Install/Uninstall packages in the DUT without using a package manager 192 """ 193 194 if isinstance(needed_packages, dict): 195 packages = [needed_packages] 196 elif isinstance(needed_packages, list): 197 packages = needed_packages 198 199 for package in packages: 200 pkg = package['pkg'] 201 rm = package.get('rm', False) 202 extract = package.get('extract', True) 203 src_dir = os.path.join(test_case.tc.extract_dir, pkg) 204 205 # Install package 206 if install and extract: 207 test_case.tc.target.copyDirTo(src_dir, '/') 208 209 # Uninstall package 210 elif not install and rm: 211 test_case.tc.target.deleteDirStructure(src_dir, '/') 212