]> Git Repo - qemu.git/blob - python/qemu/aqmp/protocol.py
c4fbe35a0e41c589059ec4fa37a816f4b76a3bd8
[qemu.git] / python / qemu / aqmp / protocol.py
1 """
2 Generic Asynchronous Message-based Protocol Support
3
4 This module provides a generic framework for sending and receiving
5 messages over an asyncio stream. `AsyncProtocol` is an abstract class
6 that implements the core mechanisms of a simple send/receive protocol,
7 and is designed to be extended.
8
9 In this package, it is used as the implementation for the `QMPClient`
10 class.
11 """
12
13 import asyncio
14 from asyncio import StreamReader, StreamWriter
15 from enum import Enum
16 from functools import wraps
17 import logging
18 from ssl import SSLContext
19 from typing import (
20     Any,
21     Awaitable,
22     Callable,
23     Generic,
24     List,
25     Optional,
26     Tuple,
27     TypeVar,
28     Union,
29     cast,
30 )
31
32 from .error import AQMPError
33 from .util import (
34     bottom_half,
35     create_task,
36     exception_summary,
37     flush,
38     is_closing,
39     pretty_traceback,
40     upper_half,
41     wait_closed,
42 )
43
44
45 T = TypeVar('T')
46 _U = TypeVar('_U')
47 _TaskFN = Callable[[], Awaitable[None]]  # aka ``async def func() -> None``
48
49
50 class Runstate(Enum):
51     """Protocol session runstate."""
52
53     #: Fully quiesced and disconnected.
54     IDLE = 0
55     #: In the process of connecting or establishing a session.
56     CONNECTING = 1
57     #: Fully connected and active session.
58     RUNNING = 2
59     #: In the process of disconnecting.
60     #: Runstate may be returned to `IDLE` by calling `disconnect()`.
61     DISCONNECTING = 3
62
63
64 class ConnectError(AQMPError):
65     """
66     Raised when the initial connection process has failed.
67
68     This Exception always wraps a "root cause" exception that can be
69     interrogated for additional information.
70
71     :param error_message: Human-readable string describing the error.
72     :param exc: The root-cause exception.
73     """
74     def __init__(self, error_message: str, exc: Exception):
75         super().__init__(error_message)
76         #: Human-readable error string
77         self.error_message: str = error_message
78         #: Wrapped root cause exception
79         self.exc: Exception = exc
80
81     def __str__(self) -> str:
82         cause = str(self.exc)
83         if not cause:
84             # If there's no error string, use the exception name.
85             cause = exception_summary(self.exc)
86         return f"{self.error_message}: {cause}"
87
88
89 class StateError(AQMPError):
90     """
91     An API command (connect, execute, etc) was issued at an inappropriate time.
92
93     This error is raised when a command like
94     :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
95     time.
96
97     :param error_message: Human-readable string describing the state violation.
98     :param state: The actual `Runstate` seen at the time of the violation.
99     :param required: The `Runstate` required to process this command.
100     """
101     def __init__(self, error_message: str,
102                  state: Runstate, required: Runstate):
103         super().__init__(error_message)
104         self.error_message = error_message
105         self.state = state
106         self.required = required
107
108
109 F = TypeVar('F', bound=Callable[..., Any])  # pylint: disable=invalid-name
110
111
112 # Don't Panic.
113 def require(required_state: Runstate) -> Callable[[F], F]:
114     """
115     Decorator: protect a method so it can only be run in a certain `Runstate`.
116
117     :param required_state: The `Runstate` required to invoke this method.
118     :raise StateError: When the required `Runstate` is not met.
119     """
120     def _decorator(func: F) -> F:
121         # _decorator is the decorator that is built by calling the
122         # require() decorator factory; e.g.:
123         #
124         # @require(Runstate.IDLE) def foo(): ...
125         # will replace 'foo' with the result of '_decorator(foo)'.
126
127         @wraps(func)
128         def _wrapper(proto: 'AsyncProtocol[Any]',
129                      *args: Any, **kwargs: Any) -> Any:
130             # _wrapper is the function that gets executed prior to the
131             # decorated method.
132
133             name = type(proto).__name__
134
135             if proto.runstate != required_state:
136                 if proto.runstate == Runstate.CONNECTING:
137                     emsg = f"{name} is currently connecting."
138                 elif proto.runstate == Runstate.DISCONNECTING:
139                     emsg = (f"{name} is disconnecting."
140                             " Call disconnect() to return to IDLE state.")
141                 elif proto.runstate == Runstate.RUNNING:
142                     emsg = f"{name} is already connected and running."
143                 elif proto.runstate == Runstate.IDLE:
144                     emsg = f"{name} is disconnected and idle."
145                 else:
146                     assert False
147                 raise StateError(emsg, proto.runstate, required_state)
148             # No StateError, so call the wrapped method.
149             return func(proto, *args, **kwargs)
150
151         # Return the decorated method;
152         # Transforming Func to Decorated[Func].
153         return cast(F, _wrapper)
154
155     # Return the decorator instance from the decorator factory. Phew!
156     return _decorator
157
158
159 class AsyncProtocol(Generic[T]):
160     """
161     AsyncProtocol implements a generic async message-based protocol.
162
163     This protocol assumes the basic unit of information transfer between
164     client and server is a "message", the details of which are left up
165     to the implementation. It assumes the sending and receiving of these
166     messages is full-duplex and not necessarily correlated; i.e. it
167     supports asynchronous inbound messages.
168
169     It is designed to be extended by a specific protocol which provides
170     the implementations for how to read and send messages. These must be
171     defined in `_do_recv()` and `_do_send()`, respectively.
172
173     Other callbacks have a default implementation, but are intended to be
174     either extended or overridden:
175
176      - `_establish_session`:
177          The base implementation starts the reader/writer tasks.
178          A protocol implementation can override this call, inserting
179          actions to be taken prior to starting the reader/writer tasks
180          before the super() call; actions needing to occur afterwards
181          can be written after the super() call.
182      - `_on_message`:
183          Actions to be performed when a message is received.
184      - `_cb_outbound`:
185          Logging/Filtering hook for all outbound messages.
186      - `_cb_inbound`:
187          Logging/Filtering hook for all inbound messages.
188          This hook runs *before* `_on_message()`.
189
190     :param name:
191         Name used for logging messages, if any. By default, messages
192         will log to 'qemu.aqmp.protocol', but each individual connection
193         can be given its own logger by giving it a name; messages will
194         then log to 'qemu.aqmp.protocol.${name}'.
195     """
196     # pylint: disable=too-many-instance-attributes
197
198     #: Logger object for debugging messages from this connection.
199     logger = logging.getLogger(__name__)
200
201     # Maximum allowable size of read buffer
202     _limit = (64 * 1024)
203
204     # -------------------------
205     # Section: Public interface
206     # -------------------------
207
208     def __init__(self, name: Optional[str] = None) -> None:
209         #: The nickname for this connection, if any.
210         self.name: Optional[str] = name
211         if self.name is not None:
212             self.logger = self.logger.getChild(self.name)
213
214         # stream I/O
215         self._reader: Optional[StreamReader] = None
216         self._writer: Optional[StreamWriter] = None
217
218         # Outbound Message queue
219         self._outgoing: asyncio.Queue[T]
220
221         # Special, long-running tasks:
222         self._reader_task: Optional[asyncio.Future[None]] = None
223         self._writer_task: Optional[asyncio.Future[None]] = None
224
225         # Aggregate of the above two tasks, used for Exception management.
226         self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
227
228         #: Disconnect task. The disconnect implementation runs in a task
229         #: so that asynchronous disconnects (initiated by the
230         #: reader/writer) are allowed to wait for the reader/writers to
231         #: exit.
232         self._dc_task: Optional[asyncio.Future[None]] = None
233
234         self._runstate = Runstate.IDLE
235         self._runstate_changed: Optional[asyncio.Event] = None
236
237     def __repr__(self) -> str:
238         cls_name = type(self).__name__
239         tokens = []
240         if self.name is not None:
241             tokens.append(f"name={self.name!r}")
242         tokens.append(f"runstate={self.runstate.name}")
243         return f"<{cls_name} {' '.join(tokens)}>"
244
245     @property  # @upper_half
246     def runstate(self) -> Runstate:
247         """The current `Runstate` of the connection."""
248         return self._runstate
249
250     @upper_half
251     async def runstate_changed(self) -> Runstate:
252         """
253         Wait for the `runstate` to change, then return that runstate.
254         """
255         await self._runstate_event.wait()
256         return self.runstate
257
258     @upper_half
259     @require(Runstate.IDLE)
260     async def accept(self, address: Union[str, Tuple[str, int]],
261                      ssl: Optional[SSLContext] = None) -> None:
262         """
263         Accept a connection and begin processing message queues.
264
265         If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
266
267         :param address:
268             Address to listen to; UNIX socket path or TCP address/port.
269         :param ssl: SSL context to use, if any.
270
271         :raise StateError: When the `Runstate` is not `IDLE`.
272         :raise ConnectError: If a connection could not be accepted.
273         """
274         await self._new_session(address, ssl, accept=True)
275
276     @upper_half
277     @require(Runstate.IDLE)
278     async def connect(self, address: Union[str, Tuple[str, int]],
279                       ssl: Optional[SSLContext] = None) -> None:
280         """
281         Connect to the server and begin processing message queues.
282
283         If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
284
285         :param address:
286             Address to connect to; UNIX socket path or TCP address/port.
287         :param ssl: SSL context to use, if any.
288
289         :raise StateError: When the `Runstate` is not `IDLE`.
290         :raise ConnectError: If a connection cannot be made to the server.
291         """
292         await self._new_session(address, ssl)
293
294     @upper_half
295     async def disconnect(self) -> None:
296         """
297         Disconnect and wait for all tasks to fully stop.
298
299         If there was an exception that caused the reader/writers to
300         terminate prematurely, it will be raised here.
301
302         :raise Exception: When the reader or writer terminate unexpectedly.
303         """
304         self.logger.debug("disconnect() called.")
305         self._schedule_disconnect()
306         await self._wait_disconnect()
307
308     # --------------------------
309     # Section: Session machinery
310     # --------------------------
311
312     @property
313     def _runstate_event(self) -> asyncio.Event:
314         # asyncio.Event() objects should not be created prior to entrance into
315         # an event loop, so we can ensure we create it in the correct context.
316         # Create it on-demand *only* at the behest of an 'async def' method.
317         if not self._runstate_changed:
318             self._runstate_changed = asyncio.Event()
319         return self._runstate_changed
320
321     @upper_half
322     @bottom_half
323     def _set_state(self, state: Runstate) -> None:
324         """
325         Change the `Runstate` of the protocol connection.
326
327         Signals the `runstate_changed` event.
328         """
329         if state == self._runstate:
330             return
331
332         self.logger.debug("Transitioning from '%s' to '%s'.",
333                           str(self._runstate), str(state))
334         self._runstate = state
335         self._runstate_event.set()
336         self._runstate_event.clear()
337
338     @upper_half
339     async def _new_session(self,
340                            address: Union[str, Tuple[str, int]],
341                            ssl: Optional[SSLContext] = None,
342                            accept: bool = False) -> None:
343         """
344         Establish a new connection and initialize the session.
345
346         Connect or accept a new connection, then begin the protocol
347         session machinery. If this call fails, `runstate` is guaranteed
348         to be set back to `IDLE`.
349
350         :param address:
351             Address to connect to/listen on;
352             UNIX socket path or TCP address/port.
353         :param ssl: SSL context to use, if any.
354         :param accept: Accept a connection instead of connecting when `True`.
355
356         :raise ConnectError:
357             When a connection or session cannot be established.
358
359             This exception will wrap a more concrete one. In most cases,
360             the wrapped exception will be `OSError` or `EOFError`. If a
361             protocol-level failure occurs while establishing a new
362             session, the wrapped error may also be an `AQMPError`.
363         """
364         assert self.runstate == Runstate.IDLE
365
366         try:
367             phase = "connection"
368             await self._establish_connection(address, ssl, accept)
369
370             phase = "session"
371             await self._establish_session()
372
373         except BaseException as err:
374             emsg = f"Failed to establish {phase}"
375             self.logger.error("%s: %s", emsg, exception_summary(err))
376             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
377             try:
378                 # Reset from CONNECTING back to IDLE.
379                 await self.disconnect()
380             except:
381                 emsg = "Unexpected bottom half exception"
382                 self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
383                 raise
384
385             # NB: CancelledError is not a BaseException before Python 3.8
386             if isinstance(err, asyncio.CancelledError):
387                 raise
388
389             if isinstance(err, Exception):
390                 raise ConnectError(emsg, err) from err
391
392             # Raise BaseExceptions un-wrapped, they're more important.
393             raise
394
395         assert self.runstate == Runstate.RUNNING
396
397     @upper_half
398     async def _establish_connection(
399             self,
400             address: Union[str, Tuple[str, int]],
401             ssl: Optional[SSLContext] = None,
402             accept: bool = False
403     ) -> None:
404         """
405         Establish a new connection.
406
407         :param address:
408             Address to connect to/listen on;
409             UNIX socket path or TCP address/port.
410         :param ssl: SSL context to use, if any.
411         :param accept: Accept a connection instead of connecting when `True`.
412         """
413         assert self.runstate == Runstate.IDLE
414         self._set_state(Runstate.CONNECTING)
415
416         # Allow runstate watchers to witness 'CONNECTING' state; some
417         # failures in the streaming layer are synchronous and will not
418         # otherwise yield.
419         await asyncio.sleep(0)
420
421         if accept:
422             await self._do_accept(address, ssl)
423         else:
424             await self._do_connect(address, ssl)
425
426     @upper_half
427     async def _do_accept(self, address: Union[str, Tuple[str, int]],
428                          ssl: Optional[SSLContext] = None) -> None:
429         """
430         Acting as the transport server, accept a single connection.
431
432         :param address:
433             Address to listen on; UNIX socket path or TCP address/port.
434         :param ssl: SSL context to use, if any.
435
436         :raise OSError: For stream-related errors.
437         """
438         self.logger.debug("Awaiting connection on %s ...", address)
439         connected = asyncio.Event()
440         server: Optional[asyncio.AbstractServer] = None
441
442         async def _client_connected_cb(reader: asyncio.StreamReader,
443                                        writer: asyncio.StreamWriter) -> None:
444             """Used to accept a single incoming connection, see below."""
445             nonlocal server
446             nonlocal connected
447
448             # A connection has been accepted; stop listening for new ones.
449             assert server is not None
450             server.close()
451             await server.wait_closed()
452             server = None
453
454             # Register this client as being connected
455             self._reader, self._writer = (reader, writer)
456
457             # Signal back: We've accepted a client!
458             connected.set()
459
460         if isinstance(address, tuple):
461             coro = asyncio.start_server(
462                 _client_connected_cb,
463                 host=address[0],
464                 port=address[1],
465                 ssl=ssl,
466                 backlog=1,
467                 limit=self._limit,
468             )
469         else:
470             coro = asyncio.start_unix_server(
471                 _client_connected_cb,
472                 path=address,
473                 ssl=ssl,
474                 backlog=1,
475                 limit=self._limit,
476             )
477
478         server = await coro     # Starts listening
479         await connected.wait()  # Waits for the callback to fire (and finish)
480         assert server is None
481
482         self.logger.debug("Connection accepted.")
483
484     @upper_half
485     async def _do_connect(self, address: Union[str, Tuple[str, int]],
486                           ssl: Optional[SSLContext] = None) -> None:
487         """
488         Acting as the transport client, initiate a connection to a server.
489
490         :param address:
491             Address to connect to; UNIX socket path or TCP address/port.
492         :param ssl: SSL context to use, if any.
493
494         :raise OSError: For stream-related errors.
495         """
496         self.logger.debug("Connecting to %s ...", address)
497
498         if isinstance(address, tuple):
499             connect = asyncio.open_connection(
500                 address[0],
501                 address[1],
502                 ssl=ssl,
503                 limit=self._limit,
504             )
505         else:
506             connect = asyncio.open_unix_connection(
507                 path=address,
508                 ssl=ssl,
509                 limit=self._limit,
510             )
511         self._reader, self._writer = await connect
512
513         self.logger.debug("Connected.")
514
515     @upper_half
516     async def _establish_session(self) -> None:
517         """
518         Establish a new session.
519
520         Starts the readers/writer tasks; subclasses may perform their
521         own negotiations here. The Runstate will be RUNNING upon
522         successful conclusion.
523         """
524         assert self.runstate == Runstate.CONNECTING
525
526         self._outgoing = asyncio.Queue()
527
528         reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
529         writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
530
531         self._reader_task = create_task(reader_coro)
532         self._writer_task = create_task(writer_coro)
533
534         self._bh_tasks = asyncio.gather(
535             self._reader_task,
536             self._writer_task,
537         )
538
539         self._set_state(Runstate.RUNNING)
540         await asyncio.sleep(0)  # Allow runstate_event to process
541
542     @upper_half
543     @bottom_half
544     def _schedule_disconnect(self) -> None:
545         """
546         Initiate a disconnect; idempotent.
547
548         This method is used both in the upper-half as a direct
549         consequence of `disconnect()`, and in the bottom-half in the
550         case of unhandled exceptions in the reader/writer tasks.
551
552         It can be invoked no matter what the `runstate` is.
553         """
554         if not self._dc_task:
555             self._set_state(Runstate.DISCONNECTING)
556             self.logger.debug("Scheduling disconnect.")
557             self._dc_task = create_task(self._bh_disconnect())
558
559     @upper_half
560     async def _wait_disconnect(self) -> None:
561         """
562         Waits for a previously scheduled disconnect to finish.
563
564         This method will gather any bottom half exceptions and re-raise
565         the one that occurred first; presuming it to be the root cause
566         of any subsequent Exceptions. It is intended to be used in the
567         upper half of the call chain.
568
569         :raise Exception:
570             Arbitrary exception re-raised on behalf of the reader/writer.
571         """
572         assert self.runstate == Runstate.DISCONNECTING
573         assert self._dc_task
574
575         aws: List[Awaitable[object]] = [self._dc_task]
576         if self._bh_tasks:
577             aws.insert(0, self._bh_tasks)
578         all_defined_tasks = asyncio.gather(*aws)
579
580         # Ensure disconnect is done; Exception (if any) is not raised here:
581         await asyncio.wait((self._dc_task,))
582
583         try:
584             await all_defined_tasks  # Raise Exceptions from the bottom half.
585         finally:
586             self._cleanup()
587             self._set_state(Runstate.IDLE)
588
589     @upper_half
590     def _cleanup(self) -> None:
591         """
592         Fully reset this object to a clean state and return to `IDLE`.
593         """
594         def _paranoid_task_erase(task: Optional['asyncio.Future[_U]']
595                                  ) -> Optional['asyncio.Future[_U]']:
596             # Help to erase a task, ENSURING it is fully quiesced first.
597             assert (task is None) or task.done()
598             return None if (task and task.done()) else task
599
600         assert self.runstate == Runstate.DISCONNECTING
601         self._dc_task = _paranoid_task_erase(self._dc_task)
602         self._reader_task = _paranoid_task_erase(self._reader_task)
603         self._writer_task = _paranoid_task_erase(self._writer_task)
604         self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
605
606         self._reader = None
607         self._writer = None
608
609         # NB: _runstate_changed cannot be cleared because we still need it to
610         # send the final runstate changed event ...!
611
612     # ----------------------------
613     # Section: Bottom Half methods
614     # ----------------------------
615
616     @bottom_half
617     async def _bh_disconnect(self) -> None:
618         """
619         Disconnect and cancel all outstanding tasks.
620
621         It is designed to be called from its task context,
622         :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
623         it is free to wait on any pending actions that may still need to
624         occur in either the reader or writer tasks.
625         """
626         assert self.runstate == Runstate.DISCONNECTING
627
628         def _done(task: Optional['asyncio.Future[Any]']) -> bool:
629             return task is not None and task.done()
630
631         # Are we already in an error pathway? If either of the tasks are
632         # already done, or if we have no tasks but a reader/writer; we
633         # must be.
634         #
635         # NB: We can't use _bh_tasks to check for premature task
636         # completion, because it may not yet have had a chance to run
637         # and gather itself.
638         tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
639         error_pathway = _done(self._reader_task) or _done(self._writer_task)
640         if not tasks:
641             error_pathway |= bool(self._reader) or bool(self._writer)
642
643         try:
644             # Try to flush the writer, if possible.
645             # This *may* cause an error and force us over into the error path.
646             if not error_pathway:
647                 await self._bh_flush_writer()
648         except BaseException as err:
649             error_pathway = True
650             emsg = "Failed to flush the writer"
651             self.logger.error("%s: %s", emsg, exception_summary(err))
652             self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
653             raise
654         finally:
655             # Cancel any still-running tasks (Won't raise):
656             if self._writer_task is not None and not self._writer_task.done():
657                 self.logger.debug("Cancelling writer task.")
658                 self._writer_task.cancel()
659             if self._reader_task is not None and not self._reader_task.done():
660                 self.logger.debug("Cancelling reader task.")
661                 self._reader_task.cancel()
662
663             # Close out the tasks entirely (Won't raise):
664             if tasks:
665                 self.logger.debug("Waiting for tasks to complete ...")
666                 await asyncio.wait(tasks)
667
668             # Lastly, close the stream itself. (*May raise*!):
669             await self._bh_close_stream(error_pathway)
670             self.logger.debug("Disconnected.")
671
672     @bottom_half
673     async def _bh_flush_writer(self) -> None:
674         if not self._writer_task:
675             return
676
677         self.logger.debug("Draining the outbound queue ...")
678         await self._outgoing.join()
679         if self._writer is not None:
680             self.logger.debug("Flushing the StreamWriter ...")
681             await flush(self._writer)
682
683     @bottom_half
684     async def _bh_close_stream(self, error_pathway: bool = False) -> None:
685         # NB: Closing the writer also implcitly closes the reader.
686         if not self._writer:
687             return
688
689         if not is_closing(self._writer):
690             self.logger.debug("Closing StreamWriter.")
691             self._writer.close()
692
693         self.logger.debug("Waiting for StreamWriter to close ...")
694         try:
695             await wait_closed(self._writer)
696         except Exception:  # pylint: disable=broad-except
697             # It's hard to tell if the Stream is already closed or
698             # not. Even if one of the tasks has failed, it may have
699             # failed for a higher-layered protocol reason. The
700             # stream could still be open and perfectly fine.
701             # I don't know how to discern its health here.
702
703             if error_pathway:
704                 # We already know that *something* went wrong. Let's
705                 # just trust that the Exception we already have is the
706                 # better one to present to the user, even if we don't
707                 # genuinely *know* the relationship between the two.
708                 self.logger.debug(
709                     "Discarding Exception from wait_closed:\n%s\n",
710                     pretty_traceback(),
711                 )
712             else:
713                 # Oops, this is a brand-new error!
714                 raise
715         finally:
716             self.logger.debug("StreamWriter closed.")
717
718     @bottom_half
719     async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
720         """
721         Run one of the bottom-half methods in a loop forever.
722
723         If the bottom half ever raises any exception, schedule a
724         disconnect that will terminate the entire loop.
725
726         :param async_fn: The bottom-half method to run in a loop.
727         :param name: The name of this task, used for logging.
728         """
729         try:
730             while True:
731                 await async_fn()
732         except asyncio.CancelledError:
733             # We have been cancelled by _bh_disconnect, exit gracefully.
734             self.logger.debug("Task.%s: cancelled.", name)
735             return
736         except BaseException as err:
737             self.logger.log(
738                 logging.INFO if isinstance(err, EOFError) else logging.ERROR,
739                 "Task.%s: %s",
740                 name, exception_summary(err)
741             )
742             self.logger.debug("Task.%s: failure:\n%s\n",
743                               name, pretty_traceback())
744             self._schedule_disconnect()
745             raise
746         finally:
747             self.logger.debug("Task.%s: exiting.", name)
748
749     @bottom_half
750     async def _bh_send_message(self) -> None:
751         """
752         Wait for an outgoing message, then send it.
753
754         Designed to be run in `_bh_loop_forever()`.
755         """
756         msg = await self._outgoing.get()
757         try:
758             await self._send(msg)
759         finally:
760             self._outgoing.task_done()
761
762     @bottom_half
763     async def _bh_recv_message(self) -> None:
764         """
765         Wait for an incoming message and call `_on_message` to route it.
766
767         Designed to be run in `_bh_loop_forever()`.
768         """
769         msg = await self._recv()
770         await self._on_message(msg)
771
772     # --------------------
773     # Section: Message I/O
774     # --------------------
775
776     @upper_half
777     @bottom_half
778     def _cb_outbound(self, msg: T) -> T:
779         """
780         Callback: outbound message hook.
781
782         This is intended for subclasses to be able to add arbitrary
783         hooks to filter or manipulate outgoing messages. The base
784         implementation does nothing but log the message without any
785         manipulation of the message.
786
787         :param msg: raw outbound message
788         :return: final outbound message
789         """
790         self.logger.debug("--> %s", str(msg))
791         return msg
792
793     @upper_half
794     @bottom_half
795     def _cb_inbound(self, msg: T) -> T:
796         """
797         Callback: inbound message hook.
798
799         This is intended for subclasses to be able to add arbitrary
800         hooks to filter or manipulate incoming messages. The base
801         implementation does nothing but log the message without any
802         manipulation of the message.
803
804         This method does not "handle" incoming messages; it is a filter.
805         The actual "endpoint" for incoming messages is `_on_message()`.
806
807         :param msg: raw inbound message
808         :return: processed inbound message
809         """
810         self.logger.debug("<-- %s", str(msg))
811         return msg
812
813     @upper_half
814     @bottom_half
815     async def _readline(self) -> bytes:
816         """
817         Wait for a newline from the incoming reader.
818
819         This method is provided as a convenience for upper-layer
820         protocols, as many are line-based.
821
822         This method *may* return a sequence of bytes without a trailing
823         newline if EOF occurs, but *some* bytes were received. In this
824         case, the next call will raise `EOFError`. It is assumed that
825         the layer 5 protocol will decide if there is anything meaningful
826         to be done with a partial message.
827
828         :raise OSError: For stream-related errors.
829         :raise EOFError:
830             If the reader stream is at EOF and there are no bytes to return.
831         :return: bytes, including the newline.
832         """
833         assert self._reader is not None
834         msg_bytes = await self._reader.readline()
835
836         if not msg_bytes:
837             if self._reader.at_eof():
838                 raise EOFError
839
840         return msg_bytes
841
842     @upper_half
843     @bottom_half
844     async def _do_recv(self) -> T:
845         """
846         Abstract: Read from the stream and return a message.
847
848         Very low-level; intended to only be called by `_recv()`.
849         """
850         raise NotImplementedError
851
852     @upper_half
853     @bottom_half
854     async def _recv(self) -> T:
855         """
856         Read an arbitrary protocol message.
857
858         .. warning::
859             This method is intended primarily for `_bh_recv_message()`
860             to use in an asynchronous task loop. Using it outside of
861             this loop will "steal" messages from the normal routing
862             mechanism. It is safe to use prior to `_establish_session()`,
863             but should not be used otherwise.
864
865         This method uses `_do_recv()` to retrieve the raw message, and
866         then transforms it using `_cb_inbound()`.
867
868         :return: A single (filtered, processed) protocol message.
869         """
870         message = await self._do_recv()
871         return self._cb_inbound(message)
872
873     @upper_half
874     @bottom_half
875     def _do_send(self, msg: T) -> None:
876         """
877         Abstract: Write a message to the stream.
878
879         Very low-level; intended to only be called by `_send()`.
880         """
881         raise NotImplementedError
882
883     @upper_half
884     @bottom_half
885     async def _send(self, msg: T) -> None:
886         """
887         Send an arbitrary protocol message.
888
889         This method will transform any outgoing messages according to
890         `_cb_outbound()`.
891
892         .. warning::
893             Like `_recv()`, this method is intended to be called by
894             the writer task loop that processes outgoing
895             messages. Calling it directly may circumvent logic
896             implemented by the caller meant to correlate outgoing and
897             incoming messages.
898
899         :raise OSError: For problems with the underlying stream.
900         """
901         msg = self._cb_outbound(msg)
902         self._do_send(msg)
903
904     @bottom_half
905     async def _on_message(self, msg: T) -> None:
906         """
907         Called to handle the receipt of a new message.
908
909         .. caution::
910             This is executed from within the reader loop, so be advised
911             that waiting on either the reader or writer task will lead
912             to deadlock. Additionally, any unhandled exceptions will
913             directly cause the loop to halt, so logic may be best-kept
914             to a minimum if at all possible.
915
916         :param msg: The incoming message, already logged/filtered.
917         """
918         # Nothing to do in the abstract case.
This page took 0.103569 seconds and 2 git commands to generate.