개인 공부를 위한 포스팅입니다.
Prefect
- Prefect는 Python 기반 워크플로 관리 시스템입니다. Prefect를 사용하면 로깅, 재시도, 동적 매핑, 캐싱, 실패 알림 등을 데이터 파이프라인에 쉽게 추가할 수 있습니다
- Prefect는 Dask 위에 구축되었으며 Dask를 사용하여 분산 환경에서 Prefect 워크플로의 실행을 예약하고 관리합니다.
- Prefect는 워크플로의 일정을 처리 하고 Dask는 각 워크플로 내 작업 의 일정 및 리소스 관리를 처리합니다.
- 작업 예약: Dask는 워크플로우 내에서 모든 작업 예약을 처리하므로 Prefect는 Dask가 밀리초 대기 시간으로 예약하는 더 작은 작업을 장려할 수 있습니다.
- Dataflow: Dask가 작업 간의 적절한 정보 직렬화 및 통신을 처리하기 때문에 Prefect는 "데이터 흐름"을 일급 패턴으로 지원할 수 있습니다.
- 분산 계산: Dask는 클러스터의 작업자에게 작업 할당을 처리하여 사용자가 최소한의 오버헤드로 분산 계산의 이점을 즉시 실현할 수 있도록 합니다.
- 병렬성: 클러스터에서 실행하든 로컬에서 실행하든 Dask는 선반에서 병렬 작업 실행을 제공합니다.
Task
- 데이터 파이프라인에서 실제로 수행되는 작업 단위입니다. Task는 단일 작업을 수행하며, 다른 Task에 의존하는 경우가 많습니다.
- 입력 데이터를 받아서 출력 데이터를 생성할 수 있으며, 실행 가능한 코드로 구성됩니다
from prefect import flow, task
@task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
def fetch(dataset_url: str) -> pd.DataFrame:
"""Read taxi data from web into pandas DataFrame"""
df = pd.read_csv(dataset_url)
return df
@flow()
def etl_web_to_gcs()->None:
"""Main ETL function"""
color = "yellow"
year = 2021
month = 1
dataset_file = f"{color}_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz"
df = fetch(dataset_url)
if __name__ == '__main__':
etl_web_to_gcs()
Flow
- 데이터 파이프라인을 정의하고 실행하기위한 추상화된 개념입니다.
- Prefect Flow는 단일 태스크 또는 여러 태스크로 구성될 수 있으며, 이러한 태스크는 일련의 파이썬 함수로 작성됩니다.
- Flow는 다양한 데이터 파이프라인 패턴을 지원합니다.
Blocks
- 블록은 구성 저장을 활성화하고 외부 시스템과 상호 작용하기 위한 인터페이스를 제공하는 Prefect 내의 기본 요소입니다.
- 블록을 사용하면 AWS, GitHub, Slack 및 Prefect로 오케스트레이션하려는 기타 시스템과 같은 서비스로 인증하기 위한 자격 증명을 안전하게 저장할 수 있습니다.
- prefect 내부에서 Block를 눌러 다양한 커넥터를 확인합니다.
- Google Cloud Platform 용 Prefect 커넥터를 등록합니다.
prefect block register -m prefect_gcp
from prefect import flow, task
from prefect_gcp.cloud_storage import GcsBucket
@task()
def write_gcs(path: Path) -> None:
"""Upload local parquet file to GCS"""
gcs_block = GcsBucket.load("zoom-gcs")
gcs_block.upload_from_path(from_path=path, to_path=path)
return
- Prefect built-in blocks
Deployment
- deployment는 스트림을 캡슐화하고 API를 통해 일정을 예약하거나 시작할 수 있는 서버 측 아티팩트입니다. flow는 여러 Deployment에 속할 수 있으며 프로그래밍하는 데 필요한 모든 것이 포함된 메타데이터가 있는 컨테이너라고 말할 수 있습니다. 명령줄이나 Python으로 만들 수 있습니다.
- Prefect 워크플로에 대한 배포 생성은 Prefect API를 통해 워크플로를 관리하고 Prefect 에이전트에서 원격으로 실행할 수 있도록 워크플로 코드, 설정 및 인프라 구성을 패키징하는 것을 의미합니다.
- Prefect CLI 또는 UI를 사용하여 생성, 업데이트, 중지 및 삭제할 수 있습니다. Deployment에 대한 상태 및 로그 정보는 Prefect Cloud 또는 Prefect Server UI에서 볼 수 있으며, 원하는 경우 다운로드하여 검사할 수 있습니다.
- Deployment를 사용하면 Flow를 쉽게 실행하고 관리할 수 있으며, 코드 변경이나 업데이트를 쉽게 반영할 수 있습니다.
- prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"
- 배포 결과로 yaml 파일이 생성되었습니다.
Reference
'Data Engeneering > workflow' 카테고리의 다른 글
dbt(Data Build Tool) 개요 (0) | 2023.04.26 |
---|---|
airflow 아키텍처 (2) | 2023.01.24 |
Aiflow 개요 (0) | 2023.01.18 |