#!/usr/bin/env python3 r""" See class prolog below for details. """ import json import re import sys from json.decoder import JSONDecodeError import func_args as fa import gen_print as gp from redfish.rest.v1 import InvalidCredentialsError from redfish_plus import redfish_plus from robot.libraries.BuiltIn import BuiltIn MTLS_ENABLED = BuiltIn().get_variable_value("${MTLS_ENABLED}") class bmc_redfish(redfish_plus): r""" bmc_redfish is a child class of redfish_plus that is designed to provide benefits specifically for using redfish to communicate with an OpenBMC. See the prologs of the methods below for details. """ def __init__(self, *args, **kwargs): r""" Do BMC-related redfish initialization. Presently, older versions of BMC code may not support redfish requests. This can lead to unsightly error text being printed out for programs that may use lib/bmc_redfish_resource.robot even though they don't necessarily intend to make redfish requests. This class method will make an attempt to tolerate this situation. At some future point, when all BMCs can be expected to support redfish, this class method may be considered for deletion. If it is deleted, the self.__inited__ test code in the login() class method below should likewise be deleted. """ self.__inited__ = False try: if MTLS_ENABLED == "True": self.__inited__ = True else: super(bmc_redfish, self).__init__(*args, **kwargs) self.__inited__ = True except ValueError as get_exception: except_type, except_value, except_traceback = sys.exc_info() regex = r"The HTTP status code was not valid:[\r\n]+status:[ ]+502" result = re.match(regex, str(except_value), flags=re.MULTILINE) if not result: gp.lprint_var(except_type) gp.lprint_varx("except_value", str(except_value)) raise (get_exception) BuiltIn().set_global_variable("${REDFISH_SUPPORTED}", self.__inited__) BuiltIn().set_global_variable("${REDFISH_REST_SUPPORTED}", True) def login(self, *args, **kwargs): r""" Assign BMC default values for username, password and auth arguments and call parent class login method. Description of argument(s): args See parent class method prolog for details. kwargs See parent class method prolog for details. """ if MTLS_ENABLED == "True": return None if not self.__inited__: message = "bmc_redfish.__init__() was never successfully run. It " message += "is likely that the target BMC firmware code level " message += "does not support redfish.\n" raise ValueError(message) # Assign default values for username, password, auth where necessary. openbmc_username = BuiltIn().get_variable_value("${OPENBMC_USERNAME}") openbmc_password = BuiltIn().get_variable_value("${OPENBMC_PASSWORD}") username, args, kwargs = fa.pop_arg(openbmc_username, *args, **kwargs) password, args, kwargs = fa.pop_arg(openbmc_password, *args, **kwargs) auth, args, kwargs = fa.pop_arg("session", *args, **kwargs) try: super(bmc_redfish, self).login( username, password, auth, *args, **kwargs ) # Handle InvalidCredentialsError. # (raise redfish.rest.v1.InvalidCredentialsError if not [200, 201, 202, 204]) except InvalidCredentialsError: except_type, except_value, except_traceback = sys.exc_info() BuiltIn().log_to_console(str(except_type)) BuiltIn().log_to_console(str(except_value)) e_message = "Re-try login due to exception and " e_message += "it is likely error response from server side." BuiltIn().log_to_console(e_message) super(bmc_redfish, self).login( username, password, auth, *args, **kwargs ) # Handle JSONDecodeError and others. except JSONDecodeError: except_type, except_value, except_traceback = sys.exc_info() BuiltIn().log_to_console(str(except_type)) BuiltIn().log_to_console(str(except_value)) e_message = "Re-try login due to JSONDecodeError exception and " e_message += "it is likely error response from server side." BuiltIn().log_to_console(e_message) super(bmc_redfish, self).login( username, password, auth, *args, **kwargs ) except ValueError: except_type, except_value, except_traceback = sys.exc_info() BuiltIn().log_to_console(str(except_type)) BuiltIn().log_to_console(str(except_value)) e_message = "Unexpected exception." BuiltIn().log_to_console(e_message) def logout(self): if MTLS_ENABLED == "True": return None else: super(bmc_redfish, self).logout() def get_properties(self, *args, **kwargs): r""" Return dictionary of attributes for a given path. The difference between calling this function and calling get() directly is that this function returns ONLY the dictionary portion of the response object. Example robot code: ${properties}= Get Properties /redfish/v1/Systems/system/ Rprint Vars properties Output: properties: [PowerState]: Off [Processors]: [@odata.id]: /redfish/v1/Systems/system/Processors [SerialNumber]: 1234567 ... Description of argument(s): args See parent class get() prolog for details. kwargs See parent class get() prolog for details. """ resp = self.get(*args, **kwargs) return resp.dict if hasattr(resp, "dict") else {} def get_attribute(self, path, attribute, default=None, *args, **kwargs): r""" Get and return the named attribute from the properties for a given path. This method has the following advantages over calling get_properties directly: - The caller can specify a default value to be returned if the attribute does not exist. Example robot code: ${attribute}= Get Attribute /redfish/v1/AccountService ... MaxPasswordLength default=600 Rprint Vars attribute Output: attribute: 31 Description of argument(s): path The path (e.g. "/redfish/v1/AccountService"). attribute The name of the attribute to be retrieved (e.g. "MaxPasswordLength"). default The default value to be returned if the attribute does not exist (e.g. ""). args See parent class get() prolog for details. kwargs See parent class get() prolog for details. """ return self.get_properties(path, *args, **kwargs).get( attribute, default ) def get_session_info(self): r""" Get and return session info as a tuple consisting of session_key and session_location. """ return self.get_session_key(), self.get_session_location() def enumerate( self, resource_path, return_json=1, include_dead_resources=False ): r""" Perform a GET enumerate request and return available resource paths. Description of argument(s): resource_path URI resource absolute path (e.g. "/redfish/v1/SessionService/Sessions"). return_json Indicates whether the result should be returned as a json string or as a dictionary. include_dead_resources Check and return a list of dead/broken URI resources. """ gp.qprint_executing(style=gp.func_line_style_short) # Set quiet variable to keep subordinate get() calls quiet. quiet = 1 self.__result = {} # Variable to hold the pending list of resources for which enumeration is yet to be obtained. self.__pending_enumeration = set() self.__pending_enumeration.add(resource_path) # Variable having resources for which enumeration is completed. enumerated_resources = set() dead_resources = {} resources_to_be_enumerated = (resource_path,) while resources_to_be_enumerated: for resource in resources_to_be_enumerated: # JsonSchemas, SessionService or URLs containing # are not required in enumeration. # Example: '/redfish/v1/JsonSchemas/' and sub resources. # '/redfish/v1/SessionService' # '/redfish/v1/Managers/${MANAGER_ID}#/Oem' if ( ("JsonSchemas" in resource) or ("SessionService" in resource) or ("#" in resource) ): continue self._rest_response_ = self.get( resource, valid_status_codes=[200, 404, 500] ) # Enumeration is done for available resources ignoring the ones for which response is not # obtained. if self._rest_response_.status != 200: if include_dead_resources: try: dead_resources[self._rest_response_.status].append( resource ) except KeyError: dead_resources[self._rest_response_.status] = [ resource ] continue self.walk_nested_dict(self._rest_response_.dict, url=resource) enumerated_resources.update(set(resources_to_be_enumerated)) resources_to_be_enumerated = tuple( self.__pending_enumeration - enumerated_resources ) if return_json: if include_dead_resources: return ( json.dumps( self.__result, sort_keys=True, indent=4, separators=(",", ": "), ), dead_resources, ) else: return json.dumps( self.__result, sort_keys=True, indent=4, separators=(",", ": "), ) else: if include_dead_resources: return self.__result, dead_resources else: return self.__result def walk_nested_dict(self, data, url=""): r""" Parse through the nested dictionary and get the resource id paths. Description of argument(s): data Nested dictionary data from response message. url Resource for which the response is obtained in data. """ url = url.rstrip("/") for key, value in data.items(): # Recursion if nested dictionary found. if isinstance(value, dict): self.walk_nested_dict(value) else: # Value contains a list of dictionaries having member data. if "Members" == key: if isinstance(value, list): for memberDict in value: self.__pending_enumeration.add( memberDict["@odata.id"] ) if "@odata.id" == key: value = value.rstrip("/") # Data for the given url. if value == url: self.__result[url] = data # Data still needs to be looked up, else: self.__pending_enumeration.add(value) def get_members_list(self, resource_path, filter=None): r""" Return members list in a given URL. Description of argument(s): resource_path URI resource absolute path (e.g. "/redfish/v1/AccountService/Accounts"). filter strings or regex /redfish/v1/AccountService/Accounts/ { "@odata.id": "/redfish/v1/AccountService/Accounts", "@odata.type": "#ManagerAccountCollection.ManagerAccountCollection", "Description": "BMC User Accounts", "Members": [ { "@odata.id": "/redfish/v1/AccountService/Accounts/root" }, { "@odata.id": "/redfish/v1/AccountService/Accounts/admin" } ], "Members@odata.count": 2, "Name": "Accounts Collection" } Return list of members if no filter is applied as: ['/redfish/v1/AccountService/Accounts/root', "/redfish/v1/AccountService/Accounts/admin"] Return list of members if filter (e.g "root") is applied as: ['/redfish/v1/AccountService/Accounts/root'] Calling from robot code: ${resp}= Redfish.Get Members List /redfish/v1/AccountService/Accounts ${resp}= Redfish.Get Members List /redfish/v1/AccountService/Accounts filter=root """ member_list = [] self._rest_response_ = self.get( resource_path, valid_status_codes=[200] ) try: for member in self._rest_response_.dict["Members"]: member_list.append(member["@odata.id"]) except KeyError: # Non Members child objects at the top level, ignore. pass # Filter elements in the list and return matched elements. if filter is not None: regex = ".*/" + filter + "[^/]*$" return [x for x in member_list if re.match(regex, x)] return member_list