Issue
I'm trying to provide a synchronous shutdown function that can gracefully kill an asyncio application via a SIGTERM
signal or KeyboardInterrupt
SystemExit
exception or just calling the function directly due to bad startup state. I have to shutdown various tasks that each have their own way of shutting down:
- an aiohttp
AppRunner
currently killed via theshutdown
method which returns a coroutine which needs to be awaited - an asyncio
APScheduler
currently killed via theshutdown
method which callscall_soon_threadsafe
on the current event loop - a simple async loop that runs forever currently kill via a
cancel
signal on the task - an aiohttp
ClientSession
which is cancelled via theclose
method on the session
I want to kill the message processor and ignore any new messages coming in, the scheduler but allow any tasks to complete that are currently running which are dependent on the aiohttp ClientSession
Here's an abbreviation of the current code and some comments to clarify the logic:
message_processor_future = loop.create_task(message_processor())
def sig_term_handler(_, __):
logging.info("SIGTERM received, shutting down server...")
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
signal.signal(signal.SIGTERM, sig_term_handler)
try:
loop.run_until_complete(message_processor_future)
except (KeyboardInterrupt, SystemExit) as e:
logging.info("{} received".format(e.__class__.__name__))
shutdown_server(
http_runner=http_runner,
scheduler=scheduler,
message_processor_future=message_processor_future
)
async def message_processor():
while True:
try:
# code
except CancelledError:
logging.info("Cancelling message processing...")
return
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
# Try to shutdown to the message processor as early as possible so we don't get any new messages
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
# Shutdown apscheduler early to make sure we don't schedule any new tasks
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
# if the server is running then kill it (this doesn't really have any requirements as it's fairly separate from the application)
if http_runner:
logging.info("Shutting down http server...")
loop.run_until_complete(http_runner.cleanup())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
# wait for any tasks spawned by apscheduler to finish and the message processor to die if it's still running
loop.run_until_complete(
asyncio.wait(asyncio.Task.all_tasks(loop), timeout=10)
)
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
# Kill the client session as the tasks that use ClientSession have completed
loop.run_until_complete(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
When I cancel the application via KeyboardInterrupt
or SystemExit
it cleans up without any issues this is due to I believe the loop has stop running so calls to loop.run_until_complete
are safe and synchronous, but when SIGTERM
is received the loop is still running so I get this exception
[2019-06-03 14:52:26,985] [ INFO] --- Shutting down http server...
[2019-06-03 14:52:26,985] [ ERROR] --- Exception in callback Loop._read_from_self
handle: <Handle Loop._read_from_self>
Traceback (most recent call last):
File "uvloop/cbhandles.pyx", line 67, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 324, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 329, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 304, in uvloop.loop.Loop._ceval_process_signals
File "/opt/collector/collector/__main__.py", line 144, in sig_term_handler
message_processor_future=message_processor_future
File "/opt/collector/collector/__main__.py", line 192, in shutdown_server
loop.run_until_complete(http_runner.cleanup())
File "uvloop/loop.pyx", line 1440, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1433, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1342, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 445, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
This makes sense, but I'm not exactly sure how to set up the shutdown method to handle this state, I tried using the add_done_callback
method, but this also didn't seem to work because the application gets stuck in the while loop waiting for all the tasks to complete or be cancelled.
def shutdown_server(
http_runner: AppRunner = None,
scheduler: AsyncIOScheduler = None,
message_processor_future: asyncio.Task = None
):
loop = asyncio.get_event_loop()
if loop.is_running():
task_runner = loop.create_task
else:
task_runner = loop.run_until_complete
if message_processor_future:
logging.info("Cancelling message processor...")
message_processor_future.cancel()
if scheduler:
logging.info("Shutting down scheduler...")
scheduler.shutdown()
if http_runner:
logging.info("Shutting down http server...")
task_runner(http_runner.shutdown())
logging.info(
f"Waiting for {len(asyncio.Task.all_tasks(loop))} to complete..."
)
def finish_shutdown():
task_runner(http_runner.cleanup())
logging.info("Closing ingest api client...")
from collector.tasks.ap_associations import api_client
task_runner(api_client.session.close())
logging.info("Shutting down process...")
exit(0)
if loop.is_running():
all_tasks_complete = loop.create_task(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
all_tasks_complete.add_done_callback(finish_shutdown)
while not all_tasks_complete.done() and not all_tasks_complete.cancelled():
pass
else:
loop.run_until_complete(asyncio.wait(
asyncio.Task.all_tasks(loop), timeout=10
))
finish_shutdown()
Solution
I realized you can just call sys.exit in the signal handler and the loop will receive a SystemExit exception and continue through the rest of the catch clause with a stopped loop.
i.e.
signal.signal(signal.SIGTERM, lambda _, __: sys.exit(0))
which allows me to refactor the code to be much cleaner and I can also force the tasks to handle their own exceptions with this pattern:
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit) as e:
logging.info(f"{e.__class__.__name__} received")
except Exception as e:
exception_manager.handle_exception(e)
finally:
shutdown(http_server_manager, scheduler)
Answered By - Cheyans
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.