Issue
I have the following SimpleHttpOperator inside my dag:
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/', # Some Api already configured and checked
method="GET",
response_filter=lambda response: json.loads(response.text),
log_response=True,
do_xcom_push=True,
)
followed by a PythonOperator:
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
The function:
def _processing_user(ti):
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError(f'User is empty')
**More function code**
When I execute airflow tasks test myDag extracting_user 2022-03-02
followed by airflow tasks test myDag processing_user 2022-03-02
I get the value error with users variable equals to an empty array.
I have tested extracting_user task alone and it gets the desired data from the API. I have already queried with sqlite xcom and it is an empty table.
I am using airflow 2.3.0
Solution
I solved the problem changing to the version 2.0.0 of airflow. It seems that the SimpleHttpOperator doesn't store the request response on the xcom table on 2.3.0 version
Answered By - Fran Arenas
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.