1#
2# Copyright (C) 2015 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7# Provides functions to help with exporting binaries obtained from built targets
8
9import os, re, glob as g, shutil as sh,sys
10from time import sleep
11from .commands import runCmd
12from difflib import SequenceMatcher as SM
13
14try:
15    import bb
16except ImportError:
17    class my_log():
18        def __init__(self):
19            pass
20        def plain(self, msg):
21            if msg:
22                print(msg)
23        def warn(self, msg):
24            if msg:
25                print("WARNING: " + msg)
26        def fatal(self, msg):
27            if msg:
28                print("FATAL:" + msg)
29                sys.exit(1)
30    bb = my_log()
31
32
33def determine_if_poky_env():
34    """
35    used to determine if we are inside the poky env or not. Usefull for remote machine where poky is not present
36    """
37    check_env = True if ("/scripts" and "/bitbake/bin") in os.getenv("PATH") else False
38    return check_env
39
40
41def get_dest_folder(tune_features, folder_list):
42    """
43    Function to determine what rpm deploy dir to choose for a given architecture based on TUNE_FEATURES
44    """
45    features_list = tune_features.split(" ")
46    features_list.reverse()
47    features_list = "_".join(features_list)
48    match_rate = 0
49    best_match = None
50    for folder in folder_list:
51        curr_match_rate = SM(None, folder, features_list).ratio()
52        if curr_match_rate > match_rate:
53            match_rate = curr_match_rate
54            best_match = folder
55    return best_match
56
57
58def process_binaries(d, params):
59    param_list = params
60    export_env = d.getVar("TEST_EXPORT_ONLY")
61
62    def extract_binary(pth_to_pkg, dest_pth=None):
63        cpio_command = runCmd("which cpio")
64        rpm2cpio_command = runCmd("ls /usr/bin/rpm2cpio")
65        if (cpio_command.status != 0) and (rpm2cpio_command.status != 0):
66            bb.fatal("Either \"rpm2cpio\" or \"cpio\" tools are not available on your system."
67                    "All binaries extraction processes will not be available, crashing all related tests."
68                    "Please install them according to your OS recommendations") # will exit here
69        if dest_pth:
70            os.chdir(dest_pth)
71        else:
72            os.chdir("%s" % os.sep)# this is for native package
73        extract_bin_command = runCmd("%s %s | %s -idm" % (rpm2cpio_command.output, pth_to_pkg, cpio_command.output)) # semi-hardcoded because of a bug on poky's rpm2cpio
74        return extract_bin_command
75
76    if determine_if_poky_env(): # machine with poky environment
77        exportpath = d.getVar("TEST_EXPORT_DIR") if export_env else d.getVar("DEPLOY_DIR")
78        rpm_deploy_dir = d.getVar("DEPLOY_DIR_RPM")
79        arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(rpm_deploy_dir))
80        arch_rpm_dir = os.path.join(rpm_deploy_dir, arch)
81        extracted_bin_dir = os.path.join(exportpath,"binaries", arch, "extracted_binaries")
82        packaged_bin_dir = os.path.join(exportpath,"binaries", arch, "packaged_binaries")
83        # creating necessary directory structure in case testing is done in poky env.
84        if export_env == "0":
85            if not os.path.exists(extracted_bin_dir): bb.utils.mkdirhier(extracted_bin_dir)
86            if not os.path.exists(packaged_bin_dir): bb.utils.mkdirhier(packaged_bin_dir)
87
88        if param_list[3] == "native":
89            if export_env == "1": #this is a native package and we only need to copy it. no need for extraction
90                native_rpm_dir = os.path.join(rpm_deploy_dir, get_dest_folder("{} nativesdk".format(d.getVar("BUILD_SYS")), os.listdir(rpm_deploy_dir)))
91                native_rpm_file_list = [item for item in os.listdir(native_rpm_dir) if re.search("nativesdk-" + param_list[0] + "-([0-9]+\.*)", item)]
92                if not native_rpm_file_list:
93                    bb.warn("Couldn't find any version of {} native package. Related tests will most probably fail.".format(param_list[0]))
94                    return ""
95                for item in native_rpm_file_list:# will copy all versions of package. Used version will be selected on remote machine
96                    bb.plain("Copying native package file: %s" % item)
97                    sh.copy(os.path.join(rpm_deploy_dir, native_rpm_dir, item), os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries", "native"))
98            else: # nothing to do here; running tests under bitbake, so we asume native binaries are in sysroots dir.
99                if param_list[1] or param_list[4]:
100                    bb.warn("Native binary %s %s%s. Running tests under bitbake environment. Version can't be checked except when the test itself does it"
101                            " and binary can't be removed."%(param_list[0],"has assigned ver. " + param_list[1] if param_list[1] else "",
102                            ", is marked for removal" if param_list[4] else ""))
103        else:# the package is target aka DUT intended and it is either required to be delivered in an extracted form or in a packaged version
104            target_rpm_file_list = [item for item in os.listdir(arch_rpm_dir) if re.search(param_list[0] + "-([0-9]+\.*)", item)]
105            if not target_rpm_file_list:
106                bb.warn("Couldn't find any version of target package %s. Please ensure it was built. "
107                        "Related tests will probably fail." % param_list[0])
108                return ""
109            if param_list[2] == "rpm": # binary should be deployed as rpm; (other, .deb, .ipk? ;  in the near future)
110                for item in target_rpm_file_list: # copying all related rpm packages. "Intuition" reasons, someone may need other versions too. Deciding later on version
111                    bb.plain("Copying target specific packaged file: %s" % item)
112                    sh.copy(os.path.join(arch_rpm_dir, item), packaged_bin_dir)
113                    return "copied"
114            else: # it is required to extract the binary
115                if param_list[1]: # the package is versioned
116                    for item in target_rpm_file_list:
117                        if re.match(".*-{}-.*\.rpm".format(param_list[1]), item):
118                            destination = os.path.join(extracted_bin_dir,param_list[0], param_list[1])
119                            bb.utils.mkdirhier(destination)
120                            extract_binary(os.path.join(arch_rpm_dir, item), destination)
121                            break
122                    else:
123                        bb.warn("Couldn't find the desired version %s for target binary %s. Related test cases will probably fail." % (param_list[1], param_list[0]))
124                        return ""
125                    return "extracted"
126                else: # no version provided, just extract one binary
127                    destination = os.path.join(extracted_bin_dir,param_list[0],
128                                               re.search(".*-([0-9]+\.[0-9]+)-.*rpm", target_rpm_file_list[0]).group(1))
129                    bb.utils.mkdirhier(destination)
130                    extract_binary(os.path.join(arch_rpm_dir, target_rpm_file_list[0]), destination)
131                    return "extracted"
132    else: # remote machine
133        binaries_path = os.getenv("bin_dir")# in order to know where the binaries are, bin_dir is set as env. variable
134        if param_list[3] == "native": #need to extract the native pkg here
135            native_rpm_dir = os.path.join(binaries_path, "native")
136            native_rpm_file_list = os.listdir(native_rpm_dir)
137            for item in native_rpm_file_list:
138                if param_list[1] and re.match("nativesdk-{}-{}-.*\.rpm".format(param_list[0], param_list[1]), item): # native package has version
139                    extract_binary(os.path.join(native_rpm_dir, item))
140                    break
141                else:# just copy any related native binary
142                    found_version = re.match("nativesdk-{}-([0-9]+\.[0-9]+)-".format(param_list[0]), item).group(1)
143                    if found_version:
144                        extract_binary(os.path.join(native_rpm_dir, item))
145            else:
146                bb.warn("Couldn't find native package %s%s. Related test cases will be influenced." %
147                        (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
148                return
149
150        else: # this is for target device
151            if param_list[2] == "rpm":
152                return "No need to extract, this is an .rpm file"
153            arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(binaries_path))
154            extracted_bin_path = os.path.join(binaries_path, arch, "extracted_binaries")
155            extracted_bin_list = [item for item in os.listdir(extracted_bin_path)]
156            packaged_bin_path = os.path.join(binaries_path, arch, "packaged_binaries")
157            packaged_bin_file_list = os.listdir(packaged_bin_path)
158            # see if the package is already in the extracted ones; maybe it was deployed when exported the env.
159            if os.path.exists(os.path.join(extracted_bin_path, param_list[0], param_list[1] if param_list[1] else "")):
160                return "binary %s is already extracted" % param_list[0]
161            else: # we need to search for it in the packaged binaries directory. It may have been shipped after export
162                for item in packaged_bin_file_list:
163                    if param_list[1]:
164                        if re.match("%s-%s.*rpm" % (param_list[0], param_list[1]), item): # package with version
165                            if not os.path.exists(os.path.join(extracted_bin_path, param_list[0],param_list[1])):
166                                os.makedirs(os.path.join(extracted_bin_path, param_list[0], param_list[1]))
167                                extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0],param_list[1]))
168                                bb.plain("Using {} for {}".format(os.path.join(packaged_bin_path, item), param_list[0]))
169                            break
170                    else:
171                        if re.match("%s-.*rpm" % param_list[0], item):
172                            found_version = re.match(".*-([0-9]+\.[0-9]+)-", item).group(1)
173                            if not os.path.exists(os.path.join(extracted_bin_path, param_list[0], found_version)):
174                                os.makedirs(os.path.join(extracted_bin_path, param_list[0], found_version))
175                                bb.plain("Used ver. %s for %s" % (found_version, param_list[0]))
176                                extract_binary(os.path.join(packaged_bin_path, item), os.path.join(extracted_bin_path, param_list[0], found_version))
177                            break
178                else:
179                    bb.warn("Couldn't find target package %s%s. Please ensure it is available "
180                            "in either of these directories: extracted_binaries or packaged_binaries. "
181                            "Related tests will probably fail." % (param_list[0], " with version " + param_list[1] if param_list[1] else ""))
182                    return
183                return "Binary %s extracted successfully." % param_list[0]
184
185
186def files_to_copy(base_dir):
187    """
188    Produces a list of files relative to the base dir path sent as param
189    :return: the list of relative path files
190    """
191    files_list = []
192    dir_list = [base_dir]
193    count = 1
194    dir_count = 1
195    while (dir_count == 1 or dir_count != count):
196        count = dir_count
197        for dir in dir_list:
198            for item in os.listdir(dir):
199                if os.path.isdir(os.path.join(dir, item)) and os.path.join(dir, item) not in dir_list:
200                   dir_list.append(os.path.join(dir, item))
201                   dir_count = len(dir_list)
202                elif os.path.join(dir, item) not in files_list and os.path.isfile(os.path.join(dir, item)):
203                   files_list.append(os.path.join(dir, item))
204    return files_list
205
206
207def send_bin_to_DUT(d,params):
208    from oeqa.oetest import oeRuntimeTest
209    param_list = params
210    cleanup_list = list()
211    bins_dir = os.path.join(d.getVar("TEST_EXPORT_DIR"), "binaries") if determine_if_poky_env() \
212                    else os.getenv("bin_dir")
213    arch = get_dest_folder(d.getVar("TUNE_FEATURES"), os.listdir(bins_dir))
214    arch_rpms_dir = os.path.join(bins_dir, arch, "packaged_binaries")
215    extracted_bin_dir = os.path.join(bins_dir, arch, "extracted_binaries", param_list[0])
216
217    def send_extracted_binary():
218        bin_local_dir = os.path.join(extracted_bin_dir, param_list[1] if param_list[1] else os.listdir(extracted_bin_dir)[0])
219        for item in files_to_copy(bin_local_dir):
220            split_path = item.split(bin_local_dir)[1]
221            path_on_DUT = split_path if split_path[0] is "/" else "/" + split_path # create the path as on DUT; eg. /usr/bin/bin_file
222            (status, output) = oeRuntimeTest.tc.target.copy_to(item, path_on_DUT)
223            if status != 0:
224                bb.warn("Failed to copy %s binary file %s on the remote target: %s" %
225                        (param_list[0], "ver. " + param_list[1] if param_list[1] else "", d.getVar("MACHINE")))
226                return
227            if param_list[4] == "rm":
228                cleanup_list.append(path_on_DUT)
229        return cleanup_list
230
231    def send_rpm(remote_path): # if it is not required to have an extracted binary, but to send an .rpm file
232        rpm_to_send = ""
233        for item in os.listdir(arch_rpms_dir):
234            if param_list[1] and re.match("%s-%s-.*rpm"%(param_list[0], param_list[1]), item):
235                rpm_to_send = item
236                break
237            elif re.match("%s-[0-9]+\.[0-9]+-.*rpm" % param_list[0], item):
238                rpm_to_send = item
239                break
240        else:
241            bb.warn("No rpm package found for %s %s in .rpm files dir %s. Skipping deployment." %
242                    (param_list[0], "ver. " + param_list[1] if param_list[1] else "", rpms_file_dir) )
243            return
244        (status, output) = oeRuntimeTest.tc.target.copy_to(os.path.join(arch_rpms_dir, rpm_to_send), remote_path)
245        if status != 0:
246                bb.warn("Failed to copy %s on the remote target: %s" %(param_list[0], d.getVar("MACHINE")))
247                return
248        if param_list[4] == "rm":
249            cleanup_list.append(os.path.join(remote_path, rpm_to_send))
250            return cleanup_list
251
252    if param_list[2] == "rpm": # send an .rpm file
253        return send_rpm("/home/root") # rpms will be sent on home dir of remote machine
254    else:
255        return send_extracted_binary()
256
257
258def rm_bin(removal_list): # need to know both if the binary is sent archived and the path where it is sent if archived
259    from oeqa.oetest import oeRuntimeTest
260    for item in removal_list:
261        (status,output) = oeRuntimeTest.tc.target.run("rm " + item)
262        if status != 0:
263            bb.warn("Failed to remove: %s. Please ensure connection with the target device is up and running and "
264                     "you have the needed rights." % item)
265
266