Issue
My program is supposed to read data forever from provider classes stored in PROVIDERS
, defined in the config. Every second, it should check whether the config has changed and if so, stop all tasks, reload the config and and create new tasks.
The below code raises CancelledError
because I'm cancelling my tasks. Should I really try/catch each of those to achieve my goals or is there a better pattern?
async def main(config_file):
load_config(config_file)
tasks = []
config_task = asyncio.create_task(watch_config(config_file)) # checks every 1s if config changed and raises ConfigChangedSignal if so
tasks.append(config_task)
for asset_name, provider in PROVIDERS.items():
task = asyncio.create_task(provider.read_forever())
tasks.append(task)
try:
await asyncio.gather(*tasks, return_exceptions=False)
except ConfigChangedSignal:
# Restarting
for task in asyncio.tasks.all_tasks():
task.cancel() # raises CancelledError
await main(config_file)
try:
asyncio.run(main(config_file))
except KeyboardInterrupt:
logger.debug("Ctrl-C pressed. Aborting")
Solution
If you are on Python 3.11, your pattern maps directly to using asyncio.TaskGroup
, the "successor" to asyncio.gather
, which makes use of the new "exception Groups". By default, if any task in the group raises an exception, all tasks in the group are cancelled:
I played around this snippet in the ipython console, and had run asyncio.run(main(False))
for no exception and asyncio.run(main(True))
for inducing an exception just to check the results:
import asyncio
async def doit(i, n, cancel=False):
await asyncio.sleep(n)
if cancel:
raise RuntimeError()
print(i, "done")
async def main(cancel):
try:
async with asyncio.TaskGroup() as group:
tasks = [group.create_task(doit(i, 2)) for i in range(10)]
group.create_task(doit(42, 1, cancel=cancel))
group.create_task(doit(11, .5))
except Exception:
pass
await asyncio.sleep(3)
Your code can acommodate that -
Apart from the best practice for cancelling tasks, though, you are doing a recursive call to your main
that, although will work for most practical purposes, can make seasoned developers go "sigh" - and also can break in edgecases, (it will fail after ~1000 cycles, for example), and leak resources.
The correct way to do that is assembling a while loop, since Python function calls, even tail calls, won't clean up the resources in the calling scope:
import asyncio
...
async def main(config_file):
while True:
load_config(config_file)
try:
async with asyncio.TaskGroup() as tasks:
tasks.create_task(watch_config(config_file)) # checks every 1s if config changed and raises ConfigChangedSignal if so
for asset_name, provider in PROVIDERS.items():
tasks.create_task.create_task(provider.read_forever())
# all tasks are awaited at the end of the with block
except *ConfigChangedSignal: # <- the new syntax in Python 3.11
# Restarting is just a matter of re-doing the while-loop
# ... log.info("config changed")
pass
# any other exception won't be caught and will error, allowing one
# to review what went wrong
...
For Python 3.10, looping over the tasks and cancelling each seems alright, but you should look at that recursive call. If you don't want a while-loop inside your current main, refactor the code so that main itself is called from an outter while-loop
async def main(config_file):
while True:
await inner_main(config_file)
async def inner_main(config_file):
load_config(config_file)
# keep the existing body
...
except ConfigChangedSignal:
# Restarting
for task in asyncio.tasks.all_tasks():
task.cancel() # raises CancelledError
# await main call dropped from here
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.