Issue
I'm trying first time the asyncio botocore implementation. However, I'm quite sure I'm not getting the expected asynchronicity, likely due to my own lack of experience with it. :)
The goal of the bellow method is to duplicate all files in a bucket while suffixing keys with UUIDs.
async def async_duplicate_files_in_bucket(bucket,
how_many_times=1):
session = get_session()
async with session.create_client('s3') as s3_client:
s3_client: S3Client
paginator = s3_client.get_paginator('list_objects')
async for result in paginator.paginate(Bucket=bucket):
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {
'Bucket': bucket,
'Key': original_file_name
}
await s3_client.copy_object(Bucket=bucket,
CopySource=copy_source,
Key=new_file_name)
print("-", end="")
When looking at the terminal:
- I see
Duplicating file: file_1
not moving to the next file until it finishes duplicatingfile_1
. Just then it I get a new log line withDuplicating file: file_2
. print('-', end="")
is not printing
Given my little experience with asyncio
, I hypothesize that the for _ in range(how_many_times)
is blocking the event loop.
Appreciate directions to better understand how to make use of asyncio
in Python as well as to achieve the goal of the function.
Thanks.
Solution
You're not blocking the event loop. What you are doing is not making proper use of asyncio
concurrency!
Using the asyncio.gather()
helper allows you to run a number of async operations in parallel.
In this example, we buffer all desired copy_object
operations in a big list, then run them all in parallel in a single huge batch:
async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
session = get_session()
s3_client: S3Client
async with session.create_client("s3") as s3_client:
awaitables: list[Awaitable[Any]] = []
paginator = s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket):
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {"Bucket": bucket, "Key": original_file_name}
awaitable = s3_client.copy_object(
Bucket=bucket,
CopySource=copy_source,
Key=new_file_name,
)
awaitables.append(awaitable)
print("-", end="")
await asyncio.gather(*awaitables)
Depending on the number of results in your paginator, you might want to run several batches, one per paginator page. This would use less memory, as you're only buffering one one page of copy_object
operations at a time. It also limits the concurrency a bit, which might perform better (or worse!):
async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
session = get_session()
s3_client: S3Client
async with session.create_client("s3") as s3_client:
paginator = s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket):
awaitables: list[Awaitable[Any]] = []
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {"Bucket": bucket, "Key": original_file_name}
awaitable = s3_client.copy_object(
Bucket=bucket,
CopySource=copy_source,
Key=new_file_name,
)
awaitables.append(awaitable)
print("-", end="")
await asyncio.gather(*awaitables)
You can make even smaller batches, one per how_many_times
group too. This will have an even lower memory footprint (and concurrency):
async def async_duplicate_files_in_bucket(bucket, how_many_times=1):
session = get_session()
s3_client: S3Client
async with session.create_client("s3") as s3_client:
paginator = s3_client.get_paginator("list_objects")
async for result in paginator.paginate(Bucket=bucket):
for file in result["Contents"]:
# it already includes the prefix in the same
original_file_name: str = file["Key"]
logger.debug(f"Duplicating file: {original_file_name} ")
awaitables: list[Awaitable[Any]] = []
for _ in range(how_many_times):
new_file_name = original_file_name + "_" + uuid.uuid4().__str__()
copy_source = {"Bucket": bucket, "Key": original_file_name}
awaitable = s3_client.copy_object(
Bucket=bucket,
CopySource=copy_source,
Key=new_file_name,
)
awaitables.append(awaitable)
print("-", end="")
await asyncio.gather(*awaitables)
I'd suggest benchmarking all approaches and see what works best for you. Even the last example, with the lowest concurrency, should perform better than your original code! 💪
See also: asyncio.gather() docs
Answered By - Dave
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.