4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
8 # Copyright (C) 2009-2022 Red Hat Inc.
14 # This work is licensed under the terms of the GNU GPL, version 2. See
15 # the COPYING file in the top-level directory.
19 from types import TracebackType
31 from .error import QMPError
32 from .protocol import Runstate, SocketAddrT
33 from .qmp_client import QMPClient
36 #: QMPMessage is an entire QMP message of any kind.
37 QMPMessage = Dict[str, Any]
39 #: QMPReturnValue is the 'return' value of a command.
40 QMPReturnValue = object
42 #: QMPObject is any object in a QMP message.
43 QMPObject = Dict[str, object]
45 # QMPMessage can be outgoing commands or incoming events/returns.
46 # QMPReturnValue is usually a dict/json object, but due to QAPI's
47 # 'returns-whitelist', it can actually be anything.
49 # {'return': {}} is a QMPMessage,
50 # {} is the QMPReturnValue.
53 # pylint: disable=missing-docstring
56 class QMPBadPortError(QMPError):
58 Unable to parse socket address: Port was non-numerical.
62 class QEMUMonitorProtocol:
63 def __init__(self, address: SocketAddrT,
65 nickname: Optional[str] = None):
67 self._aqmp = QMPClient(nickname)
68 self._aloop = asyncio.get_event_loop()
69 self._address = address
70 self._timeout: Optional[float] = None
73 self._sync(self._aqmp.start_server(self._address))
78 self, future: Awaitable[_T], timeout: Optional[float] = None
80 return self._aloop.run_until_complete(
81 asyncio.wait_for(future, timeout=timeout)
84 def _get_greeting(self) -> Optional[QMPMessage]:
85 if self._aqmp.greeting is not None:
86 # pylint: disable=protected-access
87 return self._aqmp.greeting._asdict()
90 def __enter__(self: _T) -> _T:
91 # Implement context manager enter function.
95 # pylint: disable=duplicate-code
96 # see https://github.com/PyCQA/pylint/issues/3619
97 exc_type: Optional[Type[BaseException]],
98 exc_val: Optional[BaseException],
99 exc_tb: Optional[TracebackType]) -> None:
100 # Implement context manager exit function.
104 def parse_address(cls, address: str) -> SocketAddrT:
106 Parse a string into a QMP address.
108 Figure out if the argument is in the port:host form.
109 If it's not, it's probably a file path.
111 components = address.split(':')
112 if len(components) == 2:
114 port = int(components[1])
116 msg = f"Bad port: '{components[1]}' in '{address}'."
117 raise QMPBadPortError(msg) from None
118 return (components[0], port)
123 def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
124 self._aqmp.await_greeting = negotiate
125 self._aqmp.negotiate = negotiate
128 self._aqmp.connect(self._address)
130 return self._get_greeting()
132 def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
133 self._aqmp.await_greeting = True
134 self._aqmp.negotiate = True
136 self._sync(self._aqmp.accept(), timeout)
138 ret = self._get_greeting()
139 assert ret is not None
142 def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
145 # pylint: disable=protected-access
147 # _raw() isn't a public API, because turning off
148 # automatic ID assignment is discouraged. For
149 # compatibility with iotests *only*, do it anyway.
150 self._aqmp._raw(qmp_cmd, assign_id=False),
155 def cmd(self, name: str,
156 args: Optional[Dict[str, object]] = None,
157 cmd_id: Optional[object] = None) -> QMPMessage:
159 Build a QMP command and send it to the QMP Monitor.
161 @param name: command name (string)
162 @param args: command arguments (dict)
163 @param cmd_id: command id (dict, list, string or int)
165 qmp_cmd: QMPMessage = {'execute': name}
167 qmp_cmd['arguments'] = args
169 qmp_cmd['id'] = cmd_id
170 return self.cmd_obj(qmp_cmd)
172 def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
174 self._aqmp.execute(cmd, kwds),
179 wait: Union[bool, float] = False) -> Optional[QMPMessage]:
181 # wait is False/0: "do not wait, do not except."
182 if self._aqmp.events.empty():
185 # If wait is 'True', wait forever. If wait is False/0, the events
186 # queue must not be empty; but it still needs some real amount
187 # of time to complete.
189 if wait and isinstance(wait, float):
194 self._aqmp.events.get(),
199 def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
200 events = [dict(x) for x in self._aqmp.events.clear()]
204 event = self.pull_event(wait)
205 return [event] if event is not None else []
207 def clear_events(self) -> None:
208 self._aqmp.events.clear()
210 def close(self) -> None:
212 self._aqmp.disconnect()
215 def settimeout(self, timeout: Optional[float]) -> None:
216 self._timeout = timeout
218 def send_fd_scm(self, fd: int) -> None:
219 self._aqmp.send_fd_scm(fd)
221 def __del__(self) -> None:
222 if self._aqmp.runstate == Runstate.IDLE:
225 if not self._aloop.is_running():
228 # Garbage collection ran while the event loop was running.
229 # Nothing we can do about it now, but if we don't raise our
230 # own error, the user will be treated to a lot of traceback
231 # they might not understand.
233 "QEMUMonitorProtocol.close()"
234 " was not called before object was garbage collected"