1#!/usr/bin/env python3 2 3r""" 4PEL functions. 5""" 6 7import json 8import os 9import sys 10from datetime import datetime 11 12import bmc_ssh_utils as bsu 13import func_args as fa 14 15base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 16sys.path.append(base_path + "/data/") 17 18import pel_variables # NOQA 19 20 21class PeltoolException(Exception): 22 r""" 23 Base class for peltool related exceptions. 24 """ 25 26 def __init__(self, message): 27 self.message = message 28 super().__init__(self.message) 29 30 31def peltool( 32 option_string, peltool_extension=None, parse_json=True, **bsu_options 33): 34 r""" 35 Run peltool on the BMC with the caller's option string and return the result. 36 37 Example: 38 39 ${pel_results}= Peltool -l 40 Rprint Vars pel_results 41 42 pel_results: 43 [0x50000031]: 44 [CompID]: 0x1000 45 [PLID]: 0x50000031 46 [Subsystem]: BMC Firmware 47 [Message]: An application had an internal failure 48 [SRC]: BD8D1002 49 [Commit Time]: 02/25/2020 04:51:31 50 [Sev]: Unrecoverable Error 51 [CreatorID]: BMC 52 53 Description of argument(s): 54 option_string A string of options which are to be 55 processed by the peltool command. 56 peltool_extension Provide peltool extension format. 57 Default: None. 58 parse_json Indicates that the raw JSON data should 59 parsed into a list of dictionaries. 60 bsu_options Options to be passed directly to 61 bmc_execute_command. See its prolog for 62 details. 63 """ 64 65 bsu_options = fa.args_to_objects(bsu_options) 66 peltool_cmd = "peltool" 67 if peltool_extension: 68 peltool_cmd = peltool_cmd + peltool_extension 69 70 out_buf, _, _ = bsu.bmc_execute_command( 71 peltool_cmd + " " + option_string, **bsu_options 72 ) 73 if parse_json: 74 try: 75 return json.loads(out_buf) 76 except ValueError as e: 77 if type(out_buf) is str: 78 return out_buf 79 else: 80 print(str(e)) 81 return {} 82 return out_buf 83 84 85def get_pel_data_from_bmc( 86 include_hidden_pels=False, include_informational_pels=False 87): 88 r""" 89 Returns PEL data from BMC else throws exception. 90 91 Description of arguments: 92 include_hidden_pels True/False (default: False). 93 Set True to get hidden PELs else False. 94 include_informational_pels True/False (default: False). 95 Set True to get informational PELs else False. 96 """ 97 try: 98 pel_cmd = " -l" 99 if include_hidden_pels: 100 pel_cmd = pel_cmd + " -h" 101 if include_informational_pels: 102 pel_cmd = pel_cmd + " -f" 103 pel_data = peltool(pel_cmd) 104 if not pel_data: 105 print("No PEL data present in BMC ...") 106 except Exception as exception: 107 raise PeltoolException( 108 "Failed to get PEL data from BMC : " + str(exception) 109 ) from exception 110 return pel_data 111 112 113def verify_no_pel_exists_on_bmc(): 114 r""" 115 Verify that no PEL exists in BMC. Raise an exception if it does. 116 """ 117 118 try: 119 pel_data = get_pel_data_from_bmc() 120 121 if len(pel_data) == 0: 122 return True 123 124 print("PEL data present. \n", pel_data) 125 raise PeltoolException("PEL data present in BMC") 126 except Exception as exception: 127 raise PeltoolException( 128 "Failed to get PEL data from BMC : " + str(exception) 129 ) from exception 130 131 132def compare_pel_and_redfish_event_log(pel_record, event_record): 133 r""" 134 Compare PEL log attributes like "SRC", "Created at" with Redfish 135 event log attributes like "EventId", "Created". 136 Return False if they do not match. 137 138 Description of arguments: 139 pel_record PEL record. 140 event_record Redfish event which is equivalent of PEL record. 141 """ 142 143 try: 144 # Below is format of PEL record / event record and following 145 # i.e. "SRC", "Created at" from 146 # PEL record is compared with "EventId", "Created" from event record. 147 148 # PEL Log attributes 149 # SRC : XXXXXXXX 150 # Created at : 11/14/2022 12:38:04 151 152 # Event log attributes 153 # EventId : XXXXXXXX XXXXXXXX XXXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX 154 155 # Created : 2022-11-14T12:38:04+00:00 156 157 print(f"\nPEL records : {pel_record}") 158 print(f"\nEvent records : {event_record}") 159 160 pel_src = pel_record["pel_data"]["SRC"] 161 pel_created_time = pel_record["pel_detail_data"]["Private Header"][ 162 "Created at" 163 ] 164 165 event_ids = (event_record["EventId"]).split(" ") 166 167 event_time_format = (event_record["Created"]).split("T") 168 event_date = (event_time_format[0]).split("-") 169 event_date = datetime( 170 int(event_date[0]), int(event_date[1]), int(event_date[2]) 171 ) 172 event_date = event_date.strftime("%m/%d/%Y") 173 event_sub_time_format = (event_time_format[1]).split("+") 174 event_date_time = event_date + " " + event_sub_time_format[0] 175 176 event_created_time = event_date_time.replace("-", "/") 177 178 print(f"\nPEL SRC : {pel_src} | PEL Created Time : {pel_created_time}") 179 180 print( 181 f"\nError event ID : {event_ids[0]} | Error Log Created Time " 182 + ": {event_created_time}" 183 ) 184 185 if pel_src == event_ids[0] and pel_created_time == event_created_time: 186 print( 187 "\nPEL SRC and created date time match with " 188 "event ID, created time" 189 ) 190 else: 191 raise PeltoolException( 192 "\nPEL SRC and created date time did not " 193 "match with event ID, created time" 194 ) 195 except Exception as exception: 196 raise PeltoolException( 197 "Exception occurred during PEL and Event log " 198 "comparison for SRC or event ID and created " 199 "time : " + str(exception) 200 ) from exception 201 202 203def fetch_all_pel_ids_for_src(src_id, severity, include_hidden_pels=False): 204 r""" 205 Fetch all PEL IDs for the input SRC ID based on the severity type 206 in the list format. 207 208 Description of arguments: 209 src_id SRC ID (e.g. BCXXYYYY). 210 severity PEL severity (e.g. "Predictive Error" 211 "Recovered Error"). 212 include_hidden_pels True/False (default: False). 213 Set True to get hidden PELs else False. 214 """ 215 216 try: 217 src_pel_ids = [] 218 pel_data = get_pel_data_from_bmc(include_hidden_pels) 219 pel_id_list = pel_data.keys() 220 for pel_id in pel_id_list: 221 # Check if required SRC ID with severity is present 222 if src_id in pel_data[pel_id]["SRC"]: 223 if pel_data[pel_id]["Sev"] == severity: 224 src_pel_ids.append(pel_id) 225 226 if not src_pel_ids: 227 raise PeltoolException( 228 src_id + " with severity " + severity + " not present" 229 ) 230 except Exception as exception: 231 raise PeltoolException( 232 "Failed to fetch PEL ID for required SRC : " + str(exception) 233 ) from exception 234 return src_pel_ids 235 236 237def fetch_all_src(include_hidden_pels=False): 238 r""" 239 Fetch all SRC IDs from peltool in the list format. 240 241 include_hidden_pels True/False (default: False). 242 Set True to get hidden PELs else False. 243 """ 244 try: 245 src_id = [] 246 pel_data = get_pel_data_from_bmc(include_hidden_pels) 247 if pel_data: 248 pel_id_list = pel_data.keys() 249 for pel_id in pel_id_list: 250 src_id.append(pel_data[pel_id]["SRC"]) 251 print("SRC IDs: " + str(src_id)) 252 except Exception as exception: 253 raise PeltoolException( 254 "Failed to fetch all SRCs : " + str(exception) 255 ) from exception 256 return src_id 257 258 259def check_for_unexpected_src( 260 unexpected_src_list=None, include_hidden_pels=False 261): 262 r""" 263 From the given unexpected SRC list, check if any unexpected SRC created 264 on the BMC. Returns 0 if no SRC found else throws exception. 265 266 Description of arguments: 267 unexpected_src_list Give unexpected SRCs in the list format. 268 e.g.: ["BBXXYYYY", "AAXXYYYY"]. 269 270 include_hidden_pels True/False (default: False). 271 Set True to get hidden PELs else False. 272 """ 273 try: 274 unexpected_src_count = 0 275 if not unexpected_src_list: 276 print("Unexpected SRC list is empty.") 277 src_data = fetch_all_src(include_hidden_pels) 278 for src in unexpected_src_list: 279 if src in src_data: 280 print("Found an unexpected SRC : " + src) 281 unexpected_src_count = unexpected_src_count + 1 282 if unexpected_src_count >= 1: 283 raise PeltoolException("Unexpected SRC found.") 284 285 except Exception as exception: 286 raise PeltoolException( 287 "Failed to verify unexpected SRC list : " + str(exception) 288 ) from exception 289 return unexpected_src_count 290 291 292def filter_unexpected_srcs(expected_srcs=None): 293 r""" 294 Return list of SRCs found in BMC after filtering expected SRCs. 295 If expected_srcs is None then all SRCs found in system are returned. 296 297 Description of arguments: 298 expected_srcs List of expected SRCs. E.g. ["BBXXYYYY", "AAXXYYYY"]. 299 """ 300 301 temp_srcs_found = fetch_all_src() 302 303 if not expected_srcs: 304 expected_srcs = [] 305 srcs_found = list() 306 # Remove any extra space after SRC ID 307 for item in temp_srcs_found: 308 var = item.split() 309 srcs_found.append(var[0]) 310 print(srcs_found) 311 print(expected_srcs) 312 return list(set(srcs_found) - set(expected_srcs)) 313 314 315def get_bmc_event_log_id_for_pel(pel_id): 316 r""" 317 Return BMC event log ID for the given PEL ID. 318 319 Description of arguments: 320 pel_id PEL ID. E.g. 0x50000021. 321 """ 322 323 pel_data = peltool("-i " + pel_id) 324 print(pel_data) 325 bmc_id_for_pel = pel_data["Private Header"]["BMC Event Log Id"] 326 return bmc_id_for_pel 327 328 329def get_latest_pels(number_of_pels=1): 330 r""" 331 Return latest PEL IDs. 332 333 Description of arguments: 334 number_of_pels Number of PELS to be returned. 335 """ 336 337 pel_data = peltool("-lr") 338 pel_ids = list(pel_data.keys()) 339 return pel_ids[:number_of_pels] 340 341 342def fetch_all_pel_ids_based_on_error_message( 343 error_msg, include_hidden_pels=False, include_informational_pels=False 344): 345 r""" 346 Fetch all PEL IDs based on the input error message and return 347 in the list format. 348 349 Description of arguments: 350 error_msg Error message 351 include_hidden_pels True/False (default: False). 352 Set True to get hidden PELs else False. 353 include_informational_pels True/False (default: False). 354 Set True to get informational PELs else False 355 """ 356 357 try: 358 err_pel_ids = [] 359 pel_data = get_pel_data_from_bmc( 360 include_hidden_pels, include_informational_pels 361 ) 362 pel_id_list = pel_data.keys() 363 for pel_id in pel_id_list: 364 # Check if required PEL with error message is created. 365 if error_msg in pel_data[pel_id]["Message"]: 366 err_pel_ids.append(pel_id) 367 368 if not err_pel_ids: 369 raise PeltoolException( 370 "Failed to get PEL ID with error message : " + error_msg 371 ) 372 except Exception as exception: 373 raise PeltoolException( 374 "Failed to fetch PEL ID for required SRC : " + str(exception) 375 ) from exception 376 return err_pel_ids 377 378 379def check_if_pel_transmitted_to_host(pel_id): 380 r""" 381 Return True if PEL is transmitted to Host else False. 382 383 Description of arguments: 384 pel_id PEL ID. E.g. 0x50000021. 385 """ 386 387 try: 388 pel_data = peltool("-i " + pel_id) 389 print(pel_data) 390 host_state = pel_data["User Header"]["Host Transmission"] 391 if host_state != "Acked": 392 return False 393 except Exception as exception: 394 raise PeltoolException( 395 "Failed to parse PEL data : " + str(exception) 396 ) from exception 397 return True 398