xref: /openbmc/openbmc-test-automation/ffdc/lib/telnet_utility.py (revision 42c84ea5d0dd320e1a1d57bcba34fcb788c7788c)
1#!/usr/bin/env python3
2
3
4import logging
5import socket
6import telnetlib
7import time
8from collections import deque
9
10
11class TelnetRemoteclient:
12    r"""
13    Class to create telnet connection to remote host for command execution.
14    """
15
16    def __init__(
17        self, hostname, username, password, port=23, read_timeout=None
18    ):
19        r"""
20        Description of argument(s):
21
22        hostname        Name/IP of the remote (targeting) host
23        username        User on the remote host with access to FFCD files
24        password        Password for user on remote host
25        read_timeout    New read timeout value to override default one
26        """
27
28        self.hostname = hostname
29        self.username = username
30        self.password = password
31        self.tnclient = None
32        self.port = port
33        self.read_timeout = read_timeout
34
35    def tn_remoteclient_login(self):
36        is_telnet = True
37        try:
38            self.tnclient = telnetlib.Telnet(
39                self.hostname, self.port, timeout=15
40            )
41            if b"login:" in self.tnclient.read_until(
42                b"login:", timeout=self.read_timeout
43            ):
44                self.tnclient.write(self.username.encode("utf-8") + b"\n")
45
46                if b"Password:" in self.tnclient.read_until(
47                    b"Password:", timeout=self.read_timeout
48                ):
49                    self.tnclient.write(self.password.encode("utf-8") + b"\n")
50
51                    n, match, pre_match = self.tnclient.expect(
52                        [
53                            b"Login incorrect",
54                            b"invalid login name or password.",
55                            rb"\#",
56                            rb"\$",
57                        ],
58                        timeout=self.read_timeout,
59                    )
60                    if n == 0 or n == 1:
61                        logging.error(
62                            "\n\tERROR: Telnet Authentication Failed.  Check"
63                            " userid and password.\n\n"
64                        )
65                        is_telnet = False
66                    else:
67                        # login successful
68                        self.fifo = deque()
69                else:
70                    # Anything else, telnet server is not running
71                    logging.error("\n\tERROR: Telnet Connection Refused.\n\n")
72                    is_telnet = False
73            else:
74                is_telnet = False
75        except Exception:
76            # Any kind of exception, skip telnet protocol
77            is_telnet = False
78
79        return is_telnet
80
81    def __del__(self):
82        self.tn_remoteclient_disconnect()
83
84    def tn_remoteclient_disconnect(self):
85        try:
86            self.tnclient.close()
87        except Exception:
88            # the telnet object might not exist yet, so ignore this one
89            pass
90
91    def execute_command(self, cmd, i_timeout=120):
92        r"""
93        Executes commands on the remote host
94
95        Description of argument(s):
96        cmd             Command to run on remote host
97        i_timeout       Timeout for command output
98                        default is 120 seconds
99        """
100
101        # Wait time for command execution before reading the output.
102        # Use user input wait time for command execution if one exists.
103        # Else use the default 120 sec,
104        if i_timeout != 120:
105            execution_time = i_timeout
106        else:
107            execution_time = 120
108
109        # Execute the command and read the command output.
110        return_buffer = b""
111        try:
112            # Do at least one non-blocking read.
113            #  to flush whatever data is in the read buffer.
114            while self.tnclient.read_very_eager():
115                continue
116
117            # Execute the command
118            self.tnclient.write(cmd.encode("utf-8") + b"\n")
119            time.sleep(execution_time)
120
121            local_buffer = b""
122            # Read the command output one block at a time.
123            return_buffer = self.tnclient.read_very_eager()
124            while return_buffer:
125                local_buffer = b"".join([local_buffer, return_buffer])
126                time.sleep(3)  # let the buffer fill up a bit
127                return_buffer = self.tnclient.read_very_eager()
128        except (socket.error, EOFError) as e:
129            self.tn_remoteclient_disconnect()
130
131            if str(e).__contains__("Connection reset by peer"):
132                msg = e
133            elif str(e).__contains__("telnet connection closed"):
134                msg = "Telnet connection closed."
135            else:
136                msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e)
137
138            logging.error("\t\t ERROR %s " % msg)
139
140        # Return ASCII string data with ending PROMPT stripped
141        return local_buffer.decode("ascii", "ignore").replace("$ ", "\n")
142