xref: /openbmc/openbmc-test-automation/ffdc/ffdc_collector.py (revision 286e2f07a1261ad043902a6db05d39429a31a1d4)
1#!/usr/bin/env python3
2
3r"""
4See class prolog below for details.
5"""
6
7import importlib
8import json
9import logging
10import os
11import platform
12import re
13import subprocess
14import sys
15import time
16from errno import EACCES, EPERM
17from typing import Any
18
19import yaml
20
21sys.dont_write_bytecode = True
22
23
24script_dir = os.path.dirname(os.path.abspath(__file__))
25sys.path.append(script_dir)
26# Walk path and append to sys.path
27for root, dirs, files in os.walk(script_dir):
28    for dir in dirs:
29        sys.path.append(os.path.join(root, dir))
30
31from ssh_utility import SSHRemoteclient  # NOQA
32from telnet_utility import TelnetRemoteclient  # NOQA
33
34r"""
35This is for plugin functions returning data or responses to the caller
36in YAML plugin setup.
37
38Example:
39
40    - plugin:
41      - plugin_name: plugin.ssh_execution
42      - plugin_function: version = ssh_execute_cmd
43      - plugin_args:
44        - ${hostname}
45        - ${username}
46        - ${password}
47        - "cat /etc/os-release | grep VERSION_ID | awk -F'=' '{print $2}'"
48     - plugin:
49        - plugin_name: plugin.print_vars
50        - plugin_function: print_vars
51        - plugin_args:
52          - version
53
54where first plugin "version" var is used by another plugin in the YAML
55block or plugin
56
57"""
58# Global variables for storing plugin return values, plugin return variables,
59# and log storage path.
60global global_log_store_path
61global global_plugin_dict
62global global_plugin_list
63global global_plugin_type_list
64global global_plugin_error_dict
65
66# Hold the plugin return values in a dictionary and plugin return variables in
67# a list. The dictionary is used for referencing and updating variables during
68# parsing in the parser, while the list is used for storing current variables
69# from the plugin block that need processing.
70global_plugin_dict = {}
71global_plugin_list = []
72
73# Hold the plugin return named variables if the function returned values are
74# lists or dictionaries. This list is used to reference the plugin dictionary
75# for python function execute arguments.
76# Example: ['version']
77global_plugin_type_list = []
78
79# Path where logs are to be stored or written.
80global_log_store_path = ""
81
82# Plugin error state defaults.
83global_plugin_error_dict = {
84    "exit_on_error": False,
85    "continue_on_error": False,
86}
87
88
89def execute_python_function(module_name, function_name, *args, **kwargs):
90    r"""
91    Execute a Python function from a module dynamically.
92
93    This function dynamically imports a module and executes a specified
94    function from that module with the provided arguments. The function takes
95    the module name, function name, and arguments as input. The function
96    returns the result of the executed function.
97
98    If an ImportError or AttributeError occurs, the function prints an error
99    message and returns None.
100
101    Parameters:
102        module_name (str):   The name of the module containing the function.
103        function_name (str): The name of the function to execute.
104        *args:               Positional arguments to pass to the function.
105        **kwargs:            Keyword arguments to pass to the function.
106
107    Returns:
108        Any: The result of the executed function or None if an error occurs.
109    """
110    try:
111        # Dynamically import the module.
112        module = importlib.import_module(module_name)
113
114        # Get the function from the module.
115        func = getattr(module, function_name)
116
117        # Call the function with the provided arguments.
118        result = func(*args, **kwargs)
119
120    except (ImportError, AttributeError) as e:
121        print(f"\tERROR: execute_python_function: {e}")
122        # Set the plugin error state.
123        global_plugin_error_dict["exit_on_error"] = True
124        print("\treturn: PLUGIN_EXEC_ERROR")
125        return "PLUGIN_EXEC_ERROR"
126
127    return result
128
129
130class ffdc_collector:
131    r"""
132    Execute commands from a configuration file to collect log files and store
133    the generated files at the specified location.
134
135    This class is designed to execute commands specified in a configuration
136    YAML file to collect log files from a remote host.
137
138    The class establishes connections using SSH, Telnet, or other protocols
139    based on the configuration. It fetches and stores the generated files at
140    the specified location. The class provides methods for initializing the
141    collector, executing commands, and handling errors.
142    """
143
144    def __init__(
145        self,
146        hostname,
147        username,
148        password,
149        port_ssh,
150        port_https,
151        port_ipmi,
152        ffdc_config,
153        location,
154        remote_type,
155        remote_protocol,
156        env_vars,
157        econfig,
158        log_level,
159    ):
160        r"""
161        Initialize the FFDCCollector object with the provided parameters.
162
163        This method initializes an FFDCCollector object with the given
164        attributes. The attributes represent the configuration for connecting
165        to a remote system, collecting log data, and storing the collected
166        data.
167
168        Parameters:
169            hostname (str):             Name or IP address of the targeted
170                                        (remote) system.
171            username (str):             User on the targeted system with access
172                                        to log files.
173            password (str):             Password for the user on the targeted
174                                        system.
175            port_ssh (int, optional):   SSH port value. Defaults to 22.
176            port_https (int, optional): HTTPS port value. Defaults to 443.
177            port_ipmi (int, optional):  IPMI port value. Defaults to 623.
178            ffdc_config (str):          Configuration file listing commands
179                                        and files for FFDC.
180            location (str):             Where to store collected log data.
181            remote_type (str):          Block YAML type name of the remote
182                                        host.
183            remote_protocol (str):      Protocol to use to collect data.
184            env_vars (dict, optional):  User-defined CLI environment variables.
185                                        Defaults to None.
186            econfig (str, optional):    User-defined environment variables
187                                        YAML file. Defaults to None.
188            log_level (str, optional):  Log level for the collector.
189                                        Defaults to "INFO".
190        """
191
192        self.hostname = hostname
193        self.username = username
194        self.password = password
195        self.port_ssh = str(port_ssh)
196        self.port_https = str(port_https)
197        self.port_ipmi = str(port_ipmi)
198        self.ffdc_config = ffdc_config
199        self.location = location + "/" + remote_type.upper()
200        self.ssh_remoteclient = None
201        self.telnet_remoteclient = None
202        self.ffdc_dir_path = ""
203        self.ffdc_prefix = ""
204        self.target_type = remote_type.upper()
205        self.remote_protocol = remote_protocol.upper()
206        self.env_vars = env_vars if env_vars else {}
207        self.econfig = econfig if econfig else {}
208        self.start_time = 0
209        self.elapsed_time = ""
210        self.env_dict = {}
211        self.logger = None
212
213        """
214        Set prefix values for SCP files and directories.
215        Since the time stamp is at second granularity, these values are set
216        here to be sure that all files for this run will have the same
217        timestamps and be saved in the same directory.
218        self.location == local system for now
219        """
220        self.set_ffdc_default_store_path()
221
222        # Logger for this run.  Need to be after set_ffdc_default_store_path()
223        self.script_logging(getattr(logging, log_level.upper()))
224
225        # Verify top level directory exists for storage
226        self.validate_local_store(self.location)
227
228        if self.verify_script_env():
229            try:
230                with open(self.ffdc_config, "r") as file:
231                    self.ffdc_actions = yaml.safe_load(file)
232            except yaml.YAMLError as e:
233                self.logger.error(e)
234                sys.exit(-1)
235
236            if self.target_type not in self.ffdc_actions:
237                self.logger.error(
238                    "\n\tERROR: %s is not listed in %s.\n\n"
239                    % (self.target_type, self.ffdc_config)
240                )
241                sys.exit(-1)
242
243            self.logger.info("\n\tENV: User define input YAML variables")
244            self.load_env()
245        else:
246            sys.exit(-1)
247
248    def verify_script_env(self):
249        r"""
250        Verify that all required environment variables are set.
251
252        This method checks if all required environment variables are set.
253        If any required variable is missing, the method returns False.
254        Otherwise, it returns True.
255
256        Returns:
257            bool: True if all required environment variables are set,
258            False otherwise.
259        """
260        # Import to log version
261        import click
262        import paramiko
263
264        run_env_ok = True
265
266        try:
267            redfishtool_version = (
268                self.run_tool_cmd("redfishtool -V").split(" ")[2].strip("\n")
269            )
270        except Exception as e:
271            self.logger.error("\tEXCEPTION redfishtool: %s", e)
272            redfishtool_version = "Not Installed (optional)"
273
274        try:
275            ipmitool_version = self.run_tool_cmd("ipmitool -V").split(" ")[2]
276        except Exception as e:
277            self.logger.error("\tEXCEPTION ipmitool: %s", e)
278            ipmitool_version = "Not Installed (optional)"
279
280        self.logger.info("\n\t---- Script host environment ----")
281        self.logger.info(
282            "\t{:<10}  {:<10}".format("Script hostname", os.uname()[1])
283        )
284        self.logger.info(
285            "\t{:<10}  {:<10}".format("Script host os", platform.platform())
286        )
287        self.logger.info(
288            "\t{:<10}  {:>10}".format("Python", platform.python_version())
289        )
290        self.logger.info("\t{:<10}  {:>10}".format("PyYAML", yaml.__version__))
291        self.logger.info("\t{:<10}  {:>10}".format("click", click.__version__))
292        self.logger.info(
293            "\t{:<10}  {:>10}".format("paramiko", paramiko.__version__)
294        )
295        self.logger.info(
296            "\t{:<10}  {:>9}".format("redfishtool", redfishtool_version)
297        )
298        self.logger.info(
299            "\t{:<10}  {:>12}".format("ipmitool", ipmitool_version)
300        )
301
302        if eval(yaml.__version__.replace(".", ",")) < (5, 3, 0):
303            self.logger.error(
304                "\n\tERROR: Python or python packages do not meet minimum"
305                " version requirement."
306            )
307            self.logger.error(
308                "\tERROR: PyYAML version 5.3.0 or higher is needed.\n"
309            )
310            run_env_ok = False
311
312        self.logger.info("\t---- End script host environment ----")
313        return run_env_ok
314
315    def script_logging(self, log_level_attr):
316        """
317        Create a logger for the script with the specified log level.
318
319        This method creates a logger for the script with the specified
320        log level. The logger is configured to write log messages to a file
321        and the console.
322
323        self.logger = logging.getLogger(__name__)
324
325        Setting logger with __name__ will add the trace
326        Example:
327
328            INFO:ffdc_collector:    System Type: OPENBMC
329
330        Currently, set to empty purposely to log as
331            System Type: OPENBMC
332
333        Parameters:
334            log_level_attr (str):   The log level for the logger
335                                    (e.g., "DEBUG", "INFO", "WARNING",
336                                    "ERROR", "CRITICAL").
337
338        Returns:
339            None
340        """
341        self.logger = logging.getLogger()
342        self.logger.setLevel(log_level_attr)
343
344        log_file_handler = logging.FileHandler(
345            self.ffdc_dir_path + "collector.log"
346        )
347        stdout_handler = logging.StreamHandler(sys.stdout)
348
349        self.logger.addHandler(log_file_handler)
350        self.logger.addHandler(stdout_handler)
351
352        # Turn off paramiko INFO logging
353        logging.getLogger("paramiko").setLevel(logging.WARNING)
354
355    def target_is_pingable(self):
356        r"""
357        Check if the target system is ping-able.
358
359        This method checks if the target system is reachable by sending an
360        ICMP echo request (ping). If the target system responds to the ping,
361        the method returns True. Otherwise, it returns False.
362
363        Returns:
364            bool: True if the target system is ping-able, False otherwise.
365        """
366        response = os.system("ping -c 2 %s  2>&1 >/dev/null" % self.hostname)
367        if response == 0:
368            self.logger.info(
369                "\n\t[Check] %s is ping-able.\t\t [OK]" % self.hostname
370            )
371            return True
372        else:
373            self.logger.error(
374                "\n\tERROR: %s is not ping-able. FFDC collection aborted.\n"
375                % self.hostname
376            )
377            sys.exit(-1)
378        return False
379
380    def collect_ffdc(self):
381        r"""
382        Initiate FFDC collection based on the requested protocol.
383
384        This method initiates FFDC (First Failure Data Capture) collection
385        based on the requested protocol (SSH,SCP, TELNET, REDFISH, IPMI).
386        The method establishes a connection to the target system using the
387        specified protocol and collects the required FFDC data.
388
389        Returns:
390            None
391        """
392        self.logger.info(
393            "\n\t---- Start communicating with %s ----" % self.hostname
394        )
395        self.start_time = time.time()
396
397        # Find the list of target and protocol supported.
398        check_protocol_list = []
399        config_dict = self.ffdc_actions
400
401        for target_type in config_dict.keys():
402            if self.target_type != target_type:
403                continue
404
405            for k, v in config_dict[target_type].items():
406                if v["PROTOCOL"][0] not in check_protocol_list:
407                    check_protocol_list.append(v["PROTOCOL"][0])
408
409        self.logger.info(
410            "\n\t %s protocol type: %s"
411            % (self.target_type, check_protocol_list)
412        )
413
414        verified_working_protocol = self.verify_protocol(check_protocol_list)
415
416        if verified_working_protocol:
417            self.logger.info(
418                "\n\t---- Completed protocol pre-requisite check ----\n"
419            )
420
421        # Verify top level directory exists for storage
422        self.validate_local_store(self.location)
423
424        if (self.remote_protocol not in verified_working_protocol) and (
425            self.remote_protocol != "ALL"
426        ):
427            self.logger.info(
428                "\n\tWorking protocol list: %s" % verified_working_protocol
429            )
430            self.logger.error(
431                "\tERROR: Requested protocol %s is not in working protocol"
432                " list.\n" % self.remote_protocol
433            )
434            sys.exit(-1)
435        else:
436            self.generate_ffdc(verified_working_protocol)
437
438    def ssh_to_target_system(self):
439        r"""
440        Establish an SSH connection to the target system.
441
442        This method establishes an SSH connection to the target system using
443        the provided hostname, username, password, and SSH port. If the
444        connection is successful, the method returns True. Otherwise, it logs
445        an error message and returns False.
446
447        Returns:
448            bool: True if the connection is successful, False otherwise.
449        """
450
451        self.ssh_remoteclient = SSHRemoteclient(
452            self.hostname, self.username, self.password, self.port_ssh
453        )
454
455        if self.ssh_remoteclient.ssh_remoteclient_login():
456            self.logger.info(
457                "\n\t[Check] %s SSH connection established.\t [OK]"
458                % self.hostname
459            )
460
461            # Check scp connection.
462            # If scp connection fails,
463            # continue with FFDC generation but skip scp files to local host.
464            self.ssh_remoteclient.scp_connection()
465            return True
466        else:
467            self.logger.info(
468                "\n\t[Check] %s SSH connection.\t [NOT AVAILABLE]"
469                % self.hostname
470            )
471            return False
472
473    def telnet_to_target_system(self):
474        r"""
475        Establish a Telnet connection to the target system.
476
477        This method establishes a Telnet connection to the target system using
478        the provided hostname, username, and Telnet port. If the connection is
479        successful, the method returns True. Otherwise, it logs an error
480        message and returns False.
481
482        Returns:
483            bool: True if the connection is successful, False otherwise.
484        """
485        self.telnet_remoteclient = TelnetRemoteclient(
486            self.hostname, self.username, self.password
487        )
488        if self.telnet_remoteclient.tn_remoteclient_login():
489            self.logger.info(
490                "\n\t[Check] %s Telnet connection established.\t [OK]"
491                % self.hostname
492            )
493            return True
494        else:
495            self.logger.info(
496                "\n\t[Check] %s Telnet connection.\t [NOT AVAILABLE]"
497                % self.hostname
498            )
499            return False
500
501    def generate_ffdc(self, working_protocol_list):
502        r"""
503        Generate FFDC (First Failure Data Capture) based on the remote host
504        type and working protocols.
505
506        This method determines the actions to be performed for generating FFDC
507        based on the remote host type and the list of confirmed working
508        protocols. The method iterates through the available actions for the
509        remote host type and checks if any of the working protocols are
510        supported. If a supported protocol is found, the method executes the
511        corresponding FFDC generation action.
512
513        Parameters:
514            working_protocol_list (list):  A list of confirmed working
515                                           protocols to connect to the
516                                           remote host.
517
518        Returns:
519            None
520        """
521        self.logger.info(
522            "\n\t---- Executing commands on " + self.hostname + " ----"
523        )
524        self.logger.info(
525            "\n\tWorking protocol list: %s" % working_protocol_list
526        )
527
528        config_dict = self.ffdc_actions
529        for target_type in config_dict.keys():
530            if self.target_type != target_type:
531                continue
532
533            self.logger.info("\n\tFFDC Path: %s " % self.ffdc_dir_path)
534            global_plugin_dict["global_log_store_path"] = self.ffdc_dir_path
535            self.logger.info("\tSystem Type: %s" % target_type)
536            for k, v in config_dict[target_type].items():
537                protocol = v["PROTOCOL"][0]
538
539                if (
540                    self.remote_protocol not in working_protocol_list
541                    and self.remote_protocol != "ALL"
542                ) or protocol not in working_protocol_list:
543                    continue
544
545                if protocol in working_protocol_list:
546                    if protocol in ["SSH", "SCP"]:
547                        self.protocol_ssh(protocol, target_type, k)
548                    elif protocol == "TELNET":
549                        self.protocol_telnet(target_type, k)
550                    elif protocol in ["REDFISH", "IPMI", "SHELL"]:
551                        self.protocol_service_execute(protocol, target_type, k)
552                else:
553                    self.logger.error(
554                        "\n\tERROR: %s is not available for %s."
555                        % (protocol, self.hostname)
556                    )
557
558        # Close network connection after collecting all files
559        self.elapsed_time = time.strftime(
560            "%H:%M:%S", time.gmtime(time.time() - self.start_time)
561        )
562        self.logger.info("\n\tTotal time taken: %s" % self.elapsed_time)
563        if self.ssh_remoteclient:
564            self.ssh_remoteclient.ssh_remoteclient_disconnect()
565        if self.telnet_remoteclient:
566            self.telnet_remoteclient.tn_remoteclient_disconnect()
567
568    def protocol_ssh(self, protocol, target_type, sub_type):
569        r"""
570        Perform actions using SSH and SCP protocols.
571
572        This method executes a set of commands using the SSH protocol to
573        connect to the target system and collect FFDC data. The method takes
574        the protocol, target type, and sub-type as arguments and performs the
575        corresponding actions based on the provided parameters.
576
577        Parameters:
578            protocol (str):    The protocol to execute (SSH or SCP).
579            target_type (str): The type group of the remote host.
580            sub_type (str):    The group type of commands to execute.
581
582        Returns:
583            None
584        """
585        if protocol == "SCP":
586            self.group_copy(self.ffdc_actions[target_type][sub_type])
587        else:
588            self.collect_and_copy_ffdc(
589                self.ffdc_actions[target_type][sub_type]
590            )
591
592    def protocol_telnet(self, target_type, sub_type):
593        r"""
594        Perform actions using the Telnet protocol.
595
596        This method executes a set of commands using the Telnet protocol to
597        connect to the target system and collect FFDC data. The method takes
598        the target type and sub-type as arguments and performs the
599        corresponding actions based on the provided parameters.
600
601        Parameters:
602            target_type (str): The type group of the remote host.
603            sub_type (str):    The group type of commands to execute.
604
605        Returns:
606            None
607        """
608        self.logger.info(
609            "\n\t[Run] Executing commands on %s using %s"
610            % (self.hostname, "TELNET")
611        )
612        telnet_files_saved = []
613        progress_counter = 0
614        list_of_commands = self.ffdc_actions[target_type][sub_type]["COMMANDS"]
615        for index, each_cmd in enumerate(list_of_commands, start=0):
616            command_txt, command_timeout = self.unpack_command(each_cmd)
617            result = self.telnet_remoteclient.execute_command(
618                command_txt, command_timeout
619            )
620            if result:
621                try:
622                    targ_file = self.ffdc_actions[target_type][sub_type][
623                        "FILES"
624                    ][index]
625                except IndexError:
626                    targ_file = command_txt
627                    self.logger.warning(
628                        "\n\t[WARN] Missing filename to store data from"
629                        " telnet %s." % each_cmd
630                    )
631                    self.logger.warning(
632                        "\t[WARN] Data will be stored in %s." % targ_file
633                    )
634                targ_file_with_path = (
635                    self.ffdc_dir_path + self.ffdc_prefix + targ_file
636                )
637                # Creates a new file
638                with open(targ_file_with_path, "w") as fp:
639                    fp.write(result)
640                    fp.close
641                    telnet_files_saved.append(targ_file)
642            progress_counter += 1
643            self.print_progress(progress_counter)
644        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
645        for file in telnet_files_saved:
646            self.logger.info("\n\t\tSuccessfully save file " + file + ".")
647
648    def protocol_service_execute(self, protocol, target_type, sub_type):
649        r"""
650        Perform actions for a given protocol.
651
652        This method executes a set of commands using the specified protocol to
653        connect to the target system and collect FFDC data. The method takes
654        the protocol, target type, and sub-type as arguments and performs the
655        corresponding actions based on the provided parameters.
656
657        Parameters:
658            protocol (str):    The protocol to execute
659                               (REDFISH, IPMI, or SHELL).
660            target_type (str): The type group of the remote host.
661            sub_type (str):    The group type of commands to execute.
662
663        Returns:
664            None
665        """
666        self.logger.info(
667            "\n\t[Run] Executing commands to %s using %s"
668            % (self.hostname, protocol)
669        )
670        executed_files_saved = []
671        progress_counter = 0
672        list_of_cmd = self.get_command_list(
673            self.ffdc_actions[target_type][sub_type]
674        )
675        for index, each_cmd in enumerate(list_of_cmd, start=0):
676            plugin_call = False
677            if isinstance(each_cmd, dict):
678                if "plugin" in each_cmd:
679                    # If the error is set and plugin explicitly
680                    # requested to skip execution on error..
681                    if global_plugin_error_dict[
682                        "exit_on_error"
683                    ] and self.plugin_error_check(each_cmd["plugin"]):
684                        self.logger.info(
685                            "\n\t[PLUGIN-ERROR] exit_on_error: %s"
686                            % global_plugin_error_dict["exit_on_error"]
687                        )
688                        self.logger.info(
689                            "\t[PLUGIN-SKIP] %s" % each_cmd["plugin"][0]
690                        )
691                        continue
692                    plugin_call = True
693                    # call the plugin
694                    self.logger.info("\n\t[PLUGIN-START]")
695                    result = self.execute_plugin_block(each_cmd["plugin"])
696                    self.logger.info("\t[PLUGIN-END]\n")
697            else:
698                each_cmd = self.yaml_env_and_plugin_vars_populate(each_cmd)
699
700            if not plugin_call:
701                result = self.run_tool_cmd(each_cmd)
702            if result:
703                try:
704                    file_name = self.get_file_list(
705                        self.ffdc_actions[target_type][sub_type]
706                    )[index]
707                    # If file is specified as None.
708                    if file_name == "None":
709                        continue
710                    targ_file = self.yaml_env_and_plugin_vars_populate(
711                        file_name
712                    )
713                except IndexError:
714                    targ_file = each_cmd.split("/")[-1]
715                    self.logger.warning(
716                        "\n\t[WARN] Missing filename to store data from %s."
717                        % each_cmd
718                    )
719                    self.logger.warning(
720                        "\t[WARN] Data will be stored in %s." % targ_file
721                    )
722
723                targ_file_with_path = (
724                    self.ffdc_dir_path + self.ffdc_prefix + targ_file
725                )
726
727                # Creates a new file
728                with open(targ_file_with_path, "w") as fp:
729                    if isinstance(result, dict):
730                        fp.write(json.dumps(result))
731                    else:
732                        fp.write(result)
733                    fp.close
734                    executed_files_saved.append(targ_file)
735
736            progress_counter += 1
737            self.print_progress(progress_counter)
738
739        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
740
741        for file in executed_files_saved:
742            self.logger.info("\n\t\tSuccessfully save file " + file + ".")
743
744    def collect_and_copy_ffdc(
745        self, ffdc_actions_for_target_type, form_filename=False
746    ):
747        r"""
748        Send commands and collect FFDC data from the targeted system.
749
750        This method sends a set of commands and collects FFDC data from the
751        targeted system based on the provided ffdc_actions_for_target_type
752        dictionary. The method also has an optional form_filename parameter,
753        which, if set to True, prepends the target type to the output file
754        name.
755
756        Parameters:
757            ffdc_actions_for_target_type (dict): A dictionary containing
758                                                 commands and files for the
759                                                 selected remote host type.
760            form_filename (bool, optional):      If True, prepends the target
761                                                 type to the output file name.
762                                                 Defaults to False.
763
764        Returns:
765            None
766        """
767        # Executing commands, if any
768        self.ssh_execute_ffdc_commands(
769            ffdc_actions_for_target_type, form_filename
770        )
771
772        # Copying files
773        if self.ssh_remoteclient.scpclient:
774            self.logger.info(
775                "\n\n\tCopying FFDC files from remote system %s.\n"
776                % self.hostname
777            )
778
779            # Retrieving files from target system
780            list_of_files = self.get_file_list(ffdc_actions_for_target_type)
781            self.scp_ffdc(
782                self.ffdc_dir_path,
783                self.ffdc_prefix,
784                form_filename,
785                list_of_files,
786            )
787        else:
788            self.logger.info(
789                "\n\n\tSkip copying FFDC files from remote system %s.\n"
790                % self.hostname
791            )
792
793    def get_command_list(self, ffdc_actions_for_target_type):
794        r"""
795        Fetch a list of commands from the configuration file.
796
797        This method retrieves a list of commands from the
798        ffdc_actions_for_target_type dictionary, which contains commands and
799        files for the selected remote host type. The method returns the list
800        of commands.
801
802        Parameters:
803            ffdc_actions_for_target_type (dict): A dictionary containing
804                                                 commands and files for the
805                                                 selected remote host type.
806
807        Returns:
808            list: A list of commands.
809        """
810        try:
811            list_of_commands = ffdc_actions_for_target_type["COMMANDS"]
812        except KeyError:
813            list_of_commands = []
814        return list_of_commands
815
816    def get_file_list(self, ffdc_actions_for_target_type):
817        r"""
818        Fetch a list of files from the configuration file.
819
820        This method retrieves a list of files from the
821        ffdc_actions_for_target_type dictionary, which contains commands and
822        files for the selected remote host type. The method returns the list
823        of files.
824
825        Parameters:
826            ffdc_actions_for_target_type (dict): A dictionary containing
827                                                 commands and files for the
828                                                 selected remote host type.
829
830        Returns:
831            list: A list of files.
832        """
833        try:
834            list_of_files = ffdc_actions_for_target_type["FILES"]
835        except KeyError:
836            list_of_files = []
837        return list_of_files
838
839    def unpack_command(self, command):
840        r"""
841        Unpack a command from the configuration file, handling both dictionary
842        and string inputs.
843
844        This method takes a command from the configuration file, which can be
845        either a dictionary or a string. If the input is a dictionary, the
846        method extracts the command text and timeout from the dictionary.
847        If the input is a string, the method assumes a default timeout of
848        60 seconds.
849        The method returns a tuple containing the command text and timeout.
850
851        Parameters:
852            command (dict or str): A command from the configuration file,
853                                   which can be either a dictionary or a
854                                   string.
855
856        Returns:
857            tuple: A tuple containing the command text and timeout.
858        """
859        if isinstance(command, dict):
860            command_txt = next(iter(command))
861            command_timeout = next(iter(command.values()))
862        elif isinstance(command, str):
863            command_txt = command
864            # Default command timeout 60 seconds
865            command_timeout = 60
866
867        return command_txt, command_timeout
868
869    def ssh_execute_ffdc_commands(
870        self, ffdc_actions_for_target_type, form_filename=False
871    ):
872        r"""
873        Send commands in the ffdc_config file to the targeted system using SSH.
874
875        This method sends a set of commands and collects FFDC data from the
876        targeted system using the SSH protocol. The method takes the
877        ffdc_actions_for_target_type dictionary and an optional
878        form_filename parameter as arguments.
879
880        If form_filename is set to True, the method prepends the target type
881        to the output file name. The method returns the output of the executed
882        commands.
883
884        It also prints the progress counter string + on the console.
885
886        Parameters:
887            ffdc_actions_for_target_type (dict): A dictionary containing
888                                                 commands and files for the
889                                                 selected remote host type.
890            form_filename (bool, optional):      If True, prepends the target
891                                                 type to the output file name.
892                                                 Defaults to False.
893
894        Returns:
895            None
896        """
897        self.logger.info(
898            "\n\t[Run] Executing commands on %s using %s"
899            % (self.hostname, ffdc_actions_for_target_type["PROTOCOL"][0])
900        )
901
902        list_of_commands = self.get_command_list(ffdc_actions_for_target_type)
903        # If command list is empty, returns
904        if not list_of_commands:
905            return
906
907        progress_counter = 0
908        for command in list_of_commands:
909            command_txt, command_timeout = self.unpack_command(command)
910
911            if form_filename:
912                command_txt = str(command_txt % self.target_type)
913
914            (
915                cmd_exit_code,
916                err,
917                response,
918            ) = self.ssh_remoteclient.execute_command(
919                command_txt, command_timeout
920            )
921
922            if cmd_exit_code:
923                self.logger.warning(
924                    "\n\t\t[WARN] %s exits with code %s."
925                    % (command_txt, str(cmd_exit_code))
926                )
927                self.logger.warning("\t\t[WARN] %s " % err)
928
929            progress_counter += 1
930            self.print_progress(progress_counter)
931
932        self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]")
933
934    def group_copy(self, ffdc_actions_for_target_type):
935        r"""
936        SCP a group of files (wildcard) from the remote host.
937
938        This method copies a group of files from the remote host using the SCP
939        protocol. The method takes the fdc_actions_for_target_type dictionary
940        as an argument, which contains commands and files for the selected
941        remote host type.
942
943        Parameters:
944            fdc_actions_for_target_type (dict): A dictionary containing
945                                                commands and files for the
946                                                selected remote host type.
947
948        Returns:
949            None
950        """
951        if self.ssh_remoteclient.scpclient:
952            self.logger.info(
953                "\n\tCopying files from remote system %s via SCP.\n"
954                % self.hostname
955            )
956
957            list_of_commands = self.get_command_list(
958                ffdc_actions_for_target_type
959            )
960            # If command list is empty, returns
961            if not list_of_commands:
962                return
963
964            for command in list_of_commands:
965                try:
966                    command = self.yaml_env_and_plugin_vars_populate(command)
967                except IndexError:
968                    self.logger.error("\t\tInvalid command %s" % command)
969                    continue
970
971                (
972                    cmd_exit_code,
973                    err,
974                    response,
975                ) = self.ssh_remoteclient.execute_command(command)
976
977                # If file does not exist, code take no action.
978                # cmd_exit_code is ignored for this scenario.
979                if response:
980                    scp_result = self.ssh_remoteclient.scp_file_from_remote(
981                        response.split("\n"), self.ffdc_dir_path
982                    )
983                    if scp_result:
984                        self.logger.info(
985                            "\t\tSuccessfully copied from "
986                            + self.hostname
987                            + ":"
988                            + command
989                        )
990                else:
991                    self.logger.info("\t\t%s has no result" % command)
992
993        else:
994            self.logger.info(
995                "\n\n\tSkip copying files from remote system %s.\n"
996                % self.hostname
997            )
998
999    def scp_ffdc(
1000        self,
1001        targ_dir_path,
1002        targ_file_prefix,
1003        form_filename,
1004        file_list=None,
1005        quiet=None,
1006    ):
1007        r"""
1008        SCP all files in the file_dict to the indicated directory on the local
1009        system.
1010
1011        This method copies all files specified in the file_dict dictionary
1012        from the targeted system to the local system using the SCP protocol.
1013        The method takes the target directory path, target file prefix, and a
1014        boolean flag form_filename as required arguments.
1015
1016        The file_dict argument is optional and contains the files to be copied.
1017        The quiet argument is also optional and, if set to True, suppresses
1018        the output of the SCP operation.
1019
1020        Parameters:
1021            targ_dir_path (str):        The path of the directory to receive
1022                                        the files on the local system.
1023            targ_file_prefix (str):     Prefix which will be prepended to each
1024                                        target file's name.
1025            form_filename (bool):       If True, prepends the target type to
1026                                        the file names.
1027            file_dict (dict, optional): A dictionary containing the files to
1028                                        be copied. Defaults to None.
1029            quiet (bool, optional):     If True, suppresses the output of the
1030                                        SCP operation. Defaults to None.
1031
1032        Returns:
1033            None
1034        """
1035        progress_counter = 0
1036        for filename in file_list:
1037            if form_filename:
1038                filename = str(filename % self.target_type)
1039            source_file_path = filename
1040            targ_file_path = (
1041                targ_dir_path + targ_file_prefix + filename.split("/")[-1]
1042            )
1043
1044            # If source file name contains wild card, copy filename as is.
1045            if "*" in source_file_path:
1046                scp_result = self.ssh_remoteclient.scp_file_from_remote(
1047                    source_file_path, self.ffdc_dir_path
1048                )
1049            else:
1050                scp_result = self.ssh_remoteclient.scp_file_from_remote(
1051                    source_file_path, targ_file_path
1052                )
1053
1054            if not quiet:
1055                if scp_result:
1056                    self.logger.info(
1057                        "\t\tSuccessfully copied from "
1058                        + self.hostname
1059                        + ":"
1060                        + source_file_path
1061                        + ".\n"
1062                    )
1063                else:
1064                    self.logger.info(
1065                        "\t\tFail to copy from "
1066                        + self.hostname
1067                        + ":"
1068                        + source_file_path
1069                        + ".\n"
1070                    )
1071            else:
1072                progress_counter += 1
1073                self.print_progress(progress_counter)
1074
1075    def set_ffdc_default_store_path(self):
1076        r"""
1077        Set default values for self.ffdc_dir_path and self.ffdc_prefix.
1078
1079        This method sets default values for the self.ffdc_dir_path and
1080        self.ffdc_prefix class variables.
1081
1082        The collected FFDC files will be stored in the directory
1083        /self.location/hostname_timestr/, with individual files having the
1084        format timestr_filename where timestr is in %Y%m%d-%H%M%S.
1085
1086        Returns:
1087            None
1088        """
1089        timestr = time.strftime("%Y%m%d-%H%M%S")
1090        self.ffdc_dir_path = (
1091            self.location + "/" + self.hostname + "_" + timestr + "/"
1092        )
1093        self.ffdc_prefix = timestr + "_"
1094        self.validate_local_store(self.ffdc_dir_path)
1095
1096    # Need to verify local store path exists prior to instantiate this class.
1097    # This class method to validate log path before referencing this class.
1098    @classmethod
1099    def validate_local_store(cls, dir_path):
1100        r"""
1101        Ensure the specified directory exists to store FFDC files locally.
1102
1103        This method checks if the provided dir_path exists. If the directory
1104        does not exist, the method creates it. The method does not return any
1105        value.
1106
1107        Parameters:
1108            dir_path (str): The directory path where collected FFDC data files
1109                            will be stored.
1110
1111        Returns:
1112            None
1113        """
1114        if not os.path.exists(dir_path):
1115            try:
1116                os.makedirs(dir_path, 0o755)
1117            except (IOError, OSError) as e:
1118                # PermissionError
1119                if e.errno == EPERM or e.errno == EACCES:
1120                    print(
1121                        "\tERROR: os.makedirs %s failed with"
1122                        " PermissionError.\n" % dir_path
1123                    )
1124                else:
1125                    print(
1126                        "\tERROR: os.makedirs %s failed with %s.\n"
1127                        % (dir_path, e.strerror)
1128                    )
1129                sys.exit(-1)
1130
1131    def print_progress(self, progress):
1132        r"""
1133        Print the current activity progress.
1134
1135        This method prints the current activity progress using the provided
1136        progress counter. The method does not return any value.
1137
1138        Parameters:
1139            progress (int): The current activity progress counter.
1140
1141        Returns:
1142            None
1143        """
1144        sys.stdout.write("\r\t" + "+" * progress)
1145        sys.stdout.flush()
1146        time.sleep(0.1)
1147
1148    def verify_redfish(self):
1149        r"""
1150        Verify if the remote host has the Redfish service active.
1151
1152        This method checks if the remote host has the Redfish service active
1153        by sending a GET request to the Redfish base URL /redfish/v1/.
1154        If the request is successful (status code 200), the method returns
1155        stdout output of the run else error message.
1156
1157        Returns:
1158            str: Redfish service executed output.
1159        """
1160        redfish_parm = (
1161            "redfishtool -r "
1162            + self.hostname
1163            + ":"
1164            + self.port_https
1165            + " -S Always raw GET /redfish/v1/"
1166        )
1167        return self.run_tool_cmd(redfish_parm, True)
1168
1169    def verify_ipmi(self):
1170        r"""
1171        Verify if the remote host has the IPMI LAN service active.
1172
1173        This method checks if the remote host has the IPMI LAN service active
1174        by sending an IPMI "power status" command.
1175
1176        If the command is successful (returns a non-empty response),
1177        else error message.
1178
1179        Returns:
1180            str: IPMI LAN service executed output.
1181        """
1182        if self.target_type == "OPENBMC":
1183            ipmi_parm = (
1184                "ipmitool -I lanplus -C 17  -U "
1185                + self.username
1186                + " -P "
1187                + self.password
1188                + " -H "
1189                + self.hostname
1190                + " -p "
1191                + str(self.port_ipmi)
1192                + " power status"
1193            )
1194        else:
1195            ipmi_parm = (
1196                "ipmitool -I lanplus  -P "
1197                + self.password
1198                + " -H "
1199                + self.hostname
1200                + " -p "
1201                + str(self.port_ipmi)
1202                + " power status"
1203            )
1204
1205        return self.run_tool_cmd(ipmi_parm, True)
1206
1207    def run_tool_cmd(self, parms_string, quiet=False):
1208        r"""
1209        Run a CLI standard tool or script with the provided command options.
1210
1211        This method runs a CLI standard tool or script with the provided
1212        parms_string command options. If the quiet parameter is set to True,
1213        the method suppresses the output of the command.
1214        The method returns the output of the command as a string.
1215
1216        Parameters:
1217            parms_string (str):     The command options for the CLI tool or
1218                                    script.
1219            quiet (bool, optional): If True, suppresses the output of the
1220                                    command. Defaults to False.
1221
1222        Returns:
1223            str: The output of the command as a string.
1224        """
1225
1226        result = subprocess.run(
1227            [parms_string],
1228            stdout=subprocess.PIPE,
1229            stderr=subprocess.PIPE,
1230            shell=True,
1231            universal_newlines=True,
1232        )
1233
1234        if result.stderr and not quiet:
1235            if self.password in parms_string:
1236                parms_string = parms_string.replace(self.password, "********")
1237            self.logger.error("\n\t\tERROR with %s " % parms_string)
1238            self.logger.error("\t\t" + result.stderr)
1239
1240        return result.stdout
1241
1242    def verify_protocol(self, protocol_list):
1243        r"""
1244        Perform a working check for the provided list of protocols.
1245
1246        This method checks if the specified protocols are available on the
1247        remote host. The method iterates through the protocol_list and
1248        attempts to establish a connection using each protocol.
1249
1250        If a connection is successfully established, the method append to the
1251        list and if any protocol fails to connect, the method ignores it.
1252
1253        Parameters:
1254            protocol_list (list):   A list of protocols to check.
1255
1256        Returns:
1257            list: All protocols are available list.
1258        """
1259
1260        tmp_list = []
1261        if self.target_is_pingable():
1262            tmp_list.append("SHELL")
1263
1264        for protocol in protocol_list:
1265            if self.remote_protocol != "ALL":
1266                if self.remote_protocol != protocol:
1267                    continue
1268
1269            # Only check SSH/SCP once for both protocols
1270            if (
1271                protocol == "SSH"
1272                or protocol == "SCP"
1273                and protocol not in tmp_list
1274            ):
1275                if self.ssh_to_target_system():
1276                    # Add only what user asked.
1277                    if self.remote_protocol != "ALL":
1278                        tmp_list.append(self.remote_protocol)
1279                    else:
1280                        tmp_list.append("SSH")
1281                        tmp_list.append("SCP")
1282
1283            if protocol == "TELNET":
1284                if self.telnet_to_target_system():
1285                    tmp_list.append(protocol)
1286
1287            if protocol == "REDFISH":
1288                if self.verify_redfish():
1289                    tmp_list.append(protocol)
1290                    self.logger.info(
1291                        "\n\t[Check] %s Redfish Service.\t\t [OK]"
1292                        % self.hostname
1293                    )
1294                else:
1295                    self.logger.info(
1296                        "\n\t[Check] %s Redfish Service.\t\t [NOT AVAILABLE]"
1297                        % self.hostname
1298                    )
1299
1300            if protocol == "IPMI":
1301                if self.verify_ipmi():
1302                    tmp_list.append(protocol)
1303                    self.logger.info(
1304                        "\n\t[Check] %s IPMI LAN Service.\t\t [OK]"
1305                        % self.hostname
1306                    )
1307                else:
1308                    self.logger.info(
1309                        "\n\t[Check] %s IPMI LAN Service.\t\t [NOT AVAILABLE]"
1310                        % self.hostname
1311                    )
1312
1313        return tmp_list
1314
1315    def load_env(self):
1316        r"""
1317        Load the user environment variables from a YAML file.
1318
1319        This method reads the environment variables from a YAML file specified
1320        in the ENV_FILE environment variable. If the file is not found or
1321        there is an error reading the file, an exception is raised.
1322
1323        The YAML file should have the following format:
1324
1325        .. code-block:: yaml
1326
1327            VAR_NAME: VAR_VALUE
1328
1329        Where VAR_NAME is the name of the environment variable, and
1330        VAR_VALUE is its value.
1331
1332        After loading the environment variables, they are stored in the
1333        self.env attribute for later use.
1334        """
1335
1336        tmp_env_vars = {
1337            "hostname": self.hostname,
1338            "username": self.username,
1339            "password": self.password,
1340            "port_ssh": self.port_ssh,
1341            "port_https": self.port_https,
1342            "port_ipmi": self.port_ipmi,
1343        }
1344
1345        # Updatae default Env and Dict var for both so that it can be
1346        # verified when referencing it throughout the code.
1347        for key, value in tmp_env_vars.items():
1348            os.environ[key] = value
1349            self.env_dict[key] = value
1350
1351        try:
1352            tmp_env_dict = {}
1353            if self.env_vars:
1354                tmp_env_dict = json.loads(self.env_vars)
1355                # Export ENV vars default.
1356                for key, value in tmp_env_dict.items():
1357                    os.environ[key] = value
1358                    self.env_dict[key] = str(value)
1359
1360            # Load user specified ENV config YAML.
1361            if self.econfig:
1362                with open(self.econfig, "r") as file:
1363                    try:
1364                        tmp_env_dict = yaml.load(file, Loader=yaml.SafeLoader)
1365                    except yaml.YAMLError as e:
1366                        self.logger.error(e)
1367                        sys.exit(-1)
1368                # Export ENV vars.
1369                for key, value in tmp_env_dict["env_params"].items():
1370                    os.environ[key] = str(value)
1371                    self.env_dict[key] = str(value)
1372        except json.decoder.JSONDecodeError as e:
1373            self.logger.error("\n\tERROR: %s " % e)
1374            sys.exit(-1)
1375        except FileNotFoundError as e:
1376            self.logger.error("\n\tERROR: %s " % e)
1377            sys.exit(-1)
1378
1379        # This to mask the password from displaying on the console.
1380        mask_dict = self.env_dict.copy()
1381        for k, v in mask_dict.items():
1382            if k.lower().find("password") != -1:
1383                hidden_text = []
1384                hidden_text.append(v)
1385                password_regex = (
1386                    "(" + "|".join([re.escape(x) for x in hidden_text]) + ")"
1387                )
1388                mask_dict[k] = re.sub(password_regex, "********", v)
1389
1390        self.logger.info(json.dumps(mask_dict, indent=8, sort_keys=False))
1391
1392    def execute_plugin_block(self, plugin_cmd_list):
1393        r"""
1394        Pack the plugin commands into qualified Python string objects.
1395
1396        This method processes the plugin_cmd_list argument, which is expected
1397        to contain a list of plugin commands read from a YAML file. The method
1398        iterates through the list, constructs a qualified Python string object
1399        for each plugin command, and returns a list of these string objects.
1400
1401        Parameters:
1402            plugin_cmd_list (list): A list of plugin commands containing
1403                                    plugin names and arguments.
1404                                    Plugin block read from YAML
1405                                    [
1406                                     {'plugin_name':'plugin.foo_func.my_func'},
1407                                     {'plugin_args':[10]},
1408                                    ]
1409
1410            Example:
1411                Execute and no return response
1412                - plugin:
1413                  - plugin_name: plugin.foo_func.my_func
1414                  - plugin_args:
1415                      - arg1
1416                      - arg2
1417
1418                Execute and return a response
1419                - plugin:
1420                    - plugin_name: result = plugin.foo_func.my_func
1421                    - plugin_args:
1422                        - arg1
1423                        - arg2
1424
1425                Execute and return multiple values response
1426                - plugin:
1427                    - plugin_name: result1,result2 = plugin.foo_func.my_func
1428                    - plugin_args:
1429                        - arg1
1430                        - arg2
1431
1432        Returns:
1433            str: Execute and not response or a string value(s) responses,
1434
1435        """
1436
1437        # Declare a variable plugin resp that can accept any data type.
1438        resp: Any = ""
1439        args_string = ""
1440
1441        try:
1442            idx = self.key_index_list_dict("plugin_name", plugin_cmd_list)
1443            # Get plugin module name
1444            plugin_name = plugin_cmd_list[idx]["plugin_name"]
1445
1446            # Get plugin function name
1447            idx = self.key_index_list_dict("plugin_function", plugin_cmd_list)
1448            plugin_function = plugin_cmd_list[idx]["plugin_function"]
1449
1450            # Equal separator means plugin function returns result.
1451            if " = " in plugin_function:
1452                # Ex. ['result', 'plugin.foo_func.my_func']
1453                plugin_function_args = plugin_function.split(" = ")
1454                # plugin func return data.
1455                for arg in plugin_function_args:
1456                    if arg == plugin_function_args[-1]:
1457                        plugin_function = arg
1458                    else:
1459                        plugin_resp = arg.split(",")
1460                        # ['result1','result2']
1461                        for x in plugin_resp:
1462                            global_plugin_list.append(x)
1463                            global_plugin_dict[x] = ""
1464
1465            # Walk the plugin args ['arg1,'arg2']
1466            # If the YAML plugin statement 'plugin_args' is not declared.
1467            plugin_args = []
1468            if any("plugin_args" in d for d in plugin_cmd_list):
1469                idx = self.key_index_list_dict("plugin_args", plugin_cmd_list)
1470                if idx is not None:
1471                    plugin_args = plugin_cmd_list[idx].get("plugin_args", [])
1472                    plugin_args = self.yaml_args_populate(plugin_args)
1473                else:
1474                    plugin_args = self.yaml_args_populate([])
1475
1476            # Replace keys in the string with their corresponding
1477            # values from the dictionary.
1478            for key, value in global_plugin_dict.items():
1479                # Iterate through the list and check if each element matched
1480                # exact or in the string. If matches update the plugin element
1481                # in the list.
1482                for index, element in enumerate(plugin_args):
1483                    try:
1484                        if isinstance(element, str):
1485                            # If the key is not in the list element sting,
1486                            # then continue for the next element in the list.
1487                            if str(key) not in str(element):
1488                                continue
1489                            if isinstance(value, str):
1490                                plugin_args[index] = element.replace(
1491                                    key, value
1492                                )
1493                            else:
1494                                plugin_args[index] = global_plugin_dict[
1495                                    element
1496                                ]
1497                            # break
1498                    except KeyError as e:
1499                        print(f"Exception {e}")
1500                        pass
1501
1502            """
1503            Example of plugin_func:
1504                plugin.redfish.enumerate_request(
1505                         "xx.xx.xx.xx:443",
1506                         "root",
1507                         "********",
1508                         "/redfish/v1/",
1509                         "json")
1510            """
1511            # For logging purpose to mask password.
1512            # List should be string element to join else gives TypeError
1513            args_string = self.print_plugin_args_string(plugin_args)
1514
1515            # If user wants to debug plugins.
1516            self.logger.debug(
1517                f"\tDebug Plugin function: \n\t\t{plugin_name}."
1518                f"{plugin_function}{args_string}"
1519            )
1520
1521            # For generic logging plugin info.
1522            self.logger.info(
1523                f"\tPlugin function: \n\t\t{plugin_name}."
1524                f"{plugin_function}()"
1525            )
1526
1527            # Execute the plugins function with args.
1528            resp = execute_python_function(
1529                plugin_name, plugin_function, *plugin_args
1530            )
1531            self.logger.info(f"\tPlugin response = {resp}")
1532            # Update plugin vars dict if there is any.
1533            if resp != "PLUGIN_EXEC_ERROR":
1534                self.process_response_args_data(resp)
1535        except Exception as e:
1536            # Set the plugin error state.
1537            global_plugin_error_dict["exit_on_error"] = True
1538            self.logger.error("\tERROR: execute_plugin_block: %s" % e)
1539            pass
1540
1541        # There is a real error executing the plugin function.
1542        if resp == "PLUGIN_EXEC_ERROR":
1543            return resp
1544
1545        # Check if plugin_expects_return (int, string, list,dict etc)
1546        if any("plugin_expects_return" in d for d in plugin_cmd_list):
1547            idx = self.key_index_list_dict(
1548                "plugin_expects_return", plugin_cmd_list
1549            )
1550            plugin_expects = plugin_cmd_list[idx]["plugin_expects_return"]
1551            if plugin_expects:
1552                if resp:
1553                    if (
1554                        self.plugin_expect_type(plugin_expects, resp)
1555                        == "INVALID"
1556                    ):
1557                        self.logger.error("\tWARN: Plugin error check skipped")
1558                    elif not self.plugin_expect_type(plugin_expects, resp):
1559                        self.logger.error(
1560                            "\tERROR: Plugin expects return data: %s"
1561                            % plugin_expects
1562                        )
1563                        global_plugin_error_dict["exit_on_error"] = True
1564                elif not resp:
1565                    self.logger.error(
1566                        "\tERROR: Plugin func failed to return data"
1567                    )
1568                    global_plugin_error_dict["exit_on_error"] = True
1569
1570        return resp
1571
1572    def print_plugin_args_string(self, plugin_args):
1573        r"""
1574        Generate a string representation of plugin arguments, replacing the
1575        password if necessary.
1576
1577        This method generates a string representation of the provided plugin
1578        arguments, joining them with commas. If the password is present in the
1579        arguments, it is replaced with "********".
1580        The method returns the generated string. If an exception occurs during
1581        the process, the method logs a debug log and returns "(None)".
1582
1583        Parameters:
1584            plugin_args (list): A list of plugin arguments.
1585
1586        Returns:
1587            str: The generated string representation of the plugin arguments.
1588        """
1589        try:
1590            plugin_args_str = "(" + ", ".join(map(str, plugin_args)) + ")"
1591            if self.password in plugin_args_str:
1592                args_string = plugin_args_str.replace(
1593                    self.password, "********"
1594                )
1595            else:
1596                args_string = plugin_args_str
1597        except Exception as e:
1598            self.logger.debug("\tWARN:Print args string : %s" % e)
1599            return "(None)"
1600
1601        return args_string
1602
1603    def process_response_args_data(self, plugin_resp):
1604        r"""
1605        Parse the plugin function response and update plugin return variables.
1606
1607        This method parses the response data from a plugin function and
1608        updates the plugin return variables accordingly. The method takes the
1609        plugin_resp argument, which is expected to be the response data from a
1610        plugin function.
1611
1612        The method handles various data types (string, bytes,
1613        tuple, list, int, float) and updates the global global_plugin_dict
1614        dictionary with the parsed response data. If there is an error during
1615        the process, the method logs a warning and continues with the next
1616        plugin block execution.
1617
1618        Parameters:
1619           plugin_resp (Any): The response data from the plugin function.
1620
1621        Returns:
1622            None
1623        """
1624        resp_list = []
1625        resp_data = ""
1626
1627        # There is nothing to update the plugin response.
1628        if len(global_plugin_list) == 0 or plugin_resp == "None":
1629            return
1630
1631        if isinstance(plugin_resp, str):
1632            resp_data = plugin_resp.strip("\r\n\t")
1633            resp_list.append(resp_data)
1634        elif isinstance(plugin_resp, bytes):
1635            resp_data = str(plugin_resp, "UTF-8").strip("\r\n\t")
1636            resp_list.append(resp_data)
1637        elif isinstance(plugin_resp, tuple):
1638            if len(global_plugin_list) == 1:
1639                resp_list.append(list(plugin_resp))
1640            else:
1641                resp_list = list(plugin_resp)
1642                resp_list = [x for x in resp_list]
1643        elif isinstance(plugin_resp, list):
1644            if len(global_plugin_list) == 1:
1645                resp_list.append([x.strip("\r\n\t") for x in plugin_resp])
1646            else:
1647                resp_list = [x.strip("\r\n\t") for x in plugin_resp]
1648        elif isinstance(plugin_resp, int) or isinstance(plugin_resp, float):
1649            resp_list.append(plugin_resp)
1650
1651        # Iterate if there is a list of plugin return vars to update.
1652        for idx, item in enumerate(resp_list, start=0):
1653            # Exit loop, done required loop.
1654            if idx >= len(global_plugin_list):
1655                break
1656            # Find the index of the return func in the list and
1657            # update the global func return dictionary.
1658            try:
1659                dict_idx = global_plugin_list[idx]
1660                global_plugin_dict[dict_idx] = item
1661            except (IndexError, ValueError) as e:
1662                self.logger.warn("\tWARN: process_response_args_data: %s" % e)
1663                pass
1664
1665        # Done updating plugin dict irrespective of pass or failed,
1666        # clear all the list element for next plugin block execute.
1667        global_plugin_list.clear()
1668
1669    def yaml_args_populate(self, yaml_arg_list):
1670        r"""
1671        Decode environment and plugin variables and populate the argument list.
1672
1673        This method processes the yaml_arg_list argument, which is expected to
1674        contain a list of arguments read from a YAML file. The method iterates
1675        through the list, decodes environment and plugin variables, and
1676        returns a populated list of arguments.
1677
1678        .. code-block:: yaml
1679
1680          - plugin_args:
1681            - arg1
1682            - arg2
1683
1684        ['${hostname}:${port_https}', '${username}', '/redfish/v1/', 'json']
1685
1686        Returns the populated plugin list
1687            ['xx.xx.xx.xx:443', 'root', '/redfish/v1/', 'json']
1688
1689        Parameters:
1690            yaml_arg_list (list):   A list of arguments containing environment
1691                                    and plugin variables.
1692
1693        Returns:
1694            list:   A populated list of arguments with decoded environment and
1695                    plugin variables.
1696        """
1697        if isinstance(yaml_arg_list, list):
1698            populated_list = []
1699            for arg in yaml_arg_list:
1700                if isinstance(arg, (int, float)):
1701                    populated_list.append(arg)
1702                elif isinstance(arg, str):
1703                    arg_str = self.yaml_env_and_plugin_vars_populate(arg)
1704                    populated_list.append(arg_str)
1705                else:
1706                    populated_list.append(arg)
1707
1708            return populated_list
1709
1710    def yaml_env_and_plugin_vars_populate(self, yaml_arg_str):
1711        r"""
1712        Update environment variables and plugin variables based on the
1713        provided YAML argument string.
1714
1715        This method processes the yaml_arg_str argument, which is expected
1716        to contain a string representing environment variables and plugin
1717        variables in the format:
1718
1719        .. code-block:: yaml
1720
1721            - cat ${MY_VAR}
1722            - ls -AX my_plugin_var
1723
1724        The method parses the string, extracts the variable names, and updates
1725        the corresponding environment variables and plugin variables.
1726
1727        Parameters:
1728            yaml_arg_str (str):   A string containing environment and plugin
1729                                  variable definitions in YAML format.
1730
1731        Returns:
1732            str:   The updated YAML argument string with plugin variables
1733                   replaced.
1734        """
1735
1736        # Parse and convert the Plugin YAML vars string to python vars
1737        # Example:
1738        #   ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
1739        try:
1740            # Example, list of matching
1741            # env vars ['username', 'password', 'hostname']
1742            # Extra escape \ for special symbols. '\$\{([^\}]+)\}' works good.
1743            env_var_regex = r"\$\{([^\}]+)\}"
1744            env_var_names_list = re.findall(env_var_regex, yaml_arg_str)
1745
1746            # If the list in empty [] nothing to update.
1747            if not len(env_var_names_list):
1748                return yaml_arg_str
1749            for var in env_var_names_list:
1750                env_var = os.environ.get(var)
1751                if env_var:
1752                    env_replace = "${" + var + "}"
1753                    yaml_arg_str = yaml_arg_str.replace(env_replace, env_var)
1754        except Exception as e:
1755            self.logger.error("\tERROR:yaml_env_vars_populate: %s" % e)
1756            pass
1757
1758        """
1759        Parse the string for plugin vars.
1760        Implement the logic to update environment variables based on the
1761        extracted variable names.
1762        """
1763        try:
1764            # Example, list of plugin vars env_var_names_list
1765            #    ['my_hostname', 'port_https']
1766            global_plugin_dict_keys = set(global_plugin_dict.keys())
1767            # Skip env var list already populated above code block list.
1768            plugin_var_name_list = [
1769                var
1770                for var in global_plugin_dict_keys
1771                if var not in env_var_names_list
1772            ]
1773
1774            for var in plugin_var_name_list:
1775                plugin_var_value = global_plugin_dict[var]
1776                if yaml_arg_str in global_plugin_dict:
1777                    """
1778                    If this plugin var exist but empty in dict, don't replace.
1779                    his is either a YAML plugin statement incorrectly used or
1780                    user added a plugin var which is not going to be populated.
1781                    """
1782                    if isinstance(plugin_var_value, (list, dict)):
1783                        """
1784                        List data type or dict can't be replaced, use
1785                        directly in plugin function call.
1786                        """
1787                        global_plugin_type_list.append(var)
1788                    else:
1789                        yaml_arg_str = yaml_arg_str.replace(
1790                            str(var), str(plugin_var_value)
1791                        )
1792        except (IndexError, ValueError) as e:
1793            self.logger.error("\tERROR: yaml_plugin_vars_populate: %s" % e)
1794            pass
1795
1796        # From ${my_hostname}:${port_https} -> ['my_hostname', 'port_https']
1797        # to populated values string as
1798        # Example:  xx.xx.xx.xx:443 and return the string
1799        return yaml_arg_str
1800
1801    def plugin_error_check(self, plugin_dict):
1802        r"""
1803        Process plugin error dictionary and return the corresponding error
1804        message.
1805
1806        This method checks if any dictionary in the plugin_dict list contains
1807        a "plugin_error" key. If such a dictionary is found, it retrieves the
1808        value associated with the "plugin_error" key and returns the
1809        corresponding error message from the global_plugin_error_dict
1810        attribute.
1811
1812        Parameters:
1813            plugin_dict (list of dict): A list of dictionaries containing
1814                                        plugin error information.
1815
1816        Returns:
1817           str: The error message corresponding to the "plugin_error" value,
1818                or None if no error is found.
1819        """
1820        if any("plugin_error" in d for d in plugin_dict):
1821            for d in plugin_dict:
1822                if "plugin_error" in d:
1823                    value = d["plugin_error"]
1824                    return global_plugin_error_dict.get(value, None)
1825        return None
1826
1827    def key_index_list_dict(self, key, list_dict):
1828        r"""
1829        Find the index of the first dictionary in the list that contains
1830        the specified key.
1831
1832        Parameters:
1833            key (str):                 The key to search for in the
1834                                       dictionaries.
1835            list_dict (list of dict):  A list of dictionaries to search
1836                                       through.
1837
1838        Returns:
1839            int: The index of the first dictionary containing the key, or -1
1840            if no match is found.
1841        """
1842        for i, d in enumerate(list_dict):
1843            if key in d:
1844                return i
1845        return -1
1846
1847    def plugin_expect_type(self, type, data):
1848        r"""
1849        Check if the provided data matches the expected type.
1850
1851        This method checks if the data argument matches the specified type.
1852        It supports the following types: "int", "float", "str", "list", "dict",
1853        and "tuple".
1854
1855        If the type is not recognized, it logs an info message and returns
1856        "INVALID".
1857
1858        Parameters:
1859            type (str): The expected data type.
1860            data:       The data to check against the expected type.
1861
1862        Returns:
1863            bool or str: True if the data matches the expected type, False if
1864                         not, or "INVALID" if the type is not recognized.
1865        """
1866        if type == "int":
1867            return isinstance(data, int)
1868        elif type == "float":
1869            return isinstance(data, float)
1870        elif type == "str":
1871            return isinstance(data, str)
1872        elif type == "list":
1873            return isinstance(data, list)
1874        elif type == "dict":
1875            return isinstance(data, dict)
1876        elif type == "tuple":
1877            return isinstance(data, tuple)
1878        else:
1879            self.logger.info("\tInvalid data type requested: %s" % type)
1880            return "INVALID"
1881