2 Miscellaneous Utilities
4 This module provides asyncio utilities and compatibility wrappers for
5 Python 3.6 to provide some features that otherwise become available in
8 Various logging and debugging utilities are also provided, such as
9 `exception_summary()` and `pretty_traceback()`, used primarily for
10 adding information into the logging stream.
28 # --------------------------
29 # Section: Utility Functions
30 # --------------------------
33 async def flush(writer: asyncio.StreamWriter) -> None:
35 Utility function to ensure a StreamWriter is *fully* drained.
37 `asyncio.StreamWriter.drain` only promises we will return to below
38 the "high-water mark". This function ensures we flush the entire
39 buffer -- by setting the high water mark to 0 and then calling
40 drain. The flow control limits are restored after the call is
43 transport = cast( # type: ignore[redundant-cast]
44 asyncio.WriteTransport, writer.transport
47 # https://github.com/python/typeshed/issues/5779
48 low, high = transport.get_write_buffer_limits() # type: ignore
49 transport.set_write_buffer_limits(0, 0)
53 transport.set_write_buffer_limits(high, low)
56 def upper_half(func: T) -> T:
58 Do-nothing decorator that annotates a method as an "upper-half" method.
60 These methods must not call bottom-half functions directly, but can
66 def bottom_half(func: T) -> T:
68 Do-nothing decorator that annotates a method as a "bottom-half" method.
70 These methods must take great care to handle their own exceptions whenever
71 possible. If they go unhandled, they will cause termination of the loop.
73 These methods do not, in general, have the ability to directly
74 report information to a caller’s context and will usually be
75 collected as a Task result instead.
77 They must not call upper-half functions directly.
82 # -------------------------------
83 # Section: Compatibility Wrappers
84 # -------------------------------
87 def create_task(coro: Coroutine[Any, Any, T],
88 loop: Optional[asyncio.AbstractEventLoop] = None
89 ) -> 'asyncio.Future[T]':
91 Python 3.6-compatible `asyncio.create_task` wrapper.
93 :param coro: The coroutine to execute in a task.
94 :param loop: Optionally, the loop to create the task in.
96 :return: An `asyncio.Future` object.
98 if sys.version_info >= (3, 7):
100 return loop.create_task(coro)
101 return asyncio.create_task(coro) # pylint: disable=no-member
104 return asyncio.ensure_future(coro, loop=loop)
107 def is_closing(writer: asyncio.StreamWriter) -> bool:
109 Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
111 :param writer: The `asyncio.StreamWriter` object.
112 :return: `True` if the writer is closing, or closed.
114 if sys.version_info >= (3, 7):
115 return writer.is_closing()
118 transport = writer.transport
119 assert isinstance(transport, asyncio.WriteTransport)
120 return transport.is_closing()
123 async def wait_closed(writer: asyncio.StreamWriter) -> None:
125 Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
127 :param writer: The `asyncio.StreamWriter` to wait on.
129 if sys.version_info >= (3, 7):
130 await writer.wait_closed()
134 transport = writer.transport
135 assert isinstance(transport, asyncio.WriteTransport)
137 while not transport.is_closing():
138 await asyncio.sleep(0)
140 # This is an ugly workaround, but it's the best I can come up with.
141 sock = transport.get_extra_info('socket')
144 # Our transport doesn't have a socket? ...
145 # Nothing we can reasonably do.
148 while sock.fileno() != -1:
149 await asyncio.sleep(0)
152 def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
154 Python 3.6-compatible `asyncio.run` wrapper.
156 :param coro: A coroutine to execute now.
157 :return: The return value from the coroutine.
159 if sys.version_info >= (3, 7):
160 return asyncio.run(coro, debug=debug)
163 loop = asyncio.get_event_loop()
164 loop.set_debug(debug)
165 ret = loop.run_until_complete(coro)
171 # ----------------------------
172 # Section: Logging & Debugging
173 # ----------------------------
176 def exception_summary(exc: BaseException) -> str:
178 Return a summary string of an arbitrary exception.
180 It will be of the form "ExceptionType: Error Message", if the error
181 string is non-empty, and just "ExceptionType" otherwise.
183 name = type(exc).__qualname__
184 smod = type(exc).__module__
185 if smod not in ("__main__", "builtins"):
186 name = smod + '.' + name
190 return f"{name}: {error}"
194 def pretty_traceback(prefix: str = " | ") -> str:
196 Formats the current traceback, indented to provide visual distinction.
198 This is useful for printing a traceback within a traceback for
199 debugging purposes when encapsulating errors to deliver them up the
200 stack; when those errors are printed, this helps provide a nice
201 visual grouping to quickly identify the parts of the error that
202 belong to the inner exception.
204 :param prefix: The prefix to append to each line of the traceback.
205 :return: A string, formatted something like the following::
207 | Traceback (most recent call last):
208 | File "foobar.py", line 42, in arbitrary_example
210 | ArbitraryError: [Errno 42] Something bad happened!
212 output = "".join(traceback.format_exception(*sys.exc_info()))
215 for line in output.split('\n'):
216 exc_lines.append(prefix + line)
218 # The last line is always empty, omit it
219 return "\n".join(exc_lines[:-1])