Issue
I'm wondering how to implement a task-manager that restarts failed tasks in Python.
Here's what I've come up with after some thinking, but it seems like a hack. Is there a better way to achieve this "self-healing" task group pattern?
import asyncio, random
async def noreturn(_arg):
while True:
await asyncio.sleep(1)
if random.randint(0, 10) % 10 == 0:
raise random.choice((RuntimeError, ValueError, TimeoutError))
async def main():
taskmap: dict[int, asyncio.Task] = {}
for i in range(10):
taskmap[i] = asyncio.create_task(noreturn(i))
while True:
for arg, task in taskmap.items():
if task.done():
# Task died
taskmap[arg] = asyncio.create_task(noreturn(arg))
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Thanks in advance for your help.
Solution
This will work as is, and if your core-thing is simple enough that this can fit in, just go as is.
The major problem with this approach is that it is "hardcoded" i.e.: your checking loop has all the information needed on the task that died, and can recreate it, by calling the same co-routine with the same parameters again.
In a larger system, one would expect you to have several mixed tasks, and not always have their initial parameters at hand to recreate a task when needed.
So, a pattern that could work better for you is to have an itermediate layer, which will hold the initial parameters to a task - or better yet, to its inner co-routine function, and then re-create the asyncio-task as needed whenever it fails.
This layer can be instrumented with observability (i.e. generate logs on fail, retry), retry attempts, retry interval, etc.. as you need.
You can inherit from asyncio.task
, and write a wrapping code and set it as the task_factory in the running loop. Unfortunately, you won't be able to simply instantiate your classes as usual (asyncio.create_task
), even customizing the task_factory because that takes a co-routine alreadt created - wherever you need to take note of your co-routine parameters so you can re-create the underlying co-routine in case of failure.
The code could be written along the example bellow. If this is critical for production, there may be edge cases not covered, and I'd advise you to contact a specialist to get you production-strength code for this. Nonetheless this should just work:
class RetrieableTask: #(asyncio.Task):
retiable = RuntimeError, ValueError, # ...
def __init__(self, coroutine_function, args=(), kwargs=None, name=None, context=None, name=None,... ): # retry extrategies can be parametrized
self.coroutine_function = coroutine_function
self.args = args
self.kwargs = kwargs or {}
self.context = context
self.name = name
self.start_task()
def start_task(self):
self.inner = asyncio.create_task(self.coroutine_function(*self.args, self.**kwargs)
context=self.context)
self.inner.set_name(self.name)
def done(self):
result = self.inner..done()
if result:
exception = self.inner.exception() # may raise CancelledError: just let it bubble through
if exception and isinstance(exception, self.retriable):
# if needed log, and check retry policies
self.start_task()
return False
# bridge other task methods to the inner task as needed:
def result(self):
return self.inner.result()
def exception(self):
return self.inner.exception()
def cancel(self, msg=None):
return self.inner.cancel(msg)
def set_name(self, name):
self.name = name
self.inner.set_name(name)
def get_name(self):
return self.name
# repeat proxying of methods as needed for
# the methods documented in https://docs.python.org/3/library/asyncio-task.html#task-object
And this is how it can be used:
async def noreturn(_arg):
while True:
await asyncio.sleep(1)
if random.randint(0, 10) % 10 == 0:
raise random.choice((RuntimeError, ValueError, TimeoutError))
async def main():
tasks = []
for i in range(10):
tasks.append(RetriableTask(noreturn, args=(i,))
while any(not task.done() for task in tasks):
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.