융무의 기술블로그
article thumbnail
Published 2023. 5. 21. 15:01
[de zoomcamp] 06_스트리밍 PJT

데이터 엔지니어링 줌 캠프 PJT를 진행하면서 관련된 내용을 정리하고자 합니다.

프로젝트의 코드는 github에서 확인할 수 있습니다.

https://github.com/mjs1995/data-engineering-zoomcamp/tree/main/06_stream_processing

 

GitHub - mjs1995/data-engineering-zoomcamp: PJT

PJT. Contribute to mjs1995/data-engineering-zoomcamp development by creating an account on GitHub.

github.com


1. docker에서 Spark 및 Kafka 실행

https://www.slideshare.net/PaoloCastagna1/introduction-to-apache-kafka-confluent-and-why-they-matter

아파치 카프카(Apache Kafka)는 분산 스트리밍 플랫폼이며 데이터 피드의 분산 스트리밍, 파이프 라이닝 및 재생을 위한 실시간 스트리밍 데이터를 처리하기 위한 목적으로 설계된 오픈 소스 분산형 게시-구독 메시징 플랫폼입니다.

  • 프로젝트에서 도커 파일로 사용한 서비스입니다.
    • Apache Spark Master: Spark 클러스터에서 작업을 조정하고 클러스터 내에서 작업자 노드에 작업을 분배하는 역할을 합니다.
    • Apache Spark Worker: Spark 클러스터에서 실제로 데이터 처리 작업을 수행하는 노드입니다. 마스터 노드로부터 작업을 할당받아 데이터 처리를 수행하게 됩니다.
    • Apache Kafka Broker: 분산 스트리밍 플랫폼인 Apache Kafka에서 메시지 브로커 역할을 수행합니다. Producer와 Consumer가 메시지를 주고 받을 수 있도록 중개 역할을 합니다.
    • Confluent Schema Registry: 데이터 스키마 버전 관리를 위한 서비스로, Kafka 메시지를 전송할 때 메시지의 구조와 형식을 검증하여 데이터 무결성을 유지합니다. 스키마 레지스트리는 스키마 생성, 조회, 관리에 대한 HTTP API를 제공하고 있습니다
    • Apache Kafka ZooKeeper: Apache Kafka 클러스터의 메타 데이터 정보를 저장하고 관리하는 역할을 합니다.
    • Confluent Control Center: Confluent Platform의 모니터링 및 관리 도구로, Kafka와 관련된 모든 작업을 한 곳에서 수행할 수 있도록 도와줍니다. 토픽 생성, 파티션 및 레플리카 관리, 스키마 관리, 모니터링 및 경고 등을 수행할 수 있습니다.
  • docker-compose.yml 파일의 서비스들입니다.
    • jupyterlab: JupyterLab 서비스를 실행합니다.
    • spark-master: Apache Spark 클러스터의 마스터 노드를 실행합니다.
    • spark-worker-1, spark-worker-2: Apache Spark 클러스터의 워커 노드를 실행합니다.
    • broker: Apache Kafka 브로커를 실행합니다.
    • schema-registry: Confluent Schema Registry 서비스를 실행합니다.
    • zookeeper: Apache Kafka의 ZooKeeper를 실행합니다.
    • control-center: Confluent Control Center 서비스를 실행합니다.

도커 데스크탑을 실행한 뒤에 Spark 컨테이너(spark master, spark worker, jupyterlab)를 빌드하는 데 필요한 도커 이미지를 다운로드합니다.

<bash />
./build.sh

Kafka와 Spark가 연결되도록 네트워크와 볼륨을 만들어줍니다.

<bash />
docker network create kafka-spark-network docker volume create --name=hadoop-distributed-file-system docker network ls

Kafka 및 Spark 컨테이너를 시작합니다.

<bash />
docker compose up -d

도커에서 현재 실행 중인 컨테이너는 다음과 같습니다.

<bash />
docker ps -a

컨테이너로 생성된 서비스들과 각각의 포트 번호, 그리고 해당 서비스의 간략한 설명입니다.

Application URL Description
JupyterLab localhost:8888 Cluster interface with built-in Jupyter notebooks
Spark Master localhost:8080 Spark Master node
Spark Worker I localhost:8083 Spark Worker node with 1 core and 512m of memory (default)
Spark Worker II localhost:8084 Spark Worker node with 1 core and 512m of memory (default)
Confluent kafka localhost:9021 Web interface, to monitor, operate, and manage Kafka clusters

Spark Master -&nbsp;localhost:8080
JupyterLab -&nbsp;localhost:8888
Confluent kafka -&nbsp;localhost:9021

현재 실행 중인 모든 컨테이너를 중지하는 명령어 입니다. 컨테이너를 사용하지 않을 때 실행시켜 줍니다.

<bash />
docker stop $(docker ps -a -q)


2. kafka-python

새로운 환경을 설정해줍니다.

<bash />
virtualenv kafka-env source kafka-env/bin/activate

requirements.txt 파일을 생성해 줍니다.

<bash />
kafka-python==1.4.6 confluent_kafka requests avro faust fastavro pyspark

필요한 라이브러리들을 설치해 줍니다.

<bash />
pip install -r requirements.txt

소스 코드는 다음과 같습니다.

settings.py : 환경 구성에 대한 설정 파일입니다.

<python />
import pyspark.sql.types as T INPUT_DATA_PATH = '../../resources/rides.csv' BOOTSTRAP_SERVERS = 'localhost:9092' TOPIC_WINDOWED_VENDOR_ID_COUNT = 'vendor_counts_windowed' PRODUCE_TOPIC_RIDES_CSV = CONSUME_TOPIC_RIDES_CSV = 'rides_csv' RIDE_SCHEMA = T.StructType( [T.StructField("vendor_id", T.IntegerType()), T.StructField('tpep_pickup_datetime', T.TimestampType()), T.StructField('tpep_dropoff_datetime', T.TimestampType()), T.StructField("passenger_count", T.IntegerType()), T.StructField("trip_distance", T.FloatType()), T.StructField("payment_type", T.IntegerType()), T.StructField("total_amount", T.FloatType()), ])

producer.py : 원본 데이터 파일 (rides.csv)에 연결하여 데이터를 가져와 메시지를 토픽에 전달합니다.

<python />
import csv from time import sleep from typing import Dict from kafka import KafkaProducer from settings import BOOTSTRAP_SERVERS, INPUT_DATA_PATH, PRODUCE_TOPIC_RIDES_CSV def delivery_report(err, msg): if err is not None: print("Delivery failed for record {}: {}".format(msg.key(), err)) return print('Record {} successfully produced to {} [{}] at offset {}'.format( msg.key(), msg.topic(), msg.partition(), msg.offset())) class RideCSVProducer: def __init__(self, props: Dict): self.producer = KafkaProducer(**props) # self.producer = Producer(producer_props) @staticmethod def read_records(resource_path: str): records, ride_keys = [], [] i = 0 with open(resource_path, 'r') as f: reader = csv.reader(f) header = next(reader) # skip the header for row in reader: # vendor_id, passenger_count, trip_distance, payment_type, total_amount records.append(f'{row[0]}, {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[9]}, {row[16]}') ride_keys.append(str(row[0])) i += 1 if i == 5: break return zip(ride_keys, records) def publish(self, topic: str, records: [str, str]): for key_value in records: key, value = key_value try: self.producer.send(topic=topic, key=key, value=value) print(f"Producing record for <key: {key}, value:{value}>") except KeyboardInterrupt: break except Exception as e: print(f"Exception while producing record - {value}: {e}") self.producer.flush() sleep(1) if __name__ == "__main__": config = { 'bootstrap_servers': [BOOTSTRAP_SERVERS], 'key_serializer': lambda x: x.encode('utf-8'), 'value_serializer': lambda x: x.encode('utf-8') } producer = RideCSVProducer(props=config) ride_records = producer.read_records(resource_path=INPUT_DATA_PATH) print(ride_records) producer.publish(topic=PRODUCE_TOPIC_RIDES_CSV, records=ride_records)

consumer.py : 토픽에서 메시지를 받아옵니다.

<python />
import argparse from typing import Dict, List from kafka import KafkaConsumer from settings import BOOTSTRAP_SERVERS, CONSUME_TOPIC_RIDES_CSV class RideCSVConsumer: def __init__(self, props: Dict): self.consumer = KafkaConsumer(**props) def consume_from_kafka(self, topics: List[str]): self.consumer.subscribe(topics=topics) print('Consuming from Kafka started') print('Available topics to consume: ', self.consumer.subscription()) while True: try: # SIGINT can't be handled when polling, limit timeout to 1 second. msg = self.consumer.poll(1.0) if msg is None or msg == {}: continue for msg_key, msg_values in msg.items(): for msg_val in msg_values: print(f'Key:{msg_val.key}-type({type(msg_val.key)}), ' f'Value:{msg_val.value}-type({type(msg_val.value)})') except KeyboardInterrupt: break self.consumer.close() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kafka Consumer') parser.add_argument('--topic', type=str, default=CONSUME_TOPIC_RIDES_CSV) args = parser.parse_args() topic = args.topic config = { 'bootstrap_servers': [BOOTSTRAP_SERVERS], 'auto_offset_reset': 'earliest', 'enable_auto_commit': True, 'key_deserializer': lambda key: int(key.decode('utf-8')), 'value_deserializer': lambda value: value.decode('utf-8'), 'group_id': 'consumer.group.id.csv-example.1', } csv_consumer = RideCSVConsumer(props=config) csv_consumer.consume_from_kafka(topics=[topic])

producer python 스크립트 파일을 실행합니다.

<bash />
python producer.py

consumer python 스크립트 파일을 실행합니다.

<bash />
python consumer.py

Confluent Control Center 에서 위에서 실행한 producer와 consumer에 대한 Topic을 확인합니다.


3. PySpark Streaming

streaming.py는 스파크를 사용하여 Kafka로부터 스트리밍 데이터를 읽어들이고, 파싱하고, 집계한 후 콘솔에 출력하거나 다시 Kafka로 출력하는 코드입니다.

streaming.py의 소스 코드는 다음과 같습니다.

<python />
from pyspark.sql import SparkSession import pyspark.sql.functions as F import findspark findspark.init() from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT def read_from_kafka(consume_topic: str): # Spark Streaming DataFrame, connect to Kafka topic served at host in bootrap.servers option df_stream = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \ .option("subscribe", consume_topic) \ .option("startingOffsets", "earliest") \ .option("checkpointLocation", "checkpoint") \ .load() return df_stream def parse_ride_from_kafka_message(df, schema): """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """ assert df.isStreaming is True, "DataFrame doesn't receive streaming data" df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") # split attributes to nested array in one Column col = F.split(df['value'], ', ') # expand col to multiple top-level columns for idx, field in enumerate(schema): df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType)) return df.select([field.name for field in schema]) def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'): write_query = df.writeStream \ .outputMode(output_mode) \ .trigger(processingTime=processing_time) \ .format("console") \ .option("truncate", False) \ .start() return write_query # pyspark.sql.streaming.StreamingQuery def sink_memory(df, query_name, query_template): query_df = df \ .writeStream \ .queryName(query_name) \ .format("memory") \ .start() query_str = query_template.format(table_name=query_name) query_results = spark.sql(query_str) return query_results, query_df def sink_kafka(df, topic): write_query = df.writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \ .outputMode('complete') \ .option("topic", topic) \ .option("checkpointLocation", "checkpoint") \ .start() return write_query def prepare_df_to_kafka_sink(df, value_columns, key_column=None): columns = df.columns df = df.withColumn("value", F.concat_ws(', ', *value_columns)) if key_column: df = df.withColumnRenamed(key_column, "key") df = df.withColumn("key", df.key.cast('string')) return df.select(['key', 'value']) def op_groupby(df, column_names): df_aggregation = df.groupBy(column_names).count() return df_aggregation def op_windowed_groupby(df, window_duration, slide_duration): df_windowed_aggregation = df.groupBy( F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration), df.vendor_id ).count() return df_windowed_aggregation if __name__ == "__main__": spark = SparkSession.builder.appName('streaming-examples').getOrCreate() spark.sparkContext.setLogLevel('WARN') # read_streaming data df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV) print(df_consume_stream.printSchema()) # parse streaming data df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA) print(df_rides.printSchema()) sink_console(df_rides, output_mode='append') df_trip_count_by_vendor_id = op_groupby(df_rides, ['vendor_id']) df_trip_count_by_pickup_date_vendor_id = op_windowed_groupby(df_rides, window_duration="10 minutes", slide_duration='5 minutes') # write the output out to the console for debugging / testing sink_console(df_trip_count_by_vendor_id) # write the output to the kafka topic df_trip_count_messages = prepare_df_to_kafka_sink(df=df_trip_count_by_pickup_date_vendor_id, value_columns=['count'], key_column='vendor_id') kafka_sink_query = sink_kafka(df=df_trip_count_messages, topic=TOPIC_WINDOWED_VENDOR_ID_COUNT) spark.streams.awaitAnyTermination()

spark-submit을 실행합니다.

<bash />
./spark-submit.sh streaming.py

직접 설정값을 조정할 수 도 있습니다.

<bash />
spark-submit --master spark://localhost:7077 --num-executors 2 --executor-memory 3G --executor-cores 1 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1,org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.1 streaming.py

해당 spark-submit 관련 에러는 블로그  포스팅에서 확인하실 수 있습니다.


4. Reference

데이터 엔지니어링 줌 캠프 깃허브 주소 
youtube
DataTalks.Club
data engineering zoomcamp slack

 

profile

융무의 기술블로그

@융무

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!