Issue
I am using S3ToRedshiftOperator to load csv file into Redshift database. Kindly help to pass xcom variable to S3ToRedshiftOperator. How can we push xcom without using custom function?
Error:
NameError: name 'ti' is not defined
Using below code:
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftOperator
def export_db_fn(**kwargs):
session = settings.Session()
outkey = S3_KEY.format(MWAA_ENV_NAME, name[6:])
print(outkey)
s3_client.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
ti.xcom_push(key='FILE_PATH', value=outkey)
return "OK"
with DAG(dag_id="export_info", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
export_info = PythonOperator(
task_id="export_info",
python_callable=export_db_fn,
provide_context=True
)
transfer_s3_to_redshift = S3ToRedshiftOperator(
s3_bucket=S3_BUCKET,
s3_key="{{ti.xcom_pull(key='FILE_PATH', task_ids='export_info')}}",
schema="dw_stage",
table=REDSHIFT_TABLE,
copy_options=['csv',"IGNOREHEADER 1"],
redshift_conn_id='redshift',
autocommit=True,
task_id='transfer_s3_to_redshift',
)
start >> export_info >> transfer_s3_to_redshift >> end
Solution
The error message you showed telles the problem.
ti
is not defined.
When you set provide_context=True
, Airflow makes Context available for you in the python callable. One of the attributes is ti
(see source code). So you need to extract it from kwargs or set it in the function signature.
Your code should be:
def export_db_fn(**kwargs):
...
ti = kwargs['ti']
ti.xcom_push(key='FILE_PATH', value=outkey)
...
Or if you want to use ti
directly then:
def export_db_fn(ti, **kwargs):
...
ti.xcom_push(key='FILE_PATH', value=outkey)
...
Note: In Airflow >= 2.0 there is no need to set provide_context=True
Answered By - Elad
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.