Issue
If an asyncio task task_parent creates subtasks task_child but task_parent is canceled due to an exception which is thrown after task_child was created, does task_child gets also automatically cancelled (if it's not protected with asyncio.shield)?
For example in the code below:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await asyncio.gather(t1)
r3 = await process_result(r1) # process_result throws an exception
r2, = await asyncio.gather(t2)
return await process_results(r2, r3)
if process_result(r1) throws an exception, does t2 get automatically cancelled (and subsequently garbage collected)?
And what if instead of using asyncio.gather, I await on a task directly:
async def f():
t1 = asyncio.create_task(coroutine1())
t2 = asyncio.create_task(coroutine2())
r1, = await t1
r3 = await process_result(r1) # process_result throws an exception
r2, = await t2
return await process_results(r2, r3)
if process_result(r1) throws an exception, does t2 get automatically cancelled in this case too?
Solution
Managed to find an answer to my own question. Task cancellation can be achieved via structured concurrency which in the current version of Python (Python 3.10) is not supported, though there has been a proposal to introduce TaskGroups following PEP 654.
Fortunately there is AnyIO library which implements trio-like structured concurrency on top of asyncio. The example in my question can be rewritten in AnyIO to have cancellable tasks:
import asyncio
from anyio import create_memory_object_stream, TASK_STATUS_IGNORED, create_task_group
from contextlib import AsyncExitStack
async def coroutine1(send_stream):
async with send_stream:
await send_stream.send(1)
async def coroutine2(send_stream):
async with send_stream:
await asyncio.sleep(1)
await send_stream.send(2)
async def process_result(receive_stream, send_stream):
async with AsyncExitStack() as stack:
rs = await stack.enter_async_context(receive_stream)
ss = await stack.enter_async_context(send_stream)
res_rs = await rs.receive()
raise Exception
await ss.send(res_rs + 1)
async def process_results(receive_stream_2, receive_stream_3, *, task_status=TASK_STATUS_IGNORED):
task_status.started()
async with AsyncExitStack() as stack:
rs_2 = await stack.enter_async_context(receive_stream_2)
rs_3 = await stack.enter_async_context(receive_stream_3)
res_rs_2 = await rs_2.receive()
res_rs_3 = await rs_3.receive()
return res_rs_2 + res_rs_3
async def f():
async with create_task_group() as tg:
send_stream_1, receive_stream_1 = create_memory_object_stream(1)
tg.start_soon(coroutine1, send_stream_1)
send_stream_2, receive_stream_2 = create_memory_object_stream(1)
tg.start_soon(coroutine2, send_stream_2)
send_stream_3, receive_stream_3 = create_memory_object_stream(1)
tg.start_soon(process_result, receive_stream_1, send_stream_3)
# process_result will raise an Exception which will cancel all tasks in tg group
result = await process_results(receive_stream_2, receive_stream_3)
print(result)
asyncio.run(f())
Answered By - bushkov
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.