Issue
If I create a series of asyncio
tasks in a top level class, all of which should essentially run forever, like so:
asyncio.create_task(...)
asyncio.create_task(...)
asyncio.create_task(...)
...
self.event_loop.run_forever()
# Once we fall out of the event loop, collect all remaining tasks,
# cancel them, and terminate the asyncio event loop
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
self.event_loop.run_until_complete(group)
self.event_loop.close()
The above code doesn't handle the following situation, which I'm finding I need more and more, and I haven't seen an example in Googling or in the asyncio
docs:
If one of the tasks fails with an exception, the exception doesn't get handled - all of the other tasks proceed, but that one task simply halts silently (other than the exception output).
So, how can I:
- Set up for the exception to be handled, so that the failure isn't silent any more
- Most importantly, restart the failed task, effectively running
asyncio.create_task(...)
again, just for that task? This would seem to require finding the task that received the exception in the event loop, removing it, and adding a new one - how to do that is not clear to me. - Allow the tasks that didn't have issues to continue uninterrupted. Want to avoid any side effects of handling the task that received the exception.
Solution
Uncaught exceptions are attached to the task object, and could be retrieved from it via the Task.exception()
method. The asyncio.create_task(...)
call returns the task object, so you'd want to collect those to inspect exceptions.
If you want to reschedule a task whenever an exception has occurred, then you'd want to do that in a new task (because you'd want it to run in the event loop), or use a wrapper co-routine that catches exceptions and just re-runs the given coroutine again.
The latter could look something like:
import traceback
async def rerun_on_exception(coro, *args, **kwargs):
while True:
try:
await coro(*args, **kwargs)
except asyncio.CancelledError:
# don't interfere with cancellations
raise
except Exception:
print("Caught exception")
traceback.print_exc()
then wrap your coroutines with the above coroutine when scheduling them as a task:
asyncio.create_task(rerun_on_exception(coroutine_uncalled, arg1value, ... kwarg1=value, ...)
e.g. passing in the arguments to create the coroutine each time there is an exception.
The other option is to use asyncio.wait()
in a separate task so you can monitor for exceptions as the loop runs, and make decisions on how to handle exceptions there and then:
async def def exception_aware_scheduler(*task_definitions):
tasks = {
asyncio.create_task(coro(*args, **kwargs)): (coro, args, kwargs)
for coro, args, kwargs in task_definitions
}
while tasks:
done, pending = await asyncio.wait(
tasks.keys(), return_when=asyncio.FIRST_EXCEPTION
)
for task in done:
if task.exception() is not None:
print('Task exited with exception:')
task.print_stack()
print('Rescheduling the task\n')
coro, args, kwargs = tasks.pop(task)
tasks[asyncio.create_task(coro(*args, **kwargs))] = coro, args, kwargs
The asyncio.wait()
call is given control again by the event loop when when any one of the tasks scheduled exited due to an exception, but until that happens, tasks could have been cancelled or simply completed their work. When a task did exit because of an exception, you need a way to create the same coroutine again (with the same arguments), hence the *args, **kwargs
setup above.
You'd schedule just the exception_aware_scheduler()
, passing in the tasks you wanted to pass in:
task_definitions = (
(coro1, (), {}), # no arguments
(coro2, ('arg1', 'arg2'), {}),
# ...
)
asyncio.create_task(exception_aware_scheduler(*task_definitions))
Answered By - Martijn Pieters
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.