Issue
I am using asyncio.Task.all_tasks()
to figure out which tasks to cancel & which should I wait for during shutdown.
So it looks like this:
web.run_app(web_app, port=PORT, handle_signals=True)
# Then in case the app is stopped, we do cleanup
loop.run_until_complete(wait_for_all_blocker_coroutines_to_finish())
This is the function that waits for tasks that need to be completed before shutting down:
async def wait_for_all_blocker_coroutines_to_finish() -> None:
started_time = datetime.now()
all_tasks = asyncio.Task.all_tasks() - {asyncio.current_task()}
# all_tasks doesn't contain any tasks that I created inside of the spawn() coroutine
logging.debug(f"Total tasks unfinished: {len(all_tasks)}")
loop = asyncio.get_event_loop()
logging.debug(f"Checking in loop {loop._thread_id}")
logging.debug(all_tasks)
coroutines = list(filter(filter_tasks_with_meta, all_tasks))
logging.debug(coroutines)
total = len(coroutines)
logging.debug(f"Waiting for all blocker coroutines to finish ({total} total)")
await asyncio.gather(*coroutines, return_exceptions=True)
duration = datetime.now() - started_time
seconds = duration.total_seconds()
logging.debug(f"Coroutines unblocked after {seconds} seconds")
Somewhere inside of spawn(coro) I do this:
class TaskMeta(TypedDict):
is_meta: bool
blocker: bool
def name_for_task_with_meta(task_meta: TaskMeta) -> str:
return json.dumps(task_meta)
def create_app_blocking_task(coro) -> asyncio.Task:
# We differentiate between tasks that need to be waited for using TaskMeta that is then converted into json string and saved in the name parameter (later we filter tasks by name)
name = name_for_task_with_meta(TaskMeta(is_meta=True, blocker=True))
loop = asyncio.get_running_loop()
task = loop.create_task(coro, name=name)
logging.debug(f"Creating blocking task with meta={name}, loop_id={loop._thread_id}")
return task
job = create_app_blocking_task(coro)
The tasks that I want to wait for are created inside aiojobs.spawn()
. When I do asyncio.Task.all_tasks()
inside of the coroutine ran by aiojobs.spawn()
, it displays the correct listing of tasks.
However, my shutdown handler is outside of aiojobs.spawn()
coroutine, and when I do asyncio.Task.all_tasks()
, it doesn't return anything. Zero running tasks, even though they are actually running.
From what I understand, asyncio.Task.all_tasks()
returns all running tasks inside of the current asyncio loop. Could it be that spawn()
creates a separate loop and thus tasks from there don't appear in the main loop? If so, how can I prevent this? Or can I get all loops and then get all tasks from each loop? Or do graceful shutdown for spawn() separately?
EDIT: So I figured out that these tasks get cancelled when I stop the application. My question now is: How do I prevent that?
Solution
So the problem here was that aiohttp cancelled tasks before my shutdown handler could process them. The solution I used was to process tasks inside a on_shutdown handler.
web_app = web.Application(client_max_size=1024 * 1024 * 40)
web_app.on_shutdown.append(on_shutdown)
Answered By - comonadd
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.