Issue
I have a pool of tasks in a queue and would like to get done tasks from pull output result and put the new tasks. Is this correct way to do this? I periodically check if any task is completed. Creating task like this task = asyncio.create_task(...)
is asynchronous? (Could it block the loop if we create big number of more complex tasks)?
import asyncio
from collections import deque
from random import randint
async def show_result(q):
while True:
done_task = await q.get()
result = done_task.result()
print(result)
q.task_done()
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def tasks_worker(q, pool_size):
"""
extract done tasks and put new
"""
delay = get_task_param()
tasks = deque([])
while True:
await asyncio.sleep(1)
# append new tasks
tasks_to_add = pool_size - len(tasks)
print(f"tasks_to_add: {tasks_to_add}")
if tasks_to_add > 0:
# append new tasks
for _ in range(tasks_to_add):
n, d = await delay.__anext__()
print(f"add task: {n} with delay: {d}")
task = asyncio.create_task(some_work(n, d))
tasks.append(task)
for _ in range(len(tasks)):
task = tasks.popleft()
if task.done():
await q.put(task)
else:
tasks.append(task)
async def get_task_param():
task_n = 0
while True:
task_n += 1
await asyncio.sleep(0)
yield task_n, randint(5, 10)
async def run(pool_size):
q = asyncio.Queue()
task_1 = asyncio.create_task(show_result(q))
task_2 = asyncio.create_task(tasks_worker(q, pool_size))
done, pending = await asyncio.wait({task_1, task_2}, return_when=asyncio.ALL_COMPLETED)
print(done)
print(pending)
if __name__ == '__main__':
POOL_SIZE = 50
try:
asyncio.run(run(POOL_SIZE))
except Exception as ex:
print(ex)
Solution
I need to send post requests periodically to 1000 servers but some of them sometimes doesn't respond quickly (time_out=~10s). I'd like to get responses which have already done and add the new requests tasks.
You should probably use one queue for assigning work and another for emitting results. You don't need to add the workers dynamically, you can add the tasks dynamically and have a fixed-size pool of workers process them in parallel as they come. For example:
import asyncio
from random import randint
async def some_work(n, delay):
await asyncio.sleep(delay)
return f'task {n} with delay: {delay} completed'
async def worker(tasks, results):
# individual worker task (sometimes called consumer)
# - sequentially process tasks as they come into the queue
# and emit the results
while True:
n, d = await tasks.get()
result = await some_work(n, d)
await results.put(result)
async def assigner(tasks):
# come up with tasks dynamically and enqueue them for processing
task_n = 0
while True:
await asyncio.sleep(1)
task_n += 1
await tasks.put((task_n, randint(5, 10)))
async def displayer(q):
# show results of the tasks as they arrive
while True:
result = await q.get()
print(result)
async def main(pool_size):
tasks = asyncio.Queue(100)
results = asyncio.Queue(100)
workers = [asyncio.create_task(worker(tasks, results))
for _ in range(pool_size)]
await asyncio.gather(assigner(tasks), displayer(results), *workers)
if __name__ == '__main__':
POOL_SIZE = 50
asyncio.run(main(POOL_SIZE))
Queue bounds, arbitrarily chosen at 100 items, limit the maximum queue size and provide backpressure in case the assigner is consistently faster than the workers, or in case the workers are faster than the displayer. Without a bound, the queue would in that case just accumulate those items, which is effectively a memory leak. With the bound, it will make Queue.put
when the queue is full wait until there is a free slot before letting it proceed.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.