2 (Legacy) Sync QMP Wrapper
4 This module provides the `QEMUMonitorProtocol` class, which is a
5 synchronous wrapper around `QMPClient`.
7 Its design closely resembles that of the original QEMUMonitorProtocol
8 class, originally written by Luiz Capitulino. It is provided here for
9 compatibility with scripts inside the QEMU source tree that expect the
14 # Copyright (C) 2009-2022 Red Hat Inc.
20 # This work is licensed under the terms of the GNU GPL, version 2. See
21 # the COPYING file in the top-level directory.
25 from types import TracebackType
37 from .error import QMPError
38 from .protocol import Runstate, SocketAddrT
39 from .qmp_client import QMPClient
42 #: QMPMessage is an entire QMP message of any kind.
43 QMPMessage = Dict[str, Any]
45 #: QMPReturnValue is the 'return' value of a command.
46 QMPReturnValue = object
48 #: QMPObject is any object in a QMP message.
49 QMPObject = Dict[str, object]
51 # QMPMessage can be outgoing commands or incoming events/returns.
52 # QMPReturnValue is usually a dict/json object, but due to QAPI's
53 # 'returns-whitelist', it can actually be anything.
55 # {'return': {}} is a QMPMessage,
56 # {} is the QMPReturnValue.
59 class QMPBadPortError(QMPError):
61 Unable to parse socket address: Port was non-numerical.
65 class QEMUMonitorProtocol:
67 Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP)
68 and then allow to handle commands and events.
70 :param address: QEMU address, can be either a unix socket path (string)
71 or a tuple in the form ( address, port ) for a TCP
73 :param server: Act as the socket server. (See 'accept')
74 :param nickname: Optional nickname used for logging.
77 def __init__(self, address: SocketAddrT,
79 nickname: Optional[str] = None):
81 self._qmp = QMPClient(nickname)
82 self._aloop = asyncio.get_event_loop()
83 self._address = address
84 self._timeout: Optional[float] = None
87 self._sync(self._qmp.start_server(self._address))
92 self, future: Awaitable[_T], timeout: Optional[float] = None
94 return self._aloop.run_until_complete(
95 asyncio.wait_for(future, timeout=timeout)
98 def _get_greeting(self) -> Optional[QMPMessage]:
99 if self._qmp.greeting is not None:
100 # pylint: disable=protected-access
101 return self._qmp.greeting._asdict()
104 def __enter__(self: _T) -> _T:
105 # Implement context manager enter function.
109 # pylint: disable=duplicate-code
110 # see https://github.com/PyCQA/pylint/issues/3619
111 exc_type: Optional[Type[BaseException]],
112 exc_val: Optional[BaseException],
113 exc_tb: Optional[TracebackType]) -> None:
114 # Implement context manager exit function.
118 def parse_address(cls, address: str) -> SocketAddrT:
120 Parse a string into a QMP address.
122 Figure out if the argument is in the port:host form.
123 If it's not, it's probably a file path.
125 components = address.split(':')
126 if len(components) == 2:
128 port = int(components[1])
130 msg = f"Bad port: '{components[1]}' in '{address}'."
131 raise QMPBadPortError(msg) from None
132 return (components[0], port)
137 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
139 Connect to the QMP Monitor and perform capabilities negotiation.
141 :return: QMP greeting dict, or None if negotiate is false
142 :raise ConnectError: on connection errors
144 self._qmp.await_greeting = negotiate
145 self._qmp.negotiate = negotiate
148 self._qmp.connect(self._address)
150 return self._get_greeting()
152 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
154 Await connection from QMP Monitor and perform capabilities negotiation.
157 timeout in seconds (nonnegative float number, or None).
158 If None, there is no timeout, and this may block forever.
160 :return: QMP greeting dict
161 :raise ConnectError: on connection errors
163 self._qmp.await_greeting = True
164 self._qmp.negotiate = True
166 self._sync(self._qmp.accept(), timeout)
168 ret = self._get_greeting()
169 assert ret is not None
172 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
174 Send a QMP command to the QMP Monitor.
176 :param qmp_cmd: QMP command to be sent as a Python dict
177 :return: QMP response as a Python dict
181 # pylint: disable=protected-access
183 # _raw() isn't a public API, because turning off
184 # automatic ID assignment is discouraged. For
185 # compatibility with iotests *only*, do it anyway.
186 self._qmp._raw(qmp_cmd, assign_id=False),
191 def cmd(self, name: str,
192 args: Optional[Dict[str, object]] = None,
193 cmd_id: Optional[object] = None) -> QMPMessage:
195 Build a QMP command and send it to the QMP Monitor.
197 :param name: command name (string)
198 :param args: command arguments (dict)
199 :param cmd_id: command id (dict, list, string or int)
201 qmp_cmd: QMPMessage = {'execute': name}
203 qmp_cmd['arguments'] = args
205 qmp_cmd['id'] = cmd_id
206 return self.cmd_obj(qmp_cmd)
208 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
210 Build and send a QMP command to the monitor, report errors if any
213 self._qmp.execute(cmd, kwds),
218 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
220 Pulls a single event.
223 If False or 0, do not wait. Return None if no events ready.
224 If True, wait forever until the next event.
225 Otherwise, wait for the specified number of seconds.
227 :raise asyncio.TimeoutError:
228 When a timeout is requested and the timeout period elapses.
230 :return: The first available QMP event, or None.
233 # wait is False/0: "do not wait, do not except."
234 if self._qmp.events.empty():
237 # If wait is 'True', wait forever. If wait is False/0, the events
238 # queue must not be empty; but it still needs some real amount
239 # of time to complete.
241 if wait and isinstance(wait, float):
246 self._qmp.events.get(),
251 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
253 Get a list of QMP events and clear all pending events.
256 If False or 0, do not wait. Return None if no events ready.
257 If True, wait until we have at least one event.
258 Otherwise, wait for up to the specified number of seconds for at
261 :raise asyncio.TimeoutError:
262 When a timeout is requested and the timeout period elapses.
264 :return: A list of QMP events.
266 events = [dict(x) for x in self._qmp.events.clear()]
270 event = self.pull_event(wait)
271 return [event] if event is not None else []
273 def clear_events(self) -> None:
274 """Clear current list of pending events."""
275 self._qmp.events.clear()
277 def close(self) -> None:
278 """Close the connection."""
280 self._qmp.disconnect()
283 def settimeout(self, timeout: Optional[float]) -> None:
285 Set the timeout for QMP RPC execution.
287 This timeout affects the `cmd`, `cmd_obj`, and `command` methods.
288 The `accept`, `pull_event` and `get_event` methods have their
289 own configurable timeouts.
292 timeout in seconds, or None.
293 None will wait indefinitely.
295 self._timeout = timeout
297 def send_fd_scm(self, fd: int) -> None:
299 Send a file descriptor to the remote via SCM_RIGHTS.
301 self._qmp.send_fd_scm(fd)
303 def __del__(self) -> None:
304 if self._qmp.runstate == Runstate.IDLE:
307 if not self._aloop.is_running():
310 # Garbage collection ran while the event loop was running.
311 # Nothing we can do about it now, but if we don't raise our
312 # own error, the user will be treated to a lot of traceback
313 # they might not understand.
315 "QEMUMonitorProtocol.close()"
316 " was not called before object was garbage collected"