xref: /openbmc/openbmc-test-automation/ffdc/lib/telnet_utility.py (revision 0c8100ffc8275a1442943aeb15f08e79d622da66)
1#!/usr/bin/env python3
2
3
4import time
5import socket
6import logging
7import telnetlib
8from collections import deque
9
10
11class TelnetRemoteclient:
12
13    r"""
14    Class to create telnet connection to remote host for command execution.
15    """
16
17    def __init__(self, hostname, username, password, port=23, read_timeout=None):
18
19        r"""
20        Description of argument(s):
21
22        hostname        Name/IP of the remote (targeting) host
23        username        User on the remote host with access to FFCD files
24        password        Password for user on remote host
25        read_timeout    New read timeout value to override default one
26        """
27
28        self.hostname = hostname
29        self.username = username
30        self.password = password
31        self.tnclient = None
32        self.port = port
33        self.read_timeout = read_timeout
34
35    def tn_remoteclient_login(self):
36
37        is_telnet = True
38        try:
39            self.tnclient = telnetlib.Telnet(self.hostname, self.port, timeout=15)
40            if b'login:' in self.tnclient.read_until(b'login:', timeout=self.read_timeout):
41                self.tnclient.write(self.username.encode('utf-8') + b"\n")
42
43                if b'Password:' in self.tnclient.read_until(b'Password:', timeout=self.read_timeout):
44                    self.tnclient.write(self.password.encode('utf-8') + b"\n")
45
46                    n, match, pre_match = \
47                        self.tnclient.expect(
48                            [b'Login incorrect', b'invalid login name or password.', br'\#', br'\$'],
49                            timeout=self.read_timeout)
50                    if n == 0 or n == 1:
51                        logging.error(
52                            "\n\tERROR: Telnet Authentication Failed.  Check userid and password.\n\n")
53                        is_telnet = False
54                    else:
55                        # login successful
56                        self.fifo = deque()
57                else:
58                    # Anything else, telnet server is not running
59                    logging.error("\n\tERROR: Telnet Connection Refused.\n\n")
60                    is_telnet = False
61            else:
62                is_telnet = False
63        except Exception:
64            # Any kind of exception, skip telnet protocol
65            is_telnet = False
66
67        return is_telnet
68
69    def __del__(self):
70        self.tn_remoteclient_disconnect()
71
72    def tn_remoteclient_disconnect(self):
73        try:
74            self.tnclient.close()
75        except Exception:
76            # the telnet object might not exist yet, so ignore this one
77            pass
78
79    def execute_command(self, cmd,
80                        i_timeout=120):
81
82        r'''
83            Executes commands on the remote host
84
85            Description of argument(s):
86            cmd             Command to run on remote host
87            i_timeout       Timeout for command output
88                            default is 120 seconds
89        '''
90
91        # Wait time for command execution before reading the output.
92        # Use user input wait time for command execution if one exists.
93        # Else use the default 120 sec,
94        if i_timeout != 120:
95            execution_time = i_timeout
96        else:
97            execution_time = 120
98
99        # Execute the command and read the command output.
100        return_buffer = b''
101        try:
102
103            # Do at least one non-blocking read.
104            #  to flush whatever data is in the read buffer.
105            while self.tnclient.read_very_eager():
106                continue
107
108            # Execute the command
109            self.tnclient.write(cmd.encode('utf-8') + b'\n')
110            time.sleep(execution_time)
111
112            local_buffer = b''
113            # Read the command output one block at a time.
114            return_buffer = self.tnclient.read_very_eager()
115            while return_buffer:
116                local_buffer = b''.join([local_buffer, return_buffer])
117                time.sleep(3)  # let the buffer fill up a bit
118                return_buffer = self.tnclient.read_very_eager()
119        except (socket.error, EOFError) as e:
120            self.tn_remoteclient_disconnect()
121
122            if str(e).__contains__("Connection reset by peer"):
123                msg = e
124            elif str(e).__contains__("telnet connection closed"):
125                msg = "Telnet connection closed."
126            else:
127                msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e)
128
129            logging.error("\t\t ERROR %s " % msg)
130
131        # Return ASCII string data with ending PROMPT stripped
132        return local_buffer.decode('ascii', 'ignore').replace('$ ', '\n')
133