1#!/usr/bin/env python3
2
3r"""
4This module provides utilities for code updates.
5"""
6
7import collections
8import os
9import re
10import sys
11import tarfile
12import time
13
14from robot.libraries.BuiltIn import BuiltIn
15
16robot_pgm_dir_path = os.path.dirname(__file__) + os.sep
17repo_data_path = re.sub("/lib", "/data", robot_pgm_dir_path)
18sys.path.append(repo_data_path)
19
20import bmc_ssh_utils as bsu  # NOQA
21import gen_print as gp  # NOQA
22import gen_robot_keyword as keyword  # NOQA
23import variables as var  # NOQA
24
25
26def get_bmc_firmware(image_type, sw_dict):
27    r"""
28    Get the dictionary of image based on image type like either BMC or Host.
29
30    Description of argument(s):
31    image_type                     This value is either BMC update or Host update type.
32    sw_dict                        This contain dictionary of firmware inventory properties.
33    """
34
35    temp_dict = collections.OrderedDict()
36    for key, value in sw_dict.items():
37        if value["image_type"] == image_type:
38            temp_dict[key] = value
39        else:
40            pass
41    return temp_dict
42
43
44def verify_no_duplicate_image_priorities(image_purpose):
45    r"""
46    Check that there are no active images with the same purpose and priority.
47
48    Description of argument(s):
49    image_purpose                   The purpose that images must have to be
50                                    checked for priority duplicates.
51    """
52
53    taken_priorities = {}
54    _, image_names = keyword.run_key(
55        "Get Software Objects  " + "version_type=" + image_purpose
56    )
57
58    for image_name in image_names:
59        _, image = keyword.run_key("Get Host Software Property  " + image_name)
60        if image["Activation"] != var.ACTIVE:
61            continue
62        image_priority = image["Priority"]
63        if image_priority in taken_priorities:
64            BuiltIn().fail(
65                "Found active images with the same priority.\n"
66                + gp.sprint_vars(image, taken_priorities[image_priority])
67            )
68        taken_priorities[image_priority] = image
69
70
71def get_non_running_bmc_software_object():
72    r"""
73    Get the URI to a BMC image from software that is not running on the BMC.
74    """
75
76    # Get the version of the image currently running on the BMC.
77    _, cur_img_version = keyword.run_key("Get BMC Version")
78    # Remove the surrounding double quotes from the version.
79    cur_img_version = cur_img_version.replace('"', "")
80
81    _, images = keyword.run_key(
82        "Read Properties  " + var.SOFTWARE_VERSION_URI + "enumerate"
83    )
84
85    for image_name in images:
86        _, image_properties = keyword.run_key(
87            "Get Host Software Property  " + image_name
88        )
89        if (
90            "Purpose" in image_properties
91            and "Version" in image_properties
92            and image_properties["Purpose"] != var.VERSION_PURPOSE_HOST
93            and image_properties["Version"] != cur_img_version
94        ):
95            return image_name
96    BuiltIn().fail("Did not find any non-running BMC images.")
97
98
99def delete_all_pnor_images():
100    r"""
101    Delete all PNOR images from the BMC.
102    """
103
104    keyword.run_key("Initiate Host PowerOff")
105
106    status, images = keyword.run_key(
107        "Get Software Objects  " + var.VERSION_PURPOSE_HOST
108    )
109    for image_name in images:
110        keyword.run_key(
111            "Delete Image And Verify  "
112            + image_name
113            + "  "
114            + var.VERSION_PURPOSE_HOST
115        )
116
117
118def wait_for_activation_state_change(version_id, initial_state):
119    r"""
120    Wait for the current activation state of ${version_id} to
121    change from the state provided by the calling function.
122
123    Description of argument(s):
124    version_id                      The version ID whose state change we are
125                                    waiting for.
126    initial_state                   The activation state we want to wait for.
127    """
128
129    keyword.run_key_u("Open Connection And Log In")
130    retry = 0
131    num_read_errors = 0
132    read_fail_threshold = 1
133    while retry < 60:
134        status, software_state = keyword.run_key(
135            "Read Properties  " + var.SOFTWARE_VERSION_URI + str(version_id),
136            ignore=1,
137        )
138        if status == "FAIL":
139            num_read_errors += 1
140            if num_read_errors > read_fail_threshold:
141                message = "Read errors exceeds threshold:\n " + gp.sprint_vars(
142                    num_read_errors, read_fail_threshold
143                )
144                BuiltIn().fail(message)
145            time.sleep(10)
146            continue
147
148        current_state = (software_state)["Activation"]
149        if initial_state == current_state:
150            time.sleep(10)
151            retry += 1
152            num_read_errors = 0
153        else:
154            return
155    return
156
157
158def get_latest_file(dir_path):
159    r"""
160    Get the path to the latest uploaded file.
161
162    Description of argument(s):
163    dir_path                        Path to the dir from which the name of the
164                                    last updated file or folder will be
165                                    returned to the calling function.
166    """
167
168    stdout, stderr, rc = bsu.bmc_execute_command(
169        "cd "
170        + dir_path
171        + "; stat -c '%Y %n' * |"
172        + " sort -k1,1nr | head -n 1"
173    )
174    return stdout.split(" ")[-1]
175
176
177def get_version_tar(tar_file_path):
178    r"""
179    Read the image version from the MANIFEST inside the tarball.
180
181    Description of argument(s):
182    tar_file_path                   The path to a tar file that holds the image
183                                    version inside the MANIFEST.
184    """
185
186    version = ""
187    tar = tarfile.open(tar_file_path)
188    for member in tar.getmembers():
189        BuiltIn().log_to_console(member.name)
190        if member.name != "MANIFEST":
191            continue
192        f = tar.extractfile(member)
193        content = f.read()
194        if content.find(b"version=") == -1:
195            # This tar member does not contain the version.
196            continue
197        content = content.decode("utf-8", "ignore").split("\n")
198        content = [x for x in content if "version=" in x]
199        version = content[0].split("=")[-1]
200        break
201    tar.close()
202    return version
203
204
205def get_image_version(file_path):
206    r"""
207    Read the file for a version object.
208
209    Description of argument(s):
210    file_path                       The path to a file that holds the image
211                                    version.
212    """
213
214    stdout, stderr, rc = bsu.bmc_execute_command(
215        "cat " + file_path + ' | grep "version="', ignore_err=1
216    )
217    return (stdout.split("\n")[0]).split("=")[-1]
218
219
220def get_image_purpose(file_path):
221    r"""
222    Read the file for a purpose object.
223
224    Description of argument(s):
225    file_path                       The path to a file that holds the image
226                                    purpose.
227    """
228
229    stdout, stderr, rc = bsu.bmc_execute_command(
230        "cat " + file_path + ' | grep "purpose="', ignore_err=1
231    )
232    return stdout.split("=")[-1]
233
234
235def get_image_path(image_version):
236    r"""
237    Query the upload image dir for the presence of image matching
238    the version that was read from the MANIFEST before uploading
239    the image. Based on the purpose verify the activation object
240    exists and is either READY or INVALID.
241
242    Description of argument(s):
243    image_version                   The version of the image that should match
244                                    one of the images in the upload dir.
245    """
246
247    stdout, stderr, rc = bsu.bmc_execute_command(
248        "ls -d " + var.IMAGE_UPLOAD_DIR_PATH + "*/"
249    )
250
251    image_list = stdout.split("\n")
252    retry = 0
253    while retry < 10:
254        for i in range(0, len(image_list)):
255            version = get_image_version(image_list[i] + "MANIFEST")
256            if version == image_version:
257                return image_list[i]
258        time.sleep(10)
259        retry += 1
260
261
262def verify_image_upload(image_version, timeout=3):
263    r"""
264    Verify the image was uploaded correctly and that it created
265    a valid d-bus object. If the first check for the image
266    fails, try again until we reach the timeout.
267
268    Description of argument(s):
269    image_version                   The version from the image's manifest file
270                                    (e.g. "v2.2-253-g00050f1").
271    timeout                         How long, in minutes, to keep trying to
272                                    find the image on the BMC. Default is 3 minutes.
273    """
274
275    image_path = get_image_path(image_version)
276    image_version_id = image_path.split("/")[-2]
277
278    keyword.run_key_u("Open Connection And Log In")
279    image_purpose = get_image_purpose(image_path + "MANIFEST")
280    if (
281        image_purpose == var.VERSION_PURPOSE_BMC
282        or image_purpose == var.VERSION_PURPOSE_HOST
283    ):
284        uri = var.SOFTWARE_VERSION_URI + image_version_id
285        ret_values = ""
286        for itr in range(timeout * 2):
287            status, ret_values = keyword.run_key(
288                "Read Attribute  " + uri + "  Activation"
289            )
290
291            if (
292                (ret_values == var.READY)
293                or (ret_values == var.INVALID)
294                or (ret_values == var.ACTIVE)
295            ):
296                return True, image_version_id
297            else:
298                time.sleep(30)
299
300        # If we exit the for loop, the timeout has been reached
301        gp.print_var(ret_values)
302        return False, None
303    else:
304        gp.print_var(image_purpose)
305        return False, None
306
307
308def verify_image_not_in_bmc_uploads_dir(image_version, timeout=3):
309    r"""
310    Check that an image with the given version is not unpacked inside of the
311    BMCs image uploads directory. If no image is found, retry every 30 seconds
312    until the given timeout is hit, in case the BMC takes time
313    unpacking the image.
314
315    Description of argument(s):
316    image_version                   The version of the image to look for on
317                                    the BMC.
318    timeout                         How long, in minutes, to try to find an
319                                    image on the BMC. Default is 3 minutes.
320    """
321
322    for i in range(timeout * 2):
323        stdout, stderr, rc = bsu.bmc_execute_command(
324            "ls "
325            + var.IMAGE_UPLOAD_DIR_PATH
326            + "*/MANIFEST 2>/dev/null "
327            + '| xargs grep -rl "version='
328            + image_version
329            + '"'
330        )
331        image_dir = os.path.dirname(stdout.split("\n")[0])
332        if "" != image_dir:
333            bsu.bmc_execute_command("rm -rf " + image_dir)
334            BuiltIn().fail("Found invalid BMC Image: " + image_dir)
335        time.sleep(30)
336