xref: /openbmc/openbmc-test-automation/lib/bmc_redfish_utils.py (revision 7049fbaf5cb3f1c7a7d73833e59cea28d3ce82fb)
1#!/usr/bin/env python3
2
3r"""
4BMC redfish utility functions.
5"""
6
7import json
8import re
9from json.decoder import JSONDecodeError
10
11import gen_print as gp
12from robot.libraries.BuiltIn import BuiltIn
13
14MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}")
15
16
17class bmc_redfish_utils(object):
18    ROBOT_LIBRARY_SCOPE = "TEST SUITE"
19
20    def __init__(self):
21        r"""
22        Initialize the bmc_redfish_utils object.
23        """
24        # Obtain a reference to the global redfish object.
25        self.__inited__ = False
26        try:
27            self._redfish_ = BuiltIn().get_library_instance("redfish")
28        except RuntimeError as e:
29            BuiltIn().log_to_console(
30                "get_library_instance: No active redfish instance found."
31            )
32            # Handling init exception at worse to raise on error.
33            raise e
34
35        if MTLS_ENABLED == "True":
36            self.__inited__ = True
37        else:
38            # There is a possibility that a given driver support both redfish and
39            # legacy REST.
40            self._redfish_.login()
41            self._rest_response_ = self._redfish_.get(
42                "/xyz/openbmc_project/", valid_status_codes=[200, 404]
43            )
44
45            # If REST URL /xyz/openbmc_project/ is supported.
46            if self._rest_response_.status == 200:
47                self.__inited__ = True
48
49        BuiltIn().set_global_variable(
50            "${REDFISH_REST_SUPPORTED}", self.__inited__
51        )
52
53    def get_redfish_session_info(self):
54        r"""
55        Returns redfish sessions info dictionary.
56
57        {
58            'key': 'yLXotJnrh5nDhXj5lLiH' ,
59            'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0'
60        }
61        """
62        session_dict = {
63            "key": self._redfish_.get_session_key(),
64            "location": self._redfish_.get_session_location(),
65        }
66        return session_dict
67
68    def get_attribute(self, resource_path, attribute, verify=None):
69        r"""
70        Get resource attribute.
71
72        Description of argument(s):
73        resource_path               URI resource absolute path (e.g.
74                                    "/redfish/v1/Systems/1").
75        attribute                   Name of the attribute (e.g. 'PowerState').
76        """
77        try:
78            resp = self._redfish_.get(resource_path)
79        except JSONDecodeError as e:
80            BuiltIn().log_to_console(
81                "get_attribute: JSONDecodeError, re-trying"
82            )
83            resp = self._redfish_.get(resource_path)
84
85        if verify:
86            if resp.dict[attribute] == verify:
87                return resp.dict[attribute]
88            else:
89                raise ValueError("Attribute value is not equal")
90        elif attribute in resp.dict:
91            return resp.dict[attribute]
92
93        return None
94
95    def get_properties(self, resource_path):
96        r"""
97        Returns dictionary of attributes for the resource.
98
99        Description of argument(s):
100        resource_path               URI resource absolute path (e.g.
101                                    /redfish/v1/Systems/1").
102        """
103
104        resp = self._redfish_.get(resource_path)
105        return resp.dict
106
107    def get_members_uri(self, resource_path, attribute):
108        r"""
109        Returns the list of valid path which has a given attribute.
110
111        Description of argument(s):
112        resource_path            URI resource base path (e.g.
113                                 '/redfish/v1/Systems/',
114                                 '/redfish/v1/Chassis/').
115        attribute                Name of the attribute (e.g. 'PowerSupplies').
116        """
117
118        # Set quiet variable to keep subordinate get() calls quiet.
119        quiet = 1
120
121        # Get the member id list.
122        # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar']
123        resource_path_list = self.get_member_list(resource_path)
124
125        valid_path_list = []
126
127        for path_idx in resource_path_list:
128            # Get all the child object path under the member id e.g.
129            # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power']
130            child_path_list = self.list_request(path_idx)
131
132            # Iterate and check if path object has the attribute.
133            for child_path_idx in child_path_list:
134                if (
135                    ("JsonSchemas" in child_path_idx)
136                    or ("SessionService" in child_path_idx)
137                    or ("#" in child_path_idx)
138                ):
139                    continue
140                if self.get_attribute(child_path_idx, attribute):
141                    valid_path_list.append(child_path_idx)
142
143        BuiltIn().log_to_console(valid_path_list)
144        return valid_path_list
145
146    def get_endpoint_path_list(self, resource_path, end_point_prefix):
147        r"""
148        Returns list with entries ending in "/endpoint".
149
150        Description of argument(s):
151        resource_path      URI resource base path (e.g. "/redfish/v1/Chassis/").
152        end_point_prefix   Name of the endpoint (e.g. 'Power').
153
154        Find all list entries ending in "/endpoint" combination such as
155        /redfish/v1/Chassis/<foo>/Power
156        /redfish/v1/Chassis/<bar>/Power
157        """
158
159        end_point_list = self.list_request(resource_path)
160
161        # Regex to match entries ending in "/prefix" with optional underscore.
162        regex = ".*/" + end_point_prefix + "[_]?[0-9]*$"
163        return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)]
164
165    def get_target_actions(self, resource_path, target_attribute):
166        r"""
167        Returns resource target entry of the searched target attribute.
168
169        Description of argument(s):
170        resource_path      URI resource absolute path
171                           (e.g. "/redfish/v1/Systems/system").
172        target_attribute   Name of the attribute (e.g. 'ComputerSystem.Reset').
173
174        Example:
175        "Actions": {
176        "#ComputerSystem.Reset": {
177        "ResetType@Redfish.AllowableValues": [
178            "On",
179            "ForceOff",
180            "GracefulRestart",
181            "GracefulShutdown"
182        ],
183        "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
184        }
185        }
186        """
187
188        global target_list
189        target_list = []
190
191        resp_dict = self.get_attribute(resource_path, "Actions")
192        if resp_dict is None:
193            return None
194
195        # Recursively search the "target" key in the nested dictionary.
196        # Populate the target_list of target entries.
197        self.get_key_value_nested_dict(resp_dict, "target")
198        # Return the matching target URL entry.
199        for target in target_list:
200            # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
201            attribute_in_uri = target.rsplit("/", 1)[-1]
202            # attribute_in_uri "ComputerSystem.Reset"
203            if target_attribute == attribute_in_uri:
204                return target
205
206        return None
207
208    def get_member_list(self, resource_path):
209        r"""
210        Perform a GET list request and return available members entries.
211
212        Description of argument(s):
213        resource_path  URI resource absolute path
214                       (e.g. "/redfish/v1/SessionService/Sessions").
215
216        "Members": [
217            {
218             "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7"
219            }
220            {
221             "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H"
222            }
223        ],
224        """
225
226        member_list = []
227        resp_list_dict = self.get_attribute(resource_path, "Members")
228        if resp_list_dict is None:
229            return member_list
230
231        for member_id in range(0, len(resp_list_dict)):
232            member_list.append(resp_list_dict[member_id]["@odata.id"])
233
234        return member_list
235
236    def list_request(self, resource_path):
237        r"""
238        Perform a GET list request and return available resource paths.
239        Description of argument(s):
240        resource_path  URI resource absolute path
241                       (e.g. "/redfish/v1/SessionService/Sessions").
242        """
243        gp.qprint_executing(style=gp.func_line_style_short)
244        # Set quiet variable to keep subordinate get() calls quiet.
245        quiet = 1
246        self.__pending_enumeration = set()
247        self._rest_response_ = self._redfish_.get(
248            resource_path, valid_status_codes=[200, 404, 500]
249        )
250
251        # Return empty list.
252        if self._rest_response_.status != 200:
253            return self.__pending_enumeration
254        self.walk_nested_dict(self._rest_response_.dict)
255        if not self.__pending_enumeration:
256            return resource_path
257        for resource in self.__pending_enumeration.copy():
258            self._rest_response_ = self._redfish_.get(
259                resource, valid_status_codes=[200, 404, 500]
260            )
261
262            if self._rest_response_.status != 200:
263                continue
264            self.walk_nested_dict(self._rest_response_.dict)
265        return list(sorted(self.__pending_enumeration))
266
267    def enumerate_request(
268        self, resource_path, return_json=1, include_dead_resources=False
269    ):
270        r"""
271        Perform a GET enumerate request and return available resource paths.
272
273        Description of argument(s):
274        resource_path               URI resource absolute path (e.g.
275                                    "/redfish/v1/SessionService/Sessions").
276        return_json                 Indicates whether the result should be
277                                    returned as a json string or as a
278                                    dictionary.
279        include_dead_resources      Check and return a list of dead/broken URI
280                                    resources.
281        """
282
283        gp.qprint_executing(style=gp.func_line_style_short)
284
285        return_json = int(return_json)
286
287        # Set quiet variable to keep subordinate get() calls quiet.
288        quiet = 1
289
290        # Variable to hold enumerated data.
291        self.__result = {}
292
293        # Variable to hold the pending list of resources for which enumeration.
294        # is yet to be obtained.
295        self.__pending_enumeration = set()
296
297        self.__pending_enumeration.add(resource_path)
298
299        # Variable having resources for which enumeration is completed.
300        enumerated_resources = set()
301
302        if include_dead_resources:
303            dead_resources = {}
304
305        resources_to_be_enumerated = (resource_path,)
306
307        while resources_to_be_enumerated:
308            for resource in resources_to_be_enumerated:
309                # JsonSchemas, SessionService or URLs containing # are not
310                # required in enumeration.
311                # Example: '/redfish/v1/JsonSchemas/' and sub resources.
312                #          '/redfish/v1/SessionService'
313                #          '/redfish/v1/Managers/${MANAGER_ID}#/Oem'
314                if (
315                    ("JsonSchemas" in resource)
316                    or ("SessionService" in resource)
317                    or ("PostCodes" in resource)
318                    or ("Registries" in resource)
319                    or ("Journal" in resource)
320                    or ("#" in resource)
321                ):
322                    continue
323
324                try:
325                    self._rest_response_ = self._redfish_.get(
326                        resource, valid_status_codes=[200, 404, 405, 500]
327                    )
328                except JSONDecodeError as e:
329                    BuiltIn().log_to_console(
330                        "enumerate_request: JSONDecodeError, re-trying"
331                    )
332                    self._rest_response_ = self._redfish_.get(
333                        resource, valid_status_codes=[200, 404, 405, 500]
334                    )
335
336                # Enumeration is done for available resources ignoring the
337                # ones for which response is not obtained.
338                if self._rest_response_.status != 200:
339                    if include_dead_resources:
340                        try:
341                            dead_resources[self._rest_response_.status].append(
342                                resource
343                            )
344                        except KeyError:
345                            dead_resources[self._rest_response_.status] = [
346                                resource
347                            ]
348                    continue
349
350                self.walk_nested_dict(self._rest_response_.dict, url=resource)
351
352            enumerated_resources.update(set(resources_to_be_enumerated))
353            resources_to_be_enumerated = tuple(
354                self.__pending_enumeration - enumerated_resources
355            )
356
357        if return_json:
358            if include_dead_resources:
359                return (
360                    json.dumps(
361                        self.__result,
362                        sort_keys=True,
363                        indent=4,
364                        separators=(",", ": "),
365                    ),
366                    dead_resources,
367                )
368            else:
369                return json.dumps(
370                    self.__result,
371                    sort_keys=True,
372                    indent=4,
373                    separators=(",", ": "),
374                )
375        else:
376            if include_dead_resources:
377                return self.__result, dead_resources
378            else:
379                return self.__result
380
381    def walk_nested_dict(self, data, url=""):
382        r"""
383        Parse through the nested dictionary and get the resource id paths.
384        Description of argument(s):
385        data    Nested dictionary data from response message.
386        url     Resource for which the response is obtained in data.
387        """
388        url = url.rstrip("/")
389
390        for key, value in data.items():
391            # Recursion if nested dictionary found.
392            if isinstance(value, dict):
393                self.walk_nested_dict(value)
394            else:
395                # Value contains a list of dictionaries having member data.
396                if "Members" == key:
397                    if isinstance(value, list):
398                        for memberDict in value:
399                            if isinstance(memberDict, str):
400                                self.__pending_enumeration.add(memberDict)
401                            elif (
402                                isinstance(memberDict, dict)
403                                and "@odata.id" in memberDict
404                            ):
405                                self.__pending_enumeration.add(
406                                    memberDict["@odata.id"]
407                                )
408                            else:
409                                self.__pending_enumeration.add(memberDict[1])
410
411                if "@odata.id" == key:
412                    value = value.rstrip("/")
413                    # Data for the given url.
414                    if value == url:
415                        self.__result[url] = data
416                    # Data still needs to be looked up,
417                    else:
418                        self.__pending_enumeration.add(value)
419
420    def get_key_value_nested_dict(self, data, key):
421        r"""
422        Parse through the nested dictionary and get the searched key value.
423
424        Description of argument(s):
425        data    Nested dictionary data from response message.
426        key     Search dictionary key element.
427        """
428
429        for k, v in data.items():
430            if isinstance(v, dict):
431                self.get_key_value_nested_dict(v, key)
432
433            if k == key:
434                target_list.append(v)
435