Yours Ever, Data Chronicles
[Airflow] Airflow에 Slack 연동하여 메시지 호출하기 본문
회사에서 데이터 배치를 할 때 Airflow를 사용하고 있는데, 일일이 이 배치가 잘 돌아갔는지를 체크하는 것이 번거로울 때가 많습니다.
이 때, Slack과 Airflow를 함께 연결하여 Airflow의 배치 결과를 Slack에 알림이 올 수 있도록 설정하는 방법을 알아봅시다.
Slack에 채널 생성하기
우선 Slack 내에서 알림을 받고자 하는 채널을 만듭니다. 저는 "dw-배치확인" 이라는 채널에서 알림을 받고자 합니다.
채널을 생성한 후 채널의 '정보'에 들어가면 위와 같이 하단에 "채널 ID"가 표시됩니다. 이 ID를 복사합니다.
또한 해당 채널에 AirflowBot을 추가해줍니다.
채널 내 '통합'에 들어가 '앱 추가' 버튼을 눌러 추가해줍시다.
Airflow Dag에 Slack 채널 ID를 삽입해 연동하기
다음으로는 배치 내에 작업해주는 단계입니다. Airflow dag 코드에 다음과 같이 슬랙 채널 ID를 받을 수 있는 변수인 "slack_channel_id"를 만듭니다. (이 부분은 추후 airflow 어드민을 통해 variables에 설정해줄 것입니다. 뒤에서 설명)
DAG_ID = os.path.basename(__file__).replace(".py", "")
slack_channel_id = Variable.get("slack_dw_channel_id") # 이 부분이 중요
다음으로는 메시지 전송 함수를 하나 만들었는데요.
SlackConnector 라는 클래스를 아래와 같이 따로 만들어 주었습니다.
write_tasks_message 함수에서 보면 알 수 있듯 state가 fail이면 배치 실패, fail이 아니면 배치 성공으로 슬랙에 표시되게 만드는 코드입니다.
import json
import requests
from airflow.models import Variable
class SlackConnector:
def __init__(self, channel_id):
self.slack_bot_token = Variable.get("slack_bot_token")
self.channel_id = channel_id
def post_message(self, text):
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + self.slack_bot_token
}
payload = {
'channel': self.channel_id,
'text': text
}
response = requests.post('https://slack.com/api/chat.postMessage',
headers=headers,
data=json.dumps(payload)
)
return response
def write_tasks_message(self, **kwargs):
title = f"<{kwargs['dag'].dag_id} 수행 결과>\n"
message = ""
dag_instance = kwargs["dag_run"]
task_instances = dag_instance.get_task_instances()
completed = True
for ti in task_instances[1:]:
task_id = ti.task_id
state = ti.current_state()
if state == "failed":
completed = False
# if state is not None:
# message += f"[task] {task_id} state ::: {state}\n"
if completed:
status = "--> 배치 성공 :-)\n"
message += status
else:
status = "--> 배치 실패 :-(\n"
message += status
return title + message
위에 만든 SlackConnector 클래스를 import한 후 메세지 전송 함수를 만들어 줍니다.
import SlackConnector
## 메세지 전송 함수
def _send_message(**kwargs):
slack = SlackConnector(slack_channel_id)
message = slack.write_tasks_message(**kwargs)
slack.post_message(message)
위의 "slack_channel_id" 를 인수로 받습니다.
마지막으로 DAG에 send_message 부분에 추가해줍니다.
## DAG에 추가
with DAG(dag_id=DAG_ID, default_args=args, schedule_interval=None,
catchup=False, start_date=days_ago(1),
tags=['procedure', 'daily', 'trigger']) as dag:
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag
)
clean_xcom = PythonOperator(
task_id="clean_xcom",
python_callable=cleanup_xcom,
provide_context=True,
)
send_message = PythonOperator(
task_id="send_message",
python_callable=_send_message,
provide_context=True,
)
이제 "slack_channel_id" 에 해당하는 값은 Variables에 넣어두어야 하니 아래 경로를 통해 작업합니다.
먼저 Admin -> Variables로 들어갑니다.
위와 같이 key와 value를 설정해줍니다. key의 경우 앞에서 만든 변수명인 "slack_channel_id"를, value는 가장 처음에 slack에서 복사한 채널id를 기입해줍니다.
끝! 그럼 이제 AirflowBot이 배치가 완료되면 해당 채널에서 결과인 성공 or 실패 메시지를 보내주게 됩니다.
'Skillset > Data Engineering' 카테고리의 다른 글
[Python Crawling] 크롤링 웹사이트 밑으로 내리기 - window height, scrollheight (1) | 2024.02.29 |
---|---|
[Python Crawling] 네이버 쇼핑 검색결과 크롤링하기 (3) - 네이버 API 활용하기 (1) | 2024.02.29 |
[Airflow] DAG에 스케줄 거는 방법 (DAG schedule_interval Setting) (2) | 2024.01.08 |
[Python Crawling] 네이버 쇼핑 검색결과 크롤링하기 (2) - Selenium, BeautifulSoup (2) | 2022.11.22 |
[Python Crawling] 네이버 쇼핑 검색결과 크롤링하기 (1) - Selenium, BeautifulSoup (3) | 2022.11.21 |