Issue
I am trying to implement, by using asyncio, a Producer-Consumer model that allows a producer to put n
items in a queue, and then create n
consumers that process the n
items in a concurrent way.
This is my attempt so far:
import asyncio
import uuid
import time
import random
async def produce(q, interval=1):
while True:
n = random.randint(0, 20)
for _ in range(n):
await q.put((uuid.uuid4().hex, time.perf_counter()))
await asyncio.sleep(interval)
async def consume(item, t):
print(f"Managing {item}...")
await asyncio.sleep(1)
print(f"Item managed after {time.time() - t:.2f} s")
async def manage_events(q):
while True:
item, t = await q.get()
print(f"Consuming {item}. In queue: {time.perf_counter() - t}")
task = asyncio.create_task(consume(item, time.time()), name=str(item))
print("After manage_item")
async def main():
q = asyncio.Queue()
await asyncio.gather(produce(q, 2), manage_events(q))
if __name__ == "__main__":
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds")
Every interval
seconds, 0 to 20 items are created and put in the Queue q
. In the function manage_events
, every time an item is extracted from the queue, an async task is created in order to consume the item.
This works very well, but my problem is that, when seeing the concurrency diagram, the tasks that consumed the first of the items are not dying after that consumption is complete, so the tasks are running forever in a zombie state doing nothing, and I can't figure out why. The expected and desired behaviour is that a task dies after function consume
is finished.
---- EDIT ---
This is the code after applying the suggests given by user4815162342
import asyncio
import uuid
async def consume(item):
print(f"Consuming {item}")
await asyncio.sleep(1)
print(f"{item} consumed")
async def hello(n):
print("Hello")
await asyncio.sleep(2)
print(f"Finishing hello {n}")
async def check_events(interval=1):
while True:
item = uuid.uuid4().hex
print(f"Producing item")
asyncio.create_task(consume(item))
print('Tasks count: ', len(asyncio.all_tasks()))
await asyncio.sleep(interval)
async def main():
await check_events()
if __name__ == "__main__":
asyncio.run(main())
It turns out that the tasks were effectively dying at the end of every consume() execution, but the PyCharm graph visualizer does not reflect that behaviour.
Solution
Your queue architecture is non-standard and doesn't achieve the purpose of a queue. Normally a queue is associated with a fixed number of producers and a fixed number of consumers. Each consumer fetches queue items in a loop, which is either infinite or terminated by cancellation or a sentinel value. Consumers normally process items sequentially, meaning they use await
rather than create_task
. This ensures that you have parallelism that matches the number of workers, and that items are processed in a FIFO manner.
Your queue has a single consumer which immediately spawns a processing async function (which you call consumer), and doesn't wait for it. In this setup you don't need a queue in the first place, you could just call asyncio.create_task(consume(...))
directly in the producer. Also, if you ever decide to use a bounded queue to ensure backpressure, it will be ineffective because create_task()
simply pushes the work to asyncio's internal queue which is unbounded.
This works very well, but my problem is that, when seeing the concurrency diagram, the tasks that consumed the first of the items are not dying after that consumption is complete, so the tasks are running forever in a zombie state doing nothing,
Unlike POSIX processes, asyncio doesn't have the concept of a "zombie state" for its tasks, so you are probably referring to the manage_events
coroutine which fetches items from the queue in an infinite loop - much like what a typical consumer should do. To terminate it cleanly once all items are done, you can use a sentinel value, such as None
. That means that at the end of producer
you can add await q.put((None, None))
and in manage_events
you can put if item is None: break
.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.