1#!/usr/bin/env python3 2 3 4import logging 5import socket 6import telnetlib 7import time 8from collections import deque 9 10 11class TelnetRemoteclient: 12 r""" 13 Class to create telnet connection to remote host for command execution. 14 """ 15 16 def __init__( 17 self, hostname, username, password, port=23, read_timeout=None 18 ): 19 r""" 20 Description of argument(s): 21 22 hostname Name/IP of the remote (targeting) host 23 username User on the remote host with access to FFCD files 24 password Password for user on remote host 25 read_timeout New read timeout value to override default one 26 """ 27 28 self.hostname = hostname 29 self.username = username 30 self.password = password 31 self.tnclient = None 32 self.port = port 33 self.read_timeout = read_timeout 34 35 def tn_remoteclient_login(self): 36 is_telnet = True 37 try: 38 self.tnclient = telnetlib.Telnet( 39 self.hostname, self.port, timeout=15 40 ) 41 if b"login:" in self.tnclient.read_until( 42 b"login:", timeout=self.read_timeout 43 ): 44 self.tnclient.write(self.username.encode("utf-8") + b"\n") 45 46 if b"Password:" in self.tnclient.read_until( 47 b"Password:", timeout=self.read_timeout 48 ): 49 self.tnclient.write(self.password.encode("utf-8") + b"\n") 50 51 n, match, pre_match = self.tnclient.expect( 52 [ 53 b"Login incorrect", 54 b"invalid login name or password.", 55 rb"\#", 56 rb"\$", 57 ], 58 timeout=self.read_timeout, 59 ) 60 if n == 0 or n == 1: 61 logging.error( 62 "\n\tERROR: Telnet Authentication Failed. Check" 63 " userid and password.\n\n" 64 ) 65 is_telnet = False 66 else: 67 # login successful 68 self.fifo = deque() 69 else: 70 # Anything else, telnet server is not running 71 logging.error("\n\tERROR: Telnet Connection Refused.\n\n") 72 is_telnet = False 73 else: 74 is_telnet = False 75 except Exception: 76 # Any kind of exception, skip telnet protocol 77 is_telnet = False 78 79 return is_telnet 80 81 def __del__(self): 82 self.tn_remoteclient_disconnect() 83 84 def tn_remoteclient_disconnect(self): 85 try: 86 self.tnclient.close() 87 except Exception: 88 # the telnet object might not exist yet, so ignore this one 89 pass 90 91 def execute_command(self, cmd, i_timeout=120): 92 r""" 93 Executes commands on the remote host 94 95 Description of argument(s): 96 cmd Command to run on remote host 97 i_timeout Timeout for command output 98 default is 120 seconds 99 """ 100 101 # Wait time for command execution before reading the output. 102 # Use user input wait time for command execution if one exists. 103 # Else use the default 120 sec, 104 if i_timeout != 120: 105 execution_time = i_timeout 106 else: 107 execution_time = 120 108 109 # Execute the command and read the command output. 110 return_buffer = b"" 111 try: 112 # Do at least one non-blocking read. 113 # to flush whatever data is in the read buffer. 114 while self.tnclient.read_very_eager(): 115 continue 116 117 # Execute the command 118 self.tnclient.write(cmd.encode("utf-8") + b"\n") 119 time.sleep(execution_time) 120 121 local_buffer = b"" 122 # Read the command output one block at a time. 123 return_buffer = self.tnclient.read_very_eager() 124 while return_buffer: 125 local_buffer = b"".join([local_buffer, return_buffer]) 126 time.sleep(3) # let the buffer fill up a bit 127 return_buffer = self.tnclient.read_very_eager() 128 except (socket.error, EOFError) as e: 129 self.tn_remoteclient_disconnect() 130 131 if str(e).__contains__("Connection reset by peer"): 132 msg = e 133 elif str(e).__contains__("telnet connection closed"): 134 msg = "Telnet connection closed." 135 else: 136 msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e) 137 138 logging.error("\t\t ERROR %s " % msg) 139 140 # Return ASCII string data with ending PROMPT stripped 141 return local_buffer.decode("ascii", "ignore").replace("$ ", "\n") 142