Issue
I need to get some values from an async generator concurrently, and I also need to handle KeyboardInterrupt
correctly so that whenever the application is quit I have a chance to write to disk whatever was retrieved at the moment of the exception. I came up with the following code:
import asyncio
async def some_task():
try:
yield 1
yield 2
yield 3
await asyncio.sleep(10000)
except asyncio.CancelledError:
pass
async def run_tasks():
async def _task(gen):
return [value async for value in gen]
try:
result = await asyncio.gather(_task(some_task()))
return result
except asyncio.CancelledError:
# Do something here to get the result from the gathered tasks
...
async def amain():
value = await run_tasks()
print("Returned", value)
if __name__ == '__main__':
asyncio.run(amain())
I tried using asyncio.shield
but asyncio.gather
still throws an exception instead of returning the value from _task
Solution
An exception - be it KeyboardInterrupt or another, will break the functions at the point they occur, and return
statements won't be reached. The same takes place for list comprehensions or other expressions: the inner for
loop will be cancelled.
But just by unfolding the comprehension in a multi-line for
statement, and adding elements one at a time, one gets full control of the flow with try/except blocks, with no other changes needed.
Here, I write an alterantive path with except BaseException:
(just Exception
won't catch a KeybordInterrupt)- but you can even have a single code path for both exceptions ocurring and normal flow by using the finally
clause instead.
(Also, when handling an exception you don´t care about, but just want to perform some action when it occurs, it is important to re-raise it, so code further in the call stack can know what took place)
...
import asyncio
async def some_task():
try:
yield 1
yield 2
yield 3
await asyncio.sleep(10000)
except asyncio.CancelledError:
pass
async def run_tasks():
results = []
async def _task(gen, results):
# a nested funciton could just use the "results" from the outer scope. parameter included so it can be used in not-nested functions
async for item in gen:
results.append(item)
try:
await asyncio.gather(_task(some_task(), results))
return results
except BaseException:
# Do this here to get the result from the gathered tasks
print(f"Something stopped the task. Partial results: {results}")
raise
...
return results
async def amain():
try:
value = await run_tasks()
finally:
print("Returned", global_results)
if __name__ == '__main__':
asyncio.run(amain())
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.