1import os 2import json 3import shutil 4 5from oeqa.core.utils.test import getCaseFile, getCaseMethod 6 7def get_package_manager(d, root_path): 8 """ 9 Returns an OE package manager that can install packages in root_path. 10 """ 11 from oe.package_manager import RpmPM, OpkgPM, DpkgPM 12 13 pkg_class = d.getVar("IMAGE_PKGTYPE") 14 if pkg_class == "rpm": 15 pm = RpmPM(d, 16 root_path, 17 d.getVar('TARGET_VENDOR'), 18 filterbydependencies=False) 19 pm.create_configs() 20 21 elif pkg_class == "ipk": 22 pm = OpkgPM(d, 23 root_path, 24 d.getVar("IPKGCONF_TARGET"), 25 d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"), 26 filterbydependencies=False) 27 28 elif pkg_class == "deb": 29 pm = DpkgPM(d, 30 root_path, 31 d.getVar('PACKAGE_ARCHS'), 32 d.getVar('DPKG_ARCH'), 33 filterbydependencies=False) 34 35 pm.write_index() 36 pm.update() 37 38 return pm 39 40def find_packages_to_extract(test_suite): 41 """ 42 Returns packages to extract required by runtime tests. 43 """ 44 from oeqa.core.utils.test import getSuiteCasesFiles 45 46 needed_packages = {} 47 files = getSuiteCasesFiles(test_suite) 48 49 for f in set(files): 50 json_file = _get_json_file(f) 51 if json_file: 52 needed_packages.update(_get_needed_packages(json_file)) 53 54 return needed_packages 55 56def _get_json_file(module_path): 57 """ 58 Returns the path of the JSON file for a module, empty if doesn't exitst. 59 """ 60 61 json_file = '%s.json' % module_path.rsplit('.', 1)[0] 62 if os.path.isfile(module_path) and os.path.isfile(json_file): 63 return json_file 64 else: 65 return '' 66 67def _get_needed_packages(json_file, test=None): 68 """ 69 Returns a dict with needed packages based on a JSON file. 70 71 If a test is specified it will return the dict just for that test. 72 """ 73 needed_packages = {} 74 75 with open(json_file) as f: 76 test_packages = json.load(f) 77 for key,value in test_packages.items(): 78 needed_packages[key] = value 79 80 if test: 81 if test in needed_packages: 82 needed_packages = needed_packages[test] 83 else: 84 needed_packages = {} 85 86 return needed_packages 87 88def extract_packages(d, needed_packages): 89 """ 90 Extract packages that will be needed during runtime. 91 """ 92 93 import bb 94 import oe.path 95 96 extracted_path = d.getVar('TEST_EXTRACTED_DIR') 97 98 for key,value in needed_packages.items(): 99 packages = () 100 if isinstance(value, dict): 101 packages = (value, ) 102 elif isinstance(value, list): 103 packages = value 104 else: 105 bb.fatal('Failed to process needed packages for %s; ' 106 'Value must be a dict or list' % key) 107 108 for package in packages: 109 pkg = package['pkg'] 110 rm = package.get('rm', False) 111 extract = package.get('extract', True) 112 113 if extract: 114 #logger.debug(1, 'Extracting %s' % pkg) 115 dst_dir = os.path.join(extracted_path, pkg) 116 # Same package used for more than one test, 117 # don't need to extract again. 118 if os.path.exists(dst_dir): 119 continue 120 121 # Extract package and copy it to TEST_EXTRACTED_DIR 122 pkg_dir = _extract_in_tmpdir(d, pkg) 123 oe.path.copytree(pkg_dir, dst_dir) 124 shutil.rmtree(pkg_dir) 125 126 else: 127 #logger.debug(1, 'Copying %s' % pkg) 128 _copy_package(d, pkg) 129 130def _extract_in_tmpdir(d, pkg): 131 """" 132 Returns path to a temp directory where the package was 133 extracted without dependencies. 134 """ 135 136 from oeqa.utils.package_manager import get_package_manager 137 138 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 139 pm = get_package_manager(d, pkg_path) 140 extract_dir = pm.extract(pkg) 141 shutil.rmtree(pkg_path) 142 143 return extract_dir 144 145def _copy_package(d, pkg): 146 """ 147 Copy the RPM, DEB or IPK package to dst_dir 148 """ 149 150 from oeqa.utils.package_manager import get_package_manager 151 152 pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) 153 dst_dir = d.getVar('TEST_PACKAGED_DIR') 154 pm = get_package_manager(d, pkg_path) 155 pkg_info = pm.package_info(pkg) 156 file_path = pkg_info[pkg]['filepath'] 157 shutil.copy2(file_path, dst_dir) 158 shutil.rmtree(pkg_path) 159 160def install_package(test_case): 161 """ 162 Installs package in DUT if required. 163 """ 164 needed_packages = test_needs_package(test_case) 165 if needed_packages: 166 _install_uninstall_packages(needed_packages, test_case, True) 167 168def uninstall_package(test_case): 169 """ 170 Uninstalls package in DUT if required. 171 """ 172 needed_packages = test_needs_package(test_case) 173 if needed_packages: 174 _install_uninstall_packages(needed_packages, test_case, False) 175 176def test_needs_package(test_case): 177 """ 178 Checks if a test case requires to install/uninstall packages. 179 """ 180 test_file = getCaseFile(test_case) 181 json_file = _get_json_file(test_file) 182 183 if json_file: 184 test_method = getCaseMethod(test_case) 185 needed_packages = _get_needed_packages(json_file, test_method) 186 if needed_packages: 187 return needed_packages 188 189 return None 190 191def _install_uninstall_packages(needed_packages, test_case, install=True): 192 """ 193 Install/Uninstall packages in the DUT without using a package manager 194 """ 195 196 if isinstance(needed_packages, dict): 197 packages = [needed_packages] 198 elif isinstance(needed_packages, list): 199 packages = needed_packages 200 201 for package in packages: 202 pkg = package['pkg'] 203 rm = package.get('rm', False) 204 extract = package.get('extract', True) 205 src_dir = os.path.join(test_case.tc.extract_dir, pkg) 206 207 # Install package 208 if install and extract: 209 test_case.tc.target.copyDirTo(src_dir, '/') 210 211 # Uninstall package 212 elif not install and rm: 213 test_case.tc.target.deleteDirStructure(src_dir, '/') 214