1#!/usr/bin/env python3 2 3 4import logging 5import socket 6import telnetlib 7import time 8from collections import deque 9 10 11class TelnetRemoteclient: 12 r""" 13 Class to create telnet connection to remote host for command execution. 14 """ 15 16 def __init__( 17 self, hostname, username, password, port=23, read_timeout=None 18 ): 19 r""" 20 Initialize the TelnetRemoteClient object with the provided remote host 21 details. 22 23 This method initializes a TelnetRemoteClient object with the given 24 attributes, which represent the details of the remote (targeting) host. 25 26 The attributes include the hostname, username, password, and Telnet 27 port. 28 29 The method also accepts an optional read_timeout parameter, which 30 specifies a new read timeout value to override the default one. 31 32 Parameters: 33 hostname (str): Name or IP address of the remote 34 host. 35 username (str): User on the remote host with access 36 to files. 37 password (str): Password for the user on the remote 38 host. 39 port (int, optional): Telnet port value. Defaults to 23. 40 read_timeout (int, optional): New read timeout value to override 41 the default one. Defaults to None. 42 43 Returns: 44 None 45 """ 46 self.hostname = hostname 47 self.username = username 48 self.password = password 49 self.tnclient = None 50 self.port = port 51 self.read_timeout = read_timeout 52 53 def tn_remoteclient_login(self): 54 r""" 55 Establish a Telnet connection to the remote host and log in. 56 57 This method establishes a Telnet connection to the remote host using 58 the provided hostname, username, and password. If the connection and 59 login are successful, the method returns True. Otherwise, it returns 60 False. 61 62 Parameters: 63 None 64 65 Returns: 66 bool: True if the Telnet connection and login are successful, 67 False otherwise. 68 """ 69 is_telnet = True 70 try: 71 self.tnclient = telnetlib.Telnet( 72 self.hostname, self.port, timeout=15 73 ) 74 if b"login:" in self.tnclient.read_until( 75 b"login:", timeout=self.read_timeout 76 ): 77 self.tnclient.write(self.username.encode("utf-8") + b"\n") 78 79 if b"Password:" in self.tnclient.read_until( 80 b"Password:", timeout=self.read_timeout 81 ): 82 self.tnclient.write(self.password.encode("utf-8") + b"\n") 83 84 n, match, pre_match = self.tnclient.expect( 85 [ 86 b"Login incorrect", 87 b"invalid login name or password.", 88 rb"\#", 89 rb"\$", 90 ], 91 timeout=self.read_timeout, 92 ) 93 if n == 0 or n == 1: 94 logging.error( 95 "\n\tERROR: Telnet Authentication Failed. Check" 96 " userid and password.\n\n" 97 ) 98 is_telnet = False 99 else: 100 # login successful 101 self.fifo = deque() 102 else: 103 # Anything else, telnet server is not running 104 logging.error("\n\tERROR: Telnet Connection Refused.\n\n") 105 is_telnet = False 106 else: 107 is_telnet = False 108 except Exception: 109 # Any kind of exception, skip telnet protocol 110 is_telnet = False 111 112 return is_telnet 113 114 def __del__(self): 115 r""" 116 Disconnect from the remote host when the object is deleted. 117 118 This method disconnects from the remote host when the 119 TelnetRemoteClient object is deleted. 120 """ 121 self.tn_remoteclient_disconnect() 122 123 def tn_remoteclient_disconnect(self): 124 r""" 125 Disconnect from the remote host using the Telnet client. 126 127 This method disconnects from the remote host using the Telnet client 128 established during the FFDC collection process. 129 130 The method attempts to close the Telnet connection. If the Telnet 131 client does not exist, the method ignores the exception. 132 """ 133 try: 134 self.tnclient.close() 135 except Exception: 136 # the telnet object might not exist yet, so ignore this one 137 pass 138 139 def execute_command(self, cmd, i_timeout=120): 140 r""" 141 Executes a command on the remote host using Telnet and returns the 142 output. 143 144 This method executes a provided command on the remote host using 145 Telnet. The method takes the cmd argument, which is expected to be a 146 valid command to execute, and an optional i_timeout parameter, which 147 specifies the timeout for the command output. 148 149 The method returns the output of the executed command as a string. 150 151 Parameters: 152 cmd (str): The command to be executed on the 153 remote host. 154 i_timeout (int, optional): The timeout for the command output. 155 Defaults to 120 seconds. 156 157 Returns: 158 str: The output of the executed command as a string. 159 """ 160 # Wait time for command execution before reading the output. 161 # Use user input wait time for command execution if one exists. 162 # Else use the default 120 sec, 163 if i_timeout != 120: 164 execution_time = i_timeout 165 else: 166 execution_time = 120 167 168 # Execute the command and read the command output. 169 return_buffer = b"" 170 try: 171 # Do at least one non-blocking read. 172 # to flush whatever data is in the read buffer. 173 while self.tnclient.read_very_eager(): 174 continue 175 176 # Execute the command 177 self.tnclient.write(cmd.encode("utf-8") + b"\n") 178 time.sleep(execution_time) 179 180 local_buffer = b"" 181 # Read the command output one block at a time. 182 return_buffer = self.tnclient.read_very_eager() 183 while return_buffer: 184 local_buffer = b"".join([local_buffer, return_buffer]) 185 time.sleep(3) # let the buffer fill up a bit 186 return_buffer = self.tnclient.read_very_eager() 187 except (socket.error, EOFError) as e: 188 self.tn_remoteclient_disconnect() 189 190 if str(e).__contains__("Connection reset by peer"): 191 msg = e 192 elif str(e).__contains__("telnet connection closed"): 193 msg = "Telnet connection closed." 194 else: 195 msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e) 196 197 logging.error("\t\t ERROR %s " % msg) 198 199 # Return ASCII string data with ending PROMPT stripped 200 return local_buffer.decode("ascii", "ignore").replace("$ ", "\n") 201