xref: /openbmc/openbmc-test-automation/lib/bmc_redfish_utils.py (revision c487392554ef951ebb9a8944d54b493d9b487a4d)
1#!/usr/bin/env python3
2
3r"""
4BMC redfish utility functions.
5"""
6
7import json
8import re
9from json.decoder import JSONDecodeError
10
11import gen_print as gp
12from robot.libraries.BuiltIn import BuiltIn
13
14MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}")
15
16
17class bmc_redfish_utils(object):
18    ROBOT_LIBRARY_SCOPE = "TEST SUITE"
19
20    def __init__(self):
21        r"""
22        Initialize the bmc_redfish_utils object.
23        """
24        # Obtain a reference to the global redfish object.
25        self.__inited__ = False
26        self._redfish_ = BuiltIn().get_library_instance("redfish")
27
28        if MTLS_ENABLED == "True":
29            self.__inited__ = True
30        else:
31            # There is a possibility that a given driver support both redfish and
32            # legacy REST.
33            self._redfish_.login()
34            self._rest_response_ = self._redfish_.get(
35                "/xyz/openbmc_project/", valid_status_codes=[200, 404]
36            )
37
38            # If REST URL /xyz/openbmc_project/ is supported.
39            if self._rest_response_.status == 200:
40                self.__inited__ = True
41
42        BuiltIn().set_global_variable(
43            "${REDFISH_REST_SUPPORTED}", self.__inited__
44        )
45
46    def get_redfish_session_info(self):
47        r"""
48        Returns redfish sessions info dictionary.
49
50        {
51            'key': 'yLXotJnrh5nDhXj5lLiH' ,
52            'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0'
53        }
54        """
55        session_dict = {
56            "key": self._redfish_.get_session_key(),
57            "location": self._redfish_.get_session_location(),
58        }
59        return session_dict
60
61    def get_attribute(self, resource_path, attribute, verify=None):
62        r"""
63        Get resource attribute.
64
65        Description of argument(s):
66        resource_path               URI resource absolute path (e.g.
67                                    "/redfish/v1/Systems/1").
68        attribute                   Name of the attribute (e.g. 'PowerState').
69        """
70        try:
71            resp = self._redfish_.get(resource_path)
72        except JSONDecodeError as e:
73            BuiltIn().log_to_console(
74                "get_attribute: JSONDecodeError, re-trying"
75            )
76            resp = self._redfish_.get(resource_path)
77
78        if verify:
79            if resp.dict[attribute] == verify:
80                return resp.dict[attribute]
81            else:
82                raise ValueError("Attribute value is not equal")
83        elif attribute in resp.dict:
84            return resp.dict[attribute]
85
86        return None
87
88    def get_properties(self, resource_path):
89        r"""
90        Returns dictionary of attributes for the resource.
91
92        Description of argument(s):
93        resource_path               URI resource absolute path (e.g.
94                                    /redfish/v1/Systems/1").
95        """
96
97        resp = self._redfish_.get(resource_path)
98        return resp.dict
99
100    def get_members_uri(self, resource_path, attribute):
101        r"""
102        Returns the list of valid path which has a given attribute.
103
104        Description of argument(s):
105        resource_path            URI resource base path (e.g.
106                                 '/redfish/v1/Systems/',
107                                 '/redfish/v1/Chassis/').
108        attribute                Name of the attribute (e.g. 'PowerSupplies').
109        """
110
111        # Set quiet variable to keep subordinate get() calls quiet.
112        quiet = 1
113
114        # Get the member id list.
115        # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar']
116        resource_path_list = self.get_member_list(resource_path)
117
118        valid_path_list = []
119
120        for path_idx in resource_path_list:
121            # Get all the child object path under the member id e.g.
122            # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power']
123            child_path_list = self.list_request(path_idx)
124
125            # Iterate and check if path object has the attribute.
126            for child_path_idx in child_path_list:
127                if (
128                    ("JsonSchemas" in child_path_idx)
129                    or ("SessionService" in child_path_idx)
130                    or ("#" in child_path_idx)
131                ):
132                    continue
133                if self.get_attribute(child_path_idx, attribute):
134                    valid_path_list.append(child_path_idx)
135
136        BuiltIn().log_to_console(valid_path_list)
137        return valid_path_list
138
139    def get_endpoint_path_list(self, resource_path, end_point_prefix):
140        r"""
141        Returns list with entries ending in "/endpoint".
142
143        Description of argument(s):
144        resource_path      URI resource base path (e.g. "/redfish/v1/Chassis/").
145        end_point_prefix   Name of the endpoint (e.g. 'Power').
146
147        Find all list entries ending in "/endpoint" combination such as
148        /redfish/v1/Chassis/<foo>/Power
149        /redfish/v1/Chassis/<bar>/Power
150        """
151
152        end_point_list = self.list_request(resource_path)
153
154        # Regex to match entries ending in "/prefix" with optional underscore.
155        regex = ".*/" + end_point_prefix + "[_]?[0-9]*$"
156        return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)]
157
158    def get_target_actions(self, resource_path, target_attribute):
159        r"""
160        Returns resource target entry of the searched target attribute.
161
162        Description of argument(s):
163        resource_path      URI resource absolute path
164                           (e.g. "/redfish/v1/Systems/system").
165        target_attribute   Name of the attribute (e.g. 'ComputerSystem.Reset').
166
167        Example:
168        "Actions": {
169        "#ComputerSystem.Reset": {
170        "ResetType@Redfish.AllowableValues": [
171            "On",
172            "ForceOff",
173            "GracefulRestart",
174            "GracefulShutdown"
175        ],
176        "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
177        }
178        }
179        """
180
181        global target_list
182        target_list = []
183
184        resp_dict = self.get_attribute(resource_path, "Actions")
185        if resp_dict is None:
186            return None
187
188        # Recursively search the "target" key in the nested dictionary.
189        # Populate the target_list of target entries.
190        self.get_key_value_nested_dict(resp_dict, "target")
191        # Return the matching target URL entry.
192        for target in target_list:
193            # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
194            attribute_in_uri = target.rsplit("/", 1)[-1]
195            # attribute_in_uri "ComputerSystem.Reset"
196            if target_attribute == attribute_in_uri:
197                return target
198
199        return None
200
201    def get_member_list(self, resource_path):
202        r"""
203        Perform a GET list request and return available members entries.
204
205        Description of argument(s):
206        resource_path  URI resource absolute path
207                       (e.g. "/redfish/v1/SessionService/Sessions").
208
209        "Members": [
210            {
211             "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7"
212            }
213            {
214             "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H"
215            }
216        ],
217        """
218
219        member_list = []
220        resp_list_dict = self.get_attribute(resource_path, "Members")
221        if resp_list_dict is None:
222            return member_list
223
224        for member_id in range(0, len(resp_list_dict)):
225            member_list.append(resp_list_dict[member_id]["@odata.id"])
226
227        return member_list
228
229    def list_request(self, resource_path):
230        r"""
231        Perform a GET list request and return available resource paths.
232        Description of argument(s):
233        resource_path  URI resource absolute path
234                       (e.g. "/redfish/v1/SessionService/Sessions").
235        """
236        gp.qprint_executing(style=gp.func_line_style_short)
237        # Set quiet variable to keep subordinate get() calls quiet.
238        quiet = 1
239        self.__pending_enumeration = set()
240        self._rest_response_ = self._redfish_.get(
241            resource_path, valid_status_codes=[200, 404, 500]
242        )
243
244        # Return empty list.
245        if self._rest_response_.status != 200:
246            return self.__pending_enumeration
247        self.walk_nested_dict(self._rest_response_.dict)
248        if not self.__pending_enumeration:
249            return resource_path
250        for resource in self.__pending_enumeration.copy():
251            self._rest_response_ = self._redfish_.get(
252                resource, valid_status_codes=[200, 404, 500]
253            )
254
255            if self._rest_response_.status != 200:
256                continue
257            self.walk_nested_dict(self._rest_response_.dict)
258        return list(sorted(self.__pending_enumeration))
259
260    def enumerate_request(
261        self, resource_path, return_json=1, include_dead_resources=False
262    ):
263        r"""
264        Perform a GET enumerate request and return available resource paths.
265
266        Description of argument(s):
267        resource_path               URI resource absolute path (e.g.
268                                    "/redfish/v1/SessionService/Sessions").
269        return_json                 Indicates whether the result should be
270                                    returned as a json string or as a
271                                    dictionary.
272        include_dead_resources      Check and return a list of dead/broken URI
273                                    resources.
274        """
275
276        gp.qprint_executing(style=gp.func_line_style_short)
277
278        return_json = int(return_json)
279
280        # Set quiet variable to keep subordinate get() calls quiet.
281        quiet = 1
282
283        # Variable to hold enumerated data.
284        self.__result = {}
285
286        # Variable to hold the pending list of resources for which enumeration.
287        # is yet to be obtained.
288        self.__pending_enumeration = set()
289
290        self.__pending_enumeration.add(resource_path)
291
292        # Variable having resources for which enumeration is completed.
293        enumerated_resources = set()
294
295        if include_dead_resources:
296            dead_resources = {}
297
298        resources_to_be_enumerated = (resource_path,)
299
300        while resources_to_be_enumerated:
301            for resource in resources_to_be_enumerated:
302                # JsonSchemas, SessionService or URLs containing # are not
303                # required in enumeration.
304                # Example: '/redfish/v1/JsonSchemas/' and sub resources.
305                #          '/redfish/v1/SessionService'
306                #          '/redfish/v1/Managers/${MANAGER_ID}#/Oem'
307                if (
308                    ("JsonSchemas" in resource)
309                    or ("SessionService" in resource)
310                    or ("PostCodes" in resource)
311                    or ("Registries" in resource)
312                    or ("Journal" in resource)
313                    or ("#" in resource)
314                ):
315                    continue
316
317                try:
318                    self._rest_response_ = self._redfish_.get(
319                        resource, valid_status_codes=[200, 404, 405, 500]
320                    )
321                except JSONDecodeError as e:
322                    BuiltIn().log_to_console(
323                        "enumerate_request: JSONDecodeError, re-trying"
324                    )
325                    self._rest_response_ = self._redfish_.get(
326                        resource, valid_status_codes=[200, 404, 405, 500]
327                    )
328
329                # Enumeration is done for available resources ignoring the
330                # ones for which response is not obtained.
331                if self._rest_response_.status != 200:
332                    if include_dead_resources:
333                        try:
334                            dead_resources[self._rest_response_.status].append(
335                                resource
336                            )
337                        except KeyError:
338                            dead_resources[self._rest_response_.status] = [
339                                resource
340                            ]
341                    continue
342
343                self.walk_nested_dict(self._rest_response_.dict, url=resource)
344
345            enumerated_resources.update(set(resources_to_be_enumerated))
346            resources_to_be_enumerated = tuple(
347                self.__pending_enumeration - enumerated_resources
348            )
349
350        if return_json:
351            if include_dead_resources:
352                return (
353                    json.dumps(
354                        self.__result,
355                        sort_keys=True,
356                        indent=4,
357                        separators=(",", ": "),
358                    ),
359                    dead_resources,
360                )
361            else:
362                return json.dumps(
363                    self.__result,
364                    sort_keys=True,
365                    indent=4,
366                    separators=(",", ": "),
367                )
368        else:
369            if include_dead_resources:
370                return self.__result, dead_resources
371            else:
372                return self.__result
373
374    def walk_nested_dict(self, data, url=""):
375        r"""
376        Parse through the nested dictionary and get the resource id paths.
377        Description of argument(s):
378        data    Nested dictionary data from response message.
379        url     Resource for which the response is obtained in data.
380        """
381        url = url.rstrip("/")
382
383        for key, value in data.items():
384            # Recursion if nested dictionary found.
385            if isinstance(value, dict):
386                self.walk_nested_dict(value)
387            else:
388                # Value contains a list of dictionaries having member data.
389                if "Members" == key:
390                    if isinstance(value, list):
391                        for memberDict in value:
392                            if isinstance(memberDict, str):
393                                self.__pending_enumeration.add(memberDict)
394                            elif (
395                                isinstance(memberDict, dict)
396                                and "@odata.id" in memberDict
397                            ):
398                                self.__pending_enumeration.add(
399                                    memberDict["@odata.id"]
400                                )
401                            else:
402                                self.__pending_enumeration.add(memberDict[1])
403
404                if "@odata.id" == key:
405                    value = value.rstrip("/")
406                    # Data for the given url.
407                    if value == url:
408                        self.__result[url] = data
409                    # Data still needs to be looked up,
410                    else:
411                        self.__pending_enumeration.add(value)
412
413    def get_key_value_nested_dict(self, data, key):
414        r"""
415        Parse through the nested dictionary and get the searched key value.
416
417        Description of argument(s):
418        data    Nested dictionary data from response message.
419        key     Search dictionary key element.
420        """
421
422        for k, v in data.items():
423            if isinstance(v, dict):
424                self.get_key_value_nested_dict(v, key)
425
426            if k == key:
427                target_list.append(v)
428