Issue
I have a simple piece of code driving me crazy for a while. I have posted this question some days ago asking create_task
is not working with input
. Now I have figured out something related to this. I am running event loop in a separate thread and pushing tasks in it. Very straight forward code.
import asyncio
import threading
async def printer(message):
print(f'[printer] {message}')
def loop_runner(loop):
loop.run_forever()
if __name__ == '__main__':
event_loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_runner, args=(event_loop,))
t.start()
for m in ['hello', 'world', 'foo', 'bar']:
print(f'[loop running ?] {event_loop.is_running()}')
event_loop.create_task(printer(m))
Nothing gets printed but these log messages.
[loop running ?] True
[loop running ?] True
[loop running ?] True
[loop running ?] True
Now if I block in event loop thread and let it run after a pause like this.
def loop_runner(loop):
time.sleep(1 / 1000)
loop.run_forever()
Everything works and this gets printed
[loop running ?] False
[loop running ?] False
[loop running ?] False
[loop running ?] False
[printer] hello
[printer] world
[printer] foo
[printer] bar
From surface it looks like tasks created in running event loop do not get executed. But why is that?
I have not seen anything regarding this in documentation. In most of the examples I have seen on internet people are creating tasks in loop from other coroutines and awaiting for them. But I think it is legal to use create tasks outside coroutine if you do not want to await for them.
Solution
When creating task from outside the event loop thread, you need to use asyncio.run_coroutine_threadsafe
. That function will schedule the coroutine in a thread-safe manner and notify the event loop that there is new work to be done. It will also return a concurrent.futures.Future
object which you can use to block the current thread until the result is available.
From surface it looks like tasks created in running event loop do not get executed. But why is that?
Calling create_task
is insufficient because it doesn't contain code to "wake up" the event loop. This is a feature - such wakeup is normally not needed, and adding it would just slow down the regular single-threaded use. When create_task
is invoked from the event loop thread, it is inside an event loop callback, so the event loop can check its task queue as soon as it regains control, when it is done executing the callback. But when create_task
is invoked from a different thread, the event loop is asleep waiting for IO, so run_coroutine_threadsafe
is needed to wake it up.
To test this, you can create a "heartbeat" coroutine, which only contains an infinite loop that prints something and awaits asyncio.sleep(1)
. You'll see that the tasks created with create_task
get executed along with the heartbeat, which also happens to wake up the event loop. In busy asyncio applications this effect can give the impression that create_task
from another thread "works". However, this should never be relied on, as create_task
fails to implement proper locking and could corrupt the event loop internals.
I have not seen anything regarding this in documentation.
Take a look at the concurrency and multithreading section.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.