Issue
I use code from this answer, but get asyncio.exceptions.CancelledError
when queue is empty. In real project, I add tasks to queue from consumers, that's why I use while True
statement
I compress that code to make debug more easily:
import asyncio
import traceback
async def consumer(queue: asyncio.Queue):
try:
while True:
number = await queue.get() # here is exception
queue.task_done()
print(f'consumed {number}')
except BaseException:
traceback.print_exc()
async def main():
queue = asyncio.Queue()
for i in range(3):
await queue.put(i)
consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
And error:
consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
number = await queue.get()
File "/usr/local/Cellar/[email protected]/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/queues.py", line 166, in get
await getter
asyncio.exceptions.CancelledError
By the way, the documentation of queue.get()
says that If queue is empty, wait until an item is available
. What is the real reason of this error? Maybe there is a better solution?
Solution
The reason is because you cancel the task:
Request the Task to be cancelled.
This arranges for a CancelledError exception to be thrown into the wrapped coroutine on the next cycle of the event loop.
You have a few options to handle this:
1. Use asyncio.gather
If return_exceptions is True, exceptions are treated the same as successful results, and aggregated in the result list.
await queue.join()
for c in consumers:
c.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
2. Catch the exception in the consumer
async def consumer(q):
while True:
num = await q.get()
try:
print(f"Working on: {num}")
except asyncio.CancelledError:
print(f"Exiting...")
break
finally:
q.task_done()
3. Suppress the exception
form contextlib import suppress
async def consumer(q):
with suppress(asyncio.CancelledError):
while True:
num = await q.get()
print(f"Working on: {num}")
q.task_done()
Answered By - HTF
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.