1""" 2QEMU Console Socket Module: 3 4This python module implements a ConsoleSocket object, 5which can drain a socket and optionally dump the bytes to file. 6""" 7# Copyright 2020 Linaro 8# 9# Authors: 10# Robert Foley <robert.foley@linaro.org> 11# 12# This code is licensed under the GPL version 2 or later. See 13# the COPYING file in the top-level directory. 14# 15 16from collections import deque 17import socket 18import threading 19import time 20from typing import Deque, Optional 21 22 23class ConsoleSocket(socket.socket): 24 """ 25 ConsoleSocket represents a socket attached to a char device. 26 27 :param address: An AF_UNIX path or address. 28 :param sock_fd: Optionally, an existing socket file descriptor. 29 One of address or sock_fd must be specified. 30 :param file: Optionally, a filename to log to. 31 :param drain: Optionally, drains the socket and places the bytes 32 into an in memory buffer for later processing. 33 """ 34 def __init__(self, 35 address: Optional[str] = None, 36 sock_fd: Optional[int] = None, 37 file: Optional[str] = None, 38 drain: bool = False): 39 if address is None and sock_fd is None: 40 raise ValueError("one of 'address' or 'sock_fd' must be specified") 41 if address is not None and sock_fd is not None: 42 raise ValueError("can't specify both 'address' and 'sock_fd'") 43 44 self._recv_timeout_sec = 300.0 45 self._sleep_time = 0.5 46 self._buffer: Deque[int] = deque() 47 if address is not None: 48 socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM) 49 self.connect(address) 50 else: 51 assert sock_fd is not None 52 socket.socket.__init__(self, fileno=sock_fd) 53 self._logfile = None 54 if file: 55 # pylint: disable=consider-using-with 56 self._logfile = open(file, "bw") 57 self._open = True 58 self._drain_thread = None 59 if drain: 60 self._drain_thread = self._thread_start() 61 62 def __repr__(self) -> str: 63 tmp = super().__repr__() 64 tmp = tmp.rstrip(">") 65 tmp = "%s, logfile=%s, drain_thread=%s>" % (tmp, self._logfile, 66 self._drain_thread) 67 return tmp 68 69 def _drain_fn(self) -> None: 70 """Drains the socket and runs while the socket is open.""" 71 while self._open: 72 try: 73 self._drain_socket() 74 except socket.timeout: 75 # The socket is expected to timeout since we set a 76 # short timeout to allow the thread to exit when 77 # self._open is set to False. 78 time.sleep(self._sleep_time) 79 80 def _thread_start(self) -> threading.Thread: 81 """Kick off a thread to drain the socket.""" 82 # Configure socket to not block and timeout. 83 # This allows our drain thread to not block 84 # on receive and exit smoothly. 85 socket.socket.setblocking(self, False) 86 socket.socket.settimeout(self, 1) 87 drain_thread = threading.Thread(target=self._drain_fn) 88 drain_thread.daemon = True 89 drain_thread.start() 90 return drain_thread 91 92 def close(self) -> None: 93 """Close the base object and wait for the thread to terminate""" 94 if self._open: 95 self._open = False 96 if self._drain_thread is not None: 97 thread, self._drain_thread = self._drain_thread, None 98 thread.join() 99 socket.socket.close(self) 100 if self._logfile: 101 self._logfile.close() 102 self._logfile = None 103 104 def _drain_socket(self) -> None: 105 """process arriving characters into in memory _buffer""" 106 data = socket.socket.recv(self, 1) 107 if self._logfile: 108 self._logfile.write(data) 109 self._logfile.flush() 110 self._buffer.extend(data) 111 112 def recv(self, bufsize: int = 1, flags: int = 0) -> bytes: 113 """Return chars from in memory buffer. 114 Maintains the same API as socket.socket.recv. 115 """ 116 if self._drain_thread is None: 117 # Not buffering the socket, pass thru to socket. 118 return socket.socket.recv(self, bufsize, flags) 119 assert not flags, "Cannot pass flags to recv() in drained mode" 120 start_time = time.time() 121 while len(self._buffer) < bufsize: 122 time.sleep(self._sleep_time) 123 elapsed_sec = time.time() - start_time 124 if elapsed_sec > self._recv_timeout_sec: 125 raise socket.timeout 126 return bytes((self._buffer.popleft() for i in range(bufsize))) 127 128 def setblocking(self, value: bool) -> None: 129 """When not draining we pass thru to the socket, 130 since when draining we control socket blocking. 131 """ 132 if self._drain_thread is None: 133 socket.socket.setblocking(self, value) 134 135 def settimeout(self, value: Optional[float]) -> None: 136 """When not draining we pass thru to the socket, 137 since when draining we control the timeout. 138 """ 139 if value is not None: 140 self._recv_timeout_sec = value 141 if self._drain_thread is None: 142 socket.socket.settimeout(self, value) 143