Issue
I am working with a third party library that will call a function I gave it from another thread at some random time. It can be modelled as a delayed function call in another thread.
The function I want it to call is an async function. The library does not have a special interface for async function, also I do not want it to block the code like asyncio.run
. To summarize it:
I need a way to run an async function in the background from a non-async function that will be called from another thread.
I asked the question in this post, and came up with a nearly perfect solution, until I run into the following problem.
If the async function tries to run another async function in the same way, it will not work. The cause is call_soon_threadsafe
will never call the callback function if it is inside an async function.
This is the code that can recreate the problem:
import asyncio
import threading
import time
from typing import Coroutine, Any
import logging
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO, datefmt='%H:%M:%S')
coro_queue: asyncio.Queue[Coroutine[Any, Any, Any]] = asyncio.Queue()
task: asyncio.Task[Any] | None = None
task_ready = asyncio.Event()
loop: asyncio.AbstractEventLoop | None = None
async def start_coro_queue() -> None:
global task, loop
loop = asyncio.get_event_loop()
while True:
coro = await coro_queue.get()
task = asyncio.create_task(coro)
task_ready.set()
def put_coro_in_queue(coro: Coroutine[Any, Any, Any]) -> None:
logging.info("put_coro_in_queue called.")
coro_queue.put_nowait(coro)
logging.info("put_coro_in_queue finished.")
def run_coro_in_background(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
logging.info("run_coro_in_background called")
global task
assert loop is not None
task_ready.clear()
future = asyncio.run_coroutine_threadsafe(task_ready.wait(), loop)
loop.call_soon_threadsafe(put_coro_in_queue, coro)
future.result()
assert task is not None
output = task
task = None
logging.info("run_coro_in_background finished")
return output
async def async_func() -> None:
logging.info("async_func called.")
await asyncio.sleep(2)
logging.info("async_func finished.")
def delayed_async_func() -> None:
logging.info("delayed_async_func called")
time.sleep(5)
run_coro_in_background(async_func())
logging.info("delayed_async_func finished.")
async def nested_async_func() -> None:
logging.info("nested_async_func called")
await asyncio.sleep(3)
run_coro_in_background(async_func())
logging.info("nested_async_func finished")
def delayed_nested_async_func() -> None:
logging.info("delayed_nested_async_func called")
time.sleep(4)
run_coro_in_background(nested_async_func())
logging.info("delayed_nested_async_func finished")
async def main() -> None:
t = threading.Thread(target=delayed_nested_async_func)
t.start()
await start_coro_queue()
asyncio.run(main())
The result is:
19:25:51 delayed_nested_async_func called
19:25:55 run_coro_in_background called
19:25:55 put_coro_in_queue called.
19:25:55 put_coro_in_queue finished.
19:25:55 nested_async_func called
19:25:55 run_coro_in_background finished
19:25:55 delayed_nested_async_func finished
19:25:58 run_coro_in_background called
Then it stuck forever.
The expected output should contain:
19:25:58 put_coro_in_queue called
19:25:58 put_coro_in_queue finished
19:25:58 async_func called
19:26:00 async_func finished
When test the non-nested case by changing main
into:
async def main() -> None:
t = threading.Thread(target=delayed_async_func)
t.start()
await start_coro_queue()
Everything runs as expected.
19:34:00 delayed_async_func called
19:34:05 run_coro_in_background called
19:34:05 put_coro_in_queue called.
19:34:05 put_coro_in_queue finished.
19:34:05 async_func called.
19:34:05 run_coro_in_background finished
19:34:05 delayed_async_func finished.
19:34:07 async_func finished.
Solution
Your thread t
runs run_coro_in_background
, and that's fine, because it's running in a different thread than the event loop.
Then nested_async_func
tries to run run_coro_in_background
in the event loop. run_coro_in_background
was not written to be safely run from inside the event loop. It tries to do
future.result()
which deadlocks, because it's pausing the entire thread the event loop is running in, but it's waiting for another task to finish, and that task will never finish if the event loop is hung.
Answered By - user2357112
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.