4 qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
5 offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
6 subclass of QEMUMachine, respectively.
9 # Copyright (C) 2015 Red Hat Inc.
14 # This work is licensed under the terms of the GNU GPL, version 2. See
15 # the COPYING file in the top-level directory.
29 from qemu.aqmp import SocketAddrT
31 from .machine import QEMUMachine
34 class QEMUQtestProtocol:
36 QEMUQtestProtocol implements a connection to a qtest socket.
38 :param address: QEMU address, can be either a unix socket path (string)
39 or a tuple in the form ( address, port ) for a TCP
41 :param server: server mode, listens on the socket (bool)
42 :raise socket.error: on socket connection errors
45 No conection is estabalished by __init__(), this is done
46 by the connect() or accept() methods.
48 def __init__(self, address: SocketAddrT,
49 server: bool = False):
50 self._address = address
51 self._sock = self._get_sock()
52 self._sockfile: Optional[TextIO] = None
54 self._sock.bind(self._address)
57 def _get_sock(self) -> socket.socket:
58 if isinstance(self._address, tuple):
59 family = socket.AF_INET
61 family = socket.AF_UNIX
62 return socket.socket(family, socket.SOCK_STREAM)
64 def connect(self) -> None:
66 Connect to the qtest socket.
68 @raise socket.error on socket connection errors
70 self._sock.connect(self._address)
71 self._sockfile = self._sock.makefile(mode='r')
73 def accept(self) -> None:
75 Await connection from QEMU.
77 @raise socket.error on socket connection errors
79 self._sock, _ = self._sock.accept()
80 self._sockfile = self._sock.makefile(mode='r')
82 def cmd(self, qtest_cmd: str) -> str:
84 Send a qtest command on the wire.
86 @param qtest_cmd: qtest command text to be sent
88 assert self._sockfile is not None
89 self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
90 resp = self._sockfile.readline()
93 def close(self) -> None:
99 self._sockfile.close()
100 self._sockfile = None
102 def settimeout(self, timeout: Optional[float]) -> None:
103 """Set a timeout, in seconds."""
104 self._sock.settimeout(timeout)
107 class QEMUQtestMachine(QEMUMachine):
109 A QEMU VM, with a qtest socket available.
114 args: Sequence[str] = (),
115 wrapper: Sequence[str] = (),
116 name: Optional[str] = None,
117 base_temp_dir: str = "/var/tmp",
118 sock_dir: Optional[str] = None,
119 qmp_timer: Optional[float] = None):
120 # pylint: disable=too-many-arguments
123 name = "qemu-%d" % os.getpid()
125 sock_dir = base_temp_dir
126 super().__init__(binary, args, wrapper=wrapper, name=name,
127 base_temp_dir=base_temp_dir,
128 sock_dir=sock_dir, qmp_timer=qmp_timer)
129 self._qtest: Optional[QEMUQtestProtocol] = None
130 self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
133 def _base_args(self) -> List[str]:
134 args = super()._base_args
136 '-qtest', f"unix:path={self._qtest_path}",
141 def _pre_launch(self) -> None:
142 super()._pre_launch()
143 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
145 def _post_launch(self) -> None:
146 assert self._qtest is not None
147 super()._post_launch()
150 def _post_shutdown(self) -> None:
151 super()._post_shutdown()
152 self._remove_if_exists(self._qtest_path)
154 def qtest(self, cmd: str) -> str:
156 Send a qtest command to the guest.
158 :param cmd: qtest command to send
159 :return: qtest server response
161 if self._qtest is None:
162 raise RuntimeError("qtest socket not available")
163 return self._qtest.cmd(cmd)