Issue
I am writing an Azure Function App using Python (v2 programming model) that calls an API multiple times to get some data and write the responses to a blob storage. Those operations happen asyncronously, and I am using aiohttp
to make the API calls. This is the code of my function app:
import azure.functions as func
from azure.storage.blob.aio import BlobServiceClient
import asyncio
import aiohttp
import datetime
import logging
import requests
import json
import sys
import os
import ssl
import certifi
_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING = os.environ["KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING"]
_KV_SECRET_API_KEY = os.environ["KV_SECRET_API_KEY"]
CRON_SCHEDULE = os.environ["CRON_SCHEDULE"]
STORAGE_CONTAINER_NAME = os.environ["STORAGE_CONTAINER_NAME"]
def timestamp_utc():
return datetime.datetime.now(datetime.timezone.utc)
def timestamp_for_logs():
return timestamp_utc().isoformat()
def timestamp_for_blob_path():
return timestamp_utc().strftime("%Y/%m/%d/T/%H/%M/%S_%Z")
async def makeAPIcall(api_endpoint, session, request_params):
logging.info(f'attempting to make an API call... with request_params: {request_params}')
try:
async with session.get(api_endpoint, params = request_params) as response:
response.raise_for_status()
response_content = await response.read()
logging.info("Response from the API call: "+str(response_content))
return response_content
except Exception as e:
logging.error(f'Error calling API:')
logging.error(str(sys.exc_info()))
return None
async def writeResponseToBlob(data, container_name, blob_name, connection_string):
logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name}')
try:
# Create the BlobServiceClient object using the connection string
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Get a ContainerClient object for the specified container
container_client = blob_service_client.get_container_client(container_name)
# Create a blob in the container with the specified name
blob_client = container_client.get_blob_client(blob_name)
# Write the API response to the blob
await blob_client.upload_blob(data, overwrite=True)
except:
logging.error("ERROR during blob upload : "+str(sys.exc_info()))
sys.exit(1)
async def chain(base_url, satellite_id, request_params, context, container_name, session) -> None:
# part A of the chain
api_endpoint = f"{base_url}/{satellite_id}"
response_content = await makeAPIcall(api_endpoint, session, request_params=request_params)
# part B of the chain
output_blob_name = timestamp_for_blob_path()+'_SatID_'+str(satellite_id)+'_InvocID_'+context.invocation_id+'.json'
await writeResponseToBlob(data=response_content,
container_name=container_name,
blob_name=output_blob_name,
connection_string=_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING)
logging.info(f"Finished entire chain for SatID {satellite_id}")
async def main(args, session):
base_url = args["base_url"]
satellite_ids = args["satellite_ids"]
request_params = args["request_params"]
context = args["context"]
container_name = args["container_name"]
# version A
# await asyncio.gather(*(chain(base_url, satellite_id, request_params, context, container_name, session) for satellite_id in satellite_ids))
# version B
for satellite_id in satellite_ids:
await chain(base_url, satellite_id, request_params, context, container_name, session)
#===============
# CORE FUNCTION:
#===============
app = func.FunctionApp()
@app.function_name(name="sendReqToApiWriteToBlobTimeTrigger")
@app.schedule(schedule=CRON_SCHEDULE, arg_name="mytimer", run_on_startup=False, use_monitor=True)
async def eventHandler(mytimer: func.TimerRequest, context: func.Context) -> None:
logging.info('ctx_func_name:\t'+context.function_name),
logging.info('ctx_func_dir:\t'+context.function_directory),
logging.info('ctx_invocation_id:\t'+context.invocation_id),
logging.info("Cron schedule UTC:\t"+CRON_SCHEDULE)
# satellite_ids = [25544, 25544]
satellite_ids = ["Amsterdam", "London"]
# base_url = f"https://api.wheretheiss.at/v1/satellites"
base_url = f"https://worldtimeapi.org/api/timezone/Europe"
request_params = {}
if mytimer.past_due:
logging.info('The timer is past due!')
args = {
"base_url": base_url,
"request_params": request_params,
"satellite_ids" : satellite_ids,
"context" : context,
"container_name" : STORAGE_CONTAINER_NAME,
}
ssl_context = ssl.create_default_context(cafile=certifi.where())
async with aiohttp.TCPConnector(ssl=ssl_context) as connector:
async with aiohttp.ClientSession(connector=connector) as session:
await main(args, session)
# ensure session is closed
await asyncio.sleep(1)
await session.close()
await asyncio.sleep(1)
await connector.close()
logging.info(f"Entire function execution finished")
Now the problem is that, at least when I simulate locally in VSCode, the function gives an error (or warning actually because the debugging does not stop it) that the client session and connector are unclosed:
[2023-12-12T22:41:00.524Z] Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001E9A492CCD0>
[2023-12-12T22:41:00.526Z] Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x000001E9A48D7520>, 982955.203)]']
connector: <aiohttp.connector.TCPConnector object at 0x000001E9A492C7F0>
I did follow the recommendation from aiohttp for graceful shotdown, as well as the recommendation to re-use the session, but strangely at every function execution, I make 2 requests to the API, and I seem to get an error at every request. What can be the issue, and how can I resolve the error?
EDIT 2024.01.15:
When I remove the await
in this function:
async def writeResponseToBlob(data, container_name, blob_name, connection_string):
...
# Write the API response to the blob
# await blob_client.upload_blob(data, overwrite=True)
blob_client.upload_blob(data, overwrite=True)
then the problem disappears, and instead I get a warning:
...\function_app.py: RuntimeWarning: coroutine 'BlobClient.upload_blob' was never awaited
But how to make sure that this function works with both await
on api call, and on blob upload?
Solution
turns out the problem was with the writeResponseToBlob
method and what was missing was async with
for the blob service client.
So this method now looks like this:
async def writeResponseToBlob(data, connection_string, container_name, blob_name):
logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name} ...')
try:
async with BlobServiceClient.from_connection_string(connection_string) as blob_service_client:
# Get a ContainerClient object for the specified container
container_client = blob_service_client.get_container_client(container_name)
# Write the API response to the blob
blob_client = await container_client.upload_blob(name=blob_name, data=data, overwrite=True)
return blob_client.blob_name
except:
logging.error("ERROR during blob upload : "+str(sys.exc_info()))
return None
and that fixed the issues with the unclosed aiohttp session/connector.
Answered By - average.everyman
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.