xref: /openbmc/openbmc-test-automation/lib/bmc_redfish_utils.py (revision 42c84ea5d0dd320e1a1d57bcba34fcb788c7788c)
1#!/usr/bin/env python3
2
3r"""
4BMC redfish utility functions.
5"""
6
7import json
8import re
9
10import gen_print as gp
11from robot.libraries.BuiltIn import BuiltIn
12
13MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}")
14
15
16class bmc_redfish_utils(object):
17    ROBOT_LIBRARY_SCOPE = "TEST SUITE"
18
19    def __init__(self):
20        r"""
21        Initialize the bmc_redfish_utils object.
22        """
23        # Obtain a reference to the global redfish object.
24        self.__inited__ = False
25        self._redfish_ = BuiltIn().get_library_instance("redfish")
26
27        if MTLS_ENABLED == "True":
28            self.__inited__ = True
29        else:
30            # There is a possibility that a given driver support both redfish and
31            # legacy REST.
32            self._redfish_.login()
33            self._rest_response_ = self._redfish_.get(
34                "/xyz/openbmc_project/", valid_status_codes=[200, 404]
35            )
36
37            # If REST URL /xyz/openbmc_project/ is supported.
38            if self._rest_response_.status == 200:
39                self.__inited__ = True
40
41        BuiltIn().set_global_variable(
42            "${REDFISH_REST_SUPPORTED}", self.__inited__
43        )
44
45    def get_redfish_session_info(self):
46        r"""
47        Returns redfish sessions info dictionary.
48
49        {
50            'key': 'yLXotJnrh5nDhXj5lLiH' ,
51            'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0'
52        }
53        """
54        session_dict = {
55            "key": self._redfish_.get_session_key(),
56            "location": self._redfish_.get_session_location(),
57        }
58        return session_dict
59
60    def get_attribute(self, resource_path, attribute, verify=None):
61        r"""
62        Get resource attribute.
63
64        Description of argument(s):
65        resource_path               URI resource absolute path (e.g.
66                                    "/redfish/v1/Systems/1").
67        attribute                   Name of the attribute (e.g. 'PowerState').
68        """
69
70        resp = self._redfish_.get(resource_path)
71
72        if verify:
73            if resp.dict[attribute] == verify:
74                return resp.dict[attribute]
75            else:
76                raise ValueError("Attribute value is not equal")
77        elif attribute in resp.dict:
78            return resp.dict[attribute]
79
80        return None
81
82    def get_properties(self, resource_path):
83        r"""
84        Returns dictionary of attributes for the resource.
85
86        Description of argument(s):
87        resource_path               URI resource absolute path (e.g.
88                                    /redfish/v1/Systems/1").
89        """
90
91        resp = self._redfish_.get(resource_path)
92        return resp.dict
93
94    def get_members_uri(self, resource_path, attribute):
95        r"""
96        Returns the list of valid path which has a given attribute.
97
98        Description of argument(s):
99        resource_path            URI resource base path (e.g.
100                                 '/redfish/v1/Systems/',
101                                 '/redfish/v1/Chassis/').
102        attribute                Name of the attribute (e.g. 'PowerSupplies').
103        """
104
105        # Set quiet variable to keep subordinate get() calls quiet.
106        quiet = 1
107
108        # Get the member id list.
109        # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar']
110        resource_path_list = self.get_member_list(resource_path)
111
112        valid_path_list = []
113
114        for path_idx in resource_path_list:
115            # Get all the child object path under the member id e.g.
116            # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power']
117            child_path_list = self.list_request(path_idx)
118
119            # Iterate and check if path object has the attribute.
120            for child_path_idx in child_path_list:
121                if (
122                    ("JsonSchemas" in child_path_idx)
123                    or ("SessionService" in child_path_idx)
124                    or ("#" in child_path_idx)
125                ):
126                    continue
127                if self.get_attribute(child_path_idx, attribute):
128                    valid_path_list.append(child_path_idx)
129
130        BuiltIn().log_to_console(valid_path_list)
131        return valid_path_list
132
133    def get_endpoint_path_list(self, resource_path, end_point_prefix):
134        r"""
135        Returns list with entries ending in "/endpoint".
136
137        Description of argument(s):
138        resource_path      URI resource base path (e.g. "/redfish/v1/Chassis/").
139        end_point_prefix   Name of the endpoint (e.g. 'Power').
140
141        Find all list entries ending in "/endpoint" combination such as
142        /redfish/v1/Chassis/<foo>/Power
143        /redfish/v1/Chassis/<bar>/Power
144        """
145
146        end_point_list = self.list_request(resource_path)
147
148        # Regex to match entries ending in "/prefix" with optional underscore.
149        regex = ".*/" + end_point_prefix + "[_]?[0-9]*$"
150        return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)]
151
152    def get_target_actions(self, resource_path, target_attribute):
153        r"""
154        Returns resource target entry of the searched target attribute.
155
156        Description of argument(s):
157        resource_path      URI resource absolute path
158                           (e.g. "/redfish/v1/Systems/system").
159        target_attribute   Name of the attribute (e.g. 'ComputerSystem.Reset').
160
161        Example:
162        "Actions": {
163        "#ComputerSystem.Reset": {
164        "ResetType@Redfish.AllowableValues": [
165            "On",
166            "ForceOff",
167            "GracefulRestart",
168            "GracefulShutdown"
169        ],
170        "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
171        }
172        }
173        """
174
175        global target_list
176        target_list = []
177
178        resp_dict = self.get_attribute(resource_path, "Actions")
179        if resp_dict is None:
180            return None
181
182        # Recursively search the "target" key in the nested dictionary.
183        # Populate the target_list of target entries.
184        self.get_key_value_nested_dict(resp_dict, "target")
185        # Return the matching target URL entry.
186        for target in target_list:
187            # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
188            attribute_in_uri = target.rsplit("/", 1)[-1]
189            # attribute_in_uri "ComputerSystem.Reset"
190            if target_attribute == attribute_in_uri:
191                return target
192
193        return None
194
195    def get_member_list(self, resource_path):
196        r"""
197        Perform a GET list request and return available members entries.
198
199        Description of argument(s):
200        resource_path  URI resource absolute path
201                       (e.g. "/redfish/v1/SessionService/Sessions").
202
203        "Members": [
204            {
205             "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7"
206            }
207            {
208             "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H"
209            }
210        ],
211        """
212
213        member_list = []
214        resp_list_dict = self.get_attribute(resource_path, "Members")
215        if resp_list_dict is None:
216            return member_list
217
218        for member_id in range(0, len(resp_list_dict)):
219            member_list.append(resp_list_dict[member_id]["@odata.id"])
220
221        return member_list
222
223    def list_request(self, resource_path):
224        r"""
225        Perform a GET list request and return available resource paths.
226        Description of argument(s):
227        resource_path  URI resource absolute path
228                       (e.g. "/redfish/v1/SessionService/Sessions").
229        """
230        gp.qprint_executing(style=gp.func_line_style_short)
231        # Set quiet variable to keep subordinate get() calls quiet.
232        quiet = 1
233        self.__pending_enumeration = set()
234        self._rest_response_ = self._redfish_.get(
235            resource_path, valid_status_codes=[200, 404, 500]
236        )
237
238        # Return empty list.
239        if self._rest_response_.status != 200:
240            return self.__pending_enumeration
241        self.walk_nested_dict(self._rest_response_.dict)
242        if not self.__pending_enumeration:
243            return resource_path
244        for resource in self.__pending_enumeration.copy():
245            self._rest_response_ = self._redfish_.get(
246                resource, valid_status_codes=[200, 404, 500]
247            )
248
249            if self._rest_response_.status != 200:
250                continue
251            self.walk_nested_dict(self._rest_response_.dict)
252        return list(sorted(self.__pending_enumeration))
253
254    def enumerate_request(
255        self, resource_path, return_json=1, include_dead_resources=False
256    ):
257        r"""
258        Perform a GET enumerate request and return available resource paths.
259
260        Description of argument(s):
261        resource_path               URI resource absolute path (e.g.
262                                    "/redfish/v1/SessionService/Sessions").
263        return_json                 Indicates whether the result should be
264                                    returned as a json string or as a
265                                    dictionary.
266        include_dead_resources      Check and return a list of dead/broken URI
267                                    resources.
268        """
269
270        gp.qprint_executing(style=gp.func_line_style_short)
271
272        return_json = int(return_json)
273
274        # Set quiet variable to keep subordinate get() calls quiet.
275        quiet = 1
276
277        # Variable to hold enumerated data.
278        self.__result = {}
279
280        # Variable to hold the pending list of resources for which enumeration.
281        # is yet to be obtained.
282        self.__pending_enumeration = set()
283
284        self.__pending_enumeration.add(resource_path)
285
286        # Variable having resources for which enumeration is completed.
287        enumerated_resources = set()
288
289        if include_dead_resources:
290            dead_resources = {}
291
292        resources_to_be_enumerated = (resource_path,)
293
294        while resources_to_be_enumerated:
295            for resource in resources_to_be_enumerated:
296                # JsonSchemas, SessionService or URLs containing # are not
297                # required in enumeration.
298                # Example: '/redfish/v1/JsonSchemas/' and sub resources.
299                #          '/redfish/v1/SessionService'
300                #          '/redfish/v1/Managers/${MANAGER_ID}#/Oem'
301                if (
302                    ("JsonSchemas" in resource)
303                    or ("SessionService" in resource)
304                    or ("PostCodes" in resource)
305                    or ("Registries" in resource)
306                    or ("Journal" in resource)
307                    or ("#" in resource)
308                ):
309                    continue
310
311                self._rest_response_ = self._redfish_.get(
312                    resource, valid_status_codes=[200, 404, 405, 500]
313                )
314                # Enumeration is done for available resources ignoring the
315                # ones for which response is not obtained.
316                if self._rest_response_.status != 200:
317                    if include_dead_resources:
318                        try:
319                            dead_resources[self._rest_response_.status].append(
320                                resource
321                            )
322                        except KeyError:
323                            dead_resources[self._rest_response_.status] = [
324                                resource
325                            ]
326                    continue
327
328                self.walk_nested_dict(self._rest_response_.dict, url=resource)
329
330            enumerated_resources.update(set(resources_to_be_enumerated))
331            resources_to_be_enumerated = tuple(
332                self.__pending_enumeration - enumerated_resources
333            )
334
335        if return_json:
336            if include_dead_resources:
337                return (
338                    json.dumps(
339                        self.__result,
340                        sort_keys=True,
341                        indent=4,
342                        separators=(",", ": "),
343                    ),
344                    dead_resources,
345                )
346            else:
347                return json.dumps(
348                    self.__result,
349                    sort_keys=True,
350                    indent=4,
351                    separators=(",", ": "),
352                )
353        else:
354            if include_dead_resources:
355                return self.__result, dead_resources
356            else:
357                return self.__result
358
359    def walk_nested_dict(self, data, url=""):
360        r"""
361        Parse through the nested dictionary and get the resource id paths.
362        Description of argument(s):
363        data    Nested dictionary data from response message.
364        url     Resource for which the response is obtained in data.
365        """
366        url = url.rstrip("/")
367
368        for key, value in data.items():
369            # Recursion if nested dictionary found.
370            if isinstance(value, dict):
371                self.walk_nested_dict(value)
372            else:
373                # Value contains a list of dictionaries having member data.
374                if "Members" == key:
375                    if isinstance(value, list):
376                        for memberDict in value:
377                            if isinstance(memberDict, str):
378                                self.__pending_enumeration.add(memberDict)
379                            else:
380                                self.__pending_enumeration.add(
381                                    memberDict["@odata.id"]
382                                )
383
384                if "@odata.id" == key:
385                    value = value.rstrip("/")
386                    # Data for the given url.
387                    if value == url:
388                        self.__result[url] = data
389                    # Data still needs to be looked up,
390                    else:
391                        self.__pending_enumeration.add(value)
392
393    def get_key_value_nested_dict(self, data, key):
394        r"""
395        Parse through the nested dictionary and get the searched key value.
396
397        Description of argument(s):
398        data    Nested dictionary data from response message.
399        key     Search dictionary key element.
400        """
401
402        for k, v in data.items():
403            if isinstance(v, dict):
404                self.get_key_value_nested_dict(v, key)
405
406            if k == key:
407                target_list.append(v)
408