]> Git Repo - qemu.git/blob - python/qemu/aqmp/protocol.py
python/aqmp: remove _new_session and _establish_connection
[qemu.git] / python / qemu / aqmp / protocol.py
1 """
2 Generic Asynchronous Message-based Protocol Support
3
4 This module provides a generic framework for sending and receiving
5 messages over an asyncio stream. `AsyncProtocol` is an abstract class
6 that implements the core mechanisms of a simple send/receive protocol,
7 and is designed to be extended.
8
9 In this package, it is used as the implementation for the `QMPClient`
10 class.
11 """
12
13 import asyncio
14 from asyncio import StreamReader, StreamWriter
15 from enum import Enum
16 from functools import wraps
17 import logging
18 import socket
19 from ssl import SSLContext
20 from typing import (
21     Any,
22     Awaitable,
23     Callable,
24     Generic,
25     List,
26     Optional,
27     Tuple,
28     TypeVar,
29     Union,
30     cast,
31 )
32
33 from .error import QMPError
34 from .util import (
35     bottom_half,
36     create_task,
37     exception_summary,
38     flush,
39     is_closing,
40     pretty_traceback,
41     upper_half,
42     wait_closed,
43 )
44
45
46 T = TypeVar('T')
47 _U = TypeVar('_U')
48 _TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
49
50 InternetAddrT = Tuple[str, int]
51 UnixAddrT = str
52 SocketAddrT = Union[UnixAddrT, InternetAddrT]
53
54
55 class Runstate(Enum):
56     """Protocol session runstate."""
57
58     #: Fully quiesced and disconnected.
59     IDLE = 0
60     #: In the process of connecting or establishing a session.
61     CONNECTING = 1
62     #: Fully connected and active session.
63     RUNNING = 2
64     #: In the process of disconnecting.
65     #: Runstate may be returned to `IDLE` by calling `disconnect()`.
66     DISCONNECTING = 3
67
68
69 class ConnectError(QMPError):
70     """
71     Raised when the initial connection process has failed.
72
73     This Exception always wraps a "root cause" exception that can be
74     interrogated for additional information.
75
76     :param error_message: Human-readable string describing the error.
77     :param exc: The root-cause exception.
78     """
79     def __init__(self, error_message: str, exc: Exception):
80         super().__init__(error_message)
81         #: Human-readable error string
82         self.error_message: str = error_message
83         #: Wrapped root cause exception
84         self.exc: Exception = exc
85
86     def __str__(self) -> str:
87         cause = str(self.exc)
88         if not cause:
89             # If there's no error string, use the exception name.
90             cause = exception_summary(self.exc)
91         return f"{self.error_message}: {cause}"
92
93
94 class StateError(QMPError):
95     """
96     An API command (connect, execute, etc) was issued at an inappropriate time.
97
98     This error is raised when a command like
99     :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
100     time.
101
102     :param error_message: Human-readable string describing the state violation.
103     :param state: The actual `Runstate` seen at the time of the violation.
104     :param required: The `Runstate` required to process this command.
105     """
106     def __init__(self, error_message: str,
107                  state: Runstate, required: Runstate):
108         super().__init__(error_message)
109         self.error_message = error_message
110         self.state = state
111         self.required = required
112
113
114 F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
115
116
117 # Don't Panic.
118 def require(required_state: Runstate) -> Callable[[F], F]:
119     """
120     Decorator: protect a method so it can only be run in a certain `Runstate`.
121
122     :param required_state: The `Runstate` required to invoke this method.
123     :raise StateError: When the required `Runstate` is not met.
124     """
125     def _decorator(func: F) -> F:
126         # _decorator is the decorator that is built by calling the
127         # require() decorator factory; e.g.:
128         #
129         # @require(Runstate.IDLE) def foo(): ...
130         # will replace 'foo' with the result of '_decorator(foo)'.
131
132         @wraps(func)
133         def _wrapper(proto: 'AsyncProtocol[Any]',
134                      *args: Any, **kwargs: Any) -> Any:
135             # _wrapper is the function that gets executed prior to the
136             # decorated method.
137
138             name = type(proto).__name__
139
140             if proto.runstate != required_state:
141                 if proto.runstate == Runstate.CONNECTING:
142                     emsg = f"{name} is currently connecting."
143                 elif proto.runstate == Runstate.DISCONNECTING:
144                     emsg = (f"{name} is disconnecting."
145                             " Call disconnect() to return to IDLE state.")
146                 elif proto.runstate == Runstate.RUNNING:
147                     emsg = f"{name} is already connected and running."
148                 elif proto.runstate == Runstate.IDLE:
149                     emsg = f"{name} is disconnected and idle."
150                 else:
151                     assert False
152                 raise StateError(emsg, proto.runstate, required_state)
153             # No StateError, so call the wrapped method.
154             return func(proto, *args, **kwargs)
155
156         # Return the decorated method;
157         # Transforming Func to Decorated[Func].
158         return cast(F, _wrapper)
159
160     # Return the decorator instance from the decorator factory. Phew!
161     return _decorator
162
163
164 class AsyncProtocol(Generic[T]):
165     """
166     AsyncProtocol implements a generic async message-based protocol.
167
168     This protocol assumes the basic unit of information transfer between
169     client and server is a "message", the details of which are left up
170     to the implementation. It assumes the sending and receiving of these
171     messages is full-duplex and not necessarily correlated; i.e. it
172     supports asynchronous inbound messages.
173
174     It is designed to be extended by a specific protocol which provides
175     the implementations for how to read and send messages. These must be
176     defined in `_do_recv()` and `_do_send()`, respectively.
177
178     Other callbacks have a default implementation, but are intended to be
179     either extended or overridden:
180
181      - `_establish_session`:
182          The base implementation starts the reader/writer tasks.
183          A protocol implementation can override this call, inserting
184          actions to be taken prior to starting the reader/writer tasks
185          before the super() call; actions needing to occur afterwards
186          can be written after the super() call.
187      - `_on_message`:
188          Actions to be performed when a message is received.
189      - `_cb_outbound`:
190          Logging/Filtering hook for all outbound messages.
191      - `_cb_inbound`:
192          Logging/Filtering hook for all inbound messages.
193          This hook runs *before* `_on_message()`.
194
195     :param name:
196         Name used for logging messages, if any. By default, messages
197         will log to 'qemu.aqmp.protocol', but each individual connection
198         can be given its own logger by giving it a name; messages will
199         then log to 'qemu.aqmp.protocol.${name}'.
200     """
201     # pylint: disable=too-many-instance-attributes
202
203     #: Logger object for debugging messages from this connection.
204     logger = logging.getLogger(__name__)
205
206     # Maximum allowable size of read buffer
207     _limit = (64 * 1024)
208
209     # -------------------------
210     # Section: Public interface
211     # -------------------------
212
213     def __init__(self, name: Optional[str] = None) -> None:
214         #: The nickname for this connection, if any.
215         self.name: Optional[str] = name
216         if self.name is not None:
217             self.logger = self.logger.getChild(self.name)
218
219         # stream I/O
220         self._reader: Optional[StreamReader] = None
221         self._writer: Optional[StreamWriter] = None
222
223         # Outbound Message queue
224         self._outgoing: asyncio.Queue[T]
225
226         # Special, long-running tasks:
227         self._reader_task: Optional[asyncio.Future[None]] = None
228         self._writer_task: Optional[asyncio.Future[None]] = None
229
230         # Aggregate of the above two tasks, used for Exception management.
231         self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
232
233         #: Disconnect task. The disconnect implementation runs in a task
234         #: so that asynchronous disconnects (initiated by the
235         #: reader/writer) are allowed to wait for the reader/writers to
236         #: exit.
237         self._dc_task: Optional[asyncio.Future[None]] = None
238
239         self._runstate = Runstate.IDLE
240         self._runstate_changed: Optional[asyncio.Event] = None
241
242         # Workaround for bind()
243         self._sock: Optional[socket.socket] = None
244
245     def __repr__(self) -> str:
246         cls_name = type(self).__name__
247         tokens = []
248         if self.name is not None:
249             tokens.append(f"name={self.name!r}")
250         tokens.append(f"runstate={self.runstate.name}")
251         return f"<{cls_name} {' '.join(tokens)}>"
252
253     @property  # @upper_half
254     def runstate(self) -> Runstate:
255         """The current `Runstate` of the connection."""
256         return self._runstate
257
258     @upper_half
259     async def runstate_changed(self) -> Runstate:
260         """
261         Wait for the `runstate` to change, then return that runstate.
262         """
263         await self._runstate_event.wait()
264         return self.runstate
265
266     @upper_half
267     @require(Runstate.IDLE)
268     async def start_server_and_accept(
269             self, address: SocketAddrT,
270             ssl: Optional[SSLContext] = None
271     ) -> None:
272         """
273         Accept a connection and begin processing message queues.
274
275         If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
276
277         :param address:
278             Address to listen on; UNIX socket path or TCP address/port.
279         :param ssl: SSL context to use, if any.
280
281         :raise StateError: When the `Runstate` is not `IDLE`.
282         :raise ConnectError:
283             When a connection or session cannot be established.
284
285             This exception will wrap a more concrete one. In most cases,
286             the wrapped exception will be `OSError` or `EOFError`. If a
287             protocol-level failure occurs while establishing a new
288             session, the wrapped error may also be an `QMPError`.
289         """
290         await self._session_guard(
291             self._do_accept(address, ssl),
292             'Failed to establish connection')
293         await self._session_guard(
294             self._establish_session(),
295             'Failed to establish session')
296         assert self.runstate == Runstate.RUNNING
297
298     @upper_half
299     @require(Runstate.IDLE)
300     async def connect(self, address: SocketAddrT,
301                       ssl: Optional[SSLContext] = None) -> None:
302         """
303         Connect to the server and begin processing message queues.
304
305         If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
306
307         :param address:
308             Address to connect to; UNIX socket path or TCP address/port.
309         :param ssl: SSL context to use, if any.
310
311         :raise StateError: When the `Runstate` is not `IDLE`.
312         :raise ConnectError:
313             When a connection or session cannot be established.
314
315             This exception will wrap a more concrete one. In most cases,
316             the wrapped exception will be `OSError` or `EOFError`. If a
317             protocol-level failure occurs while establishing a new
318             session, the wrapped error may also be an `QMPError`.
319         """
320         await self._session_guard(
321             self._do_connect(address, ssl),
322             'Failed to establish connection')
323         await self._session_guard(
324             self._establish_session(),
325             'Failed to establish session')
326         assert self.runstate == Runstate.RUNNING
327
328     @upper_half
329     async def disconnect(self) -> None:
330         """
331         Disconnect and wait for all tasks to fully stop.
332
333         If there was an exception that caused the reader/writers to
334         terminate prematurely, it will be raised here.
335
336         :raise Exception: When the reader or writer terminate unexpectedly.
337         """
338         self.logger.debug("disconnect() called.")
339         self._schedule_disconnect()
340         await self._wait_disconnect()
341
342     # --------------------------
343     # Section: Session machinery
344     # --------------------------
345
346     async def _session_guard(self, coro: Awaitable[None], emsg: str) -> None:
347         """
348         Async guard function used to roll back to `IDLE` on any error.
349
350         On any Exception, the state machine will be reset back to
351         `IDLE`. Most Exceptions will be wrapped with `ConnectError`, but
352         `BaseException` events will be left alone (This includes
353         asyncio.CancelledError, even prior to Python 3.8).
354
355         :param error_message:
356             Human-readable string describing what connection phase failed.
357
358         :raise BaseException:
359             When `BaseException` occurs in the guarded block.
360         :raise ConnectError:
361             When any other error is encountered in the guarded block.
362         """
363         # Note: After Python 3.6 support is removed, this should be an
364         # @asynccontextmanager instead of accepting a callback.
365         try:
366             await coro
367         except BaseException as err:
368             self.logger.error("%s: %s", emsg, exception_summary(err))
369             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
370             try:
371                 # Reset the runstate back to IDLE.
372                 await self.disconnect()
373             except:
374                 # We don't expect any Exceptions from the disconnect function
375                 # here, because we failed to connect in the first place.
376                 # The disconnect() function is intended to perform
377                 # only cannot-fail cleanup here, but you never know.
378                 emsg = (
379                     "Unexpected bottom half exception. "
380                     "This is a bug in the QMP library. "
381                     "Please report it to <[email protected]> and "
382                     "CC: John Snow <[email protected]>."
383                 )
384                 self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
385                 raise
386
387             # CancelledError is an Exception with special semantic meaning;
388             # We do NOT want to wrap it up under ConnectError.
389             # NB: CancelledError is not a BaseException before Python 3.8
390             if isinstance(err, asyncio.CancelledError):
391                 raise
392
393             # Any other kind of error can be treated as some kind of connection
394             # failure broadly. Inspect the 'exc' field to explore the root
395             # cause in greater detail.
396             if isinstance(err, Exception):
397                 raise ConnectError(emsg, err) from err
398
399             # Raise BaseExceptions un-wrapped, they're more important.
400             raise
401
402     @property
403     def _runstate_event(self) -> asyncio.Event:
404         # asyncio.Event() objects should not be created prior to entrance into
405         # an event loop, so we can ensure we create it in the correct context.
406         # Create it on-demand *only* at the behest of an 'async def' method.
407         if not self._runstate_changed:
408             self._runstate_changed = asyncio.Event()
409         return self._runstate_changed
410
411     @upper_half
412     @bottom_half
413     def _set_state(self, state: Runstate) -> None:
414         """
415         Change the `Runstate` of the protocol connection.
416
417         Signals the `runstate_changed` event.
418         """
419         if state == self._runstate:
420             return
421
422         self.logger.debug("Transitioning from '%s' to '%s'.",
423                           str(self._runstate), str(state))
424         self._runstate = state
425         self._runstate_event.set()
426         self._runstate_event.clear()
427
428     def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
429         """
430         Used to create a socket in advance of accept().
431
432         This is a workaround to ensure that we can guarantee timing of
433         precisely when a socket exists to avoid a connection attempt
434         bouncing off of nothing.
435
436         Python 3.7+ adds a feature to separate the server creation and
437         listening phases instead, and should be used instead of this
438         hack.
439         """
440         if isinstance(address, tuple):
441             family = socket.AF_INET
442         else:
443             family = socket.AF_UNIX
444
445         sock = socket.socket(family, socket.SOCK_STREAM)
446         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
447
448         try:
449             sock.bind(address)
450         except:
451             sock.close()
452             raise
453
454         self._sock = sock
455
456     @upper_half
457     async def _do_accept(self, address: SocketAddrT,
458                          ssl: Optional[SSLContext] = None) -> None:
459         """
460         Acting as the transport server, accept a single connection.
461
462         :param address:
463             Address to listen on; UNIX socket path or TCP address/port.
464         :param ssl: SSL context to use, if any.
465
466         :raise OSError: For stream-related errors.
467         """
468         assert self.runstate == Runstate.IDLE
469         self._set_state(Runstate.CONNECTING)
470
471         self.logger.debug("Awaiting connection on %s ...", address)
472         connected = asyncio.Event()
473         server: Optional[asyncio.AbstractServer] = None
474
475         async def _client_connected_cb(reader: asyncio.StreamReader,
476                                        writer: asyncio.StreamWriter) -> None:
477             """Used to accept a single incoming connection, see below."""
478             nonlocal server
479             nonlocal connected
480
481             # A connection has been accepted; stop listening for new ones.
482             assert server is not None
483             server.close()
484             await server.wait_closed()
485             server = None
486
487             # Register this client as being connected
488             self._reader, self._writer = (reader, writer)
489
490             # Signal back: We've accepted a client!
491             connected.set()
492
493         if isinstance(address, tuple):
494             coro = asyncio.start_server(
495                 _client_connected_cb,
496                 host=None if self._sock else address[0],
497                 port=None if self._sock else address[1],
498                 ssl=ssl,
499                 backlog=1,
500                 limit=self._limit,
501                 sock=self._sock,
502             )
503         else:
504             coro = asyncio.start_unix_server(
505                 _client_connected_cb,
506                 path=None if self._sock else address,
507                 ssl=ssl,
508                 backlog=1,
509                 limit=self._limit,
510                 sock=self._sock,
511             )
512
513         # Allow runstate watchers to witness 'CONNECTING' state; some
514         # failures in the streaming layer are synchronous and will not
515         # otherwise yield.
516         await asyncio.sleep(0)
517
518         server = await coro     # Starts listening
519         await connected.wait()  # Waits for the callback to fire (and finish)
520         assert server is None
521         self._sock = None
522
523         self.logger.debug("Connection accepted.")
524
525     @upper_half
526     async def _do_connect(self, address: SocketAddrT,
527                           ssl: Optional[SSLContext] = None) -> None:
528         """
529         Acting as the transport client, initiate a connection to a server.
530
531         :param address:
532             Address to connect to; UNIX socket path or TCP address/port.
533         :param ssl: SSL context to use, if any.
534
535         :raise OSError: For stream-related errors.
536         """
537         assert self.runstate == Runstate.IDLE
538         self._set_state(Runstate.CONNECTING)
539
540         # Allow runstate watchers to witness 'CONNECTING' state; some
541         # failures in the streaming layer are synchronous and will not
542         # otherwise yield.
543         await asyncio.sleep(0)
544
545         self.logger.debug("Connecting to %s ...", address)
546
547         if isinstance(address, tuple):
548             connect = asyncio.open_connection(
549                 address[0],
550                 address[1],
551                 ssl=ssl,
552                 limit=self._limit,
553             )
554         else:
555             connect = asyncio.open_unix_connection(
556                 path=address,
557                 ssl=ssl,
558                 limit=self._limit,
559             )
560         self._reader, self._writer = await connect
561
562         self.logger.debug("Connected.")
563
564     @upper_half
565     async def _establish_session(self) -> None:
566         """
567         Establish a new session.
568
569         Starts the readers/writer tasks; subclasses may perform their
570         own negotiations here. The Runstate will be RUNNING upon
571         successful conclusion.
572         """
573         assert self.runstate == Runstate.CONNECTING
574
575         self._outgoing = asyncio.Queue()
576
577         reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
578         writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
579
580         self._reader_task = create_task(reader_coro)
581         self._writer_task = create_task(writer_coro)
582
583         self._bh_tasks = asyncio.gather(
584             self._reader_task,
585             self._writer_task,
586         )
587
588         self._set_state(Runstate.RUNNING)
589         await asyncio.sleep(0)  # Allow runstate_event to process
590
591     @upper_half
592     @bottom_half
593     def _schedule_disconnect(self) -> None:
594         """
595         Initiate a disconnect; idempotent.
596
597         This method is used both in the upper-half as a direct
598         consequence of `disconnect()`, and in the bottom-half in the
599         case of unhandled exceptions in the reader/writer tasks.
600
601         It can be invoked no matter what the `runstate` is.
602         """
603         if not self._dc_task:
604             self._set_state(Runstate.DISCONNECTING)
605             self.logger.debug("Scheduling disconnect.")
606             self._dc_task = create_task(self._bh_disconnect())
607
608     @upper_half
609     async def _wait_disconnect(self) -> None:
610         """
611         Waits for a previously scheduled disconnect to finish.
612
613         This method will gather any bottom half exceptions and re-raise
614         the one that occurred first; presuming it to be the root cause
615         of any subsequent Exceptions. It is intended to be used in the
616         upper half of the call chain.
617
618         :raise Exception:
619             Arbitrary exception re-raised on behalf of the reader/writer.
620         """
621         assert self.runstate == Runstate.DISCONNECTING
622         assert self._dc_task
623
624         aws: List[Awaitable[object]] = [self._dc_task]
625         if self._bh_tasks:
626             aws.insert(0, self._bh_tasks)
627         all_defined_tasks = asyncio.gather(*aws)
628
629         # Ensure disconnect is done; Exception (if any) is not raised here:
630         await asyncio.wait((self._dc_task,))
631
632         try:
633             await all_defined_tasks  # Raise Exceptions from the bottom half.
634         finally:
635             self._cleanup()
636             self._set_state(Runstate.IDLE)
637
638     @upper_half
639     def _cleanup(self) -> None:
640         """
641         Fully reset this object to a clean state and return to `IDLE`.
642         """
643         def _paranoid_task_erase(task: Optional['asyncio.Future[_U]']
644                                  ) -> Optional['asyncio.Future[_U]']:
645             # Help to erase a task, ENSURING it is fully quiesced first.
646             assert (task is None) or task.done()
647             return None if (task and task.done()) else task
648
649         assert self.runstate == Runstate.DISCONNECTING
650         self._dc_task = _paranoid_task_erase(self._dc_task)
651         self._reader_task = _paranoid_task_erase(self._reader_task)
652         self._writer_task = _paranoid_task_erase(self._writer_task)
653         self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
654
655         self._reader = None
656         self._writer = None
657
658         # NB: _runstate_changed cannot be cleared because we still need it to
659         # send the final runstate changed event ...!
660
661     # ----------------------------
662     # Section: Bottom Half methods
663     # ----------------------------
664
665     @bottom_half
666     async def _bh_disconnect(self) -> None:
667         """
668         Disconnect and cancel all outstanding tasks.
669
670         It is designed to be called from its task context,
671         :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
672         it is free to wait on any pending actions that may still need to
673         occur in either the reader or writer tasks.
674         """
675         assert self.runstate == Runstate.DISCONNECTING
676
677         def _done(task: Optional['asyncio.Future[Any]']) -> bool:
678             return task is not None and task.done()
679
680         # Are we already in an error pathway? If either of the tasks are
681         # already done, or if we have no tasks but a reader/writer; we
682         # must be.
683         #
684         # NB: We can't use _bh_tasks to check for premature task
685         # completion, because it may not yet have had a chance to run
686         # and gather itself.
687         tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
688         error_pathway = _done(self._reader_task) or _done(self._writer_task)
689         if not tasks:
690             error_pathway |= bool(self._reader) or bool(self._writer)
691
692         try:
693             # Try to flush the writer, if possible.
694             # This *may* cause an error and force us over into the error path.
695             if not error_pathway:
696                 await self._bh_flush_writer()
697         except BaseException as err:
698             error_pathway = True
699             emsg = "Failed to flush the writer"
700             self.logger.error("%s: %s", emsg, exception_summary(err))
701             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
702             raise
703         finally:
704             # Cancel any still-running tasks (Won't raise):
705             if self._writer_task is not None and not self._writer_task.done():
706                 self.logger.debug("Cancelling writer task.")
707                 self._writer_task.cancel()
708             if self._reader_task is not None and not self._reader_task.done():
709                 self.logger.debug("Cancelling reader task.")
710                 self._reader_task.cancel()
711
712             # Close out the tasks entirely (Won't raise):
713             if tasks:
714                 self.logger.debug("Waiting for tasks to complete ...")
715                 await asyncio.wait(tasks)
716
717             # Lastly, close the stream itself. (*May raise*!):
718             await self._bh_close_stream(error_pathway)
719             self.logger.debug("Disconnected.")
720
721     @bottom_half
722     async def _bh_flush_writer(self) -> None:
723         if not self._writer_task:
724             return
725
726         self.logger.debug("Draining the outbound queue ...")
727         await self._outgoing.join()
728         if self._writer is not None:
729             self.logger.debug("Flushing the StreamWriter ...")
730             await flush(self._writer)
731
732     @bottom_half
733     async def _bh_close_stream(self, error_pathway: bool = False) -> None:
734         # NB: Closing the writer also implcitly closes the reader.
735         if not self._writer:
736             return
737
738         if not is_closing(self._writer):
739             self.logger.debug("Closing StreamWriter.")
740             self._writer.close()
741
742         self.logger.debug("Waiting for StreamWriter to close ...")
743         try:
744             await wait_closed(self._writer)
745         except Exception:  # pylint: disable=broad-except
746             # It's hard to tell if the Stream is already closed or
747             # not. Even if one of the tasks has failed, it may have
748             # failed for a higher-layered protocol reason. The
749             # stream could still be open and perfectly fine.
750             # I don't know how to discern its health here.
751
752             if error_pathway:
753                 # We already know that *something* went wrong. Let's
754                 # just trust that the Exception we already have is the
755                 # better one to present to the user, even if we don't
756                 # genuinely *know* the relationship between the two.
757                 self.logger.debug(
758                     "Discarding Exception from wait_closed:\n%s\n",
759                     pretty_traceback(),
760                 )
761             else:
762                 # Oops, this is a brand-new error!
763                 raise
764         finally:
765             self.logger.debug("StreamWriter closed.")
766
767     @bottom_half
768     async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
769         """
770         Run one of the bottom-half methods in a loop forever.
771
772         If the bottom half ever raises any exception, schedule a
773         disconnect that will terminate the entire loop.
774
775         :param async_fn: The bottom-half method to run in a loop.
776         :param name: The name of this task, used for logging.
777         """
778         try:
779             while True:
780                 await async_fn()
781         except asyncio.CancelledError:
782             # We have been cancelled by _bh_disconnect, exit gracefully.
783             self.logger.debug("Task.%s: cancelled.", name)
784             return
785         except BaseException as err:
786             self.logger.log(
787                 logging.INFO if isinstance(err, EOFError) else logging.ERROR,
788                 "Task.%s: %s",
789                 name, exception_summary(err)
790             )
791             self.logger.debug("Task.%s: failure:\n%s\n",
792                               name, pretty_traceback())
793             self._schedule_disconnect()
794             raise
795         finally:
796             self.logger.debug("Task.%s: exiting.", name)
797
798     @bottom_half
799     async def _bh_send_message(self) -> None:
800         """
801         Wait for an outgoing message, then send it.
802
803         Designed to be run in `_bh_loop_forever()`.
804         """
805         msg = await self._outgoing.get()
806         try:
807             await self._send(msg)
808         finally:
809             self._outgoing.task_done()
810
811     @bottom_half
812     async def _bh_recv_message(self) -> None:
813         """
814         Wait for an incoming message and call `_on_message` to route it.
815
816         Designed to be run in `_bh_loop_forever()`.
817         """
818         msg = await self._recv()
819         await self._on_message(msg)
820
821     # --------------------
822     # Section: Message I/O
823     # --------------------
824
825     @upper_half
826     @bottom_half
827     def _cb_outbound(self, msg: T) -> T:
828         """
829         Callback: outbound message hook.
830
831         This is intended for subclasses to be able to add arbitrary
832         hooks to filter or manipulate outgoing messages. The base
833         implementation does nothing but log the message without any
834         manipulation of the message.
835
836         :param msg: raw outbound message
837         :return: final outbound message
838         """
839         self.logger.debug("--> %s", str(msg))
840         return msg
841
842     @upper_half
843     @bottom_half
844     def _cb_inbound(self, msg: T) -> T:
845         """
846         Callback: inbound message hook.
847
848         This is intended for subclasses to be able to add arbitrary
849         hooks to filter or manipulate incoming messages. The base
850         implementation does nothing but log the message without any
851         manipulation of the message.
852
853         This method does not "handle" incoming messages; it is a filter.
854         The actual "endpoint" for incoming messages is `_on_message()`.
855
856         :param msg: raw inbound message
857         :return: processed inbound message
858         """
859         self.logger.debug("<-- %s", str(msg))
860         return msg
861
862     @upper_half
863     @bottom_half
864     async def _readline(self) -> bytes:
865         """
866         Wait for a newline from the incoming reader.
867
868         This method is provided as a convenience for upper-layer
869         protocols, as many are line-based.
870
871         This method *may* return a sequence of bytes without a trailing
872         newline if EOF occurs, but *some* bytes were received. In this
873         case, the next call will raise `EOFError`. It is assumed that
874         the layer 5 protocol will decide if there is anything meaningful
875         to be done with a partial message.
876
877         :raise OSError: For stream-related errors.
878         :raise EOFError:
879             If the reader stream is at EOF and there are no bytes to return.
880         :return: bytes, including the newline.
881         """
882         assert self._reader is not None
883         msg_bytes = await self._reader.readline()
884
885         if not msg_bytes:
886             if self._reader.at_eof():
887                 raise EOFError
888
889         return msg_bytes
890
891     @upper_half
892     @bottom_half
893     async def _do_recv(self) -> T:
894         """
895         Abstract: Read from the stream and return a message.
896
897         Very low-level; intended to only be called by `_recv()`.
898         """
899         raise NotImplementedError
900
901     @upper_half
902     @bottom_half
903     async def _recv(self) -> T:
904         """
905         Read an arbitrary protocol message.
906
907         .. warning::
908             This method is intended primarily for `_bh_recv_message()`
909             to use in an asynchronous task loop. Using it outside of
910             this loop will "steal" messages from the normal routing
911             mechanism. It is safe to use prior to `_establish_session()`,
912             but should not be used otherwise.
913
914         This method uses `_do_recv()` to retrieve the raw message, and
915         then transforms it using `_cb_inbound()`.
916
917         :return: A single (filtered, processed) protocol message.
918         """
919         message = await self._do_recv()
920         return self._cb_inbound(message)
921
922     @upper_half
923     @bottom_half
924     def _do_send(self, msg: T) -> None:
925         """
926         Abstract: Write a message to the stream.
927
928         Very low-level; intended to only be called by `_send()`.
929         """
930         raise NotImplementedError
931
932     @upper_half
933     @bottom_half
934     async def _send(self, msg: T) -> None:
935         """
936         Send an arbitrary protocol message.
937
938         This method will transform any outgoing messages according to
939         `_cb_outbound()`.
940
941         .. warning::
942             Like `_recv()`, this method is intended to be called by
943             the writer task loop that processes outgoing
944             messages. Calling it directly may circumvent logic
945             implemented by the caller meant to correlate outgoing and
946             incoming messages.
947
948         :raise OSError: For problems with the underlying stream.
949         """
950         msg = self._cb_outbound(msg)
951         self._do_send(msg)
952
953     @bottom_half
954     async def _on_message(self, msg: T) -> None:
955         """
956         Called to handle the receipt of a new message.
957
958         .. caution::
959             This is executed from within the reader loop, so be advised
960             that waiting on either the reader or writer task will lead
961             to deadlock. Additionally, any unhandled exceptions will
962             directly cause the loop to halt, so logic may be best-kept
963             to a minimum if at all possible.
964
965         :param msg: The incoming message, already logged/filtered.
966         """
967         # Nothing to do in the abstract case.
This page took 0.083507 seconds and 4 git commands to generate.