융무의 기술블로그
article thumbnail
Published 2023. 4. 26. 22:04
Prefect 개요 Data Engeneering/workflow

개인 공부를 위한 포스팅입니다.


Prefect

  • https://docs.prefect.io/latest/
  • Prefect는 Python 기반 워크플로 관리 시스템입니다. Prefect를 사용하면 로깅, 재시도, 동적 매핑, 캐싱, 실패 알림 등을 데이터 파이프라인에 쉽게 추가할 수 있습니다
  • https://discourse.prefect.io/t/what-are-the-components-of-prefect-2-0-architecture/909
  • Prefect는 Dask 위에 구축되었으며 Dask를 사용하여 분산 환경에서 Prefect 워크플로의 실행을 예약하고 관리합니다.
  • Prefect는 워크플로의 일정을 처리 하고 Dask는 각 워크플로 내 작업 의 일정 및 리소스 관리를 처리합니다.
    • 작업 예약: Dask는 워크플로우 내에서 모든 작업 예약을 처리하므로 Prefect는 Dask가 밀리초 대기 시간으로 예약하는 더 작은 작업을 장려할 수 있습니다.
    • Dataflow: Dask가 작업 간의 적절한 정보 직렬화 및 통신을 처리하기 때문에 Prefect는 "데이터 흐름"을 일급 패턴으로 지원할 수 있습니다.
    • 분산 계산: Dask는 클러스터의 작업자에게 작업 할당을 처리하여 사용자가 최소한의 오버헤드로 분산 계산의 이점을 즉시 실현할 수 있도록 합니다.
    • 병렬성: 클러스터에서 실행하든 로컬에서 실행하든 Dask는 선반에서 병렬 작업 실행을 제공합니다.

Task

  • 데이터 파이프라인에서 실제로 수행되는 작업 단위입니다. Task는 단일 작업을 수행하며, 다른 Task에 의존하는 경우가 많습니다.
  • 입력 데이터를 받아서 출력 데이터를 생성할 수 있으며, 실행 가능한 코드로 구성됩니다
from prefect import flow, task

@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def fetch(dataset_url: str) -> pd.DataFrame:
    """Read taxi data from web into pandas DataFrame"""
    df = pd.read_csv(dataset_url)
    return df

@flow()
def etl_web_to_gcs()->None:
    """Main ETL function"""
    color = "yellow"
    year = 2021
    month = 1 
    dataset_file = f"{color}_tripdata_{year}-{month:02}"
    dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"

    df = fetch(dataset_url)

if __name__ == '__main__':
    etl_web_to_gcs()
 

Flow

  • 데이터 파이프라인을 정의하고 실행하기위한 추상화된 개념입니다.
  • Prefect Flow는 단일 태스크 또는 여러 태스크로 구성될 수 있으며, 이러한 태스크는 일련의 파이썬 함수로 작성됩니다.
  • Flow는 다양한 데이터 파이프라인 패턴을 지원합니다.

Blocks

  • 블록은 구성 저장을 활성화하고 외부 시스템과 상호 작용하기 위한 인터페이스를 제공하는 Prefect 내의 기본 요소입니다.
  • 블록을 사용하면 AWS, GitHub, Slack 및 Prefect로 오케스트레이션하려는 기타 시스템과 같은 서비스로 인증하기 위한 자격 증명을 안전하게 저장할 수 있습니다.
  • prefect 내부에서 Block를 눌러 다양한 커넥터를 확인합니다.
  • Google Cloud Platform 용 Prefect 커넥터를 등록합니다. 
prefect block register -m prefect_gcp
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket

@task()
def write_gcs(path: Path) -> None:
    """Upload local parquet file to GCS"""
    gcs_block = GcsBucket.load("zoom-gcs")
    gcs_block.upload_from_path(from_path=path, to_path=path)
    return
  • Prefect built-in blocks

Deployment

  • deployment는 스트림을 캡슐화하고 API를 통해 일정을 예약하거나 시작할 수 있는 서버 측 아티팩트입니다. flow는 여러 Deployment에 속할 수 있으며 프로그래밍하는 데 필요한 모든 것이 포함된 메타데이터가 있는 컨테이너라고 말할 수 있습니다. 명령줄이나 Python으로 만들 수 있습니다.
  • Prefect 워크플로에 대한 배포 생성은 Prefect API를 통해 워크플로를 관리하고 Prefect 에이전트에서 원격으로 실행할 수 있도록 워크플로 코드, 설정 및 인프라 구성을 패키징하는 것을 의미합니다.
  • Prefect CLI 또는 UI를 사용하여 생성, 업데이트, 중지 및 삭제할 수 있습니다. Deployment에 대한 상태 및 로그 정보는 Prefect Cloud 또는 Prefect Server UI에서 볼 수 있으며, 원하는 경우 다운로드하여 검사할 수 있습니다.
  • Deployment를 사용하면 Flow를 쉽게 실행하고 관리할 수 있으며, 코드 변경이나 업데이트를 쉽게 반영할 수 있습니다.
  • prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"
  • 배포 결과로 yaml 파일이 생성되었습니다.

Reference

 

 

 

'Data Engeneering > workflow' 카테고리의 다른 글

dbt(Data Build Tool) 개요  (0) 2023.04.26
airflow 아키텍처  (2) 2023.01.24
Aiflow 개요  (0) 2023.01.18
profile

융무의 기술블로그

@융무

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!