1""" 2Miscellaneous Utilities 3 4This module provides asyncio utilities and compatibility wrappers for 5Python 3.6 to provide some features that otherwise become available in 6Python 3.7+. 7 8Various logging and debugging utilities are also provided, such as 9`exception_summary()` and `pretty_traceback()`, used primarily for 10adding information into the logging stream. 11""" 12 13import asyncio 14import sys 15import traceback 16from typing import ( 17 Any, 18 Coroutine, 19 Optional, 20 TypeVar, 21 cast, 22) 23 24 25T = TypeVar('T') 26 27 28# -------------------------- 29# Section: Utility Functions 30# -------------------------- 31 32 33async def flush(writer: asyncio.StreamWriter) -> None: 34 """ 35 Utility function to ensure a StreamWriter is *fully* drained. 36 37 `asyncio.StreamWriter.drain` only promises we will return to below 38 the "high-water mark". This function ensures we flush the entire 39 buffer -- by setting the high water mark to 0 and then calling 40 drain. The flow control limits are restored after the call is 41 completed. 42 """ 43 transport = cast( # type: ignore[redundant-cast] 44 asyncio.WriteTransport, writer.transport 45 ) 46 47 # https://github.com/python/typeshed/issues/5779 48 low, high = transport.get_write_buffer_limits() # type: ignore 49 transport.set_write_buffer_limits(0, 0) 50 try: 51 await writer.drain() 52 finally: 53 transport.set_write_buffer_limits(high, low) 54 55 56def upper_half(func: T) -> T: 57 """ 58 Do-nothing decorator that annotates a method as an "upper-half" method. 59 60 These methods must not call bottom-half functions directly, but can 61 schedule them to run. 62 """ 63 return func 64 65 66def bottom_half(func: T) -> T: 67 """ 68 Do-nothing decorator that annotates a method as a "bottom-half" method. 69 70 These methods must take great care to handle their own exceptions whenever 71 possible. If they go unhandled, they will cause termination of the loop. 72 73 These methods do not, in general, have the ability to directly 74 report information to a caller’s context and will usually be 75 collected as a Task result instead. 76 77 They must not call upper-half functions directly. 78 """ 79 return func 80 81 82# ------------------------------- 83# Section: Compatibility Wrappers 84# ------------------------------- 85 86 87def create_task(coro: Coroutine[Any, Any, T], 88 loop: Optional[asyncio.AbstractEventLoop] = None 89 ) -> 'asyncio.Future[T]': 90 """ 91 Python 3.6-compatible `asyncio.create_task` wrapper. 92 93 :param coro: The coroutine to execute in a task. 94 :param loop: Optionally, the loop to create the task in. 95 96 :return: An `asyncio.Future` object. 97 """ 98 if sys.version_info >= (3, 7): 99 if loop is not None: 100 return loop.create_task(coro) 101 return asyncio.create_task(coro) # pylint: disable=no-member 102 103 # Python 3.6: 104 return asyncio.ensure_future(coro, loop=loop) 105 106 107def is_closing(writer: asyncio.StreamWriter) -> bool: 108 """ 109 Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. 110 111 :param writer: The `asyncio.StreamWriter` object. 112 :return: `True` if the writer is closing, or closed. 113 """ 114 if sys.version_info >= (3, 7): 115 return writer.is_closing() 116 117 # Python 3.6: 118 transport = writer.transport 119 assert isinstance(transport, asyncio.WriteTransport) 120 return transport.is_closing() 121 122 123async def wait_closed(writer: asyncio.StreamWriter) -> None: 124 """ 125 Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. 126 127 :param writer: The `asyncio.StreamWriter` to wait on. 128 """ 129 if sys.version_info >= (3, 7): 130 await writer.wait_closed() 131 return 132 133 # Python 3.6 134 transport = writer.transport 135 assert isinstance(transport, asyncio.WriteTransport) 136 137 while not transport.is_closing(): 138 await asyncio.sleep(0) 139 140 # This is an ugly workaround, but it's the best I can come up with. 141 sock = transport.get_extra_info('socket') 142 143 if sock is None: 144 # Our transport doesn't have a socket? ... 145 # Nothing we can reasonably do. 146 return 147 148 while sock.fileno() != -1: 149 await asyncio.sleep(0) 150 151 152def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: 153 """ 154 Python 3.6-compatible `asyncio.run` wrapper. 155 156 :param coro: A coroutine to execute now. 157 :return: The return value from the coroutine. 158 """ 159 if sys.version_info >= (3, 7): 160 return asyncio.run(coro, debug=debug) 161 162 # Python 3.6 163 loop = asyncio.get_event_loop() 164 loop.set_debug(debug) 165 ret = loop.run_until_complete(coro) 166 loop.close() 167 168 return ret 169 170 171# ---------------------------- 172# Section: Logging & Debugging 173# ---------------------------- 174 175 176def exception_summary(exc: BaseException) -> str: 177 """ 178 Return a summary string of an arbitrary exception. 179 180 It will be of the form "ExceptionType: Error Message", if the error 181 string is non-empty, and just "ExceptionType" otherwise. 182 """ 183 name = type(exc).__qualname__ 184 smod = type(exc).__module__ 185 if smod not in ("__main__", "builtins"): 186 name = smod + '.' + name 187 188 error = str(exc) 189 if error: 190 return f"{name}: {error}" 191 return name 192 193 194def pretty_traceback(prefix: str = " | ") -> str: 195 """ 196 Formats the current traceback, indented to provide visual distinction. 197 198 This is useful for printing a traceback within a traceback for 199 debugging purposes when encapsulating errors to deliver them up the 200 stack; when those errors are printed, this helps provide a nice 201 visual grouping to quickly identify the parts of the error that 202 belong to the inner exception. 203 204 :param prefix: The prefix to append to each line of the traceback. 205 :return: A string, formatted something like the following:: 206 207 | Traceback (most recent call last): 208 | File "foobar.py", line 42, in arbitrary_example 209 | foo.baz() 210 | ArbitraryError: [Errno 42] Something bad happened! 211 """ 212 output = "".join(traceback.format_exception(*sys.exc_info())) 213 214 exc_lines = [] 215 for line in output.split('\n'): 216 exc_lines.append(prefix + line) 217 218 # The last line is always empty, omit it 219 return "\n".join(exc_lines[:-1]) 220