TypeVar,
)
-from qemu.qmp import ( # pylint: disable=import-error
+from qemu.aqmp import SocketAddrT
+from qemu.aqmp.legacy import (
+ QEMUMonitorProtocol,
QMPMessage,
QMPReturnValue,
- SocketAddrT,
)
from . import console_socket
-if os.environ.get('QEMU_PYTHON_LEGACY_QMP'):
- from qemu.qmp import QEMUMonitorProtocol
-else:
- from qemu.aqmp.legacy import QEMUMonitorProtocol
-
-
LOG = logging.getLogger(__name__)
:param timeout: Optional timeout, in seconds.
See QEMUMonitorProtocol.pull_event.
- :raise QMPTimeoutError: If timeout was non-zero and no matching events
- were found.
+ :raise asyncio.TimeoutError:
+ If timeout was non-zero and no matching events were found.
+
:return: A QMP event matching the filter criteria.
If timeout was 0 and no event matched, None.
"""
event = self._qmp.pull_event(wait=timeout)
if event is None:
# NB: None is only returned when timeout is false-ish.
- # Timeouts raise QMPTimeoutError instead!
+ # Timeouts raise asyncio.TimeoutError instead!
break
if _match(event):
return event