Issue
I have been looking for an equivalent in Python to JavaScript's await Promise.all()
functionality, which led me to asyncio.gather()
. After having read a few explanations and followed a few examples, I haven't managed to get anything working asynchronously.
The task is straightforward: extract values remotely from multiple files from S3, then collect the results when all are finished. I have done this in JS and it takes little over a second to read from 12 files.
The code is written for FastAPI
, and a simplified form of it is below. The reason I know that this is not working asynchronously is that the more files in s3 it reads from, the longer this takes.
I have seen documentation for this kind of thing, but as it is not working for me I am not sure if I am doing something wrong or this just wont work in my use case. I am worried that streaming from a remote file using rasterio just doesnt work in this case.
How can I change the code below so that it calls the functions concurrently and collects all the responses below when they are all completed? I haven't used this feature in python before, so just need a little more clarification.
async def read_from_file(s3_path):
# The important thing to note here is that it
# is streaming from a file in s3 given an s3 path
with rasterio.open(s3_path) as src:
values = src.read(1, window=Window(1, 2, 1, 1))
return values[0][0]
@app.get("/get-all")
async def get_all():
start_time = datetime.datetime.now()
# example paths
s3_paths = [
"s3:file-1",
"s3:file-2",
"s3:file-3",
"s3:file-4",
"s3:file-5",
"s3:file-6",
]
values = await asyncio.gather(
read_from_file(s3_paths[0]),
read_from_file(s3_paths[1]),
read_from_file(s3_paths[2]),
read_from_file(s3_paths[3]),
read_from_file(s3_paths[4]),
read_from_file(s3_paths[5]),
)
end_time = datetime.datetime.now()
logger.info(f"duration: {end_time-start_time}")
Solution
Python asyncio has a mechanism to run the non-async code, like the calls to the rasterio lib, in other threads, so that the async loop is not blocked.
Try this code:
import asyncio
from functools import partial
async def read_from_file(s3_path):
# The important thing to note here is that it
# is streaming from a file in s3 given an s3 path
loop = asyncio.get_running_loop()
try:
src = await loop.run_in_executor(None, rasterio.open, s3_path)
values = await loop.run_in_executor(None, partial(src.read, 1, window=Window(1, 2, 1, 1))
finally:
src.close() # might be interesting to paralelize this as well
return values[0][0]
If it needs to be faster, you can create a custom executor: the default one will only use n_cpu threads, I think, and might slow things down when the bottleneck is the network latency - some point around 20 threads might be interesting. (This executor should be either a global resource, or passed as parameter to your read_from_file
, and is a plain concurrent.futures.ThreadpoolPoolExecutor
(https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor)
As for the run_in_executor
, check https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.