Issue
A mock produce-consume flow based on the asyncio.Queue
:
import asyncio
async def produce(q: asyncio.Queue, task):
asyncio.create_task(q.put(task))
print(f'Produced {task}')
async def consume(q: asyncio.Queue):
while True:
task = await q.get()
if task > 2:
print(f'Cannot consume {task}')
raise ValueError(f'{task} too big')
print(f'Consumed {task}')
q.task_done()
async def main():
queue = asyncio.Queue()
consumers = [asyncio.create_task(consume(queue)) for _ in range(2)]
for i in range(10):
await asyncio.create_task(produce(queue, i))
await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
asyncio.run(main())
The output is:
Produced 0
Consumed 0
Produced 1
Consumed 1
Produced 2
Consumed 2
Produced 3
Cannot consume 3
Produced 4
Cannot consume 4
Produced 5
Produced 6
Produced 7
Produced 8
Produced 9
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<consume() done, defined at test.py:9> exception=ValueError('3 too big')>
Traceback (most recent call last):
File "test.py", line 14, in consume
raise ValueError(f'{task} too big')
ValueError: 3 too big
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<consume() done, defined at test.py:9> exception=ValueError('4 too big')>
Traceback (most recent call last):
File "test.py", line 14, in consume
raise ValueError(f'{task} too big')
ValueError: 4 too big
Is there a way to notify the producer to stop producing after exception raised from consumer(s)?
The code above uses multiple producers. It is also acceptable if the "notification" mechanism can only work at the single producer mode.
Solution
Inspired by user4815162342's suggestion
setting a global Boolean variable in case of consumer exception, and check it in the producer
import asyncio
stop = False
async def single_produce(q: asyncio.Queue):
global stop
for task in range(10):
await asyncio.sleep(0.001)
if stop:
break
await q.put(task)
print(f'Produced {task}')
async def multi_produce(q: asyncio.Queue, task):
await asyncio.sleep(0.001)
await q.put(task)
print(f'Produced {task}')
async def consume(q: asyncio.Queue):
global stop
while True:
task = await q.get()
if task > 2:
stop = True
print(f'Cannot consume {task}')
raise ValueError(f'{task} too big')
print(f'Consumed {task}')
q.task_done()
async def main(mode):
global stop
queue = asyncio.Queue(1)
consumers = [asyncio.create_task(consume(queue)) for _ in range(2)]
if mode == 'single':
print('single producer')
await asyncio.create_task(single_produce(queue))
elif mode == 'multiple':
print('multiple producers')
for i in range(10):
if stop:
break
await asyncio.create_task(multi_produce(queue, i))
await asyncio.wait([queue.join(), *consumers],
return_when=asyncio.FIRST_COMPLETED)
asyncio.run(main('single'))
# asyncio.run(main('multiple'))
Answered By - funkid
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.