xref: /openbmc/openbmc-test-automation/ffdc/ffdc_collector.py (revision 473643eef1bf6d7f229e0f6037d7b05da1d90872)
1#!/usr/bin/env python3
2
3r"""
4See class prolog below for details.
5"""
6
7import importlib
8import json
9import logging
10import os
11import platform
12import re
13import subprocess
14import sys
15import time
16from errno import EACCES, EPERM
17from typing import Any
18
19import yaml
20
21sys.dont_write_bytecode = True
22
23
24script_dir = os.path.dirname(os.path.abspath(__file__))
25sys.path.append(script_dir)
26# Walk path and append to sys.path
27for root, dirs, files in os.walk(script_dir):
28    for dir in dirs:
29        sys.path.append(os.path.join(root, dir))
30
31from ssh_utility import SSHRemoteclient  # NOQA
32from telnet_utility import TelnetRemoteclient  # NOQA
33
34r"""
35This is for plugin functions returning data or responses to the caller
36in YAML plugin setup.
37
38Example:
39
40    - plugin:
41      - plugin_name: plugin.ssh_execution
42      - plugin_function: version = ssh_execute_cmd
43      - plugin_args:
44        - ${hostname}
45        - ${username}
46        - ${password}
47        - "cat /etc/os-release | grep VERSION_ID | awk -F'=' '{print $2}'"
48     - plugin:
49        - plugin_name: plugin.print_vars
50        - plugin_function: print_vars
51        - plugin_args:
52          - version
53
54where first plugin "version" var is used by another plugin in the YAML
55block or plugin
56
57"""
58# Global variables for storing plugin return values, plugin return variables,
59# and log storage path.
60global global_log_store_path
61global global_plugin_dict
62global global_plugin_list
63global global_plugin_type_list
64global global_plugin_error_dict
65
66# Hold the plugin return values in a dictionary and plugin return variables in
67# a list. The dictionary is used for referencing and updating variables during
68# parsing in the parser, while the list is used for storing current variables
69# from the plugin block that need processing.
70global_plugin_dict = {}
71global_plugin_list = []
72
73# Hold the plugin return named variables if the function returned values are
74# lists or dictionaries. This list is used to reference the plugin dictionary
75# for python function execute arguments.
76# Example: ['version']
77global_plugin_type_list = []
78
79# Path where logs are to be stored or written.
80global_log_store_path = ""
81
82# Plugin error state defaults.
83global_plugin_error_dict = {
84    "exit_on_error": False,
85    "continue_on_error": False,
86}
87
88
89def execute_python_function(module_name, function_name, *args, **kwargs):
90    r"""
91    Execute a Python function from a module dynamically.
92
93    This function dynamically imports a module and executes a specified
94    function from that module with the provided arguments. The function takes
95    the module name, function name, and arguments as input. The function
96    returns the result of the executed function.
97
98    If an ImportError or AttributeError occurs, the function prints an error
99    message and returns None.
100
101    Parameters:
102        module_name (str):   The name of the module containing the function.
103        function_name (str): The name of the function to execute.
104        *args:               Positional arguments to pass to the function.
105        **kwargs:            Keyword arguments to pass to the function.
106
107    Returns:
108        Any: The result of the executed function or None if an error occurs.
109    """
110    try:
111        # Dynamically import the module.
112        module = importlib.import_module(module_name)
113
114        # Get the function from the module.
115        func = getattr(module, function_name)
116
117        # Call the function with the provided arguments.
118        result = func(*args, **kwargs)
119
120    except (ImportError, AttributeError) as e:
121        print(f"\tERROR: execute_python_function: {e}")
122        # Set the plugin error state.
123        global_plugin_error_dict["exit_on_error"] = True
124        print("\treturn: PLUGIN_EXEC_ERROR")
125        return "PLUGIN_EXEC_ERROR"
126
127    return result
128
129
130class ffdc_collector:
131    r"""
132    Execute commands from a configuration file to collect log files and store
133    the generated files at the specified location.
134
135    This class is designed to execute commands specified in a configuration
136    YAML file to collect log files from a remote host.
137
138    The class establishes connections using SSH, Telnet, or other protocols
139    based on the configuration. It fetches and stores the generated files at
140    the specified location. The class provides methods for initializing the
141    collector, executing commands, and handling errors.
142    """
143
144    def __init__(
145        self,
146        hostname,
147        username,
148        password,
149        port_ssh,
150        port_https,
151        port_ipmi,
152        ffdc_config,
153        location,
154        remote_type,
155        remote_protocol,
156        env_vars,
157        econfig,
158        log_level,
159    ):
160        r"""
161        Initialize the FFDCCollector object with the provided parameters.
162
163        This method initializes an FFDCCollector object with the given
164        attributes. The attributes represent the configuration for connecting
165        to a remote system, collecting log data, and storing the collected
166        data.
167
168        Parameters:
169            hostname (str):             Name or IP address of the targeted
170                                        (remote) system.
171            username (str):             User on the targeted system with access
172                                        to log files.
173            password (str):             Password for the user on the targeted
174                                        system.
175            port_ssh (int, optional):   SSH port value. Defaults to 22.
176            port_https (int, optional): HTTPS port value. Defaults to 443.
177            port_ipmi (int, optional):  IPMI port value. Defaults to 623.
178            ffdc_config (str):          Configuration file listing commands
179                                        and files for FFDC.
180            location (str):             Where to store collected log data.
181            remote_type (str):          Block YAML type name of the remote
182                                        host.
183            remote_protocol (str):      Protocol to use to collect data.
184            env_vars (dict, optional):  User-defined CLI environment variables.
185                                        Defaults to None.
186            econfig (str, optional):    User-defined environment variables
187                                        YAML file. Defaults to None.
188            log_level (str, optional):  Log level for the collector.
189                                        Defaults to "INFO".
190        """
191
192        self.hostname = hostname
193        self.username = username
194        self.password = password
195        self.port_ssh = str(port_ssh)
196        self.port_https = str(port_https)
197        self.port_ipmi = str(port_ipmi)
198        self.ffdc_config = ffdc_config
199        self.location = location + "/" + remote_type.upper()
200        self.ssh_remoteclient = None
201        self.telnet_remoteclient = None
202        self.ffdc_dir_path = ""
203        self.ffdc_prefix = ""
204        self.target_type = remote_type.upper()
205        self.remote_protocol = remote_protocol.upper()
206        self.env_vars = env_vars if env_vars else {}
207        self.econfig = econfig if econfig else {}
208        self.start_time = 0
209        self.elapsed_time = ""
210        self.env_dict = {}
211        self.logger = None
212
213        """
214        Set prefix values for SCP files and directories.
215        Since the time stamp is at second granularity, these values are set
216        here to be sure that all files for this run will have the same
217        timestamps and be saved in the same directory.
218        self.location == local system for now
219        """
220        self.set_ffdc_default_store_path()
221
222        # Logger for this run.  Need to be after set_ffdc_default_store_path()
223        self.script_logging(getattr(logging, log_level.upper()))
224
225        # Verify top level directory exists for storage
226        self.validate_local_store(self.location)
227
228        if self.verify_script_env():
229            try:
230                with open(self.ffdc_config, "r") as file:
231                    self.ffdc_actions = yaml.safe_load(file)
232            except yaml.YAMLError as e:
233                self.logger.error(e)
234                sys.exit(-1)
235
236            if self.target_type not in self.ffdc_actions:
237                self.logger.error(
238                    "\n\tERROR: %s is not listed in %s.\n\n"
239                    % (self.target_type, self.ffdc_config)
240                )
241                sys.exit(-1)
242
243            self.logger.info("\n\tENV: User define input YAML variables")
244            self.load_env()
245        else:
246            sys.exit(-1)
247
248    def verify_script_env(self):
249        r"""
250        Verify that all required environment variables are set.
251
252        This method checks if all required environment variables are set.
253        If any required variable is missing, the method returns False.
254        Otherwise, it returns True.
255
256        Returns:
257            bool: True if all required environment variables are set,
258            False otherwise.
259        """
260        # Import to log version
261        import click
262        import paramiko
263
264        run_env_ok = True
265
266        try:
267            redfishtool_version = (
268                self.run_tool_cmd("redfishtool -V").split(" ")[2].strip("\n")
269            )
270        except Exception as e:
271            self.logger.error("\tEXCEPTION redfishtool: %s", e)
272            redfishtool_version = "Not Installed (optional)"
273
274        try:
275            ipmitool_version = self.run_tool_cmd("ipmitool -V").split(" ")[2]
276        except Exception as e:
277            self.logger.error("\tEXCEPTION ipmitool: %s", e)
278            ipmitool_version = "Not Installed (optional)"
279
280        self.logger.info("\n\t---- Script host environment ----")
281        self.logger.info(
282            "\t{:<10}  {:<10}".format("Script hostname", os.uname()[1])
283        )
284        self.logger.info(
285            "\t{:<10}  {:<10}".format("Script host os", platform.platform())
286        )
287        self.logger.info(
288            "\t{:<10}  {:>10}".format("Python", platform.python_version())
289        )
290        self.logger.info("\t{:<10}  {:>10}".format("PyYAML", yaml.__version__))
291        self.logger.info("\t{:<10}  {:>10}".format("click", click.__version__))
292        self.logger.info(
293            "\t{:<10}  {:>10}".format("paramiko", paramiko.__version__)
294        )
295        self.logger.info(
296            "\t{:<10}  {:>9}".format("redfishtool", redfishtool_version)
297        )
298        self.logger.info(
299            "\t{:<10}  {:>12}".format("ipmitool", ipmitool_version)
300        )
301
302        if eval(yaml.__version__.replace(".", ",")) < (5, 3, 0):
303            self.logger.error(
304                "\n\tERROR: Python or python packages do not meet minimum"
305                " version requirement."
306            )
307            self.logger.error(
308                "\tERROR: PyYAML version 5.3.0 or higher is needed.\n"
309            )
310            run_env_ok = False
311
312        self.logger.info("\t---- End script host environment ----")
313        return run_env_ok
314
315    def script_logging(self, log_level_attr):
316        """
317        Create a logger for the script with the specified log level.
318
319        This method creates a logger for the script with the specified
320        log level. The logger is configured to write log messages to a file
321        and the console.
322
323        self.logger = logging.getLogger(__name__)
324
325        Setting logger with __name__ will add the trace
326        Example:
327
328            INFO:ffdc_collector:    System Type: OPENBMC
329
330        Currently, set to empty purposely to log as
331            System Type: OPENBMC
332
333        Parameters:
334            log_level_attr (str):   The log level for the logger
335                                    (e.g., "DEBUG", "INFO", "WARNING",
336                                    "ERROR", "CRITICAL").
337
338        Returns:
339            None
340        """
341        self.logger = logging.getLogger()
342        self.logger.setLevel(log_level_attr)
343
344        log_file_handler = logging.FileHandler(
345            self.ffdc_dir_path + "collector.log"
346        )
347        stdout_handler = logging.StreamHandler(sys.stdout)
348
349        self.logger.addHandler(log_file_handler)
350        self.logger.addHandler(stdout_handler)
351
352        # Turn off paramiko INFO logging
353        logging.getLogger("paramiko").setLevel(logging.WARNING)
354
355    def target_is_pingable(self):
356        r"""
357        Check if the target system is ping-able.
358
359        This method checks if the target system is reachable by sending an
360        ICMP echo request (ping). If the target system responds to the ping,
361        the method returns True. Otherwise, it returns False.
362
363        Returns:
364            bool: True if the target system is ping-able, False otherwise.
365        """
366        response = os.system("ping -c 2 %s  2>&1 >/dev/null" % self.hostname)
367        if response == 0:
368            self.logger.info(
369                "\n\t[Check] %s is ping-able.\t\t [OK]" % self.hostname
370            )
371            return True
372        else:
373            self.logger.error(
374                "\n\tERROR: %s is not ping-able. FFDC collection aborted.\n"
375                % self.hostname
376            )
377            sys.exit(-1)
378        return False
379
380    def collect_ffdc(self):
381        r"""
382        Initiate FFDC collection based on the requested protocol.
383
384        This method initiates FFDC (First Failure Data Capture) collection
385        based on the requested protocol (SSH,SCP, TELNET, REDFISH, IPMI).
386        The method establishes a connection to the target system using the
387        specified protocol and collects the required FFDC data.
388
389        Returns:
390            None
391        """
392        self.logger.info(
393            "\n\t---- Start communicating with %s ----" % self.hostname
394        )
395        self.start_time = time.time()
396
397        # Find the list of target and protocol supported.
398        check_protocol_list = []
399        config_dict = self.ffdc_actions
400
401        for target_type in config_dict.keys():
402            if self.target_type != target_type:
403                continue
404
405            for k, v in config_dict[target_type].items():
406                if v["PROTOCOL"][0] not in check_protocol_list:
407                    check_protocol_list.append(v["PROTOCOL"][0])
408
409        self.logger.info(
410            "\n\t %s protocol type: %s"
411            % (self.target_type, check_protocol_list)
412        )
413
414        verified_working_protocol = self.verify_protocol(check_protocol_list)
415
416        if verified_working_protocol:
417            self.logger.info(
418                "\n\t---- Completed protocol pre-requisite check ----\n"
419            )
420
421        # Verify top level directory exists for storage
422        self.validate_local_store(self.location)
423
424        if (self.remote_protocol not in verified_working_protocol) and (
425            self.remote_protocol != "ALL"
426        ):
427            self.logger.info(
428                "\n\tWorking protocol list: %s" % verified_working_protocol
429            )
430            self.logger.error(
431                "\tERROR: Requested protocol %s is not in working protocol"
432                " list.\n" % self.remote_protocol
433            )
434            sys.exit(-1)
435        else:
436            self.generate_ffdc(verified_working_protocol)
437
438    def ssh_to_target_system(self):
439        r"""
440        Establish an SSH connection to the target system.
441
442        This method establishes an SSH connection to the target system using
443        the provided hostname, username, password, and SSH port. If the
444        connection is successful, the method returns True. Otherwise, it logs
445        an error message and returns False.
446
447        Returns:
448            bool: True if the connection is successful, False otherwise.
449        """
450
451        self.ssh_remoteclient = SSHRemoteclient(
452            self.hostname, self.username, self.password, self.port_ssh
453        )
454
455        if self.ssh_remoteclient.ssh_remoteclient_login():
456            self.logger.info(
457                "\n\t[Check] %s SSH connection established.\t [OK]"
458                % self.hostname
459            )
460
461            # Check scp connection.
462            # If scp connection fails,
463            # continue with FFDC generation but skip scp files to local host.
464            self.ssh_remoteclient.scp_connection()
465            return True
466        else:
467            self.logger.info(
468                "\n\t[Check] %s SSH connection.\t [NOT AVAILABLE]"
469                % self.hostname
470            )
471            return False
472
473    def telnet_to_target_system(self):
474        r"""
475        Establish a Telnet connection to the target system.
476
477        This method establishes a Telnet connection to the target system using
478        the provided hostname, username, and Telnet port. If the connection is
479        successful, the method returns True. Otherwise, it logs an error
480        message and returns False.
481
482        Returns:
483            bool: True if the connection is successful, False otherwise.
484        """
485        self.telnet_remoteclient = TelnetRemoteclient(
486            self.hostname, self.username, self.password
487        )
488        if self.telnet_remoteclient.tn_remoteclient_login():
489            self.logger.info(
490                "\n\t[Check] %s Telnet connection established.\t [OK]"
491                % self.hostname
492            )
493            return True
494        else:
495            self.logger.info(
496                "\n\t[Check] %s Telnet connection.\t [NOT AVAILABLE]"
497                % self.hostname
498            )
499            return False
500
501    def generate_ffdc(self, working_protocol_list):
502        r"""
503        Generate FFDC (First Failure Data Capture) based on the remote host
504        type and working protocols.
505
506        This method determines the actions to be performed for generating FFDC
507        based on the remote host type and the list of confirmed working
508        protocols. The method iterates through the available actions for the
509        remote host type and checks if any of the working protocols are
510        supported. If a supported protocol is found, the method executes the
511        corresponding FFDC generation action.
512
513        Parameters:
514            working_protocol_list (list):  A list of confirmed working
515                                           protocols to connect to the
516                                           remote host.
517
518        Returns:
519            None
520        """
521        self.logger.info(
522            "\n\t---- Executing commands on " + self.hostname + " ----"
523        )
524        self.logger.info(
525            "\n\tWorking protocol list: %s" % working_protocol_list
526        )
527
528        config_dict = self.ffdc_actions
529        for target_type in config_dict.keys():
530            if self.target_type != target_type:
531                continue
532
533            self.logger.info("\n\tFFDC Path: %s " % self.ffdc_dir_path)
534            global_plugin_dict["global_log_store_path"] = self.ffdc_dir_path
535            self.logger.info("\tSystem Type: %s" % target_type)
536            for k, v in config_dict[target_type].items():
537                protocol = v["PROTOCOL"][0]
538
539                if (
540                    self.remote_protocol not in working_protocol_list
541                    and self.remote_protocol != "ALL"
542                ) or protocol not in working_protocol_list:
543                    continue
544
545                if protocol in working_protocol_list:
546                    if protocol in ["SSH", "SCP"]:
547                        self.protocol_ssh(protocol, target_type, k)
548                    elif protocol == "TELNET":
549                        self.protocol_telnet(target_type, k)
550                    elif protocol in ["REDFISH", "IPMI", "SHELL"]:
551                        self.protocol_service_execute(protocol, target_type, k)
552                else:
553                    self.logger.error(
554                        "\n\tERROR: %s is not available for %s."
555                        % (protocol, self.hostname)
556                    )
557
558        # Close network connection after collecting all files
559        self.elapsed_time = time.strftime(
560            "%H:%M:%S", time.gmtime(time.time() - self.start_time)
561        )
562        self.logger.info("\n\tTotal time taken: %s" % self.elapsed_time)
563        if self.ssh_remoteclient:
564            self.ssh_remoteclient.ssh_remoteclient_disconnect()
565        if self.telnet_remoteclient:
566            self.telnet_remoteclient.tn_remoteclient_disconnect()
567
568    def protocol_ssh(self, protocol, target_type, sub_type):
569        r"""
570        Perform actions using SSH and SCP protocols.
571
572        This method executes a set of commands using the SSH protocol to
573        connect to the target system and collect FFDC data. The method takes
574        the protocol, target type, and sub-type as arguments and performs the
575        corresponding actions based on the provided parameters.
576
577        Parameters:
578            protocol (str):    The protocol to execute (SSH or SCP).
579            target_type (str): The type group of the remote host.
580            sub_type (str):    The group type of commands to execute.
581
582        Returns:
583            None
584        """
585        if protocol == "SCP":
586            self.group_copy(self.ffdc_actions[target_type][sub_type])
587        else:
588            self.collect_and_copy_ffdc(
589                self.ffdc_actions[target_type][sub_type]
590            )
591
592    def protocol_telnet(self, target_type, sub_type):
593        r"""
594        Perform actions using the Telnet protocol.
595
596        This method executes a set of commands using the Telnet protocol to
597        connect to the target system and collect FFDC data. The method takes
598        the target type and sub-type as arguments and performs the
599        corresponding actions based on the provided parameters.
600
601        Parameters:
602            target_type (str): The type group of the remote host.
603            sub_type (str):    The group type of commands to execute.
604
605        Returns:
606            None
607        """
608        self.logger.info(
609            "\n\t[Run] Executing commands on %s using %s"
610            % (self.hostname, "TELNET")
611        )
612        telnet_files_saved = []
613        progress_counter = 0
614        list_of_commands = self.ffdc_actions[target_type][sub_type]["COMMANDS"]
615        for index, each_cmd in enumerate(list_of_commands, start=0):
616            command_txt, command_timeout = self.unpack_command(each_cmd)
617            result = self.telnet_remoteclient.execute_command(
618                command_txt, command_timeout
619            )
620            if result:
621                try:
622                    targ_file = self.ffdc_actions[target_type][sub_type][
623                        "FILES"
624                    ][index]
625                except IndexError:
626                    targ_file = command_txt
627                    self.logger.warning(
628                        "\n\t[WARN] Missing filename to store data from"
629                        " telnet %s." % each_cmd
630                    )
631                    self.logger.warning(
632                        "\t[WARN] Data will be stored in %s." % targ_file
633                    )
634                targ_file_with_path = (
635                    self.ffdc_dir_path + self.ffdc_prefix + targ_file
636                )
637                # Creates a new file
638                with open(targ_file_with_path, "w") as fp:
639                    fp.write(result)
640                    fp.close
641                    telnet_files_saved.append(targ_file)
642            progress_counter += 1
643            self.print_progress(progress_counter)
644        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
645        for file in telnet_files_saved:
646            self.logger.info("\n\t\tSuccessfully save file " + file + ".")
647
648    def protocol_service_execute(self, protocol, target_type, sub_type):
649        r"""
650        Perform actions for a given protocol.
651
652        This method executes a set of commands using the specified protocol to
653        connect to the target system and collect FFDC data. The method takes
654        the protocol, target type, and sub-type as arguments and performs the
655        corresponding actions based on the provided parameters.
656
657        Parameters:
658            protocol (str):    The protocol to execute
659                               (REDFISH, IPMI, or SHELL).
660            target_type (str): The type group of the remote host.
661            sub_type (str):    The group type of commands to execute.
662
663        Returns:
664            None
665        """
666        self.logger.info(
667            "\n\t[Run] Executing commands to %s using %s"
668            % (self.hostname, protocol)
669        )
670        executed_files_saved = []
671        progress_counter = 0
672        list_of_cmd = self.get_command_list(
673            self.ffdc_actions[target_type][sub_type]
674        )
675        for index, each_cmd in enumerate(list_of_cmd, start=0):
676            plugin_call = False
677            if isinstance(each_cmd, dict):
678                if "plugin" in each_cmd:
679                    # If the error is set and plugin explicitly
680                    # requested to skip execution on error..
681                    if global_plugin_error_dict[
682                        "exit_on_error"
683                    ] and self.plugin_error_check(each_cmd["plugin"]):
684                        self.logger.info(
685                            "\n\t[PLUGIN-ERROR] exit_on_error: %s"
686                            % global_plugin_error_dict["exit_on_error"]
687                        )
688                        self.logger.info(
689                            "\t[PLUGIN-SKIP] %s" % each_cmd["plugin"][0]
690                        )
691                        continue
692                    plugin_call = True
693                    # call the plugin
694                    self.logger.info("\n\t[PLUGIN-START]")
695                    result = self.execute_plugin_block(each_cmd["plugin"])
696                    self.logger.info("\t[PLUGIN-END]\n")
697            else:
698                each_cmd = self.yaml_env_and_plugin_vars_populate(each_cmd)
699
700            if not plugin_call:
701                result = self.run_tool_cmd(each_cmd)
702            if result:
703                try:
704                    file_name = self.get_file_list(
705                        self.ffdc_actions[target_type][sub_type]
706                    )[index]
707                    # If file is specified as None.
708                    if file_name == "None":
709                        continue
710                    targ_file = self.yaml_env_and_plugin_vars_populate(
711                        file_name
712                    )
713                except IndexError:
714                    targ_file = each_cmd.split("/")[-1]
715                    self.logger.warning(
716                        "\n\t[WARN] Missing filename to store data from %s."
717                        % each_cmd
718                    )
719                    self.logger.warning(
720                        "\t[WARN] Data will be stored in %s." % targ_file
721                    )
722
723                targ_file_with_path = (
724                    self.ffdc_dir_path + self.ffdc_prefix + targ_file
725                )
726
727                # Creates a new file
728                with open(targ_file_with_path, "w") as fp:
729                    if isinstance(result, dict):
730                        fp.write(json.dumps(result))
731                    else:
732                        fp.write(result)
733                    fp.close
734                    executed_files_saved.append(targ_file)
735
736            progress_counter += 1
737            self.print_progress(progress_counter)
738
739        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
740
741        for file in executed_files_saved:
742            self.logger.info("\n\t\tSuccessfully save file " + file + ".")
743
744    def collect_and_copy_ffdc(
745        self, ffdc_actions_for_target_type, form_filename=False
746    ):
747        r"""
748        Send commands and collect FFDC data from the targeted system.
749
750        This method sends a set of commands and collects FFDC data from the
751        targeted system based on the provided ffdc_actions_for_target_type
752        dictionary. The method also has an optional form_filename parameter,
753        which, if set to True, prepends the target type to the output file
754        name.
755
756        Parameters:
757            ffdc_actions_for_target_type (dict): A dictionary containing
758                                                 commands and files for the
759                                                 selected remote host type.
760            form_filename (bool, optional):      If True, prepends the target
761                                                 type to the output file name.
762                                                 Defaults to False.
763
764        Returns:
765            None
766        """
767        # Executing commands, if any
768        self.ssh_execute_ffdc_commands(
769            ffdc_actions_for_target_type, form_filename
770        )
771
772        # Copying files
773        if self.ssh_remoteclient.scpclient:
774            self.logger.info(
775                "\n\n\tCopying FFDC files from remote system %s.\n"
776                % self.hostname
777            )
778
779            # Retrieving files from target system
780            list_of_files = self.get_file_list(ffdc_actions_for_target_type)
781            self.scp_ffdc(
782                self.ffdc_dir_path,
783                self.ffdc_prefix,
784                form_filename,
785                list_of_files,
786            )
787        else:
788            self.logger.info(
789                "\n\n\tSkip copying FFDC files from remote system %s.\n"
790                % self.hostname
791            )
792
793    def get_command_list(self, ffdc_actions_for_target_type):
794        r"""
795        Fetch a list of commands from the configuration file.
796
797        This method retrieves a list of commands from the
798        ffdc_actions_for_target_type dictionary, which contains commands and
799        files for the selected remote host type. The method returns the list
800        of commands.
801
802        Parameters:
803            ffdc_actions_for_target_type (dict): A dictionary containing
804                                                 commands and files for the
805                                                 selected remote host type.
806
807        Returns:
808            list: A list of commands.
809        """
810        try:
811            list_of_commands = ffdc_actions_for_target_type["COMMANDS"]
812            # Update any global reserved variable name with value in dict.
813            list_of_commands = self.update_vars_with_env_values(
814                global_plugin_dict, list_of_commands
815            )
816        except KeyError:
817            list_of_commands = []
818        return list_of_commands
819
820    def get_file_list(self, ffdc_actions_for_target_type):
821        r"""
822        Fetch a list of files from the configuration file.
823
824        This method retrieves a list of files from the
825        ffdc_actions_for_target_type dictionary, which contains commands and
826        files for the selected remote host type. The method returns the list
827        of files.
828
829        Parameters:
830            ffdc_actions_for_target_type (dict): A dictionary containing
831                                                 commands and files for the
832                                                 selected remote host type.
833
834        Returns:
835            list: A list of files.
836        """
837        try:
838            list_of_files = ffdc_actions_for_target_type["FILES"]
839        except KeyError:
840            list_of_files = []
841        return list_of_files
842
843    def unpack_command(self, command):
844        r"""
845        Unpack a command from the configuration file, handling both dictionary
846        and string inputs.
847
848        This method takes a command from the configuration file, which can be
849        either a dictionary or a string. If the input is a dictionary, the
850        method extracts the command text and timeout from the dictionary.
851        If the input is a string, the method assumes a default timeout of
852        60 seconds.
853        The method returns a tuple containing the command text and timeout.
854
855        Parameters:
856            command (dict or str): A command from the configuration file,
857                                   which can be either a dictionary or a
858                                   string.
859
860        Returns:
861            tuple: A tuple containing the command text and timeout.
862        """
863        if isinstance(command, dict):
864            command_txt = next(iter(command))
865            command_timeout = next(iter(command.values()))
866        elif isinstance(command, str):
867            command_txt = command
868            # Default command timeout 60 seconds
869            command_timeout = 60
870
871        return command_txt, command_timeout
872
873    def ssh_execute_ffdc_commands(
874        self, ffdc_actions_for_target_type, form_filename=False
875    ):
876        r"""
877        Send commands in the ffdc_config file to the targeted system using SSH.
878
879        This method sends a set of commands and collects FFDC data from the
880        targeted system using the SSH protocol. The method takes the
881        ffdc_actions_for_target_type dictionary and an optional
882        form_filename parameter as arguments.
883
884        If form_filename is set to True, the method prepends the target type
885        to the output file name. The method returns the output of the executed
886        commands.
887
888        It also prints the progress counter string + on the console.
889
890        Parameters:
891            ffdc_actions_for_target_type (dict): A dictionary containing
892                                                 commands and files for the
893                                                 selected remote host type.
894            form_filename (bool, optional):      If True, prepends the target
895                                                 type to the output file name.
896                                                 Defaults to False.
897
898        Returns:
899            None
900        """
901        self.logger.info(
902            "\n\t[Run] Executing commands on %s using %s"
903            % (self.hostname, ffdc_actions_for_target_type["PROTOCOL"][0])
904        )
905
906        list_of_commands = self.get_command_list(ffdc_actions_for_target_type)
907        # If command list is empty, returns
908        if not list_of_commands:
909            return
910
911        progress_counter = 0
912        for command in list_of_commands:
913            command_txt, command_timeout = self.unpack_command(command)
914
915            if form_filename:
916                command_txt = str(command_txt % self.target_type)
917
918            (
919                cmd_exit_code,
920                err,
921                response,
922            ) = self.ssh_remoteclient.execute_command(
923                command_txt, command_timeout
924            )
925
926            if cmd_exit_code:
927                self.logger.warning(
928                    "\n\t\t[WARN] %s exits with code %s."
929                    % (command_txt, str(cmd_exit_code))
930                )
931                self.logger.warning("\t\t[WARN] %s " % err)
932
933            progress_counter += 1
934            self.print_progress(progress_counter)
935
936        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
937
938    def group_copy(self, ffdc_actions_for_target_type):
939        r"""
940        SCP a group of files (wildcard) from the remote host.
941
942        This method copies a group of files from the remote host using the SCP
943        protocol. The method takes the fdc_actions_for_target_type dictionary
944        as an argument, which contains commands and files for the selected
945        remote host type.
946
947        Parameters:
948            fdc_actions_for_target_type (dict): A dictionary containing
949                                                commands and files for the
950                                                selected remote host type.
951
952        Returns:
953            None
954        """
955        if self.ssh_remoteclient.scpclient:
956            self.logger.info(
957                "\n\tCopying files from remote system %s via SCP.\n"
958                % self.hostname
959            )
960
961            list_of_commands = self.get_command_list(
962                ffdc_actions_for_target_type
963            )
964            # If command list is empty, returns
965            if not list_of_commands:
966                return
967
968            for command in list_of_commands:
969                try:
970                    command = self.yaml_env_and_plugin_vars_populate(command)
971                except IndexError:
972                    self.logger.error("\t\tInvalid command %s" % command)
973                    continue
974
975                (
976                    cmd_exit_code,
977                    err,
978                    response,
979                ) = self.ssh_remoteclient.execute_command(command)
980
981                # If file does not exist, code take no action.
982                # cmd_exit_code is ignored for this scenario.
983                if response:
984                    scp_result = self.ssh_remoteclient.scp_file_from_remote(
985                        response.split("\n"), self.ffdc_dir_path
986                    )
987                    if scp_result:
988                        self.logger.info(
989                            "\t\tSuccessfully copied from "
990                            + self.hostname
991                            + ":"
992                            + command
993                        )
994                else:
995                    self.logger.info("\t\t%s has no result" % command)
996
997        else:
998            self.logger.info(
999                "\n\n\tSkip copying files from remote system %s.\n"
1000                % self.hostname
1001            )
1002
1003    def scp_ffdc(
1004        self,
1005        targ_dir_path,
1006        targ_file_prefix,
1007        form_filename,
1008        file_list=None,
1009        quiet=None,
1010    ):
1011        r"""
1012        SCP all files in the file_dict to the indicated directory on the local
1013        system.
1014
1015        This method copies all files specified in the file_dict dictionary
1016        from the targeted system to the local system using the SCP protocol.
1017        The method takes the target directory path, target file prefix, and a
1018        boolean flag form_filename as required arguments.
1019
1020        The file_dict argument is optional and contains the files to be copied.
1021        The quiet argument is also optional and, if set to True, suppresses
1022        the output of the SCP operation.
1023
1024        Parameters:
1025            targ_dir_path (str):        The path of the directory to receive
1026                                        the files on the local system.
1027            targ_file_prefix (str):     Prefix which will be prepended to each
1028                                        target file's name.
1029            form_filename (bool):       If True, prepends the target type to
1030                                        the file names.
1031            file_dict (dict, optional): A dictionary containing the files to
1032                                        be copied. Defaults to None.
1033            quiet (bool, optional):     If True, suppresses the output of the
1034                                        SCP operation. Defaults to None.
1035
1036        Returns:
1037            None
1038        """
1039        progress_counter = 0
1040        for filename in file_list:
1041            if form_filename:
1042                filename = str(filename % self.target_type)
1043            source_file_path = filename
1044            targ_file_path = (
1045                targ_dir_path + targ_file_prefix + filename.split("/")[-1]
1046            )
1047
1048            # If source file name contains wild card, copy filename as is.
1049            if "*" in source_file_path:
1050                scp_result = self.ssh_remoteclient.scp_file_from_remote(
1051                    source_file_path, self.ffdc_dir_path
1052                )
1053            else:
1054                scp_result = self.ssh_remoteclient.scp_file_from_remote(
1055                    source_file_path, targ_file_path
1056                )
1057
1058            if not quiet:
1059                if scp_result:
1060                    self.logger.info(
1061                        "\t\tSuccessfully copied from "
1062                        + self.hostname
1063                        + ":"
1064                        + source_file_path
1065                        + ".\n"
1066                    )
1067                else:
1068                    self.logger.info(
1069                        "\t\tFail to copy from "
1070                        + self.hostname
1071                        + ":"
1072                        + source_file_path
1073                        + ".\n"
1074                    )
1075            else:
1076                progress_counter += 1
1077                self.print_progress(progress_counter)
1078
1079    def set_ffdc_default_store_path(self):
1080        r"""
1081        Set default values for self.ffdc_dir_path and self.ffdc_prefix.
1082
1083        This method sets default values for the self.ffdc_dir_path and
1084        self.ffdc_prefix class variables.
1085
1086        The collected FFDC files will be stored in the directory
1087        /self.location/hostname_timestr/, with individual files having the
1088        format timestr_filename where timestr is in %Y%m%d-%H%M%S.
1089
1090        Returns:
1091            None
1092        """
1093        timestr = time.strftime("%Y%m%d-%H%M%S")
1094        self.ffdc_dir_path = (
1095            self.location + "/" + self.hostname + "_" + timestr + "/"
1096        )
1097        self.ffdc_prefix = timestr + "_"
1098        self.validate_local_store(self.ffdc_dir_path)
1099
1100    # Need to verify local store path exists prior to instantiate this class.
1101    # This class method to validate log path before referencing this class.
1102    @classmethod
1103    def validate_local_store(cls, dir_path):
1104        r"""
1105        Ensure the specified directory exists to store FFDC files locally.
1106
1107        This method checks if the provided dir_path exists. If the directory
1108        does not exist, the method creates it. The method does not return any
1109        value.
1110
1111        Parameters:
1112            dir_path (str): The directory path where collected FFDC data files
1113                            will be stored.
1114
1115        Returns:
1116            None
1117        """
1118        if not os.path.exists(dir_path):
1119            try:
1120                os.makedirs(dir_path, 0o755)
1121            except (IOError, OSError) as e:
1122                # PermissionError
1123                if e.errno == EPERM or e.errno == EACCES:
1124                    print(
1125                        "\tERROR: os.makedirs %s failed with"
1126                        " PermissionError.\n" % dir_path
1127                    )
1128                else:
1129                    print(
1130                        "\tERROR: os.makedirs %s failed with %s.\n"
1131                        % (dir_path, e.strerror)
1132                    )
1133                sys.exit(-1)
1134
1135    def print_progress(self, progress):
1136        r"""
1137        Print the current activity progress.
1138
1139        This method prints the current activity progress using the provided
1140        progress counter. The method does not return any value.
1141
1142        Parameters:
1143            progress (int): The current activity progress counter.
1144
1145        Returns:
1146            None
1147        """
1148        sys.stdout.write("\r\t" + "+" * progress)
1149        sys.stdout.flush()
1150        time.sleep(0.1)
1151
1152    def verify_redfish(self):
1153        r"""
1154        Verify if the remote host has the Redfish service active.
1155
1156        This method checks if the remote host has the Redfish service active
1157        by sending a GET request to the Redfish base URL /redfish/v1/.
1158        If the request is successful (status code 200), the method returns
1159        stdout output of the run else error message.
1160
1161        Returns:
1162            str: Redfish service executed output.
1163        """
1164        redfish_parm = (
1165            "redfishtool -r "
1166            + self.hostname
1167            + ":"
1168            + self.port_https
1169            + " -S Always raw GET /redfish/v1/"
1170        )
1171        return self.run_tool_cmd(redfish_parm, True)
1172
1173    def verify_ipmi(self):
1174        r"""
1175        Verify if the remote host has the IPMI LAN service active.
1176
1177        This method checks if the remote host has the IPMI LAN service active
1178        by sending an IPMI "power status" command.
1179
1180        If the command is successful (returns a non-empty response),
1181        else error message.
1182
1183        Returns:
1184            str: IPMI LAN service executed output.
1185        """
1186        if self.target_type == "OPENBMC":
1187            ipmi_parm = (
1188                "ipmitool -I lanplus -C 17  -U "
1189                + self.username
1190                + " -P "
1191                + self.password
1192                + " -H "
1193                + self.hostname
1194                + " -p "
1195                + str(self.port_ipmi)
1196                + " power status"
1197            )
1198        else:
1199            ipmi_parm = (
1200                "ipmitool -I lanplus  -P "
1201                + self.password
1202                + " -H "
1203                + self.hostname
1204                + " -p "
1205                + str(self.port_ipmi)
1206                + " power status"
1207            )
1208
1209        return self.run_tool_cmd(ipmi_parm, True)
1210
1211    def run_tool_cmd(self, parms_string, quiet=False):
1212        r"""
1213        Run a CLI standard tool or script with the provided command options.
1214
1215        This method runs a CLI standard tool or script with the provided
1216        parms_string command options. If the quiet parameter is set to True,
1217        the method suppresses the output of the command.
1218        The method returns the output of the command as a string.
1219
1220        Parameters:
1221            parms_string (str):     The command options for the CLI tool or
1222                                    script.
1223            quiet (bool, optional): If True, suppresses the output of the
1224                                    command. Defaults to False.
1225
1226        Returns:
1227            str: The output of the command as a string.
1228        """
1229
1230        result = subprocess.run(
1231            [parms_string],
1232            stdout=subprocess.PIPE,
1233            stderr=subprocess.PIPE,
1234            shell=True,
1235            universal_newlines=True,
1236        )
1237
1238        if result.stderr and not quiet:
1239            if self.password in parms_string:
1240                parms_string = parms_string.replace(self.password, "********")
1241            self.logger.error("\n\t\tERROR with %s " % parms_string)
1242            self.logger.error("\t\t" + result.stderr)
1243
1244        return result.stdout
1245
1246    def verify_protocol(self, protocol_list):
1247        r"""
1248        Perform a working check for the provided list of protocols.
1249
1250        This method checks if the specified protocols are available on the
1251        remote host. The method iterates through the protocol_list and
1252        attempts to establish a connection using each protocol.
1253
1254        If a connection is successfully established, the method append to the
1255        list and if any protocol fails to connect, the method ignores it.
1256
1257        Parameters:
1258            protocol_list (list):   A list of protocols to check.
1259
1260        Returns:
1261            list: All protocols are available list.
1262        """
1263
1264        tmp_list = []
1265        if self.target_is_pingable():
1266            tmp_list.append("SHELL")
1267
1268        for protocol in protocol_list:
1269            if self.remote_protocol != "ALL":
1270                if self.remote_protocol != protocol:
1271                    continue
1272
1273            # Only check SSH/SCP once for both protocols
1274            if (
1275                protocol == "SSH"
1276                or protocol == "SCP"
1277                and protocol not in tmp_list
1278            ):
1279                if self.ssh_to_target_system():
1280                    # Add only what user asked.
1281                    if self.remote_protocol != "ALL":
1282                        tmp_list.append(self.remote_protocol)
1283                    else:
1284                        tmp_list.append("SSH")
1285                        tmp_list.append("SCP")
1286
1287            if protocol == "TELNET":
1288                if self.telnet_to_target_system():
1289                    tmp_list.append(protocol)
1290
1291            if protocol == "REDFISH":
1292                if self.verify_redfish():
1293                    tmp_list.append(protocol)
1294                    self.logger.info(
1295                        "\n\t[Check] %s Redfish Service.\t\t [OK]"
1296                        % self.hostname
1297                    )
1298                else:
1299                    self.logger.info(
1300                        "\n\t[Check] %s Redfish Service.\t\t [NOT AVAILABLE]"
1301                        % self.hostname
1302                    )
1303
1304            if protocol == "IPMI":
1305                if self.verify_ipmi():
1306                    tmp_list.append(protocol)
1307                    self.logger.info(
1308                        "\n\t[Check] %s IPMI LAN Service.\t\t [OK]"
1309                        % self.hostname
1310                    )
1311                else:
1312                    self.logger.info(
1313                        "\n\t[Check] %s IPMI LAN Service.\t\t [NOT AVAILABLE]"
1314                        % self.hostname
1315                    )
1316
1317        return tmp_list
1318
1319    def load_env(self):
1320        r"""
1321        Load the user environment variables from a YAML file.
1322
1323        This method reads the environment variables from a YAML file specified
1324        in the ENV_FILE environment variable. If the file is not found or
1325        there is an error reading the file, an exception is raised.
1326
1327        The YAML file should have the following format:
1328
1329        .. code-block:: yaml
1330
1331            VAR_NAME: VAR_VALUE
1332
1333        Where VAR_NAME is the name of the environment variable, and
1334        VAR_VALUE is its value.
1335
1336        After loading the environment variables, they are stored in the
1337        self.env attribute for later use.
1338        """
1339
1340        tmp_env_vars = {
1341            "hostname": self.hostname,
1342            "username": self.username,
1343            "password": self.password,
1344            "port_ssh": self.port_ssh,
1345            "port_https": self.port_https,
1346            "port_ipmi": self.port_ipmi,
1347        }
1348
1349        # Updatae default Env and Dict var for both so that it can be
1350        # verified when referencing it throughout the code.
1351        for key, value in tmp_env_vars.items():
1352            os.environ[key] = value
1353            self.env_dict[key] = value
1354
1355        try:
1356            tmp_env_dict = {}
1357            if self.env_vars:
1358                tmp_env_dict = json.loads(self.env_vars)
1359                # Export ENV vars default.
1360                for key, value in tmp_env_dict.items():
1361                    os.environ[key] = value
1362                    self.env_dict[key] = str(value)
1363
1364            # Load user specified ENV config YAML.
1365            if self.econfig:
1366                with open(self.econfig, "r") as file:
1367                    try:
1368                        tmp_env_dict = yaml.load(file, Loader=yaml.SafeLoader)
1369                    except yaml.YAMLError as e:
1370                        self.logger.error(e)
1371                        sys.exit(-1)
1372                # Export ENV vars.
1373                for key, value in tmp_env_dict["env_params"].items():
1374                    os.environ[key] = str(value)
1375                    self.env_dict[key] = str(value)
1376        except json.decoder.JSONDecodeError as e:
1377            self.logger.error("\n\tERROR: %s " % e)
1378            sys.exit(-1)
1379        except FileNotFoundError as e:
1380            self.logger.error("\n\tERROR: %s " % e)
1381            sys.exit(-1)
1382
1383        # This to mask the password from displaying on the console.
1384        mask_dict = self.env_dict.copy()
1385        for k, v in mask_dict.items():
1386            if k.lower().find("password") != -1:
1387                hidden_text = []
1388                hidden_text.append(v)
1389                password_regex = (
1390                    "(" + "|".join([re.escape(x) for x in hidden_text]) + ")"
1391                )
1392                mask_dict[k] = re.sub(password_regex, "********", v)
1393
1394        self.logger.info(json.dumps(mask_dict, indent=8, sort_keys=False))
1395
1396    def execute_plugin_block(self, plugin_cmd_list):
1397        r"""
1398        Pack the plugin commands into qualified Python string objects.
1399
1400        This method processes the plugin_cmd_list argument, which is expected
1401        to contain a list of plugin commands read from a YAML file. The method
1402        iterates through the list, constructs a qualified Python string object
1403        for each plugin command, and returns a list of these string objects.
1404
1405        Parameters:
1406            plugin_cmd_list (list): A list of plugin commands containing
1407                                    plugin names and arguments.
1408                                    Plugin block read from YAML
1409                                    [
1410                                     {'plugin_name':'plugin.foo_func.my_func'},
1411                                     {'plugin_args':[10]},
1412                                    ]
1413
1414            Example:
1415                Execute and no return response
1416                - plugin:
1417                  - plugin_name: plugin.foo_func.my_func
1418                  - plugin_args:
1419                      - arg1
1420                      - arg2
1421
1422                Execute and return a response
1423                - plugin:
1424                    - plugin_name: result = plugin.foo_func.my_func
1425                    - plugin_args:
1426                        - arg1
1427                        - arg2
1428
1429                Execute and return multiple values response
1430                - plugin:
1431                    - plugin_name: result1,result2 = plugin.foo_func.my_func
1432                    - plugin_args:
1433                        - arg1
1434                        - arg2
1435
1436        Returns:
1437            str: Execute and not response or a string value(s) responses,
1438
1439        """
1440
1441        # Declare a variable plugin resp that can accept any data type.
1442        resp: Any = ""
1443        args_string = ""
1444
1445        try:
1446            idx = self.key_index_list_dict("plugin_name", plugin_cmd_list)
1447            # Get plugin module name
1448            plugin_name = plugin_cmd_list[idx]["plugin_name"]
1449
1450            # Get plugin function name
1451            idx = self.key_index_list_dict("plugin_function", plugin_cmd_list)
1452            plugin_function = plugin_cmd_list[idx]["plugin_function"]
1453
1454            # Equal separator means plugin function returns result.
1455            if " = " in plugin_function:
1456                # Ex. ['result', 'plugin.foo_func.my_func']
1457                plugin_function_args = plugin_function.split(" = ")
1458                # plugin func return data.
1459                for arg in plugin_function_args:
1460                    if arg == plugin_function_args[-1]:
1461                        plugin_function = arg
1462                    else:
1463                        plugin_resp = arg.split(",")
1464                        # ['result1','result2']
1465                        for x in plugin_resp:
1466                            global_plugin_list.append(x)
1467                            global_plugin_dict[x] = ""
1468
1469            # Walk the plugin args ['arg1,'arg2']
1470            # If the YAML plugin statement 'plugin_args' is not declared.
1471            plugin_args = []
1472            if any("plugin_args" in d for d in plugin_cmd_list):
1473                idx = self.key_index_list_dict("plugin_args", plugin_cmd_list)
1474                if idx is not None:
1475                    plugin_args = plugin_cmd_list[idx].get("plugin_args", [])
1476                    plugin_args = self.yaml_args_populate(plugin_args)
1477                else:
1478                    plugin_args = self.yaml_args_populate([])
1479
1480            plugin_args = self.update_vars_with_env_values(
1481                global_plugin_dict, plugin_args
1482            )
1483
1484            """
1485            Example of plugin_func:
1486                plugin.redfish.enumerate_request(
1487                         "xx.xx.xx.xx:443",
1488                         "root",
1489                         "********",
1490                         "/redfish/v1/",
1491                         "json")
1492            """
1493            # For logging purpose to mask password.
1494            # List should be string element to join else gives TypeError
1495            args_string = self.print_plugin_args_string(plugin_args)
1496
1497            # If user wants to debug plugins.
1498            self.logger.debug(
1499                f"\tDebug Plugin function: \n\t\t{plugin_name}."
1500                f"{plugin_function}{args_string}"
1501            )
1502
1503            # For generic logging plugin info.
1504            self.logger.info(
1505                f"\tPlugin function: \n\t\t{plugin_name}."
1506                f"{plugin_function}()"
1507            )
1508
1509            # Execute the plugins function with args.
1510            resp = execute_python_function(
1511                plugin_name, plugin_function, *plugin_args
1512            )
1513            self.logger.info(f"\tPlugin response = {resp}")
1514            # Update plugin vars dict if there is any.
1515            if resp != "PLUGIN_EXEC_ERROR":
1516                self.process_response_args_data(resp)
1517        except Exception as e:
1518            # Set the plugin error state.
1519            global_plugin_error_dict["exit_on_error"] = True
1520            self.logger.error("\tERROR: execute_plugin_block: %s" % e)
1521            pass
1522
1523        # There is a real error executing the plugin function.
1524        if resp == "PLUGIN_EXEC_ERROR":
1525            return resp
1526
1527        # Check if plugin_expects_return (int, string, list,dict etc)
1528        if any("plugin_expects_return" in d for d in plugin_cmd_list):
1529            idx = self.key_index_list_dict(
1530                "plugin_expects_return", plugin_cmd_list
1531            )
1532            plugin_expects = plugin_cmd_list[idx]["plugin_expects_return"]
1533            if plugin_expects:
1534                if resp:
1535                    if (
1536                        self.plugin_expect_type(plugin_expects, resp)
1537                        == "INVALID"
1538                    ):
1539                        self.logger.error("\tWARN: Plugin error check skipped")
1540                    elif not self.plugin_expect_type(plugin_expects, resp):
1541                        self.logger.error(
1542                            "\tERROR: Plugin expects return data: %s"
1543                            % plugin_expects
1544                        )
1545                        global_plugin_error_dict["exit_on_error"] = True
1546                elif not resp:
1547                    self.logger.error(
1548                        "\tERROR: Plugin func failed to return data"
1549                    )
1550                    global_plugin_error_dict["exit_on_error"] = True
1551
1552        return resp
1553
1554    def update_vars_with_env_values(self, ref_dict, args_list):
1555        r"""
1556        Update list elements with environment or gloable variable values.
1557
1558        This method updates the list arguments in the provided list with the
1559        corresponding values from the reference dictionary.
1560
1561        The method iterates through the dictionary and checks if each of the
1562        key matches an element in the list. If a match is found, the method
1563        replaces the key with its corresponding value in the list element. If
1564        the value is a string, the method replaces the key in the list element.
1565        If the value is not a string, the method assigns the value to the list
1566        element.
1567        The method handles exceptions and continues processing the remaining
1568        elements in the list.
1569
1570        Example:
1571
1572        Input (dict, list):
1573        {'global_log_store_path': 'LOG_PATH/BMC/system_20250523-000337'}
1574        ['ls  global_log_store_path/*.txt']
1575
1576        Output(list):
1577            ['ls  LOG_PATH/BMC/system_20250523-000337/*.txt']
1578
1579        Parameters:
1580            ref_dict (dict):  A dictionary containing the environment or
1581                              global variable values.
1582            args_list (list): A list of arguments to update.
1583
1584        Returns:
1585            list: The update list with global variables values.
1586        """
1587        # Replace keys in the string with their corresponding
1588        # values from the dictionary.
1589        for key, value in ref_dict.items():
1590            # Iterate through the list and check if each element matched
1591            # exact or in the string. If matches update the plugin element
1592            # in the list.
1593            for index, element in enumerate(args_list):
1594                try:
1595                    if isinstance(element, str):
1596                        # If the key is not in the list element string,
1597                        # then continue for the next element in the list.
1598                        if str(key) not in str(element):
1599                            continue
1600                        if isinstance(value, str):
1601                            args_list[index] = element.replace(key, value)
1602                        else:
1603                            args_list[index] = ref_dict[element]
1604                except KeyError as e:
1605                    print(f"Exception {e}")
1606                    pass
1607        return args_list
1608
1609    def print_plugin_args_string(self, plugin_args):
1610        r"""
1611        Generate a string representation of plugin arguments, replacing the
1612        password if necessary.
1613
1614        This method generates a string representation of the provided plugin
1615        arguments, joining them with commas. If the password is present in the
1616        arguments, it is replaced with "********".
1617        The method returns the generated string. If an exception occurs during
1618        the process, the method logs a debug log and returns "(None)".
1619
1620        Parameters:
1621            plugin_args (list): A list of plugin arguments.
1622
1623        Returns:
1624            str: The generated string representation of the plugin arguments.
1625        """
1626        try:
1627            plugin_args_str = "(" + ", ".join(map(str, plugin_args)) + ")"
1628            if self.password in plugin_args_str:
1629                args_string = plugin_args_str.replace(
1630                    self.password, "********"
1631                )
1632            else:
1633                args_string = plugin_args_str
1634        except Exception as e:
1635            self.logger.debug("\tWARN:Print args string : %s" % e)
1636            return "(None)"
1637
1638        return args_string
1639
1640    def process_response_args_data(self, plugin_resp):
1641        r"""
1642        Parse the plugin function response and update plugin return variables.
1643
1644        This method parses the response data from a plugin function and
1645        updates the plugin return variables accordingly. The method takes the
1646        plugin_resp argument, which is expected to be the response data from a
1647        plugin function.
1648
1649        The method handles various data types (string, bytes,
1650        tuple, list, int, float) and updates the global global_plugin_dict
1651        dictionary with the parsed response data. If there is an error during
1652        the process, the method logs a warning and continues with the next
1653        plugin block execution.
1654
1655        Parameters:
1656           plugin_resp (Any): The response data from the plugin function.
1657
1658        Returns:
1659            None
1660        """
1661        resp_list = []
1662        resp_data = ""
1663
1664        # There is nothing to update the plugin response.
1665        if len(global_plugin_list) == 0 or plugin_resp == "None":
1666            return
1667
1668        if isinstance(plugin_resp, str):
1669            resp_data = plugin_resp.strip("\r\n\t")
1670            resp_list.append(resp_data)
1671        elif isinstance(plugin_resp, bytes):
1672            resp_data = str(plugin_resp, "UTF-8").strip("\r\n\t")
1673            resp_list.append(resp_data)
1674        elif isinstance(plugin_resp, tuple):
1675            if len(global_plugin_list) == 1:
1676                resp_list.append(list(plugin_resp))
1677            else:
1678                resp_list = list(plugin_resp)
1679                resp_list = [x for x in resp_list]
1680        elif isinstance(plugin_resp, list):
1681            if len(global_plugin_list) == 1:
1682                resp_list.append([x.strip("\r\n\t") for x in plugin_resp])
1683            else:
1684                resp_list = [x.strip("\r\n\t") for x in plugin_resp]
1685        elif isinstance(plugin_resp, int) or isinstance(plugin_resp, float):
1686            resp_list.append(plugin_resp)
1687
1688        # Iterate if there is a list of plugin return vars to update.
1689        for idx, item in enumerate(resp_list, start=0):
1690            # Exit loop, done required loop.
1691            if idx >= len(global_plugin_list):
1692                break
1693            # Find the index of the return func in the list and
1694            # update the global func return dictionary.
1695            try:
1696                dict_idx = global_plugin_list[idx]
1697                global_plugin_dict[dict_idx] = item
1698            except (IndexError, ValueError) as e:
1699                self.logger.warn("\tWARN: process_response_args_data: %s" % e)
1700                pass
1701
1702        # Done updating plugin dict irrespective of pass or failed,
1703        # clear all the list element for next plugin block execute.
1704        global_plugin_list.clear()
1705
1706    def yaml_args_populate(self, yaml_arg_list):
1707        r"""
1708        Decode environment and plugin variables and populate the argument list.
1709
1710        This method processes the yaml_arg_list argument, which is expected to
1711        contain a list of arguments read from a YAML file. The method iterates
1712        through the list, decodes environment and plugin variables, and
1713        returns a populated list of arguments.
1714
1715        .. code-block:: yaml
1716
1717          - plugin_args:
1718            - arg1
1719            - arg2
1720
1721        ['${hostname}:${port_https}', '${username}', '/redfish/v1/', 'json']
1722
1723        Returns the populated plugin list
1724            ['xx.xx.xx.xx:443', 'root', '/redfish/v1/', 'json']
1725
1726        Parameters:
1727            yaml_arg_list (list):   A list of arguments containing environment
1728                                    and plugin variables.
1729
1730        Returns:
1731            list:   A populated list of arguments with decoded environment and
1732                    plugin variables.
1733        """
1734        if isinstance(yaml_arg_list, list):
1735            populated_list = []
1736            for arg in yaml_arg_list:
1737                if isinstance(arg, (int, float)):
1738                    populated_list.append(arg)
1739                elif isinstance(arg, str):
1740                    arg_str = self.yaml_env_and_plugin_vars_populate(arg)
1741                    populated_list.append(arg_str)
1742                else:
1743                    populated_list.append(arg)
1744
1745            return populated_list
1746
1747    def yaml_env_and_plugin_vars_populate(self, yaml_arg_str):
1748        r"""
1749        Update environment variables and plugin variables based on the
1750        provided YAML argument string.
1751
1752        This method processes the yaml_arg_str argument, which is expected
1753        to contain a string representing environment variables and plugin
1754        variables in the format:
1755
1756        .. code-block:: yaml
1757
1758            - cat ${MY_VAR}
1759            - ls -AX my_plugin_var
1760
1761        The method parses the string, extracts the variable names, and updates
1762        the corresponding environment variables and plugin variables.
1763
1764        Parameters:
1765            yaml_arg_str (str):   A string containing environment and plugin
1766                                  variable definitions in YAML format.
1767
1768        Returns:
1769            str:   The updated YAML argument string with plugin variables
1770                   replaced.
1771        """
1772
1773        # Parse and convert the Plugin YAML vars string to python vars
1774        # Example:
1775        #   ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
1776        try:
1777            # Example, list of matching
1778            # env vars ['username', 'password', 'hostname']
1779            # Extra escape \ for special symbols. '\$\{([^\}]+)\}' works good.
1780            env_var_regex = r"\$\{([^\}]+)\}"
1781            env_var_names_list = re.findall(env_var_regex, yaml_arg_str)
1782
1783            # If the list in empty [] nothing to update.
1784            if not len(env_var_names_list):
1785                return yaml_arg_str
1786            for var in env_var_names_list:
1787                env_var = os.environ.get(var)
1788                if env_var:
1789                    env_replace = "${" + var + "}"
1790                    yaml_arg_str = yaml_arg_str.replace(env_replace, env_var)
1791        except Exception as e:
1792            self.logger.error("\tERROR:yaml_env_vars_populate: %s" % e)
1793            pass
1794
1795        """
1796        Parse the string for plugin vars.
1797        Implement the logic to update environment variables based on the
1798        extracted variable names.
1799        """
1800        try:
1801            # Example, list of plugin vars env_var_names_list
1802            #    ['my_hostname', 'port_https']
1803            global_plugin_dict_keys = set(global_plugin_dict.keys())
1804            # Skip env var list already populated above code block list.
1805            plugin_var_name_list = [
1806                var
1807                for var in global_plugin_dict_keys
1808                if var not in env_var_names_list
1809            ]
1810
1811            for var in plugin_var_name_list:
1812                plugin_var_value = global_plugin_dict[var]
1813                if yaml_arg_str in global_plugin_dict:
1814                    """
1815                    If this plugin var exist but empty in dict, don't replace.
1816                    his is either a YAML plugin statement incorrectly used or
1817                    user added a plugin var which is not going to be populated.
1818                    """
1819                    if isinstance(plugin_var_value, (list, dict)):
1820                        """
1821                        List data type or dict can't be replaced, use
1822                        directly in plugin function call.
1823                        """
1824                        global_plugin_type_list.append(var)
1825                    else:
1826                        yaml_arg_str = yaml_arg_str.replace(
1827                            str(var), str(plugin_var_value)
1828                        )
1829        except (IndexError, ValueError) as e:
1830            self.logger.error("\tERROR: yaml_plugin_vars_populate: %s" % e)
1831            pass
1832
1833        # From ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
1834        # to populated values string as
1835        # Example:  xx.xx.xx.xx:443 and return the string
1836        return yaml_arg_str
1837
1838    def plugin_error_check(self, plugin_dict):
1839        r"""
1840        Process plugin error dictionary and return the corresponding error
1841        message.
1842
1843        This method checks if any dictionary in the plugin_dict list contains
1844        a "plugin_error" key. If such a dictionary is found, it retrieves the
1845        value associated with the "plugin_error" key and returns the
1846        corresponding error message from the global_plugin_error_dict
1847        attribute.
1848
1849        Parameters:
1850            plugin_dict (list of dict): A list of dictionaries containing
1851                                        plugin error information.
1852
1853        Returns:
1854           str: The error message corresponding to the "plugin_error" value,
1855                or None if no error is found.
1856        """
1857        if any("plugin_error" in d for d in plugin_dict):
1858            for d in plugin_dict:
1859                if "plugin_error" in d:
1860                    value = d["plugin_error"]
1861                    return global_plugin_error_dict.get(value, None)
1862        return None
1863
1864    def key_index_list_dict(self, key, list_dict):
1865        r"""
1866        Find the index of the first dictionary in the list that contains
1867        the specified key.
1868
1869        Parameters:
1870            key (str):                 The key to search for in the
1871                                       dictionaries.
1872            list_dict (list of dict):  A list of dictionaries to search
1873                                       through.
1874
1875        Returns:
1876            int: The index of the first dictionary containing the key, or -1
1877            if no match is found.
1878        """
1879        for i, d in enumerate(list_dict):
1880            if key in d:
1881                return i
1882        return -1
1883
1884    def plugin_expect_type(self, type, data):
1885        r"""
1886        Check if the provided data matches the expected type.
1887
1888        This method checks if the data argument matches the specified type.
1889        It supports the following types: "int", "float", "str", "list", "dict",
1890        and "tuple".
1891
1892        If the type is not recognized, it logs an info message and returns
1893        "INVALID".
1894
1895        Parameters:
1896            type (str): The expected data type.
1897            data:       The data to check against the expected type.
1898
1899        Returns:
1900            bool or str: True if the data matches the expected type, False if
1901                         not, or "INVALID" if the type is not recognized.
1902        """
1903        if type == "int":
1904            return isinstance(data, int)
1905        elif type == "float":
1906            return isinstance(data, float)
1907        elif type == "str":
1908            return isinstance(data, str)
1909        elif type == "list":
1910            return isinstance(data, list)
1911        elif type == "dict":
1912            return isinstance(data, dict)
1913        elif type == "tuple":
1914            return isinstance(data, tuple)
1915        else:
1916            self.logger.info("\tInvalid data type requested: %s" % type)
1917            return "INVALID"
1918