]> Git Repo - qemu.git/blob - python/qemu/machine/console_socket.py
Merge remote-tracking branch 'remotes/pmaydell/tags/pull-target-arm-20210921' into...
[qemu.git] / python / qemu / machine / console_socket.py
1 """
2 QEMU Console Socket Module:
3
4 This python module implements a ConsoleSocket object,
5 which can drain a socket and optionally dump the bytes to file.
6 """
7 # Copyright 2020 Linaro
8 #
9 # Authors:
10 #  Robert Foley <[email protected]>
11 #
12 # This code is licensed under the GPL version 2 or later.  See
13 # the COPYING file in the top-level directory.
14 #
15
16 from collections import deque
17 import socket
18 import threading
19 import time
20 from typing import Deque, Optional
21
22
23 class ConsoleSocket(socket.socket):
24     """
25     ConsoleSocket represents a socket attached to a char device.
26
27     Optionally (if drain==True), drains the socket and places the bytes
28     into an in memory buffer for later processing.
29
30     Optionally a file path can be passed in and we will also
31     dump the characters to this file for debugging purposes.
32     """
33     def __init__(self, address: str, file: Optional[str] = None,
34                  drain: bool = False):
35         self._recv_timeout_sec = 300.0
36         self._sleep_time = 0.5
37         self._buffer: Deque[int] = deque()
38         socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
39         self.connect(address)
40         self._logfile = None
41         if file:
42             # pylint: disable=consider-using-with
43             self._logfile = open(file, "bw")
44         self._open = True
45         self._drain_thread = None
46         if drain:
47             self._drain_thread = self._thread_start()
48
49     def __repr__(self) -> str:
50         tmp = super().__repr__()
51         tmp = tmp.rstrip(">")
52         tmp = "%s,  logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
53                                                      self._drain_thread)
54         return tmp
55
56     def _drain_fn(self) -> None:
57         """Drains the socket and runs while the socket is open."""
58         while self._open:
59             try:
60                 self._drain_socket()
61             except socket.timeout:
62                 # The socket is expected to timeout since we set a
63                 # short timeout to allow the thread to exit when
64                 # self._open is set to False.
65                 time.sleep(self._sleep_time)
66
67     def _thread_start(self) -> threading.Thread:
68         """Kick off a thread to drain the socket."""
69         # Configure socket to not block and timeout.
70         # This allows our drain thread to not block
71         # on recieve and exit smoothly.
72         socket.socket.setblocking(self, False)
73         socket.socket.settimeout(self, 1)
74         drain_thread = threading.Thread(target=self._drain_fn)
75         drain_thread.daemon = True
76         drain_thread.start()
77         return drain_thread
78
79     def close(self) -> None:
80         """Close the base object and wait for the thread to terminate"""
81         if self._open:
82             self._open = False
83             if self._drain_thread is not None:
84                 thread, self._drain_thread = self._drain_thread, None
85                 thread.join()
86             socket.socket.close(self)
87             if self._logfile:
88                 self._logfile.close()
89                 self._logfile = None
90
91     def _drain_socket(self) -> None:
92         """process arriving characters into in memory _buffer"""
93         data = socket.socket.recv(self, 1)
94         if self._logfile:
95             self._logfile.write(data)
96             self._logfile.flush()
97         self._buffer.extend(data)
98
99     def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
100         """Return chars from in memory buffer.
101            Maintains the same API as socket.socket.recv.
102         """
103         if self._drain_thread is None:
104             # Not buffering the socket, pass thru to socket.
105             return socket.socket.recv(self, bufsize, flags)
106         assert not flags, "Cannot pass flags to recv() in drained mode"
107         start_time = time.time()
108         while len(self._buffer) < bufsize:
109             time.sleep(self._sleep_time)
110             elapsed_sec = time.time() - start_time
111             if elapsed_sec > self._recv_timeout_sec:
112                 raise socket.timeout
113         return bytes((self._buffer.popleft() for i in range(bufsize)))
114
115     def setblocking(self, value: bool) -> None:
116         """When not draining we pass thru to the socket,
117            since when draining we control socket blocking.
118         """
119         if self._drain_thread is None:
120             socket.socket.setblocking(self, value)
121
122     def settimeout(self, value: Optional[float]) -> None:
123         """When not draining we pass thru to the socket,
124            since when draining we control the timeout.
125         """
126         if value is not None:
127             self._recv_timeout_sec = value
128         if self._drain_thread is None:
129             socket.socket.settimeout(self, value)
This page took 0.030642 seconds and 4 git commands to generate.