Issue
Following this question: How to read parquet files from Azure Blobs into Pandas DataFrame? I wanted to add concurrency by donwloading multiple files "in parallel" using asyncio.
I'm stuck on how I can use the TaskGroup feature of Python 3.11 to start my task and wait for it to be completed. How can I retrive a list of the downloaded streams?
My code so far:
import logging
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
from itertools import product
class BlobStorageAsync:
def __init__(self, connection_string, container_name, logging_enable):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
# This client will log detailed information about its HTTP sessions, at DEBUGlevel
logging_enable=logging_enable
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
await asyncio.gather(*tasks)
Now I am stuck because I am not able to create and run the tasks, and wait for the results, i.e. this does not work:
async def main():
blobs_list = ...
connection_string = ...
container_name = ...
BSA = BlobStorageAsync(connection_string, container_name, logging_enable)
result = asyncio.run(BSA.download_blobs_async(blobs_list))
# process the result: read the first stream and print it, for instance
df = pd.read_parquet(result[0])
print(df)
if __name__ == '__main__':
try:
main()
except Exception as ex:
print(ex)
Solution
How to read parquet files from Azure Blobs into Pandas DataFrame concurrently with asyncio?
You can use the below code to read parquet files from Azure blobs into Pandas DataFrame concurrently with asyncio.
Code:
import asyncio
import pandas as pd
from azure.storage.blob.aio import ContainerClient
from io import BytesIO
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def list_blobs_in_container_async(self, name_starts_with):
blobs_list = []
async for blob in self.container_client.list_blobs(name_starts_with=name_starts_with):
blobs_list.append(blob)
return blobs_list
async def download_blob_async(self, blob_name):
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall() # data returned as bytes-like object
# return data as bytes (in-memory binary stream)
return BytesIO(data)
async def download_blobs_async(self, blobs_list):
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
# return the list of downloaded streams
return results
async def main():
blobs_list=["pqt_file4","pqt_file5"]
connection_string =""
container_name = "test1"
BSA = BlobStorageAsync(connection_string, container_name)
try:
results = await BSA.download_blobs_async(blobs_list)
for stream in results:
df = pd.read_parquet(stream, engine="pyarrow")
print(df)
finally:
await BSA.container_client.close()
if __name__ == '__main__':
try:
asyncio.run(main())
except Exception as ex:
print(ex)
Output:
id name
0 1 Kala
1 2 Arulmozhi
2 6 Rajaraja
id name
0 1 Aditha
1 2 Arulmozhi
2 3 Kundavai
3 6 Rajaraja
Answered By - Venkatesan
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.