Issue
I am trying to limit the number of simultaneous async functions running using a semaphore, but I cannot get it to work. My code boils down to this:
import asyncio
async def send(i):
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def helper():
async with asyncio.Semaphore(value=5):
await asyncio.gather(*[
send(1),
send(2),
send(3),
send(4),
send(5),
send(6),
send(7),
send(8),
send(9),
send(10),
])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(helper())
loop.close()
The output is:
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
starting 9
starting 10
ending 1
ending 2
ending 3
ending 4
ending 5
ending 6
ending 7
ending 8
ending 9
ending 10
I hope and expect that only 5 will run at time, however all 10 start and stop at the same time. What am I doing wrong?
Solution
Please find the working example below, feel free to ask questions:
import asyncio
async def send(i: int, semaphore: asyncio.Semaphore):
# to demonstrate that all tasks start nearly together
print(f"Hello: {i}")
# only two tasks can run code inside the block below simultaneously
async with semaphore:
print(f"starting {i}")
await asyncio.sleep(4)
print(f"ending {i}")
async def async_main():
s = asyncio.Semaphore(value=2)
await asyncio.gather(*[send(i, semaphore=s) for i in range(1, 11)])
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
loop.close()
VERSION FROM 18.08.2023:
I see that many people are interested in how to use asyncio.Semaphore
and I decided to extend my answer.
The new version illustrates how to use procuder-consumers pattern with asyncio.Semaphore
. If you want something very simple, you are fine to use code from the original answer above. If you want more robust solution, which allows to limit number of asyncio.Tasks
to work with, you can use this more robust solution.
import asyncio
from typing import List
CONSUMERS_NUMBER = 10 # workers/consumer number
TASKS_NUMBER = 20 # number of tasks to do
async def producer(tasks_to_do: List[int], q: asyncio.Queue) -> None:
print(f"Producer started working!")
for task in tasks_to_do:
await q.put(task) # put tasks to Queue
# poison pill technique
for _ in range(CONSUMERS_NUMBER):
await q.put(None) # put poison pill to all worker/consumers
print("Producer finished working!")
async def consumer(
consumer_name: str,
q: asyncio.Queue,
semaphore: asyncio.Semaphore,
) -> None:
print(f"{consumer_name} started working!")
while True:
task = await q.get()
if task is None: # stop if poison pill was received
break
print(f"{consumer_name} took {task} from queue!")
# number of tasks which could be processed simultaneously
# is limited by semaphore
async with semaphore:
print(f"{consumer_name} started working with {task}!")
await asyncio.sleep(4)
print(f"{consumer_name} finished working with {task}!")
print(f"{consumer_name} finished working!")
async def async_main() -> None:
"""Main entrypoint of async app."""
tasks = [f"TheTask#{i + 1}" for i in range(TASKS_NUMBER)]
q = asyncio.Queue(maxsize=2)
s = asyncio.Semaphore(value=2)
consumers = [
consumer(
consumer_name=f"Consumer#{i + 1}",
q=q,
semaphore=s,
) for i in range(CONSUMERS_NUMBER)
]
await asyncio.gather(producer(tasks_to_do=tasks, q=q), *consumers)
if __name__ == "__main__":
asyncio.run(async_main())
Answered By - Artiom Kozyrev
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.