SpecialistOff.NET / Вопросы / Статьи / Фрагменты кода / Резюме / Метки / Помощь / Файлы
НазадМетки: airflow apache airflow
To use this operators, you must do a few things:
Add custom robot to Dingding group which you want to send Dingding message.
Get the webhook token from Dingding custom robot.
Put the Dingding custom robot token in the password field of the
dingding_defaultConnection. Notice that you just need token rather than the whole webhook string.
Use the DingdingOperator to send Dingding message:
airflow/contrib/example_dags/example_dingding_operator.pyVIEW SOURCE
text_msg_remind_none = DingdingOperator(
    task_id='text_msg_remind_none',
    dingding_conn_id='dingding_default',
    message_type='text',
    message='Airflow dingding text message remind none',
    at_mobiles=None,
    at_all=False,
    dag=dag,
)
Use parameters at_mobiles and at_all to remind specific users when you send message,at_mobiles will be ignored When at_all is set to True:
airflow/contrib/example_dags/example_dingding_operator.pyVIEW SOURCE
text_msg_remind_all = DingdingOperator(
    task_id='text_msg_remind_all',
    dingding_conn_id='dingding_default',
    message_type='text',
    message='Airflow dingding text message remind all users in group',
    # list of user phone/email here in the group
    # when at_all is specific will cover at_mobiles
    at_mobiles=['156XXXXXXXX', '130XXXXXXXX'],
    at_all=True,
    dag=dag,
)
The Dingding operator can send rich text messages including link, markdown, actionCard and feedCard. A rich text message can not remind specific users except by using markdown type message:
airflow/contrib/example_dags/example_dingding_operator.pyVIEW SOURCE
markdown_msg = DingdingOperator(
    task_id='markdown_msg',
    dingding_conn_id='dingding_default',
    message_type='markdown',
    message={
        'title': 'Airflow dingding markdown message',
        'text': '# Markdown message title\n'
                'content content .. \n'
                '### sub-title\n'
                ''
    },
    at_mobiles=['156XXXXXXXX'],
    at_all=False,
    dag=dag,
)
Dingding operator could handle task callback by writing a function wrapper dingding operators and then pass the function to sla_miss_callback, on_success_callback, on_failure_callback, or on_retry_callback. Here we use on_failure_callback as an example:
airflow/contrib/example_dags/example_dingding_operator.pyVIEW SOURCE
def failure_callback(context):
    message = 'AIRFLOW TASK FAILURE TIPS:\n' \
              'DAG:    {}\n' \
              'TASKS:  {}\n' \
              'Reason: {}\n' \
        .format(context['task_instance'].dag_id,
                context['task_instance'].task_id,
                context['exception'])
    return DingdingOperator(
        task_id='dingding_success_callback',
        dingding_conn_id='dingding_default',
        message_type='text',
        message=message,
        at_all=True,
    ).execute(context)
args['on_failure_callback'] = failure_callback
The Dingding operator post http requests using default host https://oapi.dingtalk.com, if you need to change the host used you can set the host field of the connection.
See Dingding documentation on how to custom robot.