Issue
I have a lambda that consistently takes longer than 1 minute to finish executing. This is a problem with the default LambdaInvokeFunctionOperator
, since by default, it's hook creates a Boto3 connection with a default read_timeout of 60s (meaning that after 60s, if the Lambda has not returned any result, the connection will abort with a Read Timeout error).
I solved the problem by creating a custom operator and hook where I was able to change the connection. But holy moly, surely there is a better way! I am an experienced Software Engineer but I hardly have experience with Python. So I fear I over engineered this and there is a better way.
Here is what I did:
from __future__ import annotations
from typing import Any
from botocore.config import Config
from airflow.providers.amazon.aws.utils import trim_none_values
from cached_property import cached_property
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator as BaseLambdaInvokeFunctionOperator
class LambdaInvokeFunctionOperator(BaseLambdaInvokeFunctionOperator):
@cached_property
def hook(self) -> LambdaHook:
return LambdaHook(aws_conn_id=self.aws_conn_id)
class LambdaHook(AwsBaseHook):
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "lambda"
super().__init__(*args, **kwargs)
@cached_property
def conn(self):
# All this effort to basically be able to pass this config in
config_dict = {"connect_timeout": 5, "read_timeout": 900, "tcp_keepalive": True}
config = Config(**config_dict)
return self.get_client_type(self.region_name, config)
def invoke_lambda(
self,
*,
function_name: str,
invocation_type: str | None = None,
log_type: str | None = None,
client_context: str | None = None,
payload: str | None = None,
qualifier: str | None = None,
):
invoke_args = {
"FunctionName": function_name,
"InvocationType": invocation_type,
"LogType": log_type,
"ClientContext": client_context,
"Payload": payload,
"Qualifier": qualifier,
}
return self.conn.invoke(**trim_none_values(invoke_args))
The above code works (tested in Airflow 2.5.1) and my Dags do not suffer from a timeout. Is there a better way?
NOTE: For those that say I should just do this asynchronously, you have a point. However, that's a different question ;)
Solution
Wouldn't it be way easier to define the read_timeout in the aws connection? Pass in:
{"connect_timeout": 5, "read_timeout": 900, "tcp_keepalive": True}
to the extras in an aws connection and then use that connection when calling the LambdaInvokeFunctionOperator
Answered By - MM1883
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.