1import os
2import json
3import shutil
4
5from oeqa.core.utils.test import getCaseFile, getCaseMethod
6
7def get_package_manager(d, root_path):
8    """
9    Returns an OE package manager that can install packages in root_path.
10    """
11    from oe.package_manager import RpmPM, OpkgPM, DpkgPM
12
13    pkg_class = d.getVar("IMAGE_PKGTYPE")
14    if pkg_class == "rpm":
15        pm = RpmPM(d,
16                   root_path,
17                   d.getVar('TARGET_VENDOR'),
18                   filterbydependencies=False)
19        pm.create_configs()
20
21    elif pkg_class == "ipk":
22        pm = OpkgPM(d,
23                    root_path,
24                    d.getVar("IPKGCONF_TARGET"),
25                    d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"))
26
27    elif pkg_class == "deb":
28        pm = DpkgPM(d,
29                    root_path,
30                    d.getVar('PACKAGE_ARCHS'),
31                    d.getVar('DPKG_ARCH'))
32
33    pm.write_index()
34    pm.update()
35
36    return pm
37
38def find_packages_to_extract(test_suite):
39    """
40    Returns packages to extract required by runtime tests.
41    """
42    from oeqa.core.utils.test import getSuiteCasesFiles
43
44    needed_packages = {}
45    files = getSuiteCasesFiles(test_suite)
46
47    for f in set(files):
48        json_file = _get_json_file(f)
49        if json_file:
50            needed_packages.update(_get_needed_packages(json_file))
51
52    return needed_packages
53
54def _get_json_file(module_path):
55    """
56    Returns the path of the JSON file for a module, empty if doesn't exitst.
57    """
58
59    json_file = '%s.json' % module_path.rsplit('.', 1)[0]
60    if os.path.isfile(module_path) and os.path.isfile(json_file):
61        return json_file
62    else:
63        return ''
64
65def _get_needed_packages(json_file, test=None):
66    """
67    Returns a dict with needed packages based on a JSON file.
68
69    If a test is specified it will return the dict just for that test.
70    """
71    needed_packages = {}
72
73    with open(json_file) as f:
74        test_packages = json.load(f)
75    for key,value in test_packages.items():
76        needed_packages[key] = value
77
78    if test:
79        if test in needed_packages:
80            needed_packages = needed_packages[test]
81        else:
82            needed_packages = {}
83
84    return needed_packages
85
86def extract_packages(d, needed_packages):
87    """
88    Extract packages that will be needed during runtime.
89    """
90
91    import bb
92    import oe.path
93
94    extracted_path = d.getVar('TEST_EXTRACTED_DIR')
95
96    for key,value in needed_packages.items():
97        packages = ()
98        if isinstance(value, dict):
99            packages = (value, )
100        elif isinstance(value, list):
101            packages = value
102        else:
103            bb.fatal('Failed to process needed packages for %s; '
104                     'Value must be a dict or list' % key)
105
106        for package in packages:
107            pkg = package['pkg']
108            rm = package.get('rm', False)
109            extract = package.get('extract', True)
110
111            if extract:
112                #logger.debug(1, 'Extracting %s' % pkg)
113                dst_dir = os.path.join(extracted_path, pkg)
114                # Same package used for more than one test,
115                # don't need to extract again.
116                if os.path.exists(dst_dir):
117                    continue
118
119                # Extract package and copy it to TEST_EXTRACTED_DIR
120                pkg_dir = _extract_in_tmpdir(d, pkg)
121                oe.path.copytree(pkg_dir, dst_dir)
122                shutil.rmtree(pkg_dir)
123
124            else:
125                #logger.debug(1, 'Copying %s' % pkg)
126                _copy_package(d, pkg)
127
128def _extract_in_tmpdir(d, pkg):
129    """"
130    Returns path to a temp directory where the package was
131    extracted without dependencies.
132    """
133
134    from oeqa.utils.package_manager import get_package_manager
135
136    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
137    pm = get_package_manager(d, pkg_path)
138    extract_dir = pm.extract(pkg)
139    shutil.rmtree(pkg_path)
140
141    return extract_dir
142
143def _copy_package(d, pkg):
144    """
145    Copy the RPM, DEB or IPK package to dst_dir
146    """
147
148    from oeqa.utils.package_manager import get_package_manager
149
150    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
151    dst_dir = d.getVar('TEST_PACKAGED_DIR')
152    pm = get_package_manager(d, pkg_path)
153    pkg_info = pm.package_info(pkg)
154    file_path = pkg_info[pkg]['filepath']
155    shutil.copy2(file_path, dst_dir)
156    shutil.rmtree(pkg_path)
157
158def install_package(test_case):
159    """
160    Installs package in DUT if required.
161    """
162    needed_packages = test_needs_package(test_case)
163    if needed_packages:
164        _install_uninstall_packages(needed_packages, test_case, True)
165
166def uninstall_package(test_case):
167    """
168    Uninstalls package in DUT if required.
169    """
170    needed_packages = test_needs_package(test_case)
171    if needed_packages:
172        _install_uninstall_packages(needed_packages, test_case, False)
173
174def test_needs_package(test_case):
175    """
176    Checks if a test case requires to install/uninstall packages.
177    """
178    test_file = getCaseFile(test_case)
179    json_file = _get_json_file(test_file)
180
181    if json_file:
182        test_method = getCaseMethod(test_case)
183        needed_packages = _get_needed_packages(json_file, test_method)
184        if needed_packages:
185            return needed_packages
186
187    return None
188
189def _install_uninstall_packages(needed_packages, test_case, install=True):
190    """
191    Install/Uninstall packages in the DUT without using a package manager
192    """
193
194    if isinstance(needed_packages, dict):
195        packages = [needed_packages]
196    elif isinstance(needed_packages, list):
197        packages = needed_packages
198
199    for package in packages:
200        pkg = package['pkg']
201        rm = package.get('rm', False)
202        extract = package.get('extract', True)
203        src_dir = os.path.join(test_case.tc.extract_dir, pkg)
204
205        # Install package
206        if install and extract:
207            test_case.tc.target.copyDirTo(src_dir, '/')
208
209        # Uninstall package
210        elif not install and rm:
211            test_case.tc.target.deleteDirStructure(src_dir, '/')
212