Issue
I need to run the following graph on Apache Airflow, but I'm having issue with the parallel steps because they have multiple substeps
------> task_1a --> tast_1b --- ------> task_4a --> tast_4b ---
/ \ / \
Start ---------> task_2a --> tast_2b --------> Step ---------> task_5a --> tast_5b --------> End
\ / \ /
------> task_3a --> tast_3b --- ------> task_6a --> tast_6b ---
I'm trying to generate the task_1a >> task_1b
blocks in a separate function, but the code would translate roughly to this:
start >>
[tast_1a >> task_1b, tast_2a >> task_2b, tast_3a >> task_3b] >> \
step >>
[tast_4a >> task_4b, tast_5a >> task_5b, tast_6a >> task_6b] >> \
end
The "step" is a DummyOperator to allow two parallel groups to execute in order.
What happens is that I get this (I'm just showing the 1st element of each parallel group for simplicity)
task_1a task_4a
\ \
Start ---> task_1b ---> Step ---> task_4b ---> End
Because when I do task_1a >> task_1b
, the return value of this operation is task_1b and start connects to it directly while the task_1a is orphaned. If I try to run this operation beforehand and pass task_1a to the list, then "step" is triggered at the same time as task_1b.
How do I solve this?
Solution
SubDag is a deprecated feature but even so it doesn't really allow parallelism as it's limited to run sequentially. You should use Task Groups.
Here is an example code for the structure you are after:
from datetime import datetime
from airflow.decorators import task, task_group
from airflow.models.dag import DAG
@task
def task_start():
"""Dummy Task which is First Task of Dag"""
return '[Task_start]'
@task
def task_1(value: int) -> str:
"""Dummy Task1"""
return f'[ Task1 {value} ]'
@task
def task_2(value: str) -> str:
"""Dummy Task2"""
return f'[ Task2 {value} ]'
@task
def task_3(value: str) -> str:
"""Dummy Task3"""
return f'[ Task3 {value} ]'
@task
def task_4(value: str) -> str:
"""Dummy Task4"""
return f'[ Task4 {value} ]'
@task
def task_end() -> None:
"""Dummy Task which is Last Task of Dag"""
print('[ Task_End ]')
@task
def task_step() -> None:
"""Dummy Task which is Step Task of Dag"""
print('[ Task_Step ]')
# Creating TaskGroups
@task_group
def task_group_function(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_2(task_1(value))
@task_group
def task_group_function2(value: int) -> None:
"""TaskGroup for grouping related Tasks"""
task_3(task_4(value))
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False
) as dag:
start_task = task_start()
step_task = task_step()
end_task = task_end()
for i in range(5):
first_task_group = task_group_function(i)
second_task_group = task_group_function2(i)
start_task >> first_task_group >> step_task >> second_task_group >> end_task
And when unfolding the Task Groups:
Answered By - Elad
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.