Issue
I am trying read files asynchronously, but I am running into an error saying “await only allowed in async function.” From my understanding, this error occurs when the await
keyword is used inside of a function that was not marked as async
. However, as shown in the code below, I mark the read_blob
function as async
I don’t have any issues running this in .ipynb Jupiter notebook (in which I run the code cell by cell sequentially), but when I try to run it in VSCode in a .py file, it throws an error.
Here is my code:
from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO
class AsyncContainerClient(ContainerClient):
async def read_blob(self,
blob_name: str,
add_blob_name_col=False,
add_blob_date_col=False,
preprocessing_func=None,
zip_regex=r'.+\.gz$',
csv_regex='.+\.csv(\.gz)?$',
parquet_regex='.+\.parquet$',
regex_string=None,
**kwargs):
assert isinstance(blob_name, str), f'{blob_name} is not a string'
try:
blob = (await self.download_blob(blob_name))
with BytesIO() as byte_stream:
await blob.readinto(byte_stream)
byte_stream.seek(0)
return pd.read_parquet(byte_stream, engine='pyarrow')
except ResourceNotFoundError:
return 0
blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)
test_dirs = ["models1/model.parquet", "models2/model.parquet", "models3/model.parquet"]
res = await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))
Solution
The entry point to an async program should be asyncio.run
, you should wrap your code in an async method then call it
from azure.storage.blob.aio import ContainerClient
import asyncio
from azure.core.exceptions import ResourceNotFoundError
from io import StringIO, BytesIO
class AsyncContainerClient(ContainerClient):
async def read_blob(self,
blob_name: str,
add_blob_name_col=False,
add_blob_date_col=False,
preprocessing_func=None,
zip_regex=r'.+\.gz$',
csv_regex='.+\.csv(\.gz)?$',
parquet_regex='.+\.parquet$',
regex_string=None,
**kwargs):
assert isinstance(blob_name, str), f'{blob_name} is not a string'
try:
blob = (await self.download_blob(blob_name))
with BytesIO() as byte_stream:
await blob.readinto(byte_stream)
byte_stream.seek(0)
return pd.read_parquet(byte_stream, engine='pyarrow')
except ResourceNotFoundError:
return 0
async def main():
blob_sas_url = "https://proan.blob"
acc = AsyncContainerClient.from_container_url(blob_sas_url)
test_dirs = ["models1/model.parquet", "models2/model.parquet",
"models3/model.parquet"]
return await asyncio.gather(*(acc.read_blob(f) for f in test_dirs))
asyncio.run(main())
Answered By - Ron Serruya
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.