Issue
I'm writing python context manager, which runs asynchronous tasks. I want my manager to terminate, if any of its task throws exception. Here is sample code:
class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
def __enter__(self):
return self
def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()
if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)
async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))
results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)
async def func2(self):
i = 5
while i > 0:
print('func2')
await asyncio.sleep(1)
i -= 1
raise Exception
async def _async_start(self):
self.loop.create_task(self.func1())
self.loop.create_task(self.func2())
def start(self):
self.loop.run_until_complete(self._async_start())
with MyClass() as myClass:
myClass.start()
myClass.loop.run_forever()
This is output of this script:
func1
func2
func1
func2
func1
func2
func1
func2
func1
func2
Task exception was never retrieved
func1
future: <Task finished coro=<MyClass.func2() done, defined at /home/framal/Programy/schnapps/schnapps/bottle/client.py:381> exception=Exception()>
Traceback (most recent call last):
File "/home/framal/Programy/schnapps/schnapps/bottle/client.py", line 387, in func2
raise Exception
Exception
func1
func1
func1
.
.
.
I tried to use custom exception handlers, but nothing worked - they started running just after terminating the process by force.
How can I pass the exception to the loop, so that it closes all other tasks?
Solution
Update: in Python 3.11 a TaskGroup was introduced. It has the desired functionality.
I do not understand why you are using a context manager (CM) this way.
Anyway, if a CM is given and you put loop.run_forever()
to the with
block, the only way I know to exit the loop in this situation so the control is passed to CM's exit function is loop.stop()
.
Here is a little decorator handling all exceptions except cancellation with loop.stop()
.
def watchdog(afunc):
@functools.wraps(afunc)
async def run(*args, **kwargs):
try:
await afunc(*args, **kwargs)
except asyncio.CancelledError:
return
except Exception as err:
print(f"exception {err}")
asyncio.get_event_loop().stop()
return run
If you decorate all your coroutines started as tasks by the CM (func1
, and func2
), e.g.:
@watchdog
async def func2(self):
then it will stop after first exception.
Answered By - VPfB
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.