Issue
I have written code for async pool below. in __aexit__
i'm cancelling the _worker tasks after the tasks get finished. But when i run the code, the worker tasks are not getting cancelled and the code is running forever. This what the task looks like: <Task pending coro=<AsyncPool._worker() running at \async_pool.py:17> wait_for=<Future cancelled>>
. The asyncio.wait_for
is getting cancelled but not the worker tasks.
class AsyncPool:
def __init__(self,coroutine,no_of_workers,timeout):
self._loop = asyncio.get_event_loop()
self._queue = asyncio.Queue()
self._no_of_workers = no_of_workers
self._coroutine = coroutine
self._timeout = timeout
self._workers = None
async def _worker(self):
while True:
try:
ret = False
queue_item = await self._queue.get()
ret = True
result = await asyncio.wait_for(self._coroutine(queue_item), timeout = self._timeout,loop= self._loop)
except Exception as e:
print(e)
finally:
if ret:
self._queue.task_done()
async def push_to_queue(self,item):
self._queue.put_nowait(item)
async def __aenter__(self):
assert self._workers == None
self._workers = [asyncio.create_task(self._worker()) for _ in range(self._no_of_workers)]
return self
async def __aexit__(self,type,value,traceback):
await self._queue.join()
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, loop=self._loop, return_exceptions =True)
To use the Asyncpool:
async def something(item):
print("got", item)
await asyncio.sleep(item)
async def main():
async with AsyncPool(something, 5, 2) as pool:
for i in range(10):
await pool.push_to_queue(i)
asyncio.run(main())
Solution
The problem is that your except Exception
exception clause also catches cancellation, and ignores it. To add to the confusion, print(e)
just prints an empty line in case of a CancelledError
, which is where the empty lines in the output come from. (Changing it to print(type(e))
shows what's going on.)
To correct the issue, change except Exception
to something more specific, like except asyncio.TimeoutError
. This change is not needed in Python 3.8 where asyncio.CancelledError
no longer derives from Exception
, but from BaseException
, so except Exception
doesn't catch it.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.