]> Git Repo - qemu.git/blob - python/qemu/aqmp/legacy.py
python/aqmp: fully separate from qmp.QEMUMonitorProtocol
[qemu.git] / python / qemu / aqmp / legacy.py
1 """
2 Sync QMP Wrapper
3
4 This class pretends to be qemu.qmp.QEMUMonitorProtocol.
5 """
6
7 #
8 # Copyright (C) 2009-2022 Red Hat Inc.
9 #
10 # Authors:
11 #  Luiz Capitulino <[email protected]>
12 #  John Snow <[email protected]>
13 #
14 # This work is licensed under the terms of the GNU GPL, version 2.  See
15 # the COPYING file in the top-level directory.
16 #
17
18 import asyncio
19 from types import TracebackType
20 from typing import (
21     Any,
22     Awaitable,
23     Dict,
24     List,
25     Optional,
26     Type,
27     TypeVar,
28     Union,
29 )
30
31 from .error import QMPError
32 from .protocol import Runstate, SocketAddrT
33 from .qmp_client import QMPClient
34
35
36 #: QMPMessage is an entire QMP message of any kind.
37 QMPMessage = Dict[str, Any]
38
39 #: QMPReturnValue is the 'return' value of a command.
40 QMPReturnValue = object
41
42 #: QMPObject is any object in a QMP message.
43 QMPObject = Dict[str, object]
44
45 # QMPMessage can be outgoing commands or incoming events/returns.
46 # QMPReturnValue is usually a dict/json object, but due to QAPI's
47 # 'returns-whitelist', it can actually be anything.
48 #
49 # {'return': {}} is a QMPMessage,
50 # {} is the QMPReturnValue.
51
52
53 # pylint: disable=missing-docstring
54
55
56 class QMPBadPortError(QMPError):
57     """
58     Unable to parse socket address: Port was non-numerical.
59     """
60
61
62 class QEMUMonitorProtocol:
63     def __init__(self, address: SocketAddrT,
64                  server: bool = False,
65                  nickname: Optional[str] = None):
66
67         self._aqmp = QMPClient(nickname)
68         self._aloop = asyncio.get_event_loop()
69         self._address = address
70         self._timeout: Optional[float] = None
71
72         if server:
73             self._sync(self._aqmp.start_server(self._address))
74
75     _T = TypeVar('_T')
76
77     def _sync(
78             self, future: Awaitable[_T], timeout: Optional[float] = None
79     ) -> _T:
80         return self._aloop.run_until_complete(
81             asyncio.wait_for(future, timeout=timeout)
82         )
83
84     def _get_greeting(self) -> Optional[QMPMessage]:
85         if self._aqmp.greeting is not None:
86             # pylint: disable=protected-access
87             return self._aqmp.greeting._asdict()
88         return None
89
90     def __enter__(self: _T) -> _T:
91         # Implement context manager enter function.
92         return self
93
94     def __exit__(self,
95                  # pylint: disable=duplicate-code
96                  # see https://github.com/PyCQA/pylint/issues/3619
97                  exc_type: Optional[Type[BaseException]],
98                  exc_val: Optional[BaseException],
99                  exc_tb: Optional[TracebackType]) -> None:
100         # Implement context manager exit function.
101         self.close()
102
103     @classmethod
104     def parse_address(cls, address: str) -> SocketAddrT:
105         """
106         Parse a string into a QMP address.
107
108         Figure out if the argument is in the port:host form.
109         If it's not, it's probably a file path.
110         """
111         components = address.split(':')
112         if len(components) == 2:
113             try:
114                 port = int(components[1])
115             except ValueError:
116                 msg = f"Bad port: '{components[1]}' in '{address}'."
117                 raise QMPBadPortError(msg) from None
118             return (components[0], port)
119
120         # Treat as filepath.
121         return address
122
123     def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
124         self._aqmp.await_greeting = negotiate
125         self._aqmp.negotiate = negotiate
126
127         self._sync(
128             self._aqmp.connect(self._address)
129         )
130         return self._get_greeting()
131
132     def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
133         self._aqmp.await_greeting = True
134         self._aqmp.negotiate = True
135
136         self._sync(self._aqmp.accept(), timeout)
137
138         ret = self._get_greeting()
139         assert ret is not None
140         return ret
141
142     def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
143         return dict(
144             self._sync(
145                 # pylint: disable=protected-access
146
147                 # _raw() isn't a public API, because turning off
148                 # automatic ID assignment is discouraged. For
149                 # compatibility with iotests *only*, do it anyway.
150                 self._aqmp._raw(qmp_cmd, assign_id=False),
151                 self._timeout
152             )
153         )
154
155     def cmd(self, name: str,
156             args: Optional[Dict[str, object]] = None,
157             cmd_id: Optional[object] = None) -> QMPMessage:
158         """
159         Build a QMP command and send it to the QMP Monitor.
160
161         @param name: command name (string)
162         @param args: command arguments (dict)
163         @param cmd_id: command id (dict, list, string or int)
164         """
165         qmp_cmd: QMPMessage = {'execute': name}
166         if args:
167             qmp_cmd['arguments'] = args
168         if cmd_id:
169             qmp_cmd['id'] = cmd_id
170         return self.cmd_obj(qmp_cmd)
171
172     def command(self, cmd: str, **kwds: object) -> QMPReturnValue:
173         return self._sync(
174             self._aqmp.execute(cmd, kwds),
175             self._timeout
176         )
177
178     def pull_event(self,
179                    wait: Union[bool, float] = False) -> Optional[QMPMessage]:
180         if not wait:
181             # wait is False/0: "do not wait, do not except."
182             if self._aqmp.events.empty():
183                 return None
184
185         # If wait is 'True', wait forever. If wait is False/0, the events
186         # queue must not be empty; but it still needs some real amount
187         # of time to complete.
188         timeout = None
189         if wait and isinstance(wait, float):
190             timeout = wait
191
192         return dict(
193             self._sync(
194                 self._aqmp.events.get(),
195                 timeout
196             )
197         )
198
199     def get_events(self, wait: Union[bool, float] = False) -> List[QMPMessage]:
200         events = [dict(x) for x in self._aqmp.events.clear()]
201         if events:
202             return events
203
204         event = self.pull_event(wait)
205         return [event] if event is not None else []
206
207     def clear_events(self) -> None:
208         self._aqmp.events.clear()
209
210     def close(self) -> None:
211         self._sync(
212             self._aqmp.disconnect()
213         )
214
215     def settimeout(self, timeout: Optional[float]) -> None:
216         self._timeout = timeout
217
218     def send_fd_scm(self, fd: int) -> None:
219         self._aqmp.send_fd_scm(fd)
220
221     def __del__(self) -> None:
222         if self._aqmp.runstate == Runstate.IDLE:
223             return
224
225         if not self._aloop.is_running():
226             self.close()
227         else:
228             # Garbage collection ran while the event loop was running.
229             # Nothing we can do about it now, but if we don't raise our
230             # own error, the user will be treated to a lot of traceback
231             # they might not understand.
232             raise QMPError(
233                 "QEMUMonitorProtocol.close()"
234                 " was not called before object was garbage collected"
235             )
This page took 0.040762 seconds and 4 git commands to generate.