Issue
I have the following script, but it work incorecctly. I want to first function always repeat and second function should start if first function return result1, result2. I want to add one more function which should start 40 seconds after second function finished job with result1 from first function. Third function should do something with result2. I want to work it asynchronously using asyncio library because i need the first function to not block for 40 seconds while third function sleep. I tried to use chatgpt, but this tool recommend me wrong things. How to do that?
import asyncio
import logging
async def first_function(queue1, queue2):
while True:
# do something
result1 = "some result"
result2 = {"key": "value"}
await queue1.put(result1)
await queue2.put(result2)
await asyncio.sleep(1) # wait for 1 second
async def second_function(queue1, queue2, queue3):
while True:
result1 = await queue1.get()
result2 = await queue2.get()
# do something with the results
logging.info(f"Second function received: {result1}, {result2}")
await queue3.put(result1)
await asyncio.sleep(1) # wait for 1 second
async def third_function(queue):
while True:
result = await queue.get()
await asyncio.sleep(40) # wait for 40 seconds
# do something with the result
logging.info(f"Third function received: {result}")
async def main():
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
task1 = asyncio.create_task(first_function(queue1, queue2))
task2 = asyncio.create_task(second_function(queue1, queue2, queue3))
task3 = asyncio.create_task(third_function(queue3))
await asyncio.gather(task1, task2, task3)
I tried everything i googled and it always block my code for 40 seconds when reach third function
Solution
There is multiple issues and anti-patterns:
- there is no need to use
asyncio.create_task()
since AsyncIO will do that automatically for you when the objects are coroutines, - you use unbounded queues, which means that the memory consumption or your program could grow unlimited,
third_function
takes 40 seconds whilefirst_function
produces elements every 12 seconds which means that the former won't have time to process elements fast enough,- if
result1
andresult_2
should be produces at the same time and consumed at the same time, use a single queue to put both elements.
You could use something like this:
import asyncio
async def first_function(queue_in: asyncio.Queue):
while True:
result1 = "some result"
result2 = {"key": "value"}
await queue_in.put((result1, result2))
async def second_function(queue_in: asyncio.Queue, queue_out: asyncio.Queue):
while True:
result1, result2 = await queue_in.get()
print(f"Second function received: {result1}, {result2}")
await queue_out.put(result1)
async def third_function(queue_out: asyncio.Queue):
while True:
result = await queue_out.get()
await asyncio.sleep(4)
print(f"Third function received: {result}")
async def main():
queue_in = asyncio.Queue(maxsize=1)
queue_out = asyncio.Queue(maxsize=1)
task1 = first_function(queue_in)
task2 = second_function(queue_in, queue_out)
task3 = third_function(queue_out)
await asyncio.gather(task1, task2, task3)
if __name__ == "__main__":
asyncio.run(main())
By specifying maxsize=1
you limit the production of first_function
because now third_function
defines the consumption rate: first_function
(and second_function
to) is only allowed to produce a new element when third_function
finished processing the last one.
If you cannot control the rate at which first_function
produces elements, you should implement a back-pressure strategy where for instance you drop elements if you cannot process them fast enough. You can use a higher value for max size
if you want to allow some sort of waiting
before elements are dropped.
Answered By - Louis Lac
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.