1import os
2import json
3import shutil
4
5from oeqa.core.utils.test import getCaseFile, getCaseMethod
6
7def get_package_manager(d, root_path):
8    """
9    Returns an OE package manager that can install packages in root_path.
10    """
11    from oe.package_manager import RpmPM, OpkgPM, DpkgPM
12
13    pkg_class = d.getVar("IMAGE_PKGTYPE")
14    if pkg_class == "rpm":
15        pm = RpmPM(d,
16                   root_path,
17                   d.getVar('TARGET_VENDOR'),
18                   filterbydependencies=False)
19        pm.create_configs()
20
21    elif pkg_class == "ipk":
22        pm = OpkgPM(d,
23                    root_path,
24                    d.getVar("IPKGCONF_TARGET"),
25                    d.getVar("ALL_MULTILIB_PACKAGE_ARCHS"),
26                    filterbydependencies=False)
27
28    elif pkg_class == "deb":
29        pm = DpkgPM(d,
30                    root_path,
31                    d.getVar('PACKAGE_ARCHS'),
32                    d.getVar('DPKG_ARCH'),
33                    filterbydependencies=False)
34
35    pm.write_index()
36    pm.update()
37
38    return pm
39
40def find_packages_to_extract(test_suite):
41    """
42    Returns packages to extract required by runtime tests.
43    """
44    from oeqa.core.utils.test import getSuiteCasesFiles
45
46    needed_packages = {}
47    files = getSuiteCasesFiles(test_suite)
48
49    for f in set(files):
50        json_file = _get_json_file(f)
51        if json_file:
52            needed_packages.update(_get_needed_packages(json_file))
53
54    return needed_packages
55
56def _get_json_file(module_path):
57    """
58    Returns the path of the JSON file for a module, empty if doesn't exitst.
59    """
60
61    json_file = '%s.json' % module_path.rsplit('.', 1)[0]
62    if os.path.isfile(module_path) and os.path.isfile(json_file):
63        return json_file
64    else:
65        return ''
66
67def _get_needed_packages(json_file, test=None):
68    """
69    Returns a dict with needed packages based on a JSON file.
70
71    If a test is specified it will return the dict just for that test.
72    """
73    needed_packages = {}
74
75    with open(json_file) as f:
76        test_packages = json.load(f)
77    for key,value in test_packages.items():
78        needed_packages[key] = value
79
80    if test:
81        if test in needed_packages:
82            needed_packages = needed_packages[test]
83        else:
84            needed_packages = {}
85
86    return needed_packages
87
88def extract_packages(d, needed_packages):
89    """
90    Extract packages that will be needed during runtime.
91    """
92
93    import bb
94    import oe.path
95
96    extracted_path = d.getVar('TEST_EXTRACTED_DIR')
97
98    for key,value in needed_packages.items():
99        packages = ()
100        if isinstance(value, dict):
101            packages = (value, )
102        elif isinstance(value, list):
103            packages = value
104        else:
105            bb.fatal('Failed to process needed packages for %s; '
106                     'Value must be a dict or list' % key)
107
108        for package in packages:
109            pkg = package['pkg']
110            rm = package.get('rm', False)
111            extract = package.get('extract', True)
112
113            if extract:
114                #logger.debug(1, 'Extracting %s' % pkg)
115                dst_dir = os.path.join(extracted_path, pkg)
116                # Same package used for more than one test,
117                # don't need to extract again.
118                if os.path.exists(dst_dir):
119                    continue
120
121                # Extract package and copy it to TEST_EXTRACTED_DIR
122                pkg_dir = _extract_in_tmpdir(d, pkg)
123                oe.path.copytree(pkg_dir, dst_dir)
124                shutil.rmtree(pkg_dir)
125
126            else:
127                #logger.debug(1, 'Copying %s' % pkg)
128                _copy_package(d, pkg)
129
130def _extract_in_tmpdir(d, pkg):
131    """"
132    Returns path to a temp directory where the package was
133    extracted without dependencies.
134    """
135
136    from oeqa.utils.package_manager import get_package_manager
137
138    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
139    pm = get_package_manager(d, pkg_path)
140    extract_dir = pm.extract(pkg)
141    shutil.rmtree(pkg_path)
142
143    return extract_dir
144
145def _copy_package(d, pkg):
146    """
147    Copy the RPM, DEB or IPK package to dst_dir
148    """
149
150    from oeqa.utils.package_manager import get_package_manager
151
152    pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg)
153    dst_dir = d.getVar('TEST_PACKAGED_DIR')
154    pm = get_package_manager(d, pkg_path)
155    pkg_info = pm.package_info(pkg)
156    file_path = pkg_info[pkg]['filepath']
157    shutil.copy2(file_path, dst_dir)
158    shutil.rmtree(pkg_path)
159
160def install_package(test_case):
161    """
162    Installs package in DUT if required.
163    """
164    needed_packages = test_needs_package(test_case)
165    if needed_packages:
166        _install_uninstall_packages(needed_packages, test_case, True)
167
168def uninstall_package(test_case):
169    """
170    Uninstalls package in DUT if required.
171    """
172    needed_packages = test_needs_package(test_case)
173    if needed_packages:
174        _install_uninstall_packages(needed_packages, test_case, False)
175
176def test_needs_package(test_case):
177    """
178    Checks if a test case requires to install/uninstall packages.
179    """
180    test_file = getCaseFile(test_case)
181    json_file = _get_json_file(test_file)
182
183    if json_file:
184        test_method = getCaseMethod(test_case)
185        needed_packages = _get_needed_packages(json_file, test_method)
186        if needed_packages:
187            return needed_packages
188
189    return None
190
191def _install_uninstall_packages(needed_packages, test_case, install=True):
192    """
193    Install/Uninstall packages in the DUT without using a package manager
194    """
195
196    if isinstance(needed_packages, dict):
197        packages = [needed_packages]
198    elif isinstance(needed_packages, list):
199        packages = needed_packages
200
201    for package in packages:
202        pkg = package['pkg']
203        rm = package.get('rm', False)
204        extract = package.get('extract', True)
205        src_dir = os.path.join(test_case.tc.extract_dir, pkg)
206
207        # Install package
208        if install and extract:
209            test_case.tc.target.copyDirTo(src_dir, '/')
210
211        # Uninstall package
212        elif not install and rm:
213            test_case.tc.target.deleteDirStructure(src_dir, '/')
214