Issue
I have a large (1M) db resultset for which I want to call a REST API for each row.
The API can accept batch requests but I am not sure how to slice the rows
generator so that each task processes a list of rows, say 10. I rather not read all rows upfront and stick to a generator.
Accommodating my_function
to send a list in one http request is easy enough but what about asyncio.gather
? Maybe one of the itertools
can help.
See the generic pseudo-code below to illustrate:
async def main(rows):
async with aiohttp.ClientSession() as session:
tasks = [my_function(row, session) for row in rows]
return await asyncio.gather(*tasks)
rows = <generator of database rows>
results = asyncio.run(main(rows))
Note: the results
are small, basically a acknowledgement value for each row.
On a side note,
- is there a limit to the number of tasks
asyncio.gather()
can handle (efficiently) ? - currently
gather()
loads all requests/tasks in memory, consuming 50GB (!). How can the rows and tasks be read and passed on-the-go to reduce memory usage ? Is this whatasyncio.BoundedSemaphore()
is used for ? - The TCP connections limit is 500, as the REST web server can accept that much. If semaphore comes into play, what should the value be i.e. does it make sense to set semaphore > TCPconnections limit ?
aiohttp
and asyncio
are great but difficult to follow - I agree with this post:
asyncio keeps changing all the time, so be wary of old Stack Overflow answers. Many of them are not up to date with the current best practices
EDIT:
I just tried using a asyncio.BoundedSemaphore(100)
and memory usage is about the same (45GB) - not sure it has any benefit over connections limit
Solution
Semaphore-based solutios won't help with memory usage of a huge number of tasks because you'll still be creating all the coroutines and tasks in advance. All the coroutines will start executing, only for most of them to be immediately suspended until the semaphore lets them proceed.
Instead, you can create a fixed number of workers and feed them database rows through a queue:
async def worker(queue, session, results):
while True:
row = await queue.get()
results.append(await my_function(row, session))
# Mark the item as processed, allowing queue.join() to keep
# track of remaining work and know when everything is done.
queue.task_done()
async def main(rows):
N_WORKERS = 50
queue = asyncio.Queue(N_WORKERS)
results = []
async with aiohttp.ClientSession() as session:
# create 50 workers and feed them tasks
workers = [asyncio.create_task(worker(queue, session, results))
for _ in range(N_WORKERS)]
# Feed the database rows to the workers. The fixed-capacity of the
# queue ensures that we never hold all rows in the memory at the
# same time. (When the queue reaches full capacity, this will block
# until a worker dequeues an item.)
async for row in rows:
await queue.put(row)
# Wait for all enqueued items to be processed.
await queue.join()
# The workers are now idly waiting for the next queue item and we
# no longer need them.
for worker in workers:
worker.cancel()
return results
Note that rows
should be an async generator. If it's an ordinary generator, it will probably block the event loop and become the bottleneck. If your database doesn't support an async interface, see this answer for a way to convert a blocking generator to async by running it in a dedicated thread.
To batch items, you can build an intermediate list and dispatch it. Or you can use the excellent aiostream
library which comes with the chunks
operator that does just that:
async with aiostream.stream.chunks(rows, 10).stream() as chunks:
async for batch in chunks:
await queue.put(batch) # enqueue a batch of 10 rows
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.