Issue
I have an asyncio.Queue producer and consumer running as 2 infinite loops. The producer periodically adds jobs to the queue and the consumer waits until a job is available, then processes it, then waits for the next job.
However, for some reason, my consumer is not getting called. I think this is because the producer task never yields to the consumer?
Any ideas on how to fix it so that both workers run in the background as described?
import asyncio
import concurrent.futures
import time
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
time.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
counter = 0
while True:
# Iterates at 2fps
time.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
if __name__ == "__main__":
asyncio.run(main())
Solution
Code works for me if I use await asyncio.sleep()
instead of time.sleep()
In async
taks don't run at the same time but it should switch task when it see await
- and it seems it needs await asyncio.sleep()
to have time to switch from producer to customer, and later to switch back from customer to producer.
You have await
in put()
and get()
but I can't explain why it doesn't switch tasks. Maybe it swithc but it switch too fast and it had not enough time to send data in queue.
import asyncio
import concurrent.futures
class Consumer:
def __init__(self, queue: asyncio.Queue):
self._duration_before_restart_ms = 3000
self._queue = queue
self._last_triggered_time_ms = 0
async def consumer_loop(self):
print('start consumer')
while True:
print("Consumer new iteration.")
detected_time_ms = await self._queue.get()
print("Consumer new event: ", detected_time_ms)
if (
detected_time_ms - self._duration_before_restart_ms
< self._last_triggered_time_ms
):
print("Consumer skipping event: ", detected_time_ms)
# Invalidate all items in queue that happened before
# _last_triggered_time_ms.
continue
print("Consumer processing event: ", detected_time_ms)
# Simulate authentication (an io bound operation) with sleep.
await asyncio.sleep(5)
self._last_triggered_time_ms = int(time.time() * 1000)
print(
"Consumer processed event: ",
detected_time_ms,
" at: ",
self._last_triggered_time_ms,
)
class Producer:
def __init__(self, queue: asyncio.Queue):
self._detection_time_period_ms = 3000
self._last_detection_time_ms = 0
self._queue = queue
async def producer_loop(self):
print('start producer')
counter = 0
while True:
# Iterates at 2fps
await asyncio.sleep(0.5)
print("Producer counter: ", counter)
current_time_ms = int(time.time() * 1000)
if (counter % 10 > 5) and (
self._last_detection_time_ms + self._detection_time_period_ms
< current_time_ms
):
print("Producer adding to queue: ", current_time_ms)
await self._queue.put(current_time_ms)
print("Producer added to queue: ", current_time_ms)
self._last_detection_time_ms = current_time_ms
counter += 1
async def main():
q = asyncio.Queue()
producer = Producer(q)
consumer = Consumer(q)
producer_task = asyncio.create_task(producer.producer_loop())
consumer_task = asyncio.create_task(consumer.consumer_loop())
# wait for end of task
await asyncio.gather(producer_task)
if __name__ == "__main__":
asyncio.run(main())
Answered By - furas
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.