Issue
In the dag below, sensor A is set to soft_fail = True, because I'd like to skip B and C if A fails. The problem is I'd still like to get an email alert when A fails. But when soft_fail is true, A is marked as success when the sensor doesn't detect anything, and no email alert would be sent out. Could someone please help to point out how to achieve this? Many thanks.
A (sensor, soft_fail = True) >> B >> C
Solution
Airflow sensor is marked as skipped
(not success
) when it fails and soft_fail
is True
.
There is no option to add email on skip not a callback. But you can create a new task from the operator EmailOperator
, which run when the sensor A is marked as skipped. Unfortunately, there is no trigger rule to run a task when upstream is skipped, but you can create a new operator which check the state of A and send the email based on it.
from airflow.operators.email import EmailOperator
from airflow.utils.context import Context
from airflow.utils.state import TaskInstanceState
from airflow.utils.trigger_rule import TriggerRule
class MyNotifier(EmailOperator):
def __int__(self, monitor_task_id: str, notify_on_state: str, *args, **kwargs):
self.monitor_task_id = monitor_task_id
self.notify_on_state = notify_on_state
super().__init__(*args, **kwargs)
def execute(self, context: Context):
task_to_check = context["dag_run"].get_task_instance(task_id=self.monitor_task_id)
if task_to_check.state == self.notify_on_state:
super().execute(context)
notification_task = MyNotifier(
task_id="sensor_skip_notifier",
monitor_task_id="A",
trigger_rule=TriggerRule.ALL_DONE, # to run the task when A is done regardless the state
notify_on_state=TaskInstanceState.SKIPPED,
to="<email>",
subject="<subject>",
html_content="<content>", # you can use jinja to add run info
)
A >> notification_task
Answered By - Hussein Awala
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.