2 QEMU Console Socket Module:
4 This python module implements a ConsoleSocket object,
5 which can drain a socket and optionally dump the bytes to file.
7 # Copyright 2020 Linaro
12 # This code is licensed under the GPL version 2 or later. See
13 # the COPYING file in the top-level directory.
16 from collections import deque
20 from typing import Deque, Optional
23 class ConsoleSocket(socket.socket):
25 ConsoleSocket represents a socket attached to a char device.
27 Optionally (if drain==True), drains the socket and places the bytes
28 into an in memory buffer for later processing.
30 Optionally a file path can be passed in and we will also
31 dump the characters to this file for debugging purposes.
33 def __init__(self, address: str, file: Optional[str] = None,
35 self._recv_timeout_sec = 300.0
36 self._sleep_time = 0.5
37 self._buffer: Deque[int] = deque()
38 socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
42 # pylint: disable=consider-using-with
43 self._logfile = open(file, "bw")
45 self._drain_thread = None
47 self._drain_thread = self._thread_start()
49 def __repr__(self) -> str:
50 tmp = super().__repr__()
52 tmp = "%s, logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
56 def _drain_fn(self) -> None:
57 """Drains the socket and runs while the socket is open."""
61 except socket.timeout:
62 # The socket is expected to timeout since we set a
63 # short timeout to allow the thread to exit when
64 # self._open is set to False.
65 time.sleep(self._sleep_time)
67 def _thread_start(self) -> threading.Thread:
68 """Kick off a thread to drain the socket."""
69 # Configure socket to not block and timeout.
70 # This allows our drain thread to not block
71 # on recieve and exit smoothly.
72 socket.socket.setblocking(self, False)
73 socket.socket.settimeout(self, 1)
74 drain_thread = threading.Thread(target=self._drain_fn)
75 drain_thread.daemon = True
79 def close(self) -> None:
80 """Close the base object and wait for the thread to terminate"""
83 if self._drain_thread is not None:
84 thread, self._drain_thread = self._drain_thread, None
86 socket.socket.close(self)
91 def _drain_socket(self) -> None:
92 """process arriving characters into in memory _buffer"""
93 data = socket.socket.recv(self, 1)
95 self._logfile.write(data)
97 self._buffer.extend(data)
99 def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
100 """Return chars from in memory buffer.
101 Maintains the same API as socket.socket.recv.
103 if self._drain_thread is None:
104 # Not buffering the socket, pass thru to socket.
105 return socket.socket.recv(self, bufsize, flags)
106 assert not flags, "Cannot pass flags to recv() in drained mode"
107 start_time = time.time()
108 while len(self._buffer) < bufsize:
109 time.sleep(self._sleep_time)
110 elapsed_sec = time.time() - start_time
111 if elapsed_sec > self._recv_timeout_sec:
113 return bytes((self._buffer.popleft() for i in range(bufsize)))
115 def setblocking(self, value: bool) -> None:
116 """When not draining we pass thru to the socket,
117 since when draining we control socket blocking.
119 if self._drain_thread is None:
120 socket.socket.setblocking(self, value)
122 def settimeout(self, value: Optional[float]) -> None:
123 """When not draining we pass thru to the socket,
124 since when draining we control the timeout.
126 if value is not None:
127 self._recv_timeout_sec = value
128 if self._drain_thread is None:
129 socket.socket.settimeout(self, value)