Issue
I am writing code that has some long-running coroutines that interact with each other. These coroutines can be blocked on await
until something external happens. I want to be able to drive these coroutines in a unittest. The regular way of doing await
on the coroutine doesn't work, because I want to be able to intercept something in the middle of their operation. I would also prefer not to mess with the coroutine internals either, unless there is something generic/reusable that can be done.
Ideally I would want to run an event loop until all tasks are currently blocked. This should be fairly easy to tell in an event loop implementation. Once everything is blocked, the event loop yields back control, where I can assert some state about the coroutines, and poke them externally. Then I can resume the loop until it gets blocked again. This would allow for deterministic simulation of tasks in an event loop.
Minimal example of the desired API:
import asyncio
from asyncio import Event
# Imagine this is a complicated "main" with many coroutines.
# But event is some external "mockable" event
# that can be used to drive in unit tests
async def wait_on_event(event: Event):
print("Waiting on event")
await event.wait()
print("Done waiting on event")
def test_deterministic():
loop = asyncio.get_event_loop()
event = Event()
task = loop.create_task(wait_on_event(event))
run_until_blocked_or_complete(loop) # define this magic function
# Should print "Waiting on event"
# can make some test assertions here
event.set()
run_until_blocked_or_complete(loop)
# Should print "Done waiting on event"
Anything like that possible? Or would this require writing a custom event loop just for tests?
Additionally, I am currently on Python 3.9 (AWS runtime limitation). If it's not possible to do this in 3.9, what version would support this?
Solution
This question has puzzled me since I first read it, because it's almost do-able with standard asyncio functions. The key is Alexander's "magic" is_not_blocked
method, which I give verbatim below (except for moving it to the outer indentation level). I also use his wait_on_event
method, and his test_deterministic_loop
function. I added some extra tests to show how to start and stop other tasks, and how to drive the event loop step-by-step until all tasks are finished.
Instead of his DeterministicLoop class, I use a function run_until_blocked
that makes only standard asyncio function calls. The two lines of code:
loop.call_soon(loop.stop)
loop.run_forever()
are a convenient means of advancing the loop by exactly one cycle. And asyncio already provides a method for obtaining all the tasks that run within a given event loop, so there is no need to store them independently.
A comment on the Alexander's "magic" method: if you look at the comments in the asyncio.Task code, the "private" variable _fut_waiter
is described as an important invariant. That's very unlikely to change in future versions. So I think it's quite safe in practice.
import asyncio
from typing import Optional, cast
def _is_not_blocked(task: asyncio.Task):
# pylint: disable-next=protected-access
wait_for = cast(Optional[asyncio.Future], task._fut_waiter) # type: ignore
if wait_for is None:
return True
return wait_for.done()
def run_until_blocked():
"""Runs steps of the event loop until all tasks are blocked."""
loop = asyncio.get_event_loop()
# Always run one step.
loop.call_soon(loop.stop)
loop.run_forever()
# Continue running until all tasks are blocked
while any(_is_not_blocked(task) for task in asyncio.all_tasks(loop)):
loop.call_soon(loop.stop)
loop.run_forever()
# This coroutine could spawn many others. Keeping it simple here
async def wait_on_event(event: asyncio.Event) -> int:
print("Waiting")
await event.wait()
print("Done")
return 42
def test_deterministic_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
event = asyncio.Event()
task = loop.create_task(wait_on_event(event))
assert not task.done()
run_until_blocked()
print("Task done", task.done())
assert not task.done()
print("Tasks running", asyncio.all_tasks(loop))
assert asyncio.all_tasks(loop)
event.set()
# You can start and stop tasks
loop.run_until_complete(asyncio.sleep(2.0))
run_until_blocked()
print("Task done", task.done())
assert task.done()
print("Tasks running", asyncio.all_tasks(loop))
assert task.result() == 42
assert not asyncio.all_tasks(loop)
# If you create a task you must loop run_until_blocked until
# the task is done.
task2 = loop.create_task(asyncio.sleep(2.0))
assert not task2.done()
while not task2.done():
assert asyncio.all_tasks(loop)
run_until_blocked()
assert task2.done()
assert not asyncio.all_tasks(loop)
test_deterministic_loop()
Answered By - Paul Cornelius
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.