Issue
i need to make worker queue for aiohttp.
right now im using asyncio.gather, but it works in wrong way:
this is what i want to make:
the first one can be implemented with following code:
async def some_stuff(_):
pass
tasks = []
for i in data:
tasks.append(do_stuff(i))
asyncio.run(asyncio.gather(*tasks))
i need example of
Solution
As I understand it, you want to run exactly 5 tasks in parallel. When one of those tasks finishes, you want to start a new task immediately. For this purpose, asyncio.gather
doesn't work since it waits for all of its tasks to finish before proceeding.
I suggest something along these lines:
from collections import deque
import random
import asyncio
class RunSome:
def __init__(self, task_count=5):
self.task_count = task_count
self.running = set()
self.waiting = deque()
@property
def running_task_count(self):
return len(self.running)
def add_task(self, coro):
if len(self.running) >= self.task_count:
self.waiting.append(coro)
else:
self._start_task(coro)
def _start_task(self, coro):
self.running.add(coro)
asyncio.create_task(self._task(coro))
async def _task(self, coro):
try:
return await coro
finally:
self.running.remove(coro)
if self.waiting:
coro2 = self.waiting.popleft()
self._start_task(coro2)
async def main():
runner = RunSome()
async def rand_delay():
rnd = random.random() + 0.5
print("Task started", asyncio.current_task().get_name(),
runner.running_task_count)
await asyncio.sleep(rnd)
print("Task ended", asyncio.current_task().get_name(),
runner.running_task_count)
for _ in range(50):
runner.add_task(rand_delay())
# keep the program alive until all the tasks are done
while runner.running_task_count > 0:
await asyncio.sleep(0.1)
if __name__ == "__main__":
asyncio.run(main())
Output:
Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1
Coroutines are first class objects in Python. As such they can be put into lists and sets.
All of the task creation is handled by RunSome
. You pass it coroutines to be executed. It knows how many tasks are currently running, and it decides either to create a new task immediately or add the coroutine to a queue of pending tasks. When a task finishes, it grabs a new coroutine out of the queue, if one is available. The number of running tasks never exceeds the threshold count that was passed to the constructor (default is 5). The tasks are wrappers around the passed coroutines.
You will have to figure out what to do with the returned values, if any. The error handling here is rudimentary, but it does maintain the correct number of running tasks because of the try:finally: block.
Answered By - Paul Cornelius
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.