2 from contextlib import contextmanager
5 from tempfile import TemporaryDirectory
9 from qemu.aqmp import ConnectError, Runstate
10 from qemu.aqmp.protocol import AsyncProtocol, StateError
11 from qemu.aqmp.util import asyncio_run, create_task
14 class NullProtocol(AsyncProtocol[None]):
16 NullProtocol is a test mockup of an AsyncProtocol implementation.
18 It adds a fake_session instance variable that enables a code path
19 that bypasses the actual connection logic, but still allows the
20 reader/writers to start.
22 Because the message type is defined as None, an asyncio.Event named
23 'trigger_input' is created that prohibits the reader from
24 incessantly being able to yield None; this event can be poked to
25 simulate an incoming message.
27 For testing symmetry with do_recv, an interface is added to "send" a
30 For testing purposes, a "simulate_disconnection" method is also
31 added which allows us to trigger a bottom half disconnect without
32 injecting any real errors into the reader/writer loops; in essence
33 it performs exactly half of what disconnect() normally does.
35 def __init__(self, name=None):
36 self.fake_session = False
37 self.trigger_input: asyncio.Event
38 super().__init__(name)
40 async def _establish_session(self):
41 self.trigger_input = asyncio.Event()
42 await super()._establish_session()
44 async def _do_accept(self, address, ssl=None):
46 self._set_state(Runstate.CONNECTING)
47 await asyncio.sleep(0)
49 await super()._do_accept(address, ssl)
51 async def _do_connect(self, address, ssl=None):
53 self._set_state(Runstate.CONNECTING)
54 await asyncio.sleep(0)
56 await super()._do_connect(address, ssl)
58 async def _do_recv(self) -> None:
59 await self.trigger_input.wait()
60 self.trigger_input.clear()
62 def _do_send(self, msg: None) -> None:
65 async def send_msg(self) -> None:
66 await self._outgoing.put(None)
68 async def simulate_disconnect(self) -> None:
70 Simulates a bottom-half disconnect.
72 This method schedules a disconnection but does not wait for it
73 to complete. This is used to put the loop into the DISCONNECTING
74 state without fully quiescing it back to IDLE. This is normally
75 something you cannot coax AsyncProtocol to do on purpose, but it
76 will be similar to what happens with an unhandled Exception in
79 Under normal circumstances, the library design requires you to
80 await on disconnect(), which awaits the disconnect task and
81 returns bottom half errors as a pre-condition to allowing the
82 loop to return back to IDLE.
84 self._schedule_disconnect()
87 class LineProtocol(AsyncProtocol[str]):
88 def __init__(self, name=None):
89 super().__init__(name)
92 async def _do_recv(self) -> str:
93 raw = await self._readline()
95 self.rx_history.append(msg)
98 def _do_send(self, msg: str) -> None:
99 assert self._writer is not None
100 self._writer.write(msg.encode() + b'\n')
102 async def send_msg(self, msg: str) -> None:
103 await self._outgoing.put(msg)
106 def run_as_task(coro, allow_cancellation=False):
108 Run a given coroutine as a task.
110 Optionally, wrap it in a try..except block that allows this
111 coroutine to be canceled gracefully.
116 except asyncio.CancelledError:
117 if allow_cancellation:
120 return create_task(_runner())
126 Opens up a random unused TCP port on localhost, then jams it.
131 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
132 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
133 sock.bind(('127.0.0.1', 0))
135 address = sock.getsockname()
139 # I don't *fully* understand why, but it takes *two* un-accepted
140 # connections to start jamming the socket.
142 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
143 sock.connect(address)
153 class Smoke(avocado.Test):
156 self.proto = NullProtocol()
158 def test__repr__(self):
161 "<NullProtocol runstate=IDLE>"
164 def testRunstate(self):
170 def testDefaultName(self):
176 def testLogger(self):
178 self.proto.logger.name,
183 self.proto = NullProtocol('Steve')
191 self.proto.logger.name,
192 'qemu.aqmp.protocol.Steve'
197 "<NullProtocol name='Steve' runstate=IDLE>"
201 class TestBase(avocado.Test):
204 self.proto = NullProtocol(type(self).__name__)
205 self.assertEqual(self.proto.runstate, Runstate.IDLE)
206 self.runstate_watcher = None
209 self.assertEqual(self.proto.runstate, Runstate.IDLE)
211 async def _asyncSetUp(self):
214 async def _asyncTearDown(self):
215 if self.runstate_watcher:
216 await self.runstate_watcher
219 def async_test(async_test_method):
221 Decorator; adds SetUp and TearDown to async tests.
223 async def _wrapper(self, *args, **kwargs):
224 loop = asyncio.get_event_loop()
227 await self._asyncSetUp()
228 await async_test_method(self, *args, **kwargs)
229 await self._asyncTearDown()
235 # The states we expect a "bad" connect/accept attempt to transition through
236 BAD_CONNECTION_STATES = (
238 Runstate.DISCONNECTING,
242 # The states we expect a "good" session to transition through
243 GOOD_CONNECTION_STATES = (
246 Runstate.DISCONNECTING,
252 async def _watch_runstates(self, *states):
254 This launches a task alongside (most) tests below to confirm that
255 the sequence of runstate changes that occur is exactly as
258 async def _watcher():
260 new_state = await self.proto.runstate_changed()
264 msg=f"Expected state '{state.name}'",
267 self.runstate_watcher = create_task(_watcher())
268 # Kick the loop and force the task to block on the event.
269 await asyncio.sleep(0)
272 class State(TestBase):
275 async def testSuperfluousDisconnect(self):
277 Test calling disconnect() while already disconnected.
279 await self._watch_runstates(
280 Runstate.DISCONNECTING,
283 await self.proto.disconnect()
286 class Connect(TestBase):
288 Tests primarily related to calling Connect().
290 async def _bad_connection(self, family: str):
291 assert family in ('INET', 'UNIX')
294 await self.proto.connect(('127.0.0.1', 0))
295 elif family == 'UNIX':
296 await self.proto.connect('/dev/null')
298 async def _hanging_connection(self):
299 with jammed_socket() as addr:
300 await self.proto.connect(addr)
302 async def _bad_connection_test(self, family: str):
303 await self._watch_runstates(*self.BAD_CONNECTION_STATES)
305 with self.assertRaises(ConnectError) as context:
306 await self._bad_connection(family)
308 self.assertIsInstance(context.exception.exc, OSError)
310 context.exception.error_message,
311 "Failed to establish connection"
315 async def testBadINET(self):
317 Test an immediately rejected call to an IP target.
319 await self._bad_connection_test('INET')
322 async def testBadUNIX(self):
324 Test an immediately rejected call to a UNIX socket target.
326 await self._bad_connection_test('UNIX')
329 async def testCancellation(self):
331 Test what happens when a connection attempt is aborted.
333 # Note that accept() cannot be cancelled outright, as it isn't a task.
334 # However, we can wrap it in a task and cancel *that*.
335 await self._watch_runstates(*self.BAD_CONNECTION_STATES)
336 task = run_as_task(self._hanging_connection(), allow_cancellation=True)
338 state = await self.proto.runstate_changed()
339 self.assertEqual(state, Runstate.CONNECTING)
341 # This is insider baseball, but the connection attempt has
342 # yielded *just* before the actual connection attempt, so kick
343 # the loop to make sure it's truly wedged.
344 await asyncio.sleep(0)
350 async def testTimeout(self):
352 Test what happens when a connection attempt times out.
354 await self._watch_runstates(*self.BAD_CONNECTION_STATES)
355 task = run_as_task(self._hanging_connection())
357 # More insider baseball: to improve the speed of this test while
358 # guaranteeing that the connection even gets a chance to start,
359 # verify that the connection hangs *first*, then await the
360 # result of the task with a nearly-zero timeout.
362 state = await self.proto.runstate_changed()
363 self.assertEqual(state, Runstate.CONNECTING)
364 await asyncio.sleep(0)
366 with self.assertRaises(asyncio.TimeoutError):
367 await asyncio.wait_for(task, timeout=0)
370 async def testRequire(self):
372 Test what happens when a connection attempt is made while CONNECTING.
374 await self._watch_runstates(*self.BAD_CONNECTION_STATES)
375 task = run_as_task(self._hanging_connection(), allow_cancellation=True)
377 state = await self.proto.runstate_changed()
378 self.assertEqual(state, Runstate.CONNECTING)
380 with self.assertRaises(StateError) as context:
381 await self._bad_connection('UNIX')
384 context.exception.error_message,
385 "NullProtocol is currently connecting."
387 self.assertEqual(context.exception.state, Runstate.CONNECTING)
388 self.assertEqual(context.exception.required, Runstate.IDLE)
394 async def testImplicitRunstateInit(self):
396 Test what happens if we do not wait on the runstate event until
397 AFTER a connection is made, i.e., connect()/accept() themselves
398 initialize the runstate event. All of the above tests force the
399 initialization by waiting on the runstate *first*.
401 task = run_as_task(self._hanging_connection(), allow_cancellation=True)
403 # Kick the loop to coerce the state change
404 await asyncio.sleep(0)
405 assert self.proto.runstate == Runstate.CONNECTING
407 # We already missed the transition to CONNECTING
408 await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
414 class Accept(Connect):
416 All of the same tests as Connect, but using the accept() interface.
418 async def _bad_connection(self, family: str):
419 assert family in ('INET', 'UNIX')
422 await self.proto.start_server_and_accept(('example.com', 1))
423 elif family == 'UNIX':
424 await self.proto.start_server_and_accept('/dev/null')
426 async def _hanging_connection(self):
427 with TemporaryDirectory(suffix='.aqmp') as tmpdir:
428 sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
429 await self.proto.start_server_and_accept(sock)
432 class FakeSession(TestBase):
436 self.proto.fake_session = True
438 async def _asyncSetUp(self):
439 await super()._asyncSetUp()
440 await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
442 async def _asyncTearDown(self):
443 await self.proto.disconnect()
444 await super()._asyncTearDown()
449 async def testFakeConnect(self):
451 """Test the full state lifecycle (via connect) with a no-op session."""
452 await self.proto.connect('/not/a/real/path')
453 self.assertEqual(self.proto.runstate, Runstate.RUNNING)
456 async def testFakeAccept(self):
457 """Test the full state lifecycle (via accept) with a no-op session."""
458 await self.proto.start_server_and_accept('/not/a/real/path')
459 self.assertEqual(self.proto.runstate, Runstate.RUNNING)
462 async def testFakeRecv(self):
463 """Test receiving a fake/null message."""
464 await self.proto.start_server_and_accept('/not/a/real/path')
466 logname = self.proto.logger.name
467 with self.assertLogs(logname, level='DEBUG') as context:
468 self.proto.trigger_input.set()
469 self.proto.trigger_input.clear()
470 await asyncio.sleep(0) # Kick reader.
474 [f"DEBUG:{logname}:<-- None"],
478 async def testFakeSend(self):
479 """Test sending a fake/null message."""
480 await self.proto.start_server_and_accept('/not/a/real/path')
482 logname = self.proto.logger.name
483 with self.assertLogs(logname, level='DEBUG') as context:
484 # Cheat: Send a Null message to nobody.
485 await self.proto.send_msg()
486 # Kick writer; awaiting on a queue.put isn't sufficient to yield.
487 await asyncio.sleep(0)
491 [f"DEBUG:{logname}:--> None"],
494 async def _prod_session_api(
496 current_state: Runstate,
500 with self.assertRaises(StateError) as context:
502 await self.proto.start_server_and_accept('/not/a/real/path')
504 await self.proto.connect('/not/a/real/path')
506 self.assertEqual(context.exception.error_message, error_message)
507 self.assertEqual(context.exception.state, current_state)
508 self.assertEqual(context.exception.required, Runstate.IDLE)
511 async def testAcceptRequireRunning(self):
512 """Test that accept() cannot be called when Runstate=RUNNING"""
513 await self.proto.start_server_and_accept('/not/a/real/path')
515 await self._prod_session_api(
517 "NullProtocol is already connected and running.",
522 async def testConnectRequireRunning(self):
523 """Test that connect() cannot be called when Runstate=RUNNING"""
524 await self.proto.start_server_and_accept('/not/a/real/path')
526 await self._prod_session_api(
528 "NullProtocol is already connected and running.",
533 async def testAcceptRequireDisconnecting(self):
534 """Test that accept() cannot be called when Runstate=DISCONNECTING"""
535 await self.proto.start_server_and_accept('/not/a/real/path')
537 # Cheat: force a disconnect.
538 await self.proto.simulate_disconnect()
540 await self._prod_session_api(
541 Runstate.DISCONNECTING,
542 ("NullProtocol is disconnecting."
543 " Call disconnect() to return to IDLE state."),
548 async def testConnectRequireDisconnecting(self):
549 """Test that connect() cannot be called when Runstate=DISCONNECTING"""
550 await self.proto.start_server_and_accept('/not/a/real/path')
552 # Cheat: force a disconnect.
553 await self.proto.simulate_disconnect()
555 await self._prod_session_api(
556 Runstate.DISCONNECTING,
557 ("NullProtocol is disconnecting."
558 " Call disconnect() to return to IDLE state."),
563 class SimpleSession(TestBase):
567 self.server = LineProtocol(type(self).__name__ + '-server')
569 async def _asyncSetUp(self):
570 await super()._asyncSetUp()
571 await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
573 async def _asyncTearDown(self):
574 await self.proto.disconnect()
576 await self.server.disconnect()
579 await super()._asyncTearDown()
582 async def testSmoke(self):
583 with TemporaryDirectory(suffix='.aqmp') as tmpdir:
584 sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
585 server_task = create_task(self.server.start_server_and_accept(sock))
587 # give the server a chance to start listening [...]
588 await asyncio.sleep(0)
589 await self.proto.connect(sock)