4 qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
5 offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
6 subclass of QEMUMachine, respectively.
9 # Copyright (C) 2015 Red Hat Inc.
14 # This work is licensed under the terms of the GNU GPL, version 2. See
15 # the COPYING file in the top-level directory.
29 from qemu.qmp import SocketAddrT # pylint: disable=import-error
31 from .machine import QEMUMachine
34 class QEMUQtestProtocol:
36 QEMUQtestProtocol implements a connection to a qtest socket.
38 :param address: QEMU address, can be either a unix socket path (string)
39 or a tuple in the form ( address, port ) for a TCP
41 :param server: server mode, listens on the socket (bool)
42 :raise socket.error: on socket connection errors
45 No conection is estabalished by __init__(), this is done
46 by the connect() or accept() methods.
48 def __init__(self, address: SocketAddrT,
49 server: bool = False):
50 self._address = address
51 self._sock = self._get_sock()
52 self._sockfile: Optional[TextIO] = None
54 self._sock.bind(self._address)
57 def _get_sock(self) -> socket.socket:
58 if isinstance(self._address, tuple):
59 family = socket.AF_INET
61 family = socket.AF_UNIX
62 return socket.socket(family, socket.SOCK_STREAM)
64 def connect(self) -> None:
66 Connect to the qtest socket.
68 @raise socket.error on socket connection errors
70 self._sock.connect(self._address)
71 self._sockfile = self._sock.makefile(mode='r')
73 def accept(self) -> None:
75 Await connection from QEMU.
77 @raise socket.error on socket connection errors
79 self._sock, _ = self._sock.accept()
80 self._sockfile = self._sock.makefile(mode='r')
82 def cmd(self, qtest_cmd: str) -> str:
84 Send a qtest command on the wire.
86 @param qtest_cmd: qtest command text to be sent
88 assert self._sockfile is not None
89 self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
90 resp = self._sockfile.readline()
93 def close(self) -> None:
99 self._sockfile.close()
100 self._sockfile = None
102 def settimeout(self, timeout: Optional[float]) -> None:
103 """Set a timeout, in seconds."""
104 self._sock.settimeout(timeout)
107 class QEMUQtestMachine(QEMUMachine):
109 A QEMU VM, with a qtest socket available.
114 args: Sequence[str] = (),
115 wrapper: Sequence[str] = (),
116 name: Optional[str] = None,
117 base_temp_dir: str = "/var/tmp",
118 socket_scm_helper: Optional[str] = None,
119 sock_dir: Optional[str] = None,
120 qmp_timer: Optional[float] = None):
121 # pylint: disable=too-many-arguments
124 name = "qemu-%d" % os.getpid()
126 sock_dir = base_temp_dir
127 super().__init__(binary, args, wrapper=wrapper, name=name,
128 base_temp_dir=base_temp_dir,
129 socket_scm_helper=socket_scm_helper,
130 sock_dir=sock_dir, qmp_timer=qmp_timer)
131 self._qtest: Optional[QEMUQtestProtocol] = None
132 self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
135 def _base_args(self) -> List[str]:
136 args = super()._base_args
138 '-qtest', f"unix:path={self._qtest_path}",
143 def _pre_launch(self) -> None:
144 super()._pre_launch()
145 self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
147 def _post_launch(self) -> None:
148 assert self._qtest is not None
149 super()._post_launch()
152 def _post_shutdown(self) -> None:
153 super()._post_shutdown()
154 self._remove_if_exists(self._qtest_path)
156 def qtest(self, cmd: str) -> str:
158 Send a qtest command to the guest.
160 :param cmd: qtest command to send
161 :return: qtest server response
163 if self._qtest is None:
164 raise RuntimeError("qtest socket not available")
165 return self._qtest.cmd(cmd)