xref: /openbmc/openbmc-test-automation/lib/bmc_redfish_utils.py (revision b4e3ad47639e6aa11987957b3cf597a39836d22e)
1#!/usr/bin/env python3
2
3r"""
4BMC redfish utility functions.
5"""
6
7import json
8import re
9from json.decoder import JSONDecodeError
10
11import gen_print as gp
12from robot.libraries.BuiltIn import BuiltIn
13
14MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}")
15
16
17class bmc_redfish_utils(object):
18    ROBOT_LIBRARY_SCOPE = "TEST SUITE"
19
20    def __init__(self):
21        r"""
22        Initialize the bmc_redfish_utils object.
23        """
24        # Obtain a reference to the global redfish object.
25        self.__inited__ = False
26        try:
27            self._redfish_ = BuiltIn().get_library_instance("redfish")
28        except RuntimeError as e:
29            BuiltIn().log_to_console(
30                "get_library_instance: No active redfish instance found."
31            )
32            # Handling init exception at worse to raise on error.
33            raise e
34
35        if MTLS_ENABLED == "True":
36            self.__inited__ = True
37        else:
38            # initialized if only login succeed.
39            self._redfish_.login()
40            self.__inited__ = True
41
42    def get_redfish_session_info(self):
43        r"""
44        Returns redfish sessions info dictionary.
45
46        {
47            'key': 'yLXotJnrh5nDhXj5lLiH' ,
48            'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0'
49        }
50        """
51        session_dict = {
52            "key": self._redfish_.get_session_key(),
53            "location": self._redfish_.get_session_location(),
54        }
55        return session_dict
56
57    def get_attribute(self, resource_path, attribute, verify=None):
58        r"""
59        Get resource attribute.
60
61        Description of argument(s):
62        resource_path               URI resource absolute path (e.g.
63                                    "/redfish/v1/Systems/1").
64        attribute                   Name of the attribute (e.g. 'PowerState').
65        """
66        try:
67            resp = self._redfish_.get(resource_path)
68        except JSONDecodeError as e:
69            BuiltIn().log_to_console(
70                "get_attribute: JSONDecodeError, re-trying"
71            )
72            resp = self._redfish_.get(resource_path)
73
74        if verify:
75            if resp.dict[attribute] == verify:
76                return resp.dict[attribute]
77            else:
78                raise ValueError("Attribute value is not equal")
79        elif attribute in resp.dict:
80            return resp.dict[attribute]
81
82        return None
83
84    def get_properties(self, resource_path):
85        r"""
86        Returns dictionary of attributes for the resource.
87
88        Description of argument(s):
89        resource_path               URI resource absolute path (e.g.
90                                    /redfish/v1/Systems/1").
91        """
92
93        resp = self._redfish_.get(resource_path)
94        return resp.dict
95
96    def get_members_uri(self, resource_path, attribute):
97        r"""
98        Returns the list of valid path which has a given attribute.
99
100        Description of argument(s):
101        resource_path            URI resource base path (e.g.
102                                 '/redfish/v1/Systems/',
103                                 '/redfish/v1/Chassis/').
104        attribute                Name of the attribute (e.g. 'PowerSupplies').
105        """
106
107        # Set quiet variable to keep subordinate get() calls quiet.
108        quiet = 1
109
110        # Get the member id list.
111        # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar']
112        resource_path_list = self.get_member_list(resource_path)
113
114        valid_path_list = []
115
116        for path_idx in resource_path_list:
117            # Get all the child object path under the member id e.g.
118            # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power']
119            child_path_list = self.list_request(path_idx)
120
121            # Iterate and check if path object has the attribute.
122            for child_path_idx in child_path_list:
123                if (
124                    ("JsonSchemas" in child_path_idx)
125                    or ("SessionService" in child_path_idx)
126                    or ("#" in child_path_idx)
127                ):
128                    continue
129                if self.get_attribute(child_path_idx, attribute):
130                    valid_path_list.append(child_path_idx)
131
132        BuiltIn().log_to_console(valid_path_list)
133        return valid_path_list
134
135    def get_endpoint_path_list(self, resource_path, end_point_prefix):
136        r"""
137        Returns list with entries ending in "/endpoint".
138
139        Description of argument(s):
140        resource_path      URI resource base path (e.g. "/redfish/v1/Chassis/").
141        end_point_prefix   Name of the endpoint (e.g. 'Power').
142
143        Find all list entries ending in "/endpoint" combination such as
144        /redfish/v1/Chassis/<foo>/Power
145        /redfish/v1/Chassis/<bar>/Power
146        """
147
148        end_point_list = self.list_request(resource_path)
149
150        # Regex to match entries ending in "/prefix" with optional underscore.
151        regex = ".*/" + end_point_prefix + "[_]?[0-9]*$"
152        return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)]
153
154    def get_target_actions(self, resource_path, target_attribute):
155        r"""
156        Returns resource target entry of the searched target attribute.
157
158        Description of argument(s):
159        resource_path      URI resource absolute path
160                           (e.g. "/redfish/v1/Systems/system").
161        target_attribute   Name of the attribute (e.g. 'ComputerSystem.Reset').
162
163        Example:
164        "Actions": {
165        "#ComputerSystem.Reset": {
166        "ResetType@Redfish.AllowableValues": [
167            "On",
168            "ForceOff",
169            "GracefulRestart",
170            "GracefulShutdown"
171        ],
172        "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
173        }
174        }
175        """
176
177        global target_list
178        target_list = []
179
180        resp_dict = self.get_attribute(resource_path, "Actions")
181        if resp_dict is None:
182            return None
183
184        # Recursively search the "target" key in the nested dictionary.
185        # Populate the target_list of target entries.
186        self.get_key_value_nested_dict(resp_dict, "target")
187        # Return the matching target URL entry.
188        for target in target_list:
189            # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset"
190            attribute_in_uri = target.rsplit("/", 1)[-1]
191            # attribute_in_uri "ComputerSystem.Reset"
192            if target_attribute == attribute_in_uri:
193                return target
194
195        return None
196
197    def get_member_list(self, resource_path):
198        r"""
199        Perform a GET list request and return available members entries.
200
201        Description of argument(s):
202        resource_path  URI resource absolute path
203                       (e.g. "/redfish/v1/SessionService/Sessions").
204
205        "Members": [
206            {
207             "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7"
208            }
209            {
210             "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H"
211            }
212        ],
213        """
214
215        member_list = []
216        resp_list_dict = self.get_attribute(resource_path, "Members")
217        if resp_list_dict is None:
218            return member_list
219
220        for member_id in range(0, len(resp_list_dict)):
221            member_list.append(resp_list_dict[member_id]["@odata.id"])
222
223        return member_list
224
225    def list_request(self, resource_path):
226        r"""
227        Perform a GET list request and return available resource paths.
228        Description of argument(s):
229        resource_path  URI resource absolute path
230                       (e.g. "/redfish/v1/SessionService/Sessions").
231        """
232        gp.qprint_executing(style=gp.func_line_style_short)
233        # Set quiet variable to keep subordinate get() calls quiet.
234        quiet = 1
235        self.__pending_enumeration = set()
236        self._rest_response_ = self._redfish_.get(
237            resource_path, valid_status_codes=[200, 404, 500]
238        )
239
240        # Return empty list.
241        if self._rest_response_.status != 200:
242            return self.__pending_enumeration
243        self.walk_nested_dict(self._rest_response_.dict)
244        if not self.__pending_enumeration:
245            return resource_path
246        for resource in self.__pending_enumeration.copy():
247            self._rest_response_ = self._redfish_.get(
248                resource, valid_status_codes=[200, 404, 500]
249            )
250
251            if self._rest_response_.status != 200:
252                continue
253            self.walk_nested_dict(self._rest_response_.dict)
254        return list(sorted(self.__pending_enumeration))
255
256    def enumerate_request(
257        self, resource_path, return_json=1, include_dead_resources=False
258    ):
259        r"""
260        Perform a GET enumerate request and return available resource paths.
261
262        Description of argument(s):
263        resource_path               URI resource absolute path (e.g.
264                                    "/redfish/v1/SessionService/Sessions").
265        return_json                 Indicates whether the result should be
266                                    returned as a json string or as a
267                                    dictionary.
268        include_dead_resources      Check and return a list of dead/broken URI
269                                    resources.
270        """
271
272        gp.qprint_executing(style=gp.func_line_style_short)
273
274        return_json = int(return_json)
275
276        # Set quiet variable to keep subordinate get() calls quiet.
277        quiet = 1
278
279        # Variable to hold enumerated data.
280        self.__result = {}
281
282        # Variable to hold the pending list of resources for which enumeration.
283        # is yet to be obtained.
284        self.__pending_enumeration = set()
285
286        self.__pending_enumeration.add(resource_path)
287
288        # Variable having resources for which enumeration is completed.
289        enumerated_resources = set()
290
291        if include_dead_resources:
292            dead_resources = {}
293
294        resources_to_be_enumerated = (resource_path,)
295
296        while resources_to_be_enumerated:
297            for resource in resources_to_be_enumerated:
298                # JsonSchemas, SessionService or URLs containing # are not
299                # required in enumeration.
300                # Example: '/redfish/v1/JsonSchemas/' and sub resources.
301                #          '/redfish/v1/SessionService'
302                #          '/redfish/v1/Managers/${MANAGER_ID}#/Oem'
303                if (
304                    ("JsonSchemas" in resource)
305                    or ("SessionService" in resource)
306                    or ("PostCodes" in resource)
307                    or ("Registries" in resource)
308                    or ("Journal" in resource)
309                    or ("#" in resource)
310                ):
311                    continue
312
313                try:
314                    self._rest_response_ = self._redfish_.get(
315                        resource, valid_status_codes=[200, 404, 405, 500]
316                    )
317                except JSONDecodeError as e:
318                    BuiltIn().log_to_console(
319                        "enumerate_request: JSONDecodeError, re-trying"
320                    )
321                    self._rest_response_ = self._redfish_.get(
322                        resource, valid_status_codes=[200, 404, 405, 500]
323                    )
324
325                # Enumeration is done for available resources ignoring the
326                # ones for which response is not obtained.
327                if self._rest_response_.status != 200:
328                    if include_dead_resources:
329                        try:
330                            dead_resources[self._rest_response_.status].append(
331                                resource
332                            )
333                        except KeyError:
334                            dead_resources[self._rest_response_.status] = [
335                                resource
336                            ]
337                    continue
338
339                self.walk_nested_dict(self._rest_response_.dict, url=resource)
340
341            enumerated_resources.update(set(resources_to_be_enumerated))
342            resources_to_be_enumerated = tuple(
343                self.__pending_enumeration - enumerated_resources
344            )
345
346        if return_json:
347            if include_dead_resources:
348                return (
349                    json.dumps(
350                        self.__result,
351                        sort_keys=True,
352                        indent=4,
353                        separators=(",", ": "),
354                    ),
355                    dead_resources,
356                )
357            else:
358                return json.dumps(
359                    self.__result,
360                    sort_keys=True,
361                    indent=4,
362                    separators=(",", ": "),
363                )
364        else:
365            if include_dead_resources:
366                return self.__result, dead_resources
367            else:
368                return self.__result
369
370    def walk_nested_dict(self, data, url=""):
371        r"""
372        Parse through the nested dictionary and get the resource id paths.
373        Description of argument(s):
374        data    Nested dictionary data from response message.
375        url     Resource for which the response is obtained in data.
376        """
377        url = url.rstrip("/")
378
379        for key, value in data.items():
380            # Recursion if nested dictionary found.
381            if isinstance(value, dict):
382                self.walk_nested_dict(value)
383            else:
384                # Value contains a list of dictionaries having member data.
385                if "Members" == key:
386                    if isinstance(value, list):
387                        for memberDict in value:
388                            if isinstance(memberDict, str):
389                                self.__pending_enumeration.add(memberDict)
390                            elif (
391                                isinstance(memberDict, dict)
392                                and "@odata.id" in memberDict
393                            ):
394                                self.__pending_enumeration.add(
395                                    memberDict["@odata.id"]
396                                )
397                            else:
398                                self.__pending_enumeration.add(memberDict[1])
399
400                if "@odata.id" == key:
401                    value = value.rstrip("/")
402                    # Data for the given url.
403                    if value == url:
404                        self.__result[url] = data
405                    # Data still needs to be looked up,
406                    else:
407                        self.__pending_enumeration.add(value)
408
409    def get_key_value_nested_dict(self, data, key):
410        r"""
411        Parse through the nested dictionary and get the searched key value.
412
413        Description of argument(s):
414        data    Nested dictionary data from response message.
415        key     Search dictionary key element.
416        """
417
418        for k, v in data.items():
419            if isinstance(v, dict):
420                self.get_key_value_nested_dict(v, key)
421
422            if k == key:
423                target_list.append(v)
424