1#!/usr/bin/env python3 2 3r""" 4See class prolog below for details. 5""" 6 7import json 8import logging 9import os 10import platform 11import re 12import subprocess 13import sys 14import time 15from errno import EACCES, EPERM 16 17import yaml 18 19sys.dont_write_bytecode = True 20 21 22script_dir = os.path.dirname(os.path.abspath(__file__)) 23sys.path.append(script_dir) 24# Walk path and append to sys.path 25for root, dirs, files in os.walk(script_dir): 26 for dir in dirs: 27 sys.path.append(os.path.join(root, dir)) 28 29from ssh_utility import SSHRemoteclient # NOQA 30from telnet_utility import TelnetRemoteclient # NOQA 31 32r""" 33User define plugins python functions. 34 35It will imports files from directory plugins 36 37plugins 38├── file1.py 39└── file2.py 40 41Example how to define in YAML: 42 - plugin: 43 - plugin_name: plugin.foo_func.foo_func_yaml 44 - plugin_args: 45 - arg1 46 - arg2 47""" 48plugin_dir = os.path.join(os.path.dirname(__file__), "plugins") 49sys.path.append(plugin_dir) 50 51for module in os.listdir(plugin_dir): 52 if module == "__init__.py" or not module.endswith(".py"): 53 continue 54 55 plugin_module = f"plugins.{module[:-3]}" 56 try: 57 plugin = __import__(plugin_module, globals(), locals(), [], 0) 58 except Exception as e: 59 print(f"PLUGIN: Exception: {e}") 60 print(f"PLUGIN: Module import failed: {module}") 61 continue 62 63r""" 64This is for plugin functions returning data or responses to the caller 65in YAML plugin setup. 66 67Example: 68 69 - plugin: 70 - plugin_name: version = plugin.ssh_execution.ssh_execute_cmd 71 - plugin_args: 72 - ${hostname} 73 - ${username} 74 - ${password} 75 - "cat /etc/os-release | grep VERSION_ID | awk -F'=' '{print $2}'" 76 - plugin: 77 - plugin_name: plugin.print_vars.print_vars 78 - plugin_args: 79 - version 80 81where first plugin "version" var is used by another plugin in the YAML 82block or plugin 83 84""" 85global global_log_store_path 86global global_plugin_dict 87global global_plugin_list 88 89# Hold the plugin return values in dict and plugin return vars in list. 90# Dict is to reference and update vars processing in parser where as 91# list is for current vars from the plugin block which needs processing. 92global_plugin_dict = {} 93global_plugin_list = [] 94 95# Hold the plugin return named declared if function returned values are 96# list,dict. 97# Refer this name list to look up the plugin dict for eval() args function 98# Example ['version'] 99global_plugin_type_list = [] 100 101# Path where logs are to be stored or written. 102global_log_store_path = "" 103 104# Plugin error state defaults. 105plugin_error_dict = { 106 "exit_on_error": False, 107 "continue_on_error": False, 108} 109 110 111class ffdc_collector: 112 r""" 113 Execute commands from configuration file to collect log files. 114 Fetch and store generated files at the specified location. 115 116 """ 117 118 def __init__( 119 self, 120 hostname, 121 username, 122 password, 123 port_ssh, 124 port_https, 125 port_ipmi, 126 ffdc_config, 127 location, 128 remote_type, 129 remote_protocol, 130 env_vars, 131 econfig, 132 log_level, 133 ): 134 r""" 135 Description of argument(s): 136 137 hostname Name/ip of the targeted (remote) system 138 username User on the targeted system with access to 139 FFDC files 140 password Password for user on targeted system 141 port_ssh SSH port value. By default 22 142 port_https HTTPS port value. By default 443 143 port_ipmi IPMI port value. By default 623 144 ffdc_config Configuration file listing commands and files 145 for FFDC 146 location Where to store collected FFDC 147 remote_type OS type of the remote host 148 remote_protocol Protocol to use to collect data 149 env_vars User define CLI env vars '{"key : "value"}' 150 econfig User define env vars YAML file 151 152 """ 153 154 self.hostname = hostname 155 self.username = username 156 self.password = password 157 self.port_ssh = str(port_ssh) 158 self.port_https = str(port_https) 159 self.port_ipmi = str(port_ipmi) 160 self.ffdc_config = ffdc_config 161 self.location = location + "/" + remote_type.upper() 162 self.ssh_remoteclient = None 163 self.telnet_remoteclient = None 164 self.ffdc_dir_path = "" 165 self.ffdc_prefix = "" 166 self.target_type = remote_type.upper() 167 self.remote_protocol = remote_protocol.upper() 168 self.env_vars = env_vars 169 self.econfig = econfig 170 self.start_time = 0 171 self.elapsed_time = "" 172 self.logger = None 173 174 # Set prefix values for scp files and directory. 175 # Since the time stamp is at second granularity, these values are set 176 # here to be sure that all files for this run will have same timestamps 177 # and they will be saved in the same directory. 178 # self.location == local system for now 179 self.set_ffdc_default_store_path() 180 181 # Logger for this run. Need to be after set_ffdc_default_store_path() 182 self.script_logging(getattr(logging, log_level.upper())) 183 184 # Verify top level directory exists for storage 185 self.validate_local_store(self.location) 186 187 if self.verify_script_env(): 188 # Load default or user define YAML configuration file. 189 with open(self.ffdc_config, "r") as file: 190 try: 191 self.ffdc_actions = yaml.load(file, Loader=yaml.SafeLoader) 192 except yaml.YAMLError as e: 193 self.logger.error(e) 194 sys.exit(-1) 195 196 if self.target_type not in self.ffdc_actions.keys(): 197 self.logger.error( 198 "\n\tERROR: %s is not listed in %s.\n\n" 199 % (self.target_type, self.ffdc_config) 200 ) 201 sys.exit(-1) 202 else: 203 sys.exit(-1) 204 205 # Load ENV vars from user. 206 self.logger.info("\n\tENV: User define input YAML variables") 207 self.env_dict = {} 208 self.load_env() 209 210 def verify_script_env(self): 211 # Import to log version 212 import click 213 import paramiko 214 215 run_env_ok = True 216 217 try: 218 redfishtool_version = ( 219 self.run_tool_cmd("redfishtool -V").split(" ")[2].strip("\n") 220 ) 221 except Exception as e: 222 self.logger.error("\tEXCEPTION redfishtool: %s", e) 223 redfishtool_version = "Not Installed (optional)" 224 225 try: 226 ipmitool_version = self.run_tool_cmd("ipmitool -V").split(" ")[2] 227 except Exception as e: 228 self.logger.error("\tEXCEPTION ipmitool: %s", e) 229 ipmitool_version = "Not Installed (optional)" 230 231 self.logger.info("\n\t---- Script host environment ----") 232 self.logger.info( 233 "\t{:<10} {:<10}".format("Script hostname", os.uname()[1]) 234 ) 235 self.logger.info( 236 "\t{:<10} {:<10}".format("Script host os", platform.platform()) 237 ) 238 self.logger.info( 239 "\t{:<10} {:>10}".format("Python", platform.python_version()) 240 ) 241 self.logger.info("\t{:<10} {:>10}".format("PyYAML", yaml.__version__)) 242 self.logger.info("\t{:<10} {:>10}".format("click", click.__version__)) 243 self.logger.info( 244 "\t{:<10} {:>10}".format("paramiko", paramiko.__version__) 245 ) 246 self.logger.info( 247 "\t{:<10} {:>9}".format("redfishtool", redfishtool_version) 248 ) 249 self.logger.info( 250 "\t{:<10} {:>12}".format("ipmitool", ipmitool_version) 251 ) 252 253 if eval(yaml.__version__.replace(".", ",")) < (5, 3, 0): 254 self.logger.error( 255 "\n\tERROR: Python or python packages do not meet minimum" 256 " version requirement." 257 ) 258 self.logger.error( 259 "\tERROR: PyYAML version 5.3.0 or higher is needed.\n" 260 ) 261 run_env_ok = False 262 263 self.logger.info("\t---- End script host environment ----") 264 return run_env_ok 265 266 def script_logging(self, log_level_attr): 267 r""" 268 Create logger 269 270 """ 271 self.logger = logging.getLogger() 272 self.logger.setLevel(log_level_attr) 273 log_file_handler = logging.FileHandler( 274 self.ffdc_dir_path + "collector.log" 275 ) 276 277 stdout_handler = logging.StreamHandler(sys.stdout) 278 self.logger.addHandler(log_file_handler) 279 self.logger.addHandler(stdout_handler) 280 281 # Turn off paramiko INFO logging 282 logging.getLogger("paramiko").setLevel(logging.WARNING) 283 284 def target_is_pingable(self): 285 r""" 286 Check if target system is ping-able. 287 288 """ 289 response = os.system("ping -c 1 %s 2>&1 >/dev/null" % self.hostname) 290 if response == 0: 291 self.logger.info( 292 "\n\t[Check] %s is ping-able.\t\t [OK]" % self.hostname 293 ) 294 return True 295 else: 296 self.logger.error( 297 "\n\tERROR: %s is not ping-able. FFDC collection aborted.\n" 298 % self.hostname 299 ) 300 sys.exit(-1) 301 302 def collect_ffdc(self): 303 r""" 304 Initiate FFDC Collection depending on requested protocol. 305 306 """ 307 308 self.logger.info( 309 "\n\t---- Start communicating with %s ----" % self.hostname 310 ) 311 self.start_time = time.time() 312 313 # Find the list of target and protocol supported. 314 check_protocol_list = [] 315 config_dict = self.ffdc_actions 316 317 for target_type in config_dict.keys(): 318 if self.target_type != target_type: 319 continue 320 321 for k, v in config_dict[target_type].items(): 322 if ( 323 config_dict[target_type][k]["PROTOCOL"][0] 324 not in check_protocol_list 325 ): 326 check_protocol_list.append( 327 config_dict[target_type][k]["PROTOCOL"][0] 328 ) 329 330 self.logger.info( 331 "\n\t %s protocol type: %s" 332 % (self.target_type, check_protocol_list) 333 ) 334 335 verified_working_protocol = self.verify_protocol(check_protocol_list) 336 337 if verified_working_protocol: 338 self.logger.info( 339 "\n\t---- Completed protocol pre-requisite check ----\n" 340 ) 341 342 # Verify top level directory exists for storage 343 self.validate_local_store(self.location) 344 345 if (self.remote_protocol not in verified_working_protocol) and ( 346 self.remote_protocol != "ALL" 347 ): 348 self.logger.info( 349 "\n\tWorking protocol list: %s" % verified_working_protocol 350 ) 351 self.logger.error( 352 "\tERROR: Requested protocol %s is not in working protocol" 353 " list.\n" % self.remote_protocol 354 ) 355 sys.exit(-1) 356 else: 357 self.generate_ffdc(verified_working_protocol) 358 359 def ssh_to_target_system(self): 360 r""" 361 Open a ssh connection to targeted system. 362 363 """ 364 365 self.ssh_remoteclient = SSHRemoteclient( 366 self.hostname, self.username, self.password, self.port_ssh 367 ) 368 369 if self.ssh_remoteclient.ssh_remoteclient_login(): 370 self.logger.info( 371 "\n\t[Check] %s SSH connection established.\t [OK]" 372 % self.hostname 373 ) 374 375 # Check scp connection. 376 # If scp connection fails, 377 # continue with FFDC generation but skip scp files to local host. 378 self.ssh_remoteclient.scp_connection() 379 return True 380 else: 381 self.logger.info( 382 "\n\t[Check] %s SSH connection.\t [NOT AVAILABLE]" 383 % self.hostname 384 ) 385 return False 386 387 def telnet_to_target_system(self): 388 r""" 389 Open a telnet connection to targeted system. 390 """ 391 self.telnet_remoteclient = TelnetRemoteclient( 392 self.hostname, self.username, self.password 393 ) 394 if self.telnet_remoteclient.tn_remoteclient_login(): 395 self.logger.info( 396 "\n\t[Check] %s Telnet connection established.\t [OK]" 397 % self.hostname 398 ) 399 return True 400 else: 401 self.logger.info( 402 "\n\t[Check] %s Telnet connection.\t [NOT AVAILABLE]" 403 % self.hostname 404 ) 405 return False 406 407 def generate_ffdc(self, working_protocol_list): 408 r""" 409 Determine actions based on remote host type 410 411 Description of argument(s): 412 working_protocol_list List of confirmed working protocols to 413 connect to remote host. 414 """ 415 416 self.logger.info( 417 "\n\t---- Executing commands on " + self.hostname + " ----" 418 ) 419 self.logger.info( 420 "\n\tWorking protocol list: %s" % working_protocol_list 421 ) 422 423 config_dict = self.ffdc_actions 424 for target_type in config_dict.keys(): 425 if self.target_type != target_type: 426 continue 427 428 self.logger.info("\n\tFFDC Path: %s " % self.ffdc_dir_path) 429 global_plugin_dict["global_log_store_path"] = self.ffdc_dir_path 430 self.logger.info("\tSystem Type: %s" % target_type) 431 for k, v in config_dict[target_type].items(): 432 if ( 433 self.remote_protocol not in working_protocol_list 434 and self.remote_protocol != "ALL" 435 ): 436 continue 437 438 protocol = config_dict[target_type][k]["PROTOCOL"][0] 439 440 if protocol not in working_protocol_list: 441 continue 442 443 if protocol in working_protocol_list: 444 if protocol == "SSH" or protocol == "SCP": 445 self.protocol_ssh(protocol, target_type, k) 446 elif protocol == "TELNET": 447 self.protocol_telnet(target_type, k) 448 elif ( 449 protocol == "REDFISH" 450 or protocol == "IPMI" 451 or protocol == "SHELL" 452 ): 453 self.protocol_execute(protocol, target_type, k) 454 else: 455 self.logger.error( 456 "\n\tERROR: %s is not available for %s." 457 % (protocol, self.hostname) 458 ) 459 460 # Close network connection after collecting all files 461 self.elapsed_time = time.strftime( 462 "%H:%M:%S", time.gmtime(time.time() - self.start_time) 463 ) 464 self.logger.info("\n\tTotal time taken: %s" % self.elapsed_time) 465 if self.ssh_remoteclient: 466 self.ssh_remoteclient.ssh_remoteclient_disconnect() 467 if self.telnet_remoteclient: 468 self.telnet_remoteclient.tn_remoteclient_disconnect() 469 470 def protocol_ssh(self, protocol, target_type, sub_type): 471 r""" 472 Perform actions using SSH and SCP protocols. 473 474 Description of argument(s): 475 protocol Protocol to execute. 476 target_type OS Type of remote host. 477 sub_type Group type of commands. 478 """ 479 480 if protocol == "SCP": 481 self.group_copy(self.ffdc_actions[target_type][sub_type]) 482 else: 483 self.collect_and_copy_ffdc( 484 self.ffdc_actions[target_type][sub_type] 485 ) 486 487 def protocol_telnet(self, target_type, sub_type): 488 r""" 489 Perform actions using telnet protocol. 490 Description of argument(s): 491 target_type OS Type of remote host. 492 """ 493 self.logger.info( 494 "\n\t[Run] Executing commands on %s using %s" 495 % (self.hostname, "TELNET") 496 ) 497 telnet_files_saved = [] 498 progress_counter = 0 499 list_of_commands = self.ffdc_actions[target_type][sub_type]["COMMANDS"] 500 for index, each_cmd in enumerate(list_of_commands, start=0): 501 command_txt, command_timeout = self.unpack_command(each_cmd) 502 result = self.telnet_remoteclient.execute_command( 503 command_txt, command_timeout 504 ) 505 if result: 506 try: 507 targ_file = self.ffdc_actions[target_type][sub_type][ 508 "FILES" 509 ][index] 510 except IndexError: 511 targ_file = command_txt 512 self.logger.warning( 513 "\n\t[WARN] Missing filename to store data from" 514 " telnet %s." % each_cmd 515 ) 516 self.logger.warning( 517 "\t[WARN] Data will be stored in %s." % targ_file 518 ) 519 targ_file_with_path = ( 520 self.ffdc_dir_path + self.ffdc_prefix + targ_file 521 ) 522 # Creates a new file 523 with open(targ_file_with_path, "w") as fp: 524 fp.write(result) 525 fp.close 526 telnet_files_saved.append(targ_file) 527 progress_counter += 1 528 self.print_progress(progress_counter) 529 self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") 530 for file in telnet_files_saved: 531 self.logger.info("\n\t\tSuccessfully save file " + file + ".") 532 533 def protocol_execute(self, protocol, target_type, sub_type): 534 r""" 535 Perform actions for a given protocol. 536 537 Description of argument(s): 538 protocol Protocol to execute. 539 target_type OS Type of remote host. 540 sub_type Group type of commands. 541 """ 542 543 self.logger.info( 544 "\n\t[Run] Executing commands to %s using %s" 545 % (self.hostname, protocol) 546 ) 547 executed_files_saved = [] 548 progress_counter = 0 549 list_of_cmd = self.get_command_list( 550 self.ffdc_actions[target_type][sub_type] 551 ) 552 for index, each_cmd in enumerate(list_of_cmd, start=0): 553 plugin_call = False 554 if isinstance(each_cmd, dict): 555 if "plugin" in each_cmd: 556 # If the error is set and plugin explicitly 557 # requested to skip execution on error.. 558 if plugin_error_dict[ 559 "exit_on_error" 560 ] and self.plugin_error_check(each_cmd["plugin"]): 561 self.logger.info( 562 "\n\t[PLUGIN-ERROR] exit_on_error: %s" 563 % plugin_error_dict["exit_on_error"] 564 ) 565 self.logger.info( 566 "\t[PLUGIN-SKIP] %s" % each_cmd["plugin"][0] 567 ) 568 continue 569 plugin_call = True 570 # call the plugin 571 self.logger.info("\n\t[PLUGIN-START]") 572 result = self.execute_plugin_block(each_cmd["plugin"]) 573 self.logger.info("\t[PLUGIN-END]\n") 574 else: 575 each_cmd = self.yaml_env_and_plugin_vars_populate(each_cmd) 576 577 if not plugin_call: 578 result = self.run_tool_cmd(each_cmd) 579 if result: 580 try: 581 file_name = self.get_file_list( 582 self.ffdc_actions[target_type][sub_type] 583 )[index] 584 # If file is specified as None. 585 if file_name == "None": 586 continue 587 targ_file = self.yaml_env_and_plugin_vars_populate( 588 file_name 589 ) 590 except IndexError: 591 targ_file = each_cmd.split("/")[-1] 592 self.logger.warning( 593 "\n\t[WARN] Missing filename to store data from %s." 594 % each_cmd 595 ) 596 self.logger.warning( 597 "\t[WARN] Data will be stored in %s." % targ_file 598 ) 599 600 targ_file_with_path = ( 601 self.ffdc_dir_path + self.ffdc_prefix + targ_file 602 ) 603 604 # Creates a new file 605 with open(targ_file_with_path, "w") as fp: 606 if isinstance(result, dict): 607 fp.write(json.dumps(result)) 608 else: 609 fp.write(result) 610 fp.close 611 executed_files_saved.append(targ_file) 612 613 progress_counter += 1 614 self.print_progress(progress_counter) 615 616 self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") 617 618 for file in executed_files_saved: 619 self.logger.info("\n\t\tSuccessfully save file " + file + ".") 620 621 def collect_and_copy_ffdc( 622 self, ffdc_actions_for_target_type, form_filename=False 623 ): 624 r""" 625 Send commands in ffdc_config file to targeted system. 626 627 Description of argument(s): 628 ffdc_actions_for_target_type Commands and files for the selected 629 remote host type. 630 form_filename If true, pre-pend self.target_type to 631 filename 632 """ 633 634 # Executing commands, if any 635 self.ssh_execute_ffdc_commands( 636 ffdc_actions_for_target_type, form_filename 637 ) 638 639 # Copying files 640 if self.ssh_remoteclient.scpclient: 641 self.logger.info( 642 "\n\n\tCopying FFDC files from remote system %s.\n" 643 % self.hostname 644 ) 645 646 # Retrieving files from target system 647 list_of_files = self.get_file_list(ffdc_actions_for_target_type) 648 self.scp_ffdc( 649 self.ffdc_dir_path, 650 self.ffdc_prefix, 651 form_filename, 652 list_of_files, 653 ) 654 else: 655 self.logger.info( 656 "\n\n\tSkip copying FFDC files from remote system %s.\n" 657 % self.hostname 658 ) 659 660 def get_command_list(self, ffdc_actions_for_target_type): 661 r""" 662 Fetch list of commands from configuration file 663 664 Description of argument(s): 665 ffdc_actions_for_target_type Commands and files for the selected 666 remote host type. 667 """ 668 try: 669 list_of_commands = ffdc_actions_for_target_type["COMMANDS"] 670 except KeyError: 671 list_of_commands = [] 672 return list_of_commands 673 674 def get_file_list(self, ffdc_actions_for_target_type): 675 r""" 676 Fetch list of commands from configuration file 677 678 Description of argument(s): 679 ffdc_actions_for_target_type Commands and files for the selected 680 remote host type. 681 """ 682 try: 683 list_of_files = ffdc_actions_for_target_type["FILES"] 684 except KeyError: 685 list_of_files = [] 686 return list_of_files 687 688 def unpack_command(self, command): 689 r""" 690 Unpack command from config file 691 692 Description of argument(s): 693 command Command from config file. 694 """ 695 if isinstance(command, dict): 696 command_txt = next(iter(command)) 697 command_timeout = next(iter(command.values())) 698 elif isinstance(command, str): 699 command_txt = command 700 # Default command timeout 60 seconds 701 command_timeout = 60 702 703 return command_txt, command_timeout 704 705 def ssh_execute_ffdc_commands( 706 self, ffdc_actions_for_target_type, form_filename=False 707 ): 708 r""" 709 Send commands in ffdc_config file to targeted system. 710 711 Description of argument(s): 712 ffdc_actions_for_target_type Commands and files for the selected 713 remote host type. 714 form_filename If true, pre-pend self.target_type to 715 filename 716 """ 717 self.logger.info( 718 "\n\t[Run] Executing commands on %s using %s" 719 % (self.hostname, ffdc_actions_for_target_type["PROTOCOL"][0]) 720 ) 721 722 list_of_commands = self.get_command_list(ffdc_actions_for_target_type) 723 # If command list is empty, returns 724 if not list_of_commands: 725 return 726 727 progress_counter = 0 728 for command in list_of_commands: 729 command_txt, command_timeout = self.unpack_command(command) 730 731 if form_filename: 732 command_txt = str(command_txt % self.target_type) 733 734 ( 735 cmd_exit_code, 736 err, 737 response, 738 ) = self.ssh_remoteclient.execute_command( 739 command_txt, command_timeout 740 ) 741 742 if cmd_exit_code: 743 self.logger.warning( 744 "\n\t\t[WARN] %s exits with code %s." 745 % (command_txt, str(cmd_exit_code)) 746 ) 747 self.logger.warning("\t\t[WARN] %s " % err) 748 749 progress_counter += 1 750 self.print_progress(progress_counter) 751 752 self.logger.info("\n\t[Run] Commands execution completed.\t\t [OK]") 753 754 def group_copy(self, ffdc_actions_for_target_type): 755 r""" 756 scp group of files (wild card) from remote host. 757 758 Description of argument(s): 759 fdc_actions_for_target_type Commands and files for the selected 760 remote host type. 761 """ 762 763 if self.ssh_remoteclient.scpclient: 764 self.logger.info( 765 "\n\tCopying files from remote system %s via SCP.\n" 766 % self.hostname 767 ) 768 769 list_of_commands = self.get_command_list( 770 ffdc_actions_for_target_type 771 ) 772 # If command list is empty, returns 773 if not list_of_commands: 774 return 775 776 for command in list_of_commands: 777 try: 778 command = self.yaml_env_and_plugin_vars_populate(command) 779 except IndexError: 780 self.logger.error("\t\tInvalid command %s" % command) 781 continue 782 783 ( 784 cmd_exit_code, 785 err, 786 response, 787 ) = self.ssh_remoteclient.execute_command(command) 788 789 # If file does not exist, code take no action. 790 # cmd_exit_code is ignored for this scenario. 791 if response: 792 scp_result = self.ssh_remoteclient.scp_file_from_remote( 793 response.split("\n"), self.ffdc_dir_path 794 ) 795 if scp_result: 796 self.logger.info( 797 "\t\tSuccessfully copied from " 798 + self.hostname 799 + ":" 800 + command 801 ) 802 else: 803 self.logger.info("\t\t%s has no result" % command) 804 805 else: 806 self.logger.info( 807 "\n\n\tSkip copying files from remote system %s.\n" 808 % self.hostname 809 ) 810 811 def scp_ffdc( 812 self, 813 targ_dir_path, 814 targ_file_prefix, 815 form_filename, 816 file_list=None, 817 quiet=None, 818 ): 819 r""" 820 SCP all files in file_dict to the indicated directory on the local 821 system. 822 823 Description of argument(s): 824 targ_dir_path The path of the directory to receive 825 the files. 826 targ_file_prefix Prefix which will be prepended to each 827 target file's name. 828 file_dict A dictionary of files to scp from 829 targeted system to this system 830 831 """ 832 833 progress_counter = 0 834 for filename in file_list: 835 if form_filename: 836 filename = str(filename % self.target_type) 837 source_file_path = filename 838 targ_file_path = ( 839 targ_dir_path + targ_file_prefix + filename.split("/")[-1] 840 ) 841 842 # If source file name contains wild card, copy filename as is. 843 if "*" in source_file_path: 844 scp_result = self.ssh_remoteclient.scp_file_from_remote( 845 source_file_path, self.ffdc_dir_path 846 ) 847 else: 848 scp_result = self.ssh_remoteclient.scp_file_from_remote( 849 source_file_path, targ_file_path 850 ) 851 852 if not quiet: 853 if scp_result: 854 self.logger.info( 855 "\t\tSuccessfully copied from " 856 + self.hostname 857 + ":" 858 + source_file_path 859 + ".\n" 860 ) 861 else: 862 self.logger.info( 863 "\t\tFail to copy from " 864 + self.hostname 865 + ":" 866 + source_file_path 867 + ".\n" 868 ) 869 else: 870 progress_counter += 1 871 self.print_progress(progress_counter) 872 873 def set_ffdc_default_store_path(self): 874 r""" 875 Set a default value for self.ffdc_dir_path and self.ffdc_prefix. 876 Collected ffdc file will be stored in dir 877 /self.location/hostname_timestr/. 878 Individual ffdc file will have timestr_filename. 879 880 Description of class variables: 881 self.ffdc_dir_path The dir path where collected ffdc data files 882 should be put. 883 884 self.ffdc_prefix The prefix to be given to each ffdc file name. 885 886 """ 887 888 timestr = time.strftime("%Y%m%d-%H%M%S") 889 self.ffdc_dir_path = ( 890 self.location + "/" + self.hostname + "_" + timestr + "/" 891 ) 892 self.ffdc_prefix = timestr + "_" 893 self.validate_local_store(self.ffdc_dir_path) 894 895 # Need to verify local store path exists prior to instantiate this class. 896 # This class method is used to share the same code between CLI input parm 897 # and Robot Framework "${EXECDIR}/logs" before referencing this class. 898 @classmethod 899 def validate_local_store(cls, dir_path): 900 r""" 901 Ensure path exists to store FFDC files locally. 902 903 Description of variable: 904 dir_path The dir path where collected ffdc data files will be stored. 905 906 """ 907 908 if not os.path.exists(dir_path): 909 try: 910 os.makedirs(dir_path, 0o755) 911 except (IOError, OSError) as e: 912 # PermissionError 913 if e.errno == EPERM or e.errno == EACCES: 914 print( 915 "\tERROR: os.makedirs %s failed with" 916 " PermissionError.\n" % dir_path 917 ) 918 else: 919 print( 920 "\tERROR: os.makedirs %s failed with %s.\n" 921 % (dir_path, e.strerror) 922 ) 923 sys.exit(-1) 924 925 def print_progress(self, progress): 926 r""" 927 Print activity progress + 928 929 Description of variable: 930 progress Progress counter. 931 932 """ 933 934 sys.stdout.write("\r\t" + "+" * progress) 935 sys.stdout.flush() 936 time.sleep(0.1) 937 938 def verify_redfish(self): 939 r""" 940 Verify remote host has redfish service active 941 942 """ 943 redfish_parm = ( 944 "redfishtool -r " 945 + self.hostname 946 + ":" 947 + self.port_https 948 + " -S Always raw GET /redfish/v1/" 949 ) 950 return self.run_tool_cmd(redfish_parm, True) 951 952 def verify_ipmi(self): 953 r""" 954 Verify remote host has IPMI LAN service active 955 956 """ 957 if self.target_type == "OPENBMC": 958 ipmi_parm = ( 959 "ipmitool -I lanplus -C 17 -U " 960 + self.username 961 + " -P " 962 + self.password 963 + " -H " 964 + self.hostname 965 + " -p " 966 + str(self.port_ipmi) 967 + " power status" 968 ) 969 else: 970 ipmi_parm = ( 971 "ipmitool -I lanplus -P " 972 + self.password 973 + " -H " 974 + self.hostname 975 + " -p " 976 + str(self.port_ipmi) 977 + " power status" 978 ) 979 980 return self.run_tool_cmd(ipmi_parm, True) 981 982 def run_tool_cmd(self, parms_string, quiet=False): 983 r""" 984 Run CLI standard tool or scripts. 985 986 Description of variable: 987 parms_string tool command options. 988 quiet do not print tool error message if True 989 """ 990 991 result = subprocess.run( 992 [parms_string], 993 stdout=subprocess.PIPE, 994 stderr=subprocess.PIPE, 995 shell=True, 996 universal_newlines=True, 997 ) 998 999 if result.stderr and not quiet: 1000 if self.password in parms_string: 1001 parms_string = parms_string.replace(self.password, "********") 1002 self.logger.error("\n\t\tERROR with %s " % parms_string) 1003 self.logger.error("\t\t" + result.stderr) 1004 1005 return result.stdout 1006 1007 def verify_protocol(self, protocol_list): 1008 r""" 1009 Perform protocol working check. 1010 1011 Description of argument(s): 1012 protocol_list List of protocol. 1013 """ 1014 1015 tmp_list = [] 1016 if self.target_is_pingable(): 1017 tmp_list.append("SHELL") 1018 1019 for protocol in protocol_list: 1020 if self.remote_protocol != "ALL": 1021 if self.remote_protocol != protocol: 1022 continue 1023 1024 # Only check SSH/SCP once for both protocols 1025 if ( 1026 protocol == "SSH" 1027 or protocol == "SCP" 1028 and protocol not in tmp_list 1029 ): 1030 if self.ssh_to_target_system(): 1031 # Add only what user asked. 1032 if self.remote_protocol != "ALL": 1033 tmp_list.append(self.remote_protocol) 1034 else: 1035 tmp_list.append("SSH") 1036 tmp_list.append("SCP") 1037 1038 if protocol == "TELNET": 1039 if self.telnet_to_target_system(): 1040 tmp_list.append(protocol) 1041 1042 if protocol == "REDFISH": 1043 if self.verify_redfish(): 1044 tmp_list.append(protocol) 1045 self.logger.info( 1046 "\n\t[Check] %s Redfish Service.\t\t [OK]" 1047 % self.hostname 1048 ) 1049 else: 1050 self.logger.info( 1051 "\n\t[Check] %s Redfish Service.\t\t [NOT AVAILABLE]" 1052 % self.hostname 1053 ) 1054 1055 if protocol == "IPMI": 1056 if self.verify_ipmi(): 1057 tmp_list.append(protocol) 1058 self.logger.info( 1059 "\n\t[Check] %s IPMI LAN Service.\t\t [OK]" 1060 % self.hostname 1061 ) 1062 else: 1063 self.logger.info( 1064 "\n\t[Check] %s IPMI LAN Service.\t\t [NOT AVAILABLE]" 1065 % self.hostname 1066 ) 1067 1068 return tmp_list 1069 1070 def load_env(self): 1071 r""" 1072 Load the user environment variables from a YAML file. 1073 1074 This method reads the environment variables from a YAML file specified 1075 in the ENV_FILE environment variable. If the file is not found or 1076 there is an error reading the file, an exception is raised. 1077 1078 The YAML file should have the following format: 1079 1080 .. code-block:: yaml 1081 1082 VAR_NAME: VAR_VALUE 1083 1084 Where VAR_NAME is the name of the environment variable, and 1085 VAR_VALUE is its value. 1086 1087 After loading the environment variables, they are stored in the 1088 self.env attribute for later use. 1089 """ 1090 1091 os.environ["hostname"] = self.hostname 1092 os.environ["username"] = self.username 1093 os.environ["password"] = self.password 1094 os.environ["port_ssh"] = self.port_ssh 1095 os.environ["port_https"] = self.port_https 1096 os.environ["port_ipmi"] = self.port_ipmi 1097 1098 # Append default Env. 1099 self.env_dict["hostname"] = self.hostname 1100 self.env_dict["username"] = self.username 1101 self.env_dict["password"] = self.password 1102 self.env_dict["port_ssh"] = self.port_ssh 1103 self.env_dict["port_https"] = self.port_https 1104 self.env_dict["port_ipmi"] = self.port_ipmi 1105 1106 try: 1107 tmp_env_dict = {} 1108 if self.env_vars: 1109 tmp_env_dict = json.loads(self.env_vars) 1110 # Export ENV vars default. 1111 for key, value in tmp_env_dict.items(): 1112 os.environ[key] = value 1113 self.env_dict[key] = str(value) 1114 1115 # Load user specified ENV config YAML. 1116 if self.econfig: 1117 with open(self.econfig, "r") as file: 1118 try: 1119 tmp_env_dict = yaml.load(file, Loader=yaml.SafeLoader) 1120 except yaml.YAMLError as e: 1121 self.logger.error(e) 1122 sys.exit(-1) 1123 # Export ENV vars. 1124 for key, value in tmp_env_dict["env_params"].items(): 1125 os.environ[key] = str(value) 1126 self.env_dict[key] = str(value) 1127 except json.decoder.JSONDecodeError as e: 1128 self.logger.error("\n\tERROR: %s " % e) 1129 sys.exit(-1) 1130 except FileNotFoundError as e: 1131 self.logger.error("\n\tERROR: %s " % e) 1132 sys.exit(-1) 1133 1134 # This to mask the password from displaying on the console. 1135 mask_dict = self.env_dict.copy() 1136 for k, v in mask_dict.items(): 1137 if k.lower().find("password") != -1: 1138 hidden_text = [] 1139 hidden_text.append(v) 1140 password_regex = ( 1141 "(" + "|".join([re.escape(x) for x in hidden_text]) + ")" 1142 ) 1143 mask_dict[k] = re.sub(password_regex, "********", v) 1144 1145 self.logger.info(json.dumps(mask_dict, indent=8, sort_keys=False)) 1146 1147 def execute_python_eval(self, eval_string): 1148 r""" 1149 Execute qualified python function string using eval. 1150 1151 Description of argument(s): 1152 eval_string Execute the python object. 1153 1154 Example: 1155 eval(plugin.foo_func.foo_func(10)) 1156 """ 1157 try: 1158 self.logger.info("\tExecuting plugin func()") 1159 self.logger.debug("\tCall func: %s" % eval_string) 1160 result = eval(eval_string) 1161 self.logger.info("\treturn: %s" % str(result)) 1162 except ( 1163 ValueError, 1164 SyntaxError, 1165 NameError, 1166 AttributeError, 1167 TypeError, 1168 ) as e: 1169 self.logger.error("\tERROR: execute_python_eval: %s" % e) 1170 # Set the plugin error state. 1171 plugin_error_dict["exit_on_error"] = True 1172 self.logger.info("\treturn: PLUGIN_EVAL_ERROR") 1173 return "PLUGIN_EVAL_ERROR" 1174 1175 return result 1176 1177 def execute_plugin_block(self, plugin_cmd_list): 1178 r""" 1179 Pack the plugin command to qualifed python string object. 1180 1181 Description of argument(s): 1182 plugin_list_dict Plugin block read from YAML 1183 [{'plugin_name': 'plugin.foo_func.my_func'}, 1184 {'plugin_args': [10]}] 1185 1186 Example: 1187 - plugin: 1188 - plugin_name: plugin.foo_func.my_func 1189 - plugin_args: 1190 - arg1 1191 - arg2 1192 1193 - plugin: 1194 - plugin_name: result = plugin.foo_func.my_func 1195 - plugin_args: 1196 - arg1 1197 - arg2 1198 1199 - plugin: 1200 - plugin_name: result1,result2 = plugin.foo_func.my_func 1201 - plugin_args: 1202 - arg1 1203 - arg2 1204 """ 1205 try: 1206 idx = self.key_index_list_dict("plugin_name", plugin_cmd_list) 1207 plugin_name = plugin_cmd_list[idx]["plugin_name"] 1208 # Equal separator means plugin function returns result. 1209 if " = " in plugin_name: 1210 # Ex. ['result', 'plugin.foo_func.my_func'] 1211 plugin_name_args = plugin_name.split(" = ") 1212 # plugin func return data. 1213 for arg in plugin_name_args: 1214 if arg == plugin_name_args[-1]: 1215 plugin_name = arg 1216 else: 1217 plugin_resp = arg.split(",") 1218 # ['result1','result2'] 1219 for x in plugin_resp: 1220 global_plugin_list.append(x) 1221 global_plugin_dict[x] = "" 1222 1223 # Walk the plugin args ['arg1,'arg2'] 1224 # If the YAML plugin statement 'plugin_args' is not declared. 1225 plugin_args = [] 1226 if any("plugin_args" in d for d in plugin_cmd_list): 1227 idx = self.key_index_list_dict("plugin_args", plugin_cmd_list) 1228 if idx is not None: 1229 plugin_args = plugin_cmd_list[idx].get("plugin_args", []) 1230 plugin_args = self.yaml_args_populate(plugin_args) 1231 else: 1232 plugin_args = self.yaml_args_populate([]) 1233 1234 # Pack the args arg1, arg2, .... argn into 1235 # "arg1","arg2","argn" string as params for function. 1236 parm_args_str = self.yaml_args_string(plugin_args) 1237 if parm_args_str: 1238 plugin_func = plugin_name + "(" + parm_args_str + ")" 1239 else: 1240 plugin_func = plugin_name + "()" 1241 1242 # Execute plugin function. 1243 if global_plugin_dict: 1244 resp = self.execute_python_eval(plugin_func) 1245 # Update plugin vars dict if there is any. 1246 if resp != "PLUGIN_EVAL_ERROR": 1247 self.response_args_data(resp) 1248 else: 1249 resp = self.execute_python_eval(plugin_func) 1250 except Exception as e: 1251 # Set the plugin error state. 1252 plugin_error_dict["exit_on_error"] = True 1253 self.logger.error("\tERROR: execute_plugin_block: %s" % e) 1254 pass 1255 1256 # There is a real error executing the plugin function. 1257 if resp == "PLUGIN_EVAL_ERROR": 1258 return resp 1259 1260 # Check if plugin_expects_return (int, string, list,dict etc) 1261 if any("plugin_expects_return" in d for d in plugin_cmd_list): 1262 idx = self.key_index_list_dict( 1263 "plugin_expects_return", plugin_cmd_list 1264 ) 1265 plugin_expects = plugin_cmd_list[idx]["plugin_expects_return"] 1266 if plugin_expects: 1267 if resp: 1268 if ( 1269 self.plugin_expect_type(plugin_expects, resp) 1270 == "INVALID" 1271 ): 1272 self.logger.error("\tWARN: Plugin error check skipped") 1273 elif not self.plugin_expect_type(plugin_expects, resp): 1274 self.logger.error( 1275 "\tERROR: Plugin expects return data: %s" 1276 % plugin_expects 1277 ) 1278 plugin_error_dict["exit_on_error"] = True 1279 elif not resp: 1280 self.logger.error( 1281 "\tERROR: Plugin func failed to return data" 1282 ) 1283 plugin_error_dict["exit_on_error"] = True 1284 1285 return resp 1286 1287 def response_args_data(self, plugin_resp): 1288 r""" 1289 Parse the plugin function response and update plugin return variable. 1290 1291 plugin_resp Response data from plugin function. 1292 """ 1293 resp_list = [] 1294 resp_data = "" 1295 1296 # There is nothing to update the plugin response. 1297 if len(global_plugin_list) == 0 or plugin_resp == "None": 1298 return 1299 1300 if isinstance(plugin_resp, str): 1301 resp_data = plugin_resp.strip("\r\n\t") 1302 resp_list.append(resp_data) 1303 elif isinstance(plugin_resp, bytes): 1304 resp_data = str(plugin_resp, "UTF-8").strip("\r\n\t") 1305 resp_list.append(resp_data) 1306 elif isinstance(plugin_resp, tuple): 1307 if len(global_plugin_list) == 1: 1308 resp_list.append(plugin_resp) 1309 else: 1310 resp_list = list(plugin_resp) 1311 resp_list = [x.strip("\r\n\t") for x in resp_list] 1312 elif isinstance(plugin_resp, list): 1313 if len(global_plugin_list) == 1: 1314 resp_list.append([x.strip("\r\n\t") for x in plugin_resp]) 1315 else: 1316 resp_list = [x.strip("\r\n\t") for x in plugin_resp] 1317 elif isinstance(plugin_resp, int) or isinstance(plugin_resp, float): 1318 resp_list.append(plugin_resp) 1319 1320 # Iterate if there is a list of plugin return vars to update. 1321 for idx, item in enumerate(resp_list, start=0): 1322 # Exit loop, done required loop. 1323 if idx >= len(global_plugin_list): 1324 break 1325 # Find the index of the return func in the list and 1326 # update the global func return dictionary. 1327 try: 1328 dict_idx = global_plugin_list[idx] 1329 global_plugin_dict[dict_idx] = item 1330 except (IndexError, ValueError) as e: 1331 self.logger.warn("\tWARN: response_args_data: %s" % e) 1332 pass 1333 1334 # Done updating plugin dict irrespective of pass or failed, 1335 # clear all the list element for next plugin block execute. 1336 global_plugin_list.clear() 1337 1338 def yaml_args_string(self, plugin_args): 1339 r""" 1340 Pack the args into string. 1341 1342 plugin_args arg list ['arg1','arg2,'argn'] 1343 """ 1344 args_str = "" 1345 for args in plugin_args: 1346 if args: 1347 if isinstance(args, (int, float)): 1348 args_str += str(args) 1349 elif args in global_plugin_type_list: 1350 args_str += str(global_plugin_dict[args]) 1351 else: 1352 args_str += '"' + str(args.strip("\r\n\t")) + '"' 1353 # Skip last list element. 1354 if args != plugin_args[-1]: 1355 args_str += "," 1356 return args_str 1357 1358 def yaml_args_populate(self, yaml_arg_list): 1359 r""" 1360 Decode environment and plugin variables and populate the argument list. 1361 1362 This method processes the yaml_arg_list argument, which is expected to 1363 contain a list of arguments read from a YAML file. The method iterates 1364 through the list, decodes environment and plugin variables, and 1365 returns a populated list of arguments. 1366 1367 .. code-block:: yaml 1368 1369 - plugin_args: 1370 - arg1 1371 - arg2 1372 1373 ['${hostname}:${port_https}', '${username}', '/redfish/v1/', 'json'] 1374 1375 Returns the populated plugin list 1376 ['xx.xx.xx.xx:443', 'root', '/redfish/v1/', 'json'] 1377 1378 Parameters: 1379 yaml_arg_list (list): A list of arguments containing environment 1380 and plugin variables. 1381 1382 Returns: 1383 list: A populated list of arguments with decoded environment and 1384 plugin variables. 1385 """ 1386 if isinstance(yaml_arg_list, list): 1387 populated_list = [] 1388 for arg in yaml_arg_list: 1389 if isinstance(arg, (int, float)): 1390 populated_list.append(arg) 1391 elif isinstance(arg, str): 1392 arg_str = self.yaml_env_and_plugin_vars_populate(str(arg)) 1393 populated_list.append(arg_str) 1394 else: 1395 populated_list.append(arg) 1396 1397 return populated_list 1398 1399 def yaml_env_and_plugin_vars_populate(self, yaml_arg_str): 1400 r""" 1401 Update environment variables and plugin variables based on the 1402 provided YAML argument string. 1403 1404 This method processes the yaml_arg_str argument, which is expected 1405 to contain a string representing environment variables and plugin 1406 variables in the format: 1407 1408 .. code-block:: yaml 1409 1410 - cat ${MY_VAR} 1411 - ls -AX my_plugin_var 1412 1413 The method parses the string, extracts the variable names, and updates 1414 the corresponding environment variables and plugin variables. 1415 1416 Parameters: 1417 yaml_arg_str (str): A string containing environment and plugin 1418 variable definitions in YAML format. 1419 1420 Returns: 1421 str: The updated YAML argument string with plugin variables 1422 replaced. 1423 """ 1424 1425 # Parse and convert the Plugin YAML vars string to python vars 1426 # Example: 1427 # ${my_hostname}:${port_https} -> ['my_hostname', 'port_https'] 1428 try: 1429 # Example, list of matching 1430 # env vars ['username', 'password', 'hostname'] 1431 # Extra escape \ for special symbols. '\$\{([^\}]+)\}' works good. 1432 env_var_regex = r"\$\{([^\}]+)\}" 1433 env_var_names_list = re.findall(env_var_regex, yaml_arg_str) 1434 1435 for var in env_var_names_list: 1436 env_var = os.environ.get(var) 1437 if env_var: 1438 env_replace = "${" + var + "}" 1439 yaml_arg_str = yaml_arg_str.replace(env_replace, env_var) 1440 except Exception as e: 1441 self.logger.error("\tERROR:yaml_env_vars_populate: %s" % e) 1442 pass 1443 1444 """ 1445 Parse the string for plugin vars. 1446 Implement the logic to update environment variables based on the 1447 extracted variable names. 1448 """ 1449 try: 1450 # Example, list of plugin vars env_var_names_list 1451 # ['my_hostname', 'port_https'] 1452 global_plugin_dict_keys = set(global_plugin_dict.keys()) 1453 # Skip env var list already populated above code block list. 1454 plugin_var_name_list = [ 1455 var 1456 for var in global_plugin_dict_keys 1457 if var not in env_var_names_list 1458 ] 1459 1460 for var in plugin_var_name_list: 1461 plugin_var_value = global_plugin_dict[var] 1462 if yaml_arg_str in global_plugin_dict: 1463 """ 1464 If this plugin var exist but empty in dict, don't replace. 1465 his is either a YAML plugin statement incorrectly used or 1466 user added a plugin var which is not going to be populated. 1467 """ 1468 if isinstance(plugin_var_value, (list, dict)): 1469 """ 1470 List data type or dict can't be replaced, use 1471 directly in eval function call. 1472 """ 1473 global_plugin_type_list.append(var) 1474 else: 1475 yaml_arg_str = yaml_arg_str.replace( 1476 str(var), str(plugin_var_value) 1477 ) 1478 except (IndexError, ValueError) as e: 1479 self.logger.error("\tERROR: yaml_plugin_vars_populate: %s" % e) 1480 pass 1481 1482 # From ${my_hostname}:${port_https} -> ['my_hostname', 'port_https'] 1483 # to populated values string as 1484 # Example: xx.xx.xx.xx:443 and return the string 1485 return yaml_arg_str 1486 1487 def plugin_error_check(self, plugin_dict): 1488 r""" 1489 Process plugin error dictionary and return the corresponding error 1490 message. 1491 1492 This method checks if any dictionary in the plugin_dict list contains 1493 a "plugin_error" key. If such a dictionary is found, it retrieves the 1494 value associated with the "plugin_error" key and returns the 1495 corresponding error message from the plugin_error_dict attribute. 1496 1497 Parameters: 1498 plugin_dict (list of dict): A list of dictionaries containing 1499 plugin error information. 1500 1501 Returns: 1502 str: The error message corresponding to the "plugin_error" value, 1503 or None if no error is found. 1504 """ 1505 if any("plugin_error" in d for d in plugin_dict): 1506 for d in plugin_dict: 1507 if "plugin_error" in d: 1508 value = d["plugin_error"] 1509 return self.plugin_error_dict.get(value, None) 1510 return None 1511 1512 def key_index_list_dict(self, key, list_dict): 1513 r""" 1514 Find the index of the first dictionary in the list that contains 1515 the specified key. 1516 1517 Parameters: 1518 key (str): The key to search for in the 1519 dictionaries. 1520 list_dict (list of dict): A list of dictionaries to search 1521 through. 1522 1523 Returns: 1524 int: The index of the first dictionary containing the key, or -1 1525 if no match is found. 1526 """ 1527 for i, d in enumerate(list_dict): 1528 if key in d: 1529 return i 1530 return -1 1531 1532 def plugin_expect_type(self, type, data): 1533 r""" 1534 Check if the provided data matches the expected type. 1535 1536 This method checks if the data argument matches the specified type. 1537 It supports the following types: "int", "float", "str", "list", "dict", 1538 and "tuple". 1539 1540 If the type is not recognized, it logs an info message and returns 1541 "INVALID". 1542 1543 Parameters: 1544 type (str): The expected data type. 1545 data: The data to check against the expected type. 1546 1547 Returns: 1548 bool or str: True if the data matches the expected type, False if 1549 not, or "INVALID" if the type is not recognized. 1550 """ 1551 if type == "int": 1552 return isinstance(data, int) 1553 elif type == "float": 1554 return isinstance(data, float) 1555 elif type == "str": 1556 return isinstance(data, str) 1557 elif type == "list": 1558 return isinstance(data, list) 1559 elif type == "dict": 1560 return isinstance(data, dict) 1561 elif type == "tuple": 1562 return isinstance(data, tuple) 1563 else: 1564 self.logger.info("\tInvalid data type requested: %s" % type) 1565 return "INVALID" 1566