융무의 기술블로그
article thumbnail

데이터 엔지니어링 줌 캠프 PJT를 진행하면서 관련된 내용을 정리하고자 합니다.

프로젝트의 코드는 github에서 확인할 수 있습니다.

https://github.com/mjs1995/data-engineering-zoomcamp/blob/main/02_workflow_orchestration/prefect_gcp.md

 

GitHub - mjs1995/data-engineering-zoomcamp

Contribute to mjs1995/data-engineering-zoomcamp development by creating an account on GitHub.

github.com


1. Prefect

Cloud Storage 에서 BigQuery 데이터 베이스 로 Parquet 데이터를 수집하기 위해 Prefect을 쓰려고 합니다.

Prefect에 관한 내용은 다음 링크를 참고해주세요.

https://github.com/mjs1995/muse-data-engineer/blob/main/doc/workflow/prefect_base.md

 

GitHub - mjs1995/muse-data-engineer: 데이터 엔지니어로 성장하기

데이터 엔지니어로 성장하기. Contribute to mjs1995/muse-data-engineer development by creating an account on GitHub.

github.com

zoomcamp 가상환경을 생성한 뒤에 필요한 라이브러리를 설치해줍니다.

<bash />
conda create -n zoomcamp python=3.9 conda activate zoomcamp pip install -r requirements.txt

requirements.txt 파일의 경우 다음과 같습니다.

<bash />
pandas==1.5.2 prefect==2.7.7 prefect-sqlalchemy==0.2.2 prefect-gcp[cloud_storage]==0.2.3 protobuf==4.21.11 pyarrow==10.0.1 pandas-gbq==0.19.0

Prefect GUI 환경으로 접속합니다.

<bash />
prefect orion start


2. GCS Pipeline 연결

버킷을 신규로 생성합니다. 버킷의 이름은 prefect-de-zoom으로 설정해줍니다.

prefect 내부에서 Block를 눌러 다양한 커넥터를 확인합니다.

Google Cloud Platform 용 Prefect 커넥터를 등록합니다.

prefect GUI 환경에서 GCS 등록해줍니다.

<bash />
prefect block register -m prefect_gcp

먼저 구글 IAM에서 서비스계정을 신규로 만듭니다.

BigQuery 관리자와 저장소 관리자의 역할을 부여해줍니다.

생성된 서비스 계정에 신규로 키를 등록해주고 prefect에도 등록해줍니다.

prefect에서 Blocks를 만들고 코드를 실행시키는데 ValueError: Unable to find block document named zoom-gcs for block type gcs-bucket 에러가 발생했습니다.

<bash />
pip install prefect --upgrade prefect orion database reset prefect block register -m prefect_gcp prefect block register -m prefect_sqlachemy

그 후에 로컬 터미널이 아닌 vscode ssh 터미널을 이용해서 prefect orion start를 실행하고 다시 Blocks를 등록해줍니다. 여기서 서비스계정의 키도 신규로 추가했습니다.

GCS 버킷에 데이터가 잘 적재된것을 확인할 수 있습니다.

위와 같은 에러가 발생한다면 먼저 Blocks에 Type이 잘 할당 되었는지 확인이 필요할 거같습니다.

<bash />
prefect blocks ls

전체 소스코드는 다음과 같습니다.

<python />
from pathlib import Path import pandas as pd from prefect import flow, task from prefect_gcp.cloud_storage import GcsBucket @task(retries=3) #retries parametes tells how many times our task will restart in case of crash def fetch(dataset_url: str) -> pd.DataFrame: """Read data from web inmto pandas dataframe""" df = pd.read_csv(dataset_url) return df @task(log_prints=True) #log_prints will write logs to our console and UI def clean(df) -> pd.DataFrame: """Fix some dtype issues""" df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"]) df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"]) print(df.head(2)) print(df.dtypes) print('Length of DF:', len(df)) return df @task(log_prints=True) def export_data_local(df, color, dataset_file): path = Path(f"data/{color}/{dataset_file}.parquet") df.to_parquet(path, compression="gzip") return path @task() def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path: """write df as a parquet file""" path = Path(f"data/{color}/{dataset_file}.parquet") if not path.parent.is_dir(): path.parent.mkdir(parents=True) df.to_parquet(path, compression="gzip") return path @task() def write_gcs(path: Path) -> None: """Upload local parquet file to GCS""" gcs_block = GcsBucket.load("zoom-gcs") gcs_block.upload_from_path(from_path=path, to_path=path) return @flow() def etl_web_to_gcs()->None: """Main ETL function""" color = "yellow" year = 2021 month = 1 dataset_file = f"{color}_tripdata_{year}-{month:02}" dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz" df = fetch(dataset_url) df_clean = clean(df) path = write_local(df_clean, color, dataset_file) write_gcs(path) if __name__ == '__main__': etl_web_to_gcs()

3. Cloud Storage에서 BigQuery로 Parquet 수집

데이터를 GCP로 수집하려면 Parquet 파일의 스키마를 사용하여 BigQuery에서 테이블을 생성해야 합니다

GCP > BigQuery > 추가 > Google Cloud Storage를 클릭합니다.

GCS URI 패턴을 사용하세요 옆에 찾아보기를 눌러 GCS 상의 parquet 파일을 선택해줍니다.

아래와 같은 설정으로 빅쿼리 파일을 추가해줍니다.

테이블을 만들 수 없음: Cannot read and write in different locations: source: asia, destination: asia-northeast3 라는 error가 발생하는데 gcs의 리전이 asia이고 bigquery의 리전이 달라서 생기는 에러입니다. 즉, GCS 버킷에서 BigQuery로 데이터를 로드할 수 있도록 모든 리소스가 동일한 리전을 공유해야 합니다.

  • GCP의 데이터 세트 만들기를 한 뒤에 GCS 버킷의 리전이 맞춰주면 되지만 저의 경우에는 GCS의 멀티 리전이 asia이고 빅쿼리에서 asia는 연동이 안되어서 데이터 이전을 해서 해결을 하려고 합니다.

  • 신규로 US 리전의 GCS 버킷을 만들고 기존의 asia 리전에서 데이터를 전송합니다.

다시 GCP에서 테이블을 추가해줍니다. prefect blocks에서도 GCP 버킷의 주소를 변경해 줍니다.

  • 위에서 만든 dataset을 지우고 prefect을 이용해서 etl 합니다.

prefect을 실행시킬 때 Prefect Flow: ERROR | Flow run 'xxxxxx' - Finished in state Failed('Flow run encountered an exception. google.api_core.exceptions.Forbidden: 403 GET: Access Denied: Table xxxxx: Permission bigquery.tables.get denied on table xxxxxx (or it may not exist).\n') 에러가 발생 했는데 python 스크립트 안에서 해당 프로젝트 id와 GcpCredentials의 싱크를 맞추니 해결이 되었습니다.

 

 

실행해보면 데이터가 잘 인입된것을 볼 수 있습니다.

전체 소스 코드는 다음과 같습니다.

<python />
from pathlib import Path import pandas as pd from prefect import flow, task from prefect_gcp.cloud_storage import GcsBucket from prefect_gcp import GcpCredentials @task(retries=3) def extract_from_gcs(color: str, year: int, month: int) -> Path: """Download trip data from GCS""" gcs_path = f"data/{color}/{color}_tripdata_{year}-{month:02}.parquet" gcs_block = GcsBucket.load("zoom-gcs") gcs_block.get_directory(from_path=gcs_path, local_path=f"../data/") return Path(f"../data/{gcs_path}") @task() def transform(path: Path) -> pd.DataFrame: """Data cleaning example""" df = pd.read_parquet(path) print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}") df["passenger_count"].fillna(0, inplace=True) print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}") return df @task() def write_bq(df: pd.DataFrame) -> None: """Write DataFrame to BiqQuery""" gcp_credentials_block = GcpCredentials.load("zoom-gcp-creds") df.to_gbq( destination_table="dezoomcamp.rides", project_id="dtc-de-382512", credentials=gcp_credentials_block.get_credentials_from_service_account(), chunksize=500_000, if_exists="append", ) @flow() def etl_gcs_to_bq(): """Main ETL flow to load data into Big Query""" color = "yellow" year = 2021 month = 1 path = extract_from_gcs(color, year, month) df = transform(path) write_bq(df) if __name__ == "__main__": etl_gcs_to_bq()

4. Parametrizing Flow & Deployments with ETL into GCS flow

ETL을 자동화하기 위해 방금 구축한 파이프라인을 매개변수화합니다.

deployments 폴더를 만들고 그 안에 year, month, color를 매개변수화 한 뒤에 코드를 재실행 합니다.

 

<python />
from pathlib import Path import pandas as pd from prefect import flow, task from prefect_gcp.cloud_storage import GcsBucket from random import randint from prefect.tasks import task_input_hash from datetime import timedelta @task(retries=3, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) def fetch(dataset_url: str) -> pd.DataFrame: """Read taxi data from web into pandas DataFrame""" # if randint(0, 1) > 0: # raise Exception df = pd.read_csv(dataset_url) return df @task(log_prints=True) def clean(df: pd.DataFrame) -> pd.DataFrame: """Fix dtype issues""" df["tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"]) df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"]) print(df.head(2)) print(f"columns: {df.dtypes}") print(f"rows: {len(df)}") return df @task() def write_local(df: pd.DataFrame, color: str, dataset_file: str) -> Path: """Write DataFrame out locally as parquet file""" path = Path(f"data/{color}/{dataset_file}.parquet") if not path.parent.is_dir(): path.parent.mkdir(parents=True) df.to_parquet(path, compression="gzip") return path @task() def write_gcs(path: Path) -> None: """Upload local parquet file to GCS""" gcs_block = GcsBucket.load("zoom-gcs") gcs_block.upload_from_path(from_path=path, to_path=path) return @flow() def etl_web_to_gcs(year: int, month: int, color: str) -> None: """The main ETL function""" dataset_file = f"{color}_tripdata_{year}-{month:02}" dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{dataset_file}.csv.gz" df = fetch(dataset_url) df_clean = clean(df) path = write_local(df_clean, color, dataset_file) write_gcs(path) @flow() def etl_parent_flow(months: list[int] = [1, 2], year: int = 2021, color: str = "yellow"): for month in months: etl_web_to_gcs(year, month, color) if __name__ == "__main__": color = "yellow" months = [1, 2, 3] year = 2021 etl_parent_flow(months, year, color)

deployment는 스트림을 캡슐화하고 API를 통해 일정을 예약하거나 시작할 수 있는 서버 측 아티팩트입니다. flow는 여러 배포에 속할 수 있으며 프로그래밍하는 데 필요한 모든 것이 포함된 메타데이터가 있는 컨테이너라고 말할 수 있습니다 . 명령줄이나 Python으로 만들 수 있습니다.

Prefect 워크플로에 대한 배포 생성은 Prefect API를 통해 워크플로를 관리하고 Prefect 에이전트에서 원격으로 실행할 수 있도록 워크플로 코드, 설정 및 인프라 구성을 패키징하는 것을 의미합니다.

<python />
prefect deployment build parameterized_flows.py:etl_parent_flow -n "Parameterized ETL"

배포 하려고 했을 때 Script at './parameterized_flows.py' encountered an exception: FileNotFoundError(2, 'No such file or directory') 다음과 같은 에러가 발생했습니다. 이를 해결하기 위해서 기존 경로를 worflow_orchestration으로 옮겼서 배포했습니다.

  • 배포 결과로 yaml 파일이 생성되었습니다.

<bash />
### ### A complete description of a Prefect Deployment for flow 'etl-parent-flow' ### name: Parameterized ETL description: null version: 5a09a6eee86fd216605c8f0e27ca82c6 # The work queue that will handle this deployment's runs work_queue_name: default work_pool_name: null tags: [] parameters: {} schedule: null is_schedule_active: null infra_overrides: {} infrastructure: type: process env: {} labels: {} name: null command: null stream_output: true working_dir: null block_type_slug: process _block_type_slug: process ### ### DO NOT EDIT BELOW THIS LINE ### flow_name: etl-parent-flow manifest_path: null storage: null path: /home/mjs/data-engineering-zoomcamp/week_2_workflow_orchestration entrypoint: parameterized_flow.py:etl_parent_flow parameter_openapi_schema: title: Parameters type: object properties: months: title: months default: - 1 - 2 position: 0 type: array items: type: integer year: title: year default: 2021 position: 1 type: integer color: title: color default: yellow position: 2 type: string required: null definitions: null timestamp: '2023-04-13T14:50:29.732842+00:00'

생성된 yaml을 실행시켜줍니다.

<bash />
prefect deployment apply etl_parent_flow-deployment.yaml

Prefect GUI로 이동하고 배포 섹션에서 방금 생성한 항목에 액세스합니다

대기열에 저장되는데 실행 시키기 위해서 prefect agent start --pool default-agent-pool --work-queue default 명령어를 입력합니다.

prefect Agent를 시작하면 Deployment 실행이 시작되는 것이 보입니다.


5. Schedules & Docker Storage with Infrastructure

워크플로우의 스케줄링 시간을 설정할 수 있습니다.

CLI 환경에서 설정도 가능합니다.

<bash />
prefect deployment build parameterized_flow.py:etl_parent_flow -n etl2 --cron "0 0 * * *" -a

6. Docker Container

docker-requirementes.txt 파일을 생성해 줍니다.

<bash />
pandas==1.5.2 prefect-gcp[cloud_storage]==0.2.3 protobuf==4.21.11 pyarrow==10.0.1 pandas-gbq==0.18.1

Dockerfile을 설정해 줍니다.

<bash />
FROM prefecthq/prefect:2.7.7-python3.9 COPY docker-requirements.txt . RUN pip install -r docker-requirements.txt --trusted-host pypi.python.org --no-cache-dir COPY flows /opt/prefect/flows RUN mkdir -p /opt/prefect/data/yellow

도커 이미지를 빌드한 뒤에 이미지를 Docker Hub에 게시하기 위해 푸시를 수행합니다.

<bash />
docker image build -t discdiver/prefect:zoom . docker image push discdiver/prefect:zoom

푸시를 수행할 때 denied: requested access to the resource is denied 에러가 발생했습니다. 도커 이미지를 빌드할때 잘못되어서 다시 시작했습니다.

<bash />
docker login

<bash />
docker build -t mjs1995/prefect_cloud:v1 .

<bash />
docker image push mjs1995/prefect_cloud:v1

도커 컨테이너 blocks 등록을 하려고 했는데 blocks에서 조회가 되지 않습니다.

Prefect blocks에 도커를 등록해줍니다.

<bash />
pip install prefect-docker prefect block register -m prefect_docker

등록 결과 조회가 잘 되고 있습니다.

Block Name을 zoom으로 Image는 위에서 빌드했던 이미지를 ImagePullPolicy는 always를 지정합니다.

python flows/03_deployments/docker_deploy.py : 아래의 docker_deploy.py를 만들고 실행시킵니다.

<python />
from prefect.deployments import Deployment from parameterized_flow import etl_parent_flow from prefect.infrastructure.docker import DockerContainer docker_block = DockerContainer.load("zoom") docker_dep = Deployment.build_from_flow( flow=etl_parent_flow, name="docker-flow", infrastructure=docker_block, ) if __name__ == "__main__": docker_dep.apply()

GUI 환경에서 배포가 잘 생성 되었는지 확인합니다.

프로필에 연결 확인한 뒤에 임시 로컬 API를 시작 시 에이전트가 발생시키는 API로 바꾸기 위해서 prefect config set이 되는는 API의 URL 및 명령과 함께 사용할 API를 구성합니다. 그 후에 prefect agent를 실행합니다.

<python />
prefect profiles ls prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api" prefect agent start --work-queue "default"

<bash />
prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]"

prefect에서 배포할 때 문제가 발생했습니다. prefect.exceptions.ScriptError: Script at 'flows/03_deployments/parameterized_flow.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')

에러를 해결 하기 위해서 아래와같이 도커파일을 수정하고 새롭게 이미지를 빌드하고 푸시했습니다.

<bash />
docker build -t mjs1995/prefect_cloud:v1 . docker image push mjs1995/prefect_cloud:v1 prefect deployment run etl-parent-flow/docker-flow -p "months=[1,2]"


7. Reference

데이터 엔지니어링 줌 캠프 깃허브 주소 
youtube
DataTalks.Club
data engineering zoomcamp slack

 

profile

융무의 기술블로그

@융무

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!