Cloud

[GCP] GKE의 Apache Airflow에 minio 연동하기

융무 2023. 11. 19. 23:09

MinIO는 고성능, 오픈소스 객체 스토리지 서비스로, 클라우드 네이티브 환경에서 매우 유용합니다. 이번 포스팅에서는 Kubernetes 클러스터 상에서 Airflow와 MinIO를 연동하는 과정을 단계별로 살펴보겠습니다.


MinIO 설치 및 설정

먼저, Kubernetes 클러스터에 MinIO를 설치해야 합니다. MinIO는 쿠버네티스 Pod로 실행되며, 이를 위해 YAML 파일을 사용하여 필요한 리소스를 정의합니다.

MinIO 설치를 위한 minio-dev.yaml 파일 예시입니다.

apiVersion: v1
kind: Namespace
metadata:
  name: minio-dev
---
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: minio
  name: minio
  namespace: minio-dev
spec:
  containers:
  - name: minio
    image: quay.io/minio/minio:latest
    args: 
    - minio server /data --console-address :9090
  volumes:
  - name: localvolume
    emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: minio-service
  namespace: minio-dev
spec:
  type: ClusterIP
  ports:
    - port: 9000
      targetPort: 9000
  selector:
    app: minio
  • 이 파일은 MinIO를 단일 Pod로 실행하며, 클러스터 내부에서만 접근 가능한 서비스를 생성합니다.

minio-dev.yaml 파일에 정의된 설정을 Kubernetes 클러스터에 적용합니다. 이 파일에는 MinIO를 실행하는 데 필요한 Namespace, Pod 및 Service 설정이 포함되어 있습니다.

kubectl apply -f minio-dev.yaml

MinIO가 배포된 minio-dev 네임스페이스 내의 모든 파드의 상태를 확인합니다. 여기서 minio 파드가 성공적으로 실행 중인지 확인할 수 있습니다

kubectl get pods -n minio-dev
NAME    READY   STATUS    RESTARTS   AGE
minio   1/1     Running   0          2d7h

MinIO 서비스의 상태를 확인합니다. 서비스는 파드와 외부 또는 다른 파드 간의 네트워크 통신을 관리합니다

NAME            TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)    AGE
minio-service   ClusterIP   10.52.6.113   <none>        9000/TCP   59m

MinIO CLI (mc) 설정

  • MinIO를 관리하기 위해 mc (MinIO Client)를 사용합니다. 이 클라이언트를 통해 버킷 생성, 파일 업로드 등 다양한 작업을 수행할 수 있습니다.
  • mc 설치 및 설정 과정입니다.
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
./mc --version
./mc alias set k8s-minio-dev http://127.0.0.1:9000 minioadmin minioadmin

MinIO 클라이언트 Alias 설정

  • MinIO 클라이언트(mc)에 k8s-minio-dev라는 이름의 alias(별칭)를 설정합니다. http://127.0.0.1:9000는 MinIO 서버의 주소입니다. 이 주소는 로컬 환경에서 포트 포워딩을 통해 MinIO Pod에 접근하기 위한 것이며 minioadmin은 MinIO의 Access Key와 Secret Key입니다.

Airflow와 MinIO 연동

Airflow에서 MinIO를 사용하기 위해, Airflow의 PythonOperator를 사용하여 데이터 처리 및 저장 로직을 구현합니다.

minio 접속 정보에 대해서 Connection을 추가해 줍니다.

다음은 Airflow DAG의 예시입니다.

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import requests
import pandas as pd
import boto3
import json
from io import BytesIO


def download_and_upload_to_minio():
    url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    response = requests.get(url)
    data = response.content
    df = pd.read_parquet(BytesIO(data))
    csv_data = df.to_csv(index=False).encode("utf-8")
    conn = BaseHook.get_connection("minio-test")
    extra_config = json.loads(conn.extra)

    s3_client = boto3.client(
        "s3",
        endpoint_url=extra_config["host"],
        aws_access_key_id=conn.login,
        aws_secret_access_key=conn.password,
    )

    s3_client.put_object(
        Bucket="airflow-minio", Key="nyc_data.csv", Body=BytesIO(csv_data)
    )


default_args = {
    "owner": "airflow",
    "start_date": datetime(2023, 1, 1),
    "retries": 1,
}

with DAG(
    "download_and_upload_to_minio", default_args=default_args, schedule_interval=None
) as dag:

    upload_task = PythonOperator(
        task_id="download_upload_nyc_data", python_callable=download_and_upload_to_minio
    )

upload_task

이 예시에서는 외부 URL에서 Parquet 파일을 다운로드하고, 이를 CSV 형식으로 변환하여 MinIO에 업로드합니다. Airflow의 PythonOperator를 사용하여 이러한 데이터 처리 및 저장 작업을 자동화합니다.

minio에 잘 적재가 되었는지 확인해 봅니다.

./mc ls k8s-minio-dev/airflow-minio