Pyspark 를 사용한 테이블 조인, table join with pyspark
두 개의 HBase 테이블을 가지고 PySpark를 통해 통계 데이터를 생성하려면 다음과 같은 과정이 필요합니다. PySpark와 HBase를 연동하려면 Spark와 HBase 간의 통신을 설정하고, 테이블 데이터를 불러온 후 필요한 통계를 계산하는 로직을 구현해야 합니다.
아래는 PySpark와 HBase 연동 및 데이터 처리 과정에 대한 전체 흐름과 예제 코드입니다.
1. 환경 설정 및 의존성 추가
Spark와 HBase 간의 통신을 위해 몇 가지 의존성 패키지가 필요합니다:
hbase-client
: HBase 클라이언트 라이브러리hbase-spark
: Spark와 HBase 간의 통합을 지원하는 모듈
다음과 같은 의존성을 Spark 환경에 추가해야 합니다:
spark-submit \
--jars /path/to/hbase-spark.jar,/path/to/hbase-client.jar \
your_spark_script.py
Spark 환경에 HBase 관련 의존성을 추가하지 않으면 PySpark 코드에서 HBase와의 연동이 제대로 동작하지 않습니다. 사용 중인 클러스터 환경에 따라 이 설정이 달라질 수 있습니다.
2. SparkContext 설정
PySpark에서 HBase와 연동하려면, hbase-site.xml
파일을 참조하거나 HBase의 설정을 명시적으로 지정해야 합니다. 이 예제에서는 SparkSession과 Hadoop 설정을 통해 HBase와의 연동을 준비합니다.
from pyspark.sql import SparkSession
# SparkSession 생성 (HBase와 통신 준비)
spark = SparkSession.builder \
.appName("HBase-Stat-Job") \
.config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
.config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
.config("spark.hadoop.hbase.mapreduce.inputtable", "table1") \
.getOrCreate()
위 코드는 Spark 세션을 초기화하고, HBase의 ZooKeeper 설정을 추가한 것입니다. hbase-site.xml
을 정확한 경로에 배치하면, Spark가 자동으로 HBase 클러스터와 연결될 수 있습니다.
3. HBase 테이블 데이터 로딩
HBase 테이블의 데이터를 PySpark DataFrame으로 불러오기 위해 다음과 같은 방식을 사용할 수 있습니다.
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
def load_hbase_table(table_name: str) -> DataFrame:
# HBase 테이블을 DataFrame으로 읽어오기
return spark.read \
.format("org.apache.hadoop.hbase.spark") \
.option("hbase.table", table_name) \
.option("hbase.spark.use.hbasecontext", "true") \
.load()
# 두 개의 HBase 테이블을 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")
# 데이터 확인
df1.show()
df2.show()
위 코드에서는 HBase 테이블 데이터를 Spark DataFrame으로 변환합니다. table1
과 table2
의 데이터를 각각 df1
과 df2
에 로드한 예입니다.
4. 데이터 조인 및 통계 계산
두 테이블 간의 공통 키를 기준으로 조인한 후 필요한 통계 데이터를 계산할 수 있습니다.
예제: 두 테이블의 공통 키(id
)를 기준으로 조인하고, 특정 컬럼에 대한 통계 계산
from pyspark.sql.functions import avg, count
# 'id'를 기준으로 두 테이블을 조인
joined_df = df1.join(df2, on="id", how="inner")
# 특정 컬럼에 대한 평균 및 개수 계산
stats_df = joined_df.groupBy("category").agg(
avg("value1").alias("avg_value1"),
avg("value2").alias("avg_value2"),
count("id").alias("count")
)
# 통계 결과 출력
stats_df.show()
위 코드에서는 두 HBase 테이블을 id
컬럼을 기준으로 조인하고, category
별로 그룹화한 후, value1
과 value2
컬럼의 평균과 id
의 개수를 계산합니다.
5. 결과 저장 (선택)
계산된 통계 데이터를 다시 HBase에 저장하거나, CSV/Parquet 파일로 저장할 수 있습니다.
HBase에 저장하는 예제
stats_df.write \
.format("org.apache.hadoop.hbase.spark") \
.option("hbase.table", "stats_table") \
.save()
CSV 파일로 저장하는 예제
stats_df.write.csv("hdfs:///user/output/stats_data.csv")
6. 전체 코드 예시
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count
# SparkSession 생성 및 HBase 설정
spark = SparkSession.builder \
.appName("HBase-Stat-Job") \
.config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
.config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
.getOrCreate()
# HBase 테이블을 DataFrame으로 불러오기 함수
def load_hbase_table(table_name: str):
return spark.read \
.format("org.apache.hadoop.hbase.spark") \
.option("hbase.table", table_name) \
.option("hbase.spark.use.hbasecontext", "true") \
.load()
# 두 개의 테이블 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")
# 조인 및 통계 계산
joined_df = df1.join(df2, on="id", how="inner")
stats_df = joined_df.groupBy("category").agg(
avg("value1").alias("avg_value1"),
avg("value2").alias("avg_value2"),
count("id").alias("count")
)
# 통계 결과 출력
stats_df.show()
# 통계 결과를 CSV로 저장
stats_df.write.csv("hdfs:///user/output/stats_data.csv")
# 세션 종료
spark.stop()
결론
위의 예제에서는 PySpark를 사용해 두 개의 HBase 테이블에서 데이터를 불러오고, 조인한 후 통계 데이터를 계산하는 과정을 보여드렸습니다. 사용자의 환경에 따라 HBase와의 연동 방식이 다를 수 있으니, Spark 및 HBase 클러스터 환경에 맞게 설정을 조정해야 합니다.
필요에 따라 결과 데이터를 다시 HBase에 저장하거나, 다른 저장소(CSV, Parquet 등)에 저장할 수 있습니다.
댓글
댓글 쓰기