1#!/usr/bin/env python3
2
3r"""
4This module provides utilities for code updates.
5"""
6
7import collections
8import os
9import re
10import sys
11import tarfile
12import time
13
14import bmc_ssh_utils as bsu
15import gen_print as gp
16import gen_robot_keyword as keyword
17import variables as var
18from robot.libraries.BuiltIn import BuiltIn
19
20robot_pgm_dir_path = os.path.dirname(__file__) + os.sep
21repo_data_path = re.sub("/lib", "/data", robot_pgm_dir_path)
22sys.path.append(repo_data_path)
23
24
25def get_bmc_firmware(image_type, sw_dict):
26    r"""
27    Get the dictionary of image based on image type like either BMC or Host.
28
29    Description of argument(s):
30    image_type                     This value is either BMC update or Host update type.
31    sw_dict                        This contain dictionary of firmware inventory properties.
32    """
33
34    temp_dict = collections.OrderedDict()
35    for key, value in sw_dict.items():
36        if value["image_type"] == image_type:
37            temp_dict[key] = value
38        else:
39            pass
40    return temp_dict
41
42
43def verify_no_duplicate_image_priorities(image_purpose):
44    r"""
45    Check that there are no active images with the same purpose and priority.
46
47    Description of argument(s):
48    image_purpose                   The purpose that images must have to be
49                                    checked for priority duplicates.
50    """
51
52    taken_priorities = {}
53    _, image_names = keyword.run_key(
54        "Get Software Objects  " + "version_type=" + image_purpose
55    )
56
57    for image_name in image_names:
58        _, image = keyword.run_key("Get Host Software Property  " + image_name)
59        if image["Activation"] != var.ACTIVE:
60            continue
61        image_priority = image["Priority"]
62        if image_priority in taken_priorities:
63            BuiltIn().fail(
64                "Found active images with the same priority.\n"
65                + gp.sprint_vars(image, taken_priorities[image_priority])
66            )
67        taken_priorities[image_priority] = image
68
69
70def get_non_running_bmc_software_object():
71    r"""
72    Get the URI to a BMC image from software that is not running on the BMC.
73    """
74
75    # Get the version of the image currently running on the BMC.
76    _, cur_img_version = keyword.run_key("Get BMC Version")
77    # Remove the surrounding double quotes from the version.
78    cur_img_version = cur_img_version.replace('"', "")
79
80    _, images = keyword.run_key(
81        "Read Properties  " + var.SOFTWARE_VERSION_URI + "enumerate"
82    )
83
84    for image_name in images:
85        _, image_properties = keyword.run_key(
86            "Get Host Software Property  " + image_name
87        )
88        if (
89            "Purpose" in image_properties
90            and "Version" in image_properties
91            and image_properties["Purpose"] != var.VERSION_PURPOSE_HOST
92            and image_properties["Version"] != cur_img_version
93        ):
94            return image_name
95    BuiltIn().fail("Did not find any non-running BMC images.")
96
97
98def delete_all_pnor_images():
99    r"""
100    Delete all PNOR images from the BMC.
101    """
102
103    keyword.run_key("Initiate Host PowerOff")
104
105    status, images = keyword.run_key(
106        "Get Software Objects  " + var.VERSION_PURPOSE_HOST
107    )
108    for image_name in images:
109        keyword.run_key(
110            "Delete Image And Verify  "
111            + image_name
112            + "  "
113            + var.VERSION_PURPOSE_HOST
114        )
115
116
117def wait_for_activation_state_change(version_id, initial_state):
118    r"""
119    Wait for the current activation state of ${version_id} to
120    change from the state provided by the calling function.
121
122    Description of argument(s):
123    version_id                      The version ID whose state change we are
124                                    waiting for.
125    initial_state                   The activation state we want to wait for.
126    """
127
128    keyword.run_key_u("Open Connection And Log In")
129    retry = 0
130    num_read_errors = 0
131    read_fail_threshold = 1
132    while retry < 60:
133        status, software_state = keyword.run_key(
134            "Read Properties  " + var.SOFTWARE_VERSION_URI + str(version_id),
135            ignore=1,
136        )
137        if status == "FAIL":
138            num_read_errors += 1
139            if num_read_errors > read_fail_threshold:
140                message = "Read errors exceeds threshold:\n " + gp.sprint_vars(
141                    num_read_errors, read_fail_threshold
142                )
143                BuiltIn().fail(message)
144            time.sleep(10)
145            continue
146
147        current_state = (software_state)["Activation"]
148        if initial_state == current_state:
149            time.sleep(10)
150            retry += 1
151            num_read_errors = 0
152        else:
153            return
154    return
155
156
157def get_latest_file(dir_path):
158    r"""
159    Get the path to the latest uploaded file.
160
161    Description of argument(s):
162    dir_path                        Path to the dir from which the name of the
163                                    last updated file or folder will be
164                                    returned to the calling function.
165    """
166
167    stdout, stderr, rc = bsu.bmc_execute_command(
168        "cd "
169        + dir_path
170        + "; stat -c '%Y %n' * |"
171        + " sort -k1,1nr | head -n 1"
172    )
173    return stdout.split(" ")[-1]
174
175
176def get_version_tar(tar_file_path):
177    r"""
178    Read the image version from the MANIFEST inside the tarball.
179
180    Description of argument(s):
181    tar_file_path                   The path to a tar file that holds the image
182                                    version inside the MANIFEST.
183    """
184
185    version = ""
186    tar = tarfile.open(tar_file_path)
187    for member in tar.getmembers():
188        BuiltIn().log_to_console(member.name)
189        if member.name != "MANIFEST":
190            continue
191        f = tar.extractfile(member)
192        content = f.read()
193        if content.find(b"version=") == -1:
194            # This tar member does not contain the version.
195            continue
196        content = content.decode("utf-8", "ignore").split("\n")
197        content = [x for x in content if "version=" in x]
198        version = content[0].split("=")[-1]
199        break
200    tar.close()
201    return version
202
203
204def get_image_version(file_path):
205    r"""
206    Read the file for a version object.
207
208    Description of argument(s):
209    file_path                       The path to a file that holds the image
210                                    version.
211    """
212
213    stdout, stderr, rc = bsu.bmc_execute_command(
214        "cat " + file_path + ' | grep "version="', ignore_err=1
215    )
216    return (stdout.split("\n")[0]).split("=")[-1]
217
218
219def get_image_purpose(file_path):
220    r"""
221    Read the file for a purpose object.
222
223    Description of argument(s):
224    file_path                       The path to a file that holds the image
225                                    purpose.
226    """
227
228    stdout, stderr, rc = bsu.bmc_execute_command(
229        "cat " + file_path + ' | grep "purpose="', ignore_err=1
230    )
231    return stdout.split("=")[-1]
232
233
234def get_image_path(image_version):
235    r"""
236    Query the upload image dir for the presence of image matching
237    the version that was read from the MANIFEST before uploading
238    the image. Based on the purpose verify the activation object
239    exists and is either READY or INVALID.
240
241    Description of argument(s):
242    image_version                   The version of the image that should match
243                                    one of the images in the upload dir.
244    """
245
246    stdout, stderr, rc = bsu.bmc_execute_command(
247        "ls -d " + var.IMAGE_UPLOAD_DIR_PATH + "*/"
248    )
249
250    image_list = stdout.split("\n")
251    retry = 0
252    while retry < 10:
253        for i in range(0, len(image_list)):
254            version = get_image_version(image_list[i] + "MANIFEST")
255            if version == image_version:
256                return image_list[i]
257        time.sleep(10)
258        retry += 1
259
260
261def verify_image_upload(image_version, timeout=3):
262    r"""
263    Verify the image was uploaded correctly and that it created
264    a valid d-bus object. If the first check for the image
265    fails, try again until we reach the timeout.
266
267    Description of argument(s):
268    image_version                   The version from the image's manifest file
269                                    (e.g. "v2.2-253-g00050f1").
270    timeout                         How long, in minutes, to keep trying to
271                                    find the image on the BMC. Default is 3 minutes.
272    """
273
274    image_path = get_image_path(image_version)
275    image_version_id = image_path.split("/")[-2]
276
277    keyword.run_key_u("Open Connection And Log In")
278    image_purpose = get_image_purpose(image_path + "MANIFEST")
279    if (
280        image_purpose == var.VERSION_PURPOSE_BMC
281        or image_purpose == var.VERSION_PURPOSE_HOST
282    ):
283        uri = var.SOFTWARE_VERSION_URI + image_version_id
284        ret_values = ""
285        for itr in range(timeout * 2):
286            status, ret_values = keyword.run_key(
287                "Read Attribute  " + uri + "  Activation"
288            )
289
290            if (
291                (ret_values == var.READY)
292                or (ret_values == var.INVALID)
293                or (ret_values == var.ACTIVE)
294            ):
295                return True, image_version_id
296            else:
297                time.sleep(30)
298
299        # If we exit the for loop, the timeout has been reached
300        gp.print_var(ret_values)
301        return False, None
302    else:
303        gp.print_var(image_purpose)
304        return False, None
305
306
307def verify_image_not_in_bmc_uploads_dir(image_version, timeout=3):
308    r"""
309    Check that an image with the given version is not unpacked inside of the
310    BMCs image uploads directory. If no image is found, retry every 30 seconds
311    until the given timeout is hit, in case the BMC takes time
312    unpacking the image.
313
314    Description of argument(s):
315    image_version                   The version of the image to look for on
316                                    the BMC.
317    timeout                         How long, in minutes, to try to find an
318                                    image on the BMC. Default is 3 minutes.
319    """
320
321    for i in range(timeout * 2):
322        stdout, stderr, rc = bsu.bmc_execute_command(
323            "ls "
324            + var.IMAGE_UPLOAD_DIR_PATH
325            + "*/MANIFEST 2>/dev/null "
326            + '| xargs grep -rl "version='
327            + image_version
328            + '"'
329        )
330        image_dir = os.path.dirname(stdout.split("\n")[0])
331        if "" != image_dir:
332            bsu.bmc_execute_command("rm -rf " + image_dir)
333            BuiltIn().fail("Found invalid BMC Image: " + image_dir)
334        time.sleep(30)
335