1#!/usr/bin/env python3
2
3r"""
4PEL functions.
5"""
6
7import json
8import os
9import sys
10from datetime import datetime
11
12import bmc_ssh_utils as bsu
13import func_args as fa
14
15base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
16sys.path.append(base_path + "/data/")
17
18import pel_variables  # NOQA
19
20
21class PeltoolException(Exception):
22    r"""
23    Base class for peltool related exceptions.
24    """
25
26    def __init__(self, message):
27        self.message = message
28        super().__init__(self.message)
29
30
31def peltool(option_string, parse_json=True, **bsu_options):
32    r"""
33    Run peltool on the BMC with the caller's option string and return the result.
34
35    Example:
36
37    ${pel_results}=  Peltool  -l
38    Rprint Vars  pel_results
39
40    pel_results:
41      [0x50000031]:
42        [CompID]:                       0x1000
43        [PLID]:                         0x50000031
44        [Subsystem]:                    BMC Firmware
45        [Message]:                      An application had an internal failure
46        [SRC]:                          BD8D1002
47        [Commit Time]:                  02/25/2020  04:51:31
48        [Sev]:                          Unrecoverable Error
49        [CreatorID]:                    BMC
50
51    Description of argument(s):
52    option_string                   A string of options which are to be
53                                    processed by the peltool command.
54    parse_json                      Indicates that the raw JSON data should
55                                    parsed into a list of dictionaries.
56    bsu_options                     Options to be passed directly to
57                                    bmc_execute_command. See its prolog for
58                                    details.
59    """
60
61    bsu_options = fa.args_to_objects(bsu_options)
62    out_buf, _, _ = bsu.bmc_execute_command(
63        "peltool " + option_string, **bsu_options
64    )
65    if parse_json:
66        try:
67            return json.loads(out_buf)
68        except ValueError:
69            return {}
70    return out_buf
71
72
73def get_pel_data_from_bmc(
74    include_hidden_pels=False, include_informational_pels=False
75):
76    r"""
77    Returns PEL data from BMC else throws exception.
78
79    Description of arguments:
80    include_hidden_pels           True/False (default: False).
81                                  Set True to get hidden PELs else False.
82    include_informational_pels    True/False (default: False).
83                                  Set True to get informational PELs else False.
84    """
85    try:
86        pel_cmd = " -l"
87        if include_hidden_pels:
88            pel_cmd = pel_cmd + " -h"
89        if include_informational_pels:
90            pel_cmd = pel_cmd + " -f"
91        pel_data = peltool(pel_cmd)
92        if not pel_data:
93            print("No PEL data present in BMC ...")
94    except Exception as exception:
95        raise PeltoolException(
96            "Failed to get PEL data from BMC : " + str(exception)
97        ) from exception
98    return pel_data
99
100
101def verify_no_pel_exists_on_bmc():
102    r"""
103    Verify that no PEL exists in BMC. Raise an exception if it does.
104    """
105
106    try:
107        pel_data = get_pel_data_from_bmc()
108
109        if len(pel_data) == 0:
110            return True
111
112        print("PEL data present. \n", pel_data)
113        raise PeltoolException("PEL data present in BMC")
114    except Exception as exception:
115        raise PeltoolException(
116            "Failed to get PEL data from BMC : " + str(exception)
117        ) from exception
118
119
120def compare_pel_and_redfish_event_log(pel_record, event_record):
121    r"""
122    Compare PEL log attributes like "SRC", "Created at" with Redfish
123    event log attributes like "EventId", "Created".
124    Return False if they do not match.
125
126    Description of arguments:
127    pel_record     PEL record.
128    event_record   Redfish event which is equivalent of PEL record.
129    """
130
131    try:
132        # Below is format of PEL record / event record and following
133        # i.e. "SRC", "Created at" from
134        # PEL record is compared with "EventId", "Created" from event record.
135
136        # PEL Log attributes
137        # SRC        : XXXXXXXX
138        # Created at : 11/14/2022 12:38:04
139
140        # Event log attributes
141        # EventId : XXXXXXXX XXXXXXXX XXXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
142
143        # Created : 2022-11-14T12:38:04+00:00
144
145        print(f"\nPEL records : {pel_record}")
146        print(f"\nEvent records : {event_record}")
147
148        pel_src = pel_record["pel_data"]["SRC"]
149        pel_created_time = pel_record["pel_detail_data"]["Private Header"][
150            "Created at"
151        ]
152
153        event_ids = (event_record["EventId"]).split(" ")
154
155        event_time_format = (event_record["Created"]).split("T")
156        event_date = (event_time_format[0]).split("-")
157        event_date = datetime(
158            int(event_date[0]), int(event_date[1]), int(event_date[2])
159        )
160        event_date = event_date.strftime("%m/%d/%Y")
161        event_sub_time_format = (event_time_format[1]).split("+")
162        event_date_time = event_date + " " + event_sub_time_format[0]
163
164        event_created_time = event_date_time.replace("-", "/")
165
166        print(f"\nPEL SRC : {pel_src} | PEL Created Time : {pel_created_time}")
167
168        print(
169            f"\nError event ID : {event_ids[0]} | Error Log Created Time "
170            + ": {event_created_time}"
171        )
172
173        if pel_src == event_ids[0] and pel_created_time == event_created_time:
174            print(
175                "\nPEL SRC and created date time match with "
176                "event ID, created time"
177            )
178        else:
179            raise PeltoolException(
180                "\nPEL SRC and created date time did not "
181                "match with event ID, created time"
182            )
183    except Exception as exception:
184        raise PeltoolException(
185            "Exception occurred during PEL and Event log "
186            "comparison for SRC or event ID and created "
187            "time : "
188            + str(exception)
189        ) from exception
190
191
192def fetch_all_pel_ids_for_src(src_id, severity, include_hidden_pels=False):
193    r"""
194    Fetch all PEL IDs for the input SRC ID based on the severity type
195    in the list format.
196
197    Description of arguments:
198    src_id                SRC ID (e.g. BCXXYYYY).
199    severity              PEL severity (e.g. "Predictive Error"
200                                             "Recovered Error").
201    include_hidden_pels   True/False (default: False).
202                          Set True to get hidden PELs else False.
203    """
204
205    try:
206        src_pel_ids = []
207        pel_data = get_pel_data_from_bmc(include_hidden_pels)
208        pel_id_list = pel_data.keys()
209        for pel_id in pel_id_list:
210            # Check if required SRC ID with severity is present
211            if src_id in pel_data[pel_id]["SRC"]:
212                if pel_data[pel_id]["Sev"] == severity:
213                    src_pel_ids.append(pel_id)
214
215        if not src_pel_ids:
216            raise PeltoolException(
217                src_id + " with severity " + severity + " not present"
218            )
219    except Exception as exception:
220        raise PeltoolException(
221            "Failed to fetch PEL ID for required SRC : " + str(exception)
222        ) from exception
223    return src_pel_ids
224
225
226def fetch_all_src(include_hidden_pels=False):
227    r"""
228    Fetch all SRC IDs from peltool in the list format.
229
230    include_hidden_pels       True/False (default: False).
231                              Set True to get hidden PELs else False.
232    """
233    try:
234        src_id = []
235        pel_data = get_pel_data_from_bmc(include_hidden_pels)
236        if pel_data:
237            pel_id_list = pel_data.keys()
238            for pel_id in pel_id_list:
239                src_id.append(pel_data[pel_id]["SRC"])
240        print("SRC IDs: " + str(src_id))
241    except Exception as exception:
242        raise PeltoolException(
243            "Failed to fetch all SRCs : " + str(exception)
244        ) from exception
245    return src_id
246
247
248def check_for_unexpected_src(
249    unexpected_src_list=None, include_hidden_pels=False
250):
251    r"""
252    From the given unexpected SRC list, check if any unexpected SRC created
253    on the BMC. Returns 0 if no SRC found else throws exception.
254
255    Description of arguments:
256    unexpected_src_list       Give unexpected SRCs in the list format.
257                              e.g.: ["BBXXYYYY", "AAXXYYYY"].
258
259    include_hidden_pels       True/False (default: False).
260                              Set True to get hidden PELs else False.
261    """
262    try:
263        unexpected_src_count = 0
264        if not unexpected_src_list:
265            print("Unexpected SRC list is empty.")
266        src_data = fetch_all_src(include_hidden_pels)
267        for src in unexpected_src_list:
268            if src in src_data:
269                print("Found an unexpected SRC : " + src)
270                unexpected_src_count = unexpected_src_count + 1
271        if unexpected_src_count >= 1:
272            raise PeltoolException("Unexpected SRC found.")
273
274    except Exception as exception:
275        raise PeltoolException(
276            "Failed to verify unexpected SRC list : " + str(exception)
277        ) from exception
278    return unexpected_src_count
279
280
281def filter_unexpected_srcs(expected_srcs=None):
282    r"""
283    Return list of SRCs found in BMC after filtering expected SRCs.
284    If expected_srcs is None then all SRCs found in system are returned.
285
286    Description of arguments:
287    expected_srcs       List of expected SRCs. E.g. ["BBXXYYYY", "AAXXYYYY"].
288    """
289
290    srcs_found = fetch_all_src()
291    if not expected_srcs:
292        expected_srcs = []
293    print(expected_srcs)
294    return list(set(srcs_found) - set(expected_srcs))
295
296
297def get_bmc_event_log_id_for_pel(pel_id):
298    r"""
299    Return BMC event log ID for the given PEL ID.
300
301    Description of arguments:
302    pel_id       PEL ID. E.g. 0x50000021.
303    """
304
305    pel_data = peltool("-i " + pel_id)
306    print(pel_data)
307    bmc_id_for_pel = pel_data["Private Header"]["BMC Event Log Id"]
308    return bmc_id_for_pel
309
310
311def get_latest_pels(number_of_pels=1):
312    r"""
313    Return latest PEL IDs.
314
315    Description of arguments:
316    number_of_pels       Number of PELS to be returned.
317    """
318
319    pel_data = peltool("-lr")
320    pel_ids = list(pel_data.keys())
321    return pel_ids[:number_of_pels]
322