Yours Ever, Data Chronicles

[Airflow] Airflow에 Slack 연동하여 메시지 호출하기 본문

Skillset/Data Engineering

[Airflow] Airflow에 Slack 연동하여 메시지 호출하기

Everly. 2025. 1. 6. 23:55

회사에서 데이터 배치를 할 때 Airflow를 사용하고 있는데, 일일이 이 배치가 잘 돌아갔는지를 체크하는 것이 번거로울 때가 많습니다.

이 때, Slack과 Airflow를 함께 연결하여 Airflow의 배치 결과를 Slack에 알림이 올 수 있도록 설정하는 방법을 알아봅시다.

 

Slack에 채널 생성하기

우선 Slack 내에서 알림을 받고자 하는 채널을 만듭니다. 저는 "dw-배치확인" 이라는 채널에서 알림을 받고자 합니다.

 

채널을 생성한 후 채널의 '정보'에 들어가면 위와 같이 하단에 "채널 ID"가 표시됩니다. 이 ID를 복사합니다.

 

또한 해당 채널에 AirflowBot을 추가해줍니다.

 

채널 내 '통합'에 들어가 '앱 추가' 버튼을 눌러 추가해줍시다.

 

Airflow Dag에 Slack 채널 ID를 삽입해 연동하기

다음으로는 배치 내에 작업해주는 단계입니다. Airflow dag 코드에 다음과 같이 슬랙 채널 ID를 받을 수 있는 변수인 "slack_channel_id"를 만듭니다. (이 부분은 추후 airflow 어드민을 통해 variables에 설정해줄 것입니다. 뒤에서 설명) 

DAG_ID = os.path.basename(__file__).replace(".py", "")
slack_channel_id = Variable.get("slack_dw_channel_id") # 이 부분이 중요

 

 

다음으로는 메시지 전송 함수를 하나 만들었는데요. 

 SlackConnector 라는 클래스를 아래와 같이 따로 만들어 주었습니다. 

write_tasks_message 함수에서 보면 알 수 있듯 state가 fail이면 배치 실패, fail이 아니면 배치 성공으로 슬랙에 표시되게 만드는 코드입니다.

import json
import requests
from airflow.models import Variable


class SlackConnector:

    def __init__(self, channel_id):
        self.slack_bot_token = Variable.get("slack_bot_token")
        self.channel_id = channel_id

    def post_message(self, text):
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + self.slack_bot_token
        }
        payload = {
            'channel': self.channel_id,
            'text': text
        }
        response = requests.post('https://slack.com/api/chat.postMessage',
                                 headers=headers,
                                 data=json.dumps(payload)
                                 )
        return response

    def write_tasks_message(self, **kwargs):
        title = f"<{kwargs['dag'].dag_id} 수행 결과>\n"
        message = ""
        dag_instance = kwargs["dag_run"]
        task_instances = dag_instance.get_task_instances()
        completed = True
        for ti in task_instances[1:]:
            task_id = ti.task_id
            state = ti.current_state()
            if state == "failed":
                completed = False
            # if state is not None:
            #     message += f"[task] {task_id} state ::: {state}\n"
        if completed:
            status = "--> 배치 성공 :-)\n"
            message += status
        else:
            status = "--> 배치 실패 :-(\n"
            message += status

        return title + message

 

위에 만든 SlackConnector 클래스를 import한 후 메세지 전송 함수를 만들어 줍니다.

import SlackConnector

## 메세지 전송 함수
def _send_message(**kwargs):
    slack = SlackConnector(slack_channel_id) 
    message = slack.write_tasks_message(**kwargs)
    slack.post_message(message)

 

위의 "slack_channel_id" 를 인수로 받습니다.

 

마지막으로 DAG에 send_message 부분에 추가해줍니다.

## DAG에 추가
with DAG(dag_id=DAG_ID, default_args=args, schedule_interval=None,
         catchup=False, start_date=days_ago(1),
         tags=['procedure', 'daily', 'trigger']) as dag:

    start = DummyOperator(
        task_id='start',
        dag=dag
    )

    end = DummyOperator(
        task_id='end',
        dag=dag
    )

    clean_xcom = PythonOperator(
        task_id="clean_xcom",
        python_callable=cleanup_xcom,
        provide_context=True,
    )

    send_message = PythonOperator(
        task_id="send_message",
        python_callable=_send_message,
        provide_context=True,
    )

 

이제 "slack_channel_id" 에 해당하는 값은 Variables에 넣어두어야 하니 아래 경로를 통해 작업합니다.

먼저 Admin -> Variables로 들어갑니다.

 

위와 같이 key와 value를 설정해줍니다. key의 경우 앞에서 만든 변수명인 "slack_channel_id"를, value는 가장 처음에 slack에서 복사한 채널id를 기입해줍니다.

 

끝! 그럼 이제 AirflowBot이 배치가 완료되면 해당 채널에서 결과인 성공 or 실패 메시지를 보내주게 됩니다.

반응형