Cloud

[GCP] GKE의 Apache Airflow에 Slack 연동하기

융무 2023. 11. 5. 16:13

앞의 포스팅에서 GKE 상에서 Airflow를 배포하였습니다.

이제 Slack 알림을 설정하는 법을 포스팅하려고 합니다.

 

[GCP] GKE에 Apache Airflow 배포

Google Kubernetes Engine (GKE)에 helm 차트를 활용해서 airflow 배포를 하는 법에 대해서 포스팅하려고 합니다. 코드는 아래 github에서 확인할 수 있습니다. https://github.com/mjs1995/muse-data-engineer/blob/main/blog/gcp

mjs1995.tistory.com

Webhooks 생성 및 Slack으로 결과 전달하기

Webhooks 생성하기

  • slack api웹사이트에 접속해서 Webhook URL을 생성합니다.

Slack으로 알림 전달

  • Apache Airflow를 사용하여 Directed Acyclic Graph (DAG)를 정의하고, 작업(task)의 실패 및 성공 시 Slack으로 알림을 보내는 방법은 다음과 같습니다.
  • Airflow UI 설정: Airflow UI에서 'Connections'로 이동하여 새로운 연결(record)을 추가합니다.
    • Connection ID: slack_webhook_conn_id (이 ID는 DAG 소스 코드 내에서 사용됩니다.)
    • Connection Type: Slack Incoming Webhook
    • Webhook Token: Webhooks 생성 시 얻은 URL을 입력합니다.

DAG 실행 및 확인:

  • Slack Incoming Webhook이 Connections에 추가되었으면, 예제 DAG를 실행하여 설정한 Slack 채널에 알림이 올바르게 도착하는지 확인합니다.
  • 이 과정은 Airflow Callbacks 함수와 Airflow Slack Providers를 사용하여 구성되며, Sidecar 패턴을 이용하여 GitHub와 연동됩니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta
import requests
import logging
import json

dag = DAG(
    dag_id="slack_test",
    start_date=days_ago(1),
    max_active_runs=1,
    catchup=False,
    schedule_interval="@once",
)

send_slack_message = SlackWebhookOperator(
    task_id="send_slack",
    slack_webhook_conn_id="slack_webhook",
    message="Hello slack",
    dag=dag,
)

send_slack_message

다음과 같은 알림 메시지를 받게 됩니다.