1#!/usr/bin/env python3 2 3r""" 4BMC redfish utility functions. 5""" 6 7import json 8import re 9from json.decoder import JSONDecodeError 10 11import gen_print as gp 12from robot.libraries.BuiltIn import BuiltIn 13 14MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}") 15 16 17class bmc_redfish_utils(object): 18 ROBOT_LIBRARY_SCOPE = "TEST SUITE" 19 20 def __init__(self): 21 r""" 22 Initialize the bmc_redfish_utils object. 23 """ 24 # Obtain a reference to the global redfish object. 25 self.__inited__ = False 26 try: 27 self._redfish_ = BuiltIn().get_library_instance("redfish") 28 except RuntimeError as e: 29 BuiltIn().log_to_console( 30 "get_library_instance: No active redfish instance found." 31 ) 32 # Handling init exception at worse to raise on error. 33 raise e 34 35 if MTLS_ENABLED == "True": 36 self.__inited__ = True 37 else: 38 # initialized if only login succeed. 39 self._redfish_.login() 40 self.__inited__ = True 41 42 def get_redfish_session_info(self): 43 r""" 44 Returns redfish sessions info dictionary. 45 46 { 47 'key': 'yLXotJnrh5nDhXj5lLiH' , 48 'location': '/redfish/v1/SessionService/Sessions/nblYY4wlz0' 49 } 50 """ 51 session_dict = { 52 "key": self._redfish_.get_session_key(), 53 "location": self._redfish_.get_session_location(), 54 } 55 return session_dict 56 57 def get_attribute(self, resource_path, attribute, verify=None): 58 r""" 59 Get resource attribute. 60 61 Description of argument(s): 62 resource_path URI resource absolute path (e.g. 63 "/redfish/v1/Systems/1"). 64 attribute Name of the attribute (e.g. 'PowerState'). 65 """ 66 try: 67 resp = self._redfish_.get(resource_path) 68 except JSONDecodeError as e: 69 BuiltIn().log_to_console( 70 "get_attribute: JSONDecodeError, re-trying" 71 ) 72 resp = self._redfish_.get(resource_path) 73 74 if verify: 75 if resp.dict[attribute] == verify: 76 return resp.dict[attribute] 77 else: 78 raise ValueError("Attribute value is not equal") 79 elif attribute in resp.dict: 80 return resp.dict[attribute] 81 82 return None 83 84 def get_properties(self, resource_path): 85 r""" 86 Returns dictionary of attributes for the resource. 87 88 Description of argument(s): 89 resource_path URI resource absolute path (e.g. 90 /redfish/v1/Systems/1"). 91 """ 92 93 resp = self._redfish_.get(resource_path) 94 return resp.dict 95 96 def get_members_uri(self, resource_path, attribute): 97 r""" 98 Returns the list of valid path which has a given attribute. 99 100 Description of argument(s): 101 resource_path URI resource base path (e.g. 102 '/redfish/v1/Systems/', 103 '/redfish/v1/Chassis/'). 104 attribute Name of the attribute (e.g. 'PowerSupplies'). 105 """ 106 107 # Set quiet variable to keep subordinate get() calls quiet. 108 quiet = 1 109 110 # Get the member id list. 111 # e.g. ['/redfish/v1/Chassis/foo', '/redfish/v1/Chassis/bar'] 112 resource_path_list = self.get_member_list(resource_path) 113 114 valid_path_list = [] 115 116 for path_idx in resource_path_list: 117 # Get all the child object path under the member id e.g. 118 # ['/redfish/v1/Chassis/foo/Power','/redfish/v1/Chassis/bar/Power'] 119 child_path_list = self.list_request(path_idx) 120 121 # Iterate and check if path object has the attribute. 122 for child_path_idx in child_path_list: 123 if ( 124 ("JsonSchemas" in child_path_idx) 125 or ("SessionService" in child_path_idx) 126 or ("#" in child_path_idx) 127 ): 128 continue 129 if self.get_attribute(child_path_idx, attribute): 130 valid_path_list.append(child_path_idx) 131 132 BuiltIn().log_to_console(valid_path_list) 133 return valid_path_list 134 135 def get_endpoint_path_list(self, resource_path, end_point_prefix): 136 r""" 137 Returns list with entries ending in "/endpoint". 138 139 Description of argument(s): 140 resource_path URI resource base path (e.g. "/redfish/v1/Chassis/"). 141 end_point_prefix Name of the endpoint (e.g. 'Power'). 142 143 Find all list entries ending in "/endpoint" combination such as 144 /redfish/v1/Chassis/<foo>/Power 145 /redfish/v1/Chassis/<bar>/Power 146 """ 147 148 end_point_list = self.list_request(resource_path) 149 150 # Regex to match entries ending in "/prefix" with optional underscore. 151 regex = ".*/" + end_point_prefix + "[_]?[0-9]*$" 152 return [x for x in end_point_list if re.match(regex, x, re.IGNORECASE)] 153 154 def get_target_actions(self, resource_path, target_attribute): 155 r""" 156 Returns resource target entry of the searched target attribute. 157 158 Description of argument(s): 159 resource_path URI resource absolute path 160 (e.g. "/redfish/v1/Systems/system"). 161 target_attribute Name of the attribute (e.g. 'ComputerSystem.Reset'). 162 163 Example: 164 "Actions": { 165 "#ComputerSystem.Reset": { 166 "ResetType@Redfish.AllowableValues": [ 167 "On", 168 "ForceOff", 169 "GracefulRestart", 170 "GracefulShutdown" 171 ], 172 "target": "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset" 173 } 174 } 175 """ 176 177 global target_list 178 target_list = [] 179 180 resp_dict = self.get_attribute(resource_path, "Actions") 181 if resp_dict is None: 182 return None 183 184 # Recursively search the "target" key in the nested dictionary. 185 # Populate the target_list of target entries. 186 self.get_key_value_nested_dict(resp_dict, "target") 187 # Return the matching target URL entry. 188 for target in target_list: 189 # target "/redfish/v1/Systems/system/Actions/ComputerSystem.Reset" 190 attribute_in_uri = target.rsplit("/", 1)[-1] 191 # attribute_in_uri "ComputerSystem.Reset" 192 if target_attribute == attribute_in_uri: 193 return target 194 195 return None 196 197 def get_member_list(self, resource_path): 198 r""" 199 Perform a GET list request and return available members entries. 200 201 Description of argument(s): 202 resource_path URI resource absolute path 203 (e.g. "/redfish/v1/SessionService/Sessions"). 204 205 "Members": [ 206 { 207 "@odata.id": "/redfish/v1/SessionService/Sessions/Z5HummWPZ7" 208 } 209 { 210 "@odata.id": "/redfish/v1/SessionService/Sessions/46CmQmEL7H" 211 } 212 ], 213 """ 214 215 member_list = [] 216 resp_list_dict = self.get_attribute(resource_path, "Members") 217 if resp_list_dict is None: 218 return member_list 219 220 for member_id in range(0, len(resp_list_dict)): 221 member_list.append(resp_list_dict[member_id]["@odata.id"]) 222 223 return member_list 224 225 def list_request(self, resource_path): 226 r""" 227 Perform a GET list request and return available resource paths. 228 Description of argument(s): 229 resource_path URI resource absolute path 230 (e.g. "/redfish/v1/SessionService/Sessions"). 231 """ 232 gp.qprint_executing(style=gp.func_line_style_short) 233 # Set quiet variable to keep subordinate get() calls quiet. 234 quiet = 1 235 self.__pending_enumeration = set() 236 self._rest_response_ = self._redfish_.get( 237 resource_path, valid_status_codes=[200, 404, 500] 238 ) 239 240 # Return empty list. 241 if self._rest_response_.status != 200: 242 return self.__pending_enumeration 243 self.walk_nested_dict(self._rest_response_.dict) 244 if not self.__pending_enumeration: 245 return resource_path 246 for resource in self.__pending_enumeration.copy(): 247 self._rest_response_ = self._redfish_.get( 248 resource, valid_status_codes=[200, 404, 500] 249 ) 250 251 if self._rest_response_.status != 200: 252 continue 253 self.walk_nested_dict(self._rest_response_.dict) 254 return list(sorted(self.__pending_enumeration)) 255 256 def enumerate_request( 257 self, resource_path, return_json=1, include_dead_resources=False 258 ): 259 r""" 260 Perform a GET enumerate request and return available resource paths. 261 262 Description of argument(s): 263 resource_path URI resource absolute path (e.g. 264 "/redfish/v1/SessionService/Sessions"). 265 return_json Indicates whether the result should be 266 returned as a json string or as a 267 dictionary. 268 include_dead_resources Check and return a list of dead/broken URI 269 resources. 270 """ 271 272 gp.qprint_executing(style=gp.func_line_style_short) 273 274 return_json = int(return_json) 275 276 # Set quiet variable to keep subordinate get() calls quiet. 277 quiet = 1 278 279 # Variable to hold enumerated data. 280 self.__result = {} 281 282 # Variable to hold the pending list of resources for which enumeration. 283 # is yet to be obtained. 284 self.__pending_enumeration = set() 285 286 self.__pending_enumeration.add(resource_path) 287 288 # Variable having resources for which enumeration is completed. 289 enumerated_resources = set() 290 291 if include_dead_resources: 292 dead_resources = {} 293 294 resources_to_be_enumerated = (resource_path,) 295 296 while resources_to_be_enumerated: 297 for resource in resources_to_be_enumerated: 298 # JsonSchemas, SessionService or URLs containing # are not 299 # required in enumeration. 300 # Example: '/redfish/v1/JsonSchemas/' and sub resources. 301 # '/redfish/v1/SessionService' 302 # '/redfish/v1/Managers/${MANAGER_ID}#/Oem' 303 if ( 304 ("JsonSchemas" in resource) 305 or ("SessionService" in resource) 306 or ("PostCodes" in resource) 307 or ("Registries" in resource) 308 or ("Journal" in resource) 309 or ("#" in resource) 310 ): 311 continue 312 313 try: 314 self._rest_response_ = self._redfish_.get( 315 resource, valid_status_codes=[200, 404, 405, 500] 316 ) 317 except JSONDecodeError as e: 318 BuiltIn().log_to_console( 319 "enumerate_request: JSONDecodeError, re-trying" 320 ) 321 self._rest_response_ = self._redfish_.get( 322 resource, valid_status_codes=[200, 404, 405, 500] 323 ) 324 325 # Enumeration is done for available resources ignoring the 326 # ones for which response is not obtained. 327 if self._rest_response_.status != 200: 328 if include_dead_resources: 329 try: 330 dead_resources[self._rest_response_.status].append( 331 resource 332 ) 333 except KeyError: 334 dead_resources[self._rest_response_.status] = [ 335 resource 336 ] 337 continue 338 339 self.walk_nested_dict(self._rest_response_.dict, url=resource) 340 341 enumerated_resources.update(set(resources_to_be_enumerated)) 342 resources_to_be_enumerated = tuple( 343 self.__pending_enumeration - enumerated_resources 344 ) 345 346 if return_json: 347 if include_dead_resources: 348 return ( 349 json.dumps( 350 self.__result, 351 sort_keys=True, 352 indent=4, 353 separators=(",", ": "), 354 ), 355 dead_resources, 356 ) 357 else: 358 return json.dumps( 359 self.__result, 360 sort_keys=True, 361 indent=4, 362 separators=(",", ": "), 363 ) 364 else: 365 if include_dead_resources: 366 return self.__result, dead_resources 367 else: 368 return self.__result 369 370 def walk_nested_dict(self, data, url=""): 371 r""" 372 Parse through the nested dictionary and get the resource id paths. 373 Description of argument(s): 374 data Nested dictionary data from response message. 375 url Resource for which the response is obtained in data. 376 """ 377 url = url.rstrip("/") 378 379 for key, value in data.items(): 380 # Recursion if nested dictionary found. 381 if isinstance(value, dict): 382 self.walk_nested_dict(value) 383 else: 384 # Value contains a list of dictionaries having member data. 385 if "Members" == key: 386 if isinstance(value, list): 387 for memberDict in value: 388 if isinstance(memberDict, str): 389 self.__pending_enumeration.add(memberDict) 390 elif ( 391 isinstance(memberDict, dict) 392 and "@odata.id" in memberDict 393 ): 394 self.__pending_enumeration.add( 395 memberDict["@odata.id"] 396 ) 397 else: 398 self.__pending_enumeration.add(memberDict[1]) 399 400 if "@odata.id" == key: 401 value = value.rstrip("/") 402 # Data for the given url. 403 if value == url: 404 self.__result[url] = data 405 # Data still needs to be looked up, 406 else: 407 self.__pending_enumeration.add(value) 408 409 def get_key_value_nested_dict(self, data, key): 410 r""" 411 Parse through the nested dictionary and get the searched key value. 412 413 Description of argument(s): 414 data Nested dictionary data from response message. 415 key Search dictionary key element. 416 """ 417 418 for k, v in data.items(): 419 if isinstance(v, dict): 420 self.get_key_value_nested_dict(v, key) 421 422 if k == key: 423 target_list.append(v) 424