2 Miscellaneous Utilities
4 This module provides asyncio utilities and compatibility wrappers for
5 Python 3.6 to provide some features that otherwise become available in
8 Various logging and debugging utilities are also provided, such as
9 `exception_summary()` and `pretty_traceback()`, used primarily for
10 adding information into the logging stream.
28 # --------------------------
29 # Section: Utility Functions
30 # --------------------------
33 async def flush(writer: asyncio.StreamWriter) -> None:
35 Utility function to ensure a StreamWriter is *fully* drained.
37 `asyncio.StreamWriter.drain` only promises we will return to below
38 the "high-water mark". This function ensures we flush the entire
39 buffer -- by setting the high water mark to 0 and then calling
40 drain. The flow control limits are restored after the call is
43 transport = cast(asyncio.WriteTransport, writer.transport)
45 # https://github.com/python/typeshed/issues/5779
46 low, high = transport.get_write_buffer_limits() # type: ignore
47 transport.set_write_buffer_limits(0, 0)
51 transport.set_write_buffer_limits(high, low)
54 def upper_half(func: T) -> T:
56 Do-nothing decorator that annotates a method as an "upper-half" method.
58 These methods must not call bottom-half functions directly, but can
64 def bottom_half(func: T) -> T:
66 Do-nothing decorator that annotates a method as a "bottom-half" method.
68 These methods must take great care to handle their own exceptions whenever
69 possible. If they go unhandled, they will cause termination of the loop.
71 These methods do not, in general, have the ability to directly
72 report information to a caller’s context and will usually be
73 collected as a Task result instead.
75 They must not call upper-half functions directly.
80 # -------------------------------
81 # Section: Compatibility Wrappers
82 # -------------------------------
85 def create_task(coro: Coroutine[Any, Any, T],
86 loop: Optional[asyncio.AbstractEventLoop] = None
87 ) -> 'asyncio.Future[T]':
89 Python 3.6-compatible `asyncio.create_task` wrapper.
91 :param coro: The coroutine to execute in a task.
92 :param loop: Optionally, the loop to create the task in.
94 :return: An `asyncio.Future` object.
96 if sys.version_info >= (3, 7):
98 return loop.create_task(coro)
99 return asyncio.create_task(coro) # pylint: disable=no-member
102 return asyncio.ensure_future(coro, loop=loop)
105 def is_closing(writer: asyncio.StreamWriter) -> bool:
107 Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
109 :param writer: The `asyncio.StreamWriter` object.
110 :return: `True` if the writer is closing, or closed.
112 if sys.version_info >= (3, 7):
113 return writer.is_closing()
116 transport = writer.transport
117 assert isinstance(transport, asyncio.WriteTransport)
118 return transport.is_closing()
121 async def wait_closed(writer: asyncio.StreamWriter) -> None:
123 Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
125 :param writer: The `asyncio.StreamWriter` to wait on.
127 if sys.version_info >= (3, 7):
128 await writer.wait_closed()
132 transport = writer.transport
133 assert isinstance(transport, asyncio.WriteTransport)
135 while not transport.is_closing():
136 await asyncio.sleep(0)
138 # This is an ugly workaround, but it's the best I can come up with.
139 sock = transport.get_extra_info('socket')
142 # Our transport doesn't have a socket? ...
143 # Nothing we can reasonably do.
146 while sock.fileno() != -1:
147 await asyncio.sleep(0)
150 def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
152 Python 3.6-compatible `asyncio.run` wrapper.
154 :param coro: A coroutine to execute now.
155 :return: The return value from the coroutine.
157 if sys.version_info >= (3, 7):
158 return asyncio.run(coro, debug=debug)
161 loop = asyncio.get_event_loop()
162 loop.set_debug(debug)
163 ret = loop.run_until_complete(coro)
169 # ----------------------------
170 # Section: Logging & Debugging
171 # ----------------------------
174 def exception_summary(exc: BaseException) -> str:
176 Return a summary string of an arbitrary exception.
178 It will be of the form "ExceptionType: Error Message", if the error
179 string is non-empty, and just "ExceptionType" otherwise.
181 name = type(exc).__qualname__
182 smod = type(exc).__module__
183 if smod not in ("__main__", "builtins"):
184 name = smod + '.' + name
188 return f"{name}: {error}"
192 def pretty_traceback(prefix: str = " | ") -> str:
194 Formats the current traceback, indented to provide visual distinction.
196 This is useful for printing a traceback within a traceback for
197 debugging purposes when encapsulating errors to deliver them up the
198 stack; when those errors are printed, this helps provide a nice
199 visual grouping to quickly identify the parts of the error that
200 belong to the inner exception.
202 :param prefix: The prefix to append to each line of the traceback.
203 :return: A string, formatted something like the following::
205 | Traceback (most recent call last):
206 | File "foobar.py", line 42, in arbitrary_example
208 | ArbitraryError: [Errno 42] Something bad happened!
210 output = "".join(traceback.format_exception(*sys.exc_info()))
213 for line in output.split('\n'):
214 exc_lines.append(prefix + line)
216 # The last line is always empty, omit it
217 return "\n".join(exc_lines[:-1])