Issue
Here is a simplified version of what my code is doing, hope it is self-explanatory enough.
def config_changed(new_config: Any):
apply_config(new_config)
start()
asyncio.create_task(upload_to_db({"running": True}))
async def upload_to_db(data: Any):
await some_db_code(data)
def start():
asyncio.create_task(run())
async def run():
while True:
do_something_every_second()
await asyncio.sleep(1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
some_db_client.listen_to_change(path_to_config, callback=config_changed)
loop.run_forever()
The code above runs into an error: no running loop
at the line asyncio.create_task(run())
. So I changed all asyncio.create_task
into:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(...)
Now the task never start running. I confirmed start
has been called.
I changed loop.create_task(...)
into:
task = loop.create_task(...)
loop.run_until_complete(task)
Now it will run, but upload_to_db
was never called. Seems like the first run_until_complete
will block the code below it.
What is the correct way of doing this?
I am aware that using threading can probably solve this, but I think this should be solvable without using threading. After all, asyncio
is to spare us from using threading in this kind of situations right?
Edit: add more context.
The reason start
need to be non-async is because it is a function in an abstract class, in fact, all functions are members of that abstract class, but only start
is abstract. Some child class may use it to run an async function like my example above, others may use it to run a function that will callback in the future, but itself is not async, for example:
def start():
some_db_client.listen_to_change(path, callback=my_callback)
So start
has to be non-async and use asyncio.create_task
when the implementation requires async, in order to be compatible with both cases.
If not for that limitation, I can just call
asyncio.gather(start(), upload_to_db({"running": True}))
Solution
Found a way to do it perfectly.
__coro_queue: asyncio.Queue[Coroutine[Any, Any, Any]] = asyncio.Queue()
__loop: asyncio.AbstractEventLoop | None = None
__task: asyncio.Task[Any] | None = None
__task_ready = asyncio.Event()
async def start_coro_queue() -> None:
global __loop, __task
__loop = asyncio.get_event_loop()
while True:
coro = await __coro_queue.get()
__task = asyncio.create_task(coro)
__task_ready.set()
def run_coroutine_in_background(
coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
assert __loop is not None
__task_ready.clear()
__loop.call_soon_threadsafe(__coro_queue.put_nowait, coro)
future = asyncio.run_coroutine_threadsafe(__task_ready.wait(), __loop)
future.result()
assert __task is not None
return __task
Run this line before running any coroutine.
await start_coro_queue()
start_coro_queue()
will never end, so you need to put other code above it, or use gather. In my case:
some_db_client.listen_to_change(path_to_config, callback=config_changed)
await start_coro_queue()
Every time you need to run a coroutine in the background and get a task object so you can cancel it in the future.
task = run_coroutine_in_background(your_coroutine)
Basically use it to replace asyncio.create_task
in my original code.
run_coroutine_in_background
can be called from a non-async function, it can even be called from a different thread. In my case:
some_db_client.listen_to_change(path_to_config, callback=config_changed)
calls the callback in a different thread.
Answered By - Jeffrey Chen
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.