#!/usr/bin/env python3 import logging import socket import telnetlib import time from collections import deque class TelnetRemoteclient: r""" Class to create telnet connection to remote host for command execution. """ def __init__( self, hostname, username, password, port=23, read_timeout=None ): r""" Initialize the TelnetRemoteClient object with the provided remote host details. This method initializes a TelnetRemoteClient object with the given attributes, which represent the details of the remote (targeting) host. The attributes include the hostname, username, password, and Telnet port. The method also accepts an optional read_timeout parameter, which specifies a new read timeout value to override the default one. Parameters: hostname (str): Name or IP address of the remote host. username (str): User on the remote host with access to files. password (str): Password for the user on the remote host. port (int, optional): Telnet port value. Defaults to 23. read_timeout (int, optional): New read timeout value to override the default one. Defaults to None. Returns: None """ self.hostname = hostname self.username = username self.password = password self.tnclient = None self.port = port self.read_timeout = read_timeout def tn_remoteclient_login(self): r""" Establish a Telnet connection to the remote host and log in. This method establishes a Telnet connection to the remote host using the provided hostname, username, and password. If the connection and login are successful, the method returns True. Otherwise, it returns False. Parameters: None Returns: bool: True if the Telnet connection and login are successful, False otherwise. """ is_telnet = True try: self.tnclient = telnetlib.Telnet( self.hostname, self.port, timeout=15 ) if b"login:" in self.tnclient.read_until( b"login:", timeout=self.read_timeout ): self.tnclient.write(self.username.encode("utf-8") + b"\n") if b"Password:" in self.tnclient.read_until( b"Password:", timeout=self.read_timeout ): self.tnclient.write(self.password.encode("utf-8") + b"\n") n, match, pre_match = self.tnclient.expect( [ b"Login incorrect", b"invalid login name or password.", rb"\#", rb"\$", ], timeout=self.read_timeout, ) if n == 0 or n == 1: logging.error( "\n\tERROR: Telnet Authentication Failed. Check" " userid and password.\n\n" ) is_telnet = False else: # login successful self.fifo = deque() else: # Anything else, telnet server is not running logging.error("\n\tERROR: Telnet Connection Refused.\n\n") is_telnet = False else: is_telnet = False except Exception: # Any kind of exception, skip telnet protocol is_telnet = False return is_telnet def __del__(self): r""" Disconnect from the remote host when the object is deleted. This method disconnects from the remote host when the TelnetRemoteClient object is deleted. """ self.tn_remoteclient_disconnect() def tn_remoteclient_disconnect(self): r""" Disconnect from the remote host using the Telnet client. This method disconnects from the remote host using the Telnet client established during the FFDC collection process. The method attempts to close the Telnet connection. If the Telnet client does not exist, the method ignores the exception. """ try: self.tnclient.close() except Exception: # the telnet object might not exist yet, so ignore this one pass def execute_command(self, cmd, i_timeout=120): r""" Executes a command on the remote host using Telnet and returns the output. This method executes a provided command on the remote host using Telnet. The method takes the cmd argument, which is expected to be a valid command to execute, and an optional i_timeout parameter, which specifies the timeout for the command output. The method returns the output of the executed command as a string. Parameters: cmd (str): The command to be executed on the remote host. i_timeout (int, optional): The timeout for the command output. Defaults to 120 seconds. Returns: str: The output of the executed command as a string. """ # Wait time for command execution before reading the output. # Use user input wait time for command execution if one exists. # Else use the default 120 sec, if i_timeout != 120: execution_time = i_timeout else: execution_time = 120 # Execute the command and read the command output. return_buffer = b"" try: # Do at least one non-blocking read. # to flush whatever data is in the read buffer. while self.tnclient.read_very_eager(): continue # Execute the command self.tnclient.write(cmd.encode("utf-8") + b"\n") time.sleep(execution_time) local_buffer = b"" # Read the command output one block at a time. return_buffer = self.tnclient.read_very_eager() while return_buffer: local_buffer = b"".join([local_buffer, return_buffer]) time.sleep(3) # let the buffer fill up a bit return_buffer = self.tnclient.read_very_eager() except (socket.error, EOFError) as e: self.tn_remoteclient_disconnect() if str(e).__contains__("Connection reset by peer"): msg = e elif str(e).__contains__("telnet connection closed"): msg = "Telnet connection closed." else: msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e) logging.error("\t\t ERROR %s " % msg) # Return ASCII string data with ending PROMPT stripped return local_buffer.decode("ascii", "ignore").replace("$ ", "\n")