Pyspark 를 사용한 테이블 조인, table join with pyspark

두 개의 HBase 테이블을 가지고 PySpark를 통해 통계 데이터를 생성하려면 다음과 같은 과정이 필요합니다. PySpark와 HBase를 연동하려면 Spark와 HBase 간의 통신을 설정하고, 테이블 데이터를 불러온 후 필요한 통계를 계산하는 로직을 구현해야 합니다.

아래는 PySpark와 HBase 연동데이터 처리 과정에 대한 전체 흐름과 예제 코드입니다.


1. 환경 설정 및 의존성 추가

Spark와 HBase 간의 통신을 위해 몇 가지 의존성 패키지가 필요합니다:

  • hbase-client: HBase 클라이언트 라이브러리
  • hbase-spark: Spark와 HBase 간의 통합을 지원하는 모듈

다음과 같은 의존성을 Spark 환경에 추가해야 합니다:

spark-submit \
  --jars /path/to/hbase-spark.jar,/path/to/hbase-client.jar \
  your_spark_script.py

Spark 환경에 HBase 관련 의존성을 추가하지 않으면 PySpark 코드에서 HBase와의 연동이 제대로 동작하지 않습니다. 사용 중인 클러스터 환경에 따라 이 설정이 달라질 수 있습니다.


2. SparkContext 설정

PySpark에서 HBase와 연동하려면, hbase-site.xml 파일을 참조하거나 HBase의 설정을 명시적으로 지정해야 합니다. 이 예제에서는 SparkSession과 Hadoop 설정을 통해 HBase와의 연동을 준비합니다.

from pyspark.sql import SparkSession

# SparkSession 생성 (HBase와 통신 준비)
spark = SparkSession.builder \
    .appName("HBase-Stat-Job") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .config("spark.hadoop.hbase.mapreduce.inputtable", "table1") \
    .getOrCreate()

위 코드는 Spark 세션을 초기화하고, HBase의 ZooKeeper 설정을 추가한 것입니다. hbase-site.xml을 정확한 경로에 배치하면, Spark가 자동으로 HBase 클러스터와 연결될 수 있습니다.


3. HBase 테이블 데이터 로딩

HBase 테이블의 데이터를 PySpark DataFrame으로 불러오기 위해 다음과 같은 방식을 사용할 수 있습니다.

from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def load_hbase_table(table_name: str) -> DataFrame:
    # HBase 테이블을 DataFrame으로 읽어오기
    return spark.read \
        .format("org.apache.hadoop.hbase.spark") \
        .option("hbase.table", table_name) \
        .option("hbase.spark.use.hbasecontext", "true") \
        .load()

# 두 개의 HBase 테이블을 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")

# 데이터 확인
df1.show()
df2.show()

위 코드에서는 HBase 테이블 데이터를 Spark DataFrame으로 변환합니다. table1table2의 데이터를 각각 df1df2에 로드한 예입니다.


4. 데이터 조인 및 통계 계산

두 테이블 간의 공통 키를 기준으로 조인한 후 필요한 통계 데이터를 계산할 수 있습니다.

예제: 두 테이블의 공통 키(id)를 기준으로 조인하고, 특정 컬럼에 대한 통계 계산

from pyspark.sql.functions import avg, count

# 'id'를 기준으로 두 테이블을 조인
joined_df = df1.join(df2, on="id", how="inner")

# 특정 컬럼에 대한 평균 및 개수 계산
stats_df = joined_df.groupBy("category").agg(
    avg("value1").alias("avg_value1"),
    avg("value2").alias("avg_value2"),
    count("id").alias("count")
)

# 통계 결과 출력
stats_df.show()

위 코드에서는 두 HBase 테이블을 id 컬럼을 기준으로 조인하고, category별로 그룹화한 후, value1value2 컬럼의 평균과 id의 개수를 계산합니다.


5. 결과 저장 (선택)

계산된 통계 데이터를 다시 HBase에 저장하거나, CSV/Parquet 파일로 저장할 수 있습니다.

HBase에 저장하는 예제

stats_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("hbase.table", "stats_table") \
    .save()

CSV 파일로 저장하는 예제

stats_df.write.csv("hdfs:///user/output/stats_data.csv")

6. 전체 코드 예시

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# SparkSession 생성 및 HBase 설정
spark = SparkSession.builder \
    .appName("HBase-Stat-Job") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# HBase 테이블을 DataFrame으로 불러오기 함수
def load_hbase_table(table_name: str):
    return spark.read \
        .format("org.apache.hadoop.hbase.spark") \
        .option("hbase.table", table_name) \
        .option("hbase.spark.use.hbasecontext", "true") \
        .load()

# 두 개의 테이블 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")

# 조인 및 통계 계산
joined_df = df1.join(df2, on="id", how="inner")
stats_df = joined_df.groupBy("category").agg(
    avg("value1").alias("avg_value1"),
    avg("value2").alias("avg_value2"),
    count("id").alias("count")
)

# 통계 결과 출력
stats_df.show()

# 통계 결과를 CSV로 저장
stats_df.write.csv("hdfs:///user/output/stats_data.csv")

# 세션 종료
spark.stop()

결론

위의 예제에서는 PySpark를 사용해 두 개의 HBase 테이블에서 데이터를 불러오고, 조인한 후 통계 데이터를 계산하는 과정을 보여드렸습니다. 사용자의 환경에 따라 HBase와의 연동 방식이 다를 수 있으니, Spark 및 HBase 클러스터 환경에 맞게 설정을 조정해야 합니다.

필요에 따라 결과 데이터를 다시 HBase에 저장하거나, 다른 저장소(CSV, Parquet 등)에 저장할 수 있습니다.

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

How to split a list into chunks of 100 items in JavaScript, 자바스크립트 리스트 쪼개기

HTML Inline divisions at one row by Tailwind

Boilerplate for typescript server programing

가속도 & 속도

Gradle multi-module project

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

CDPEvents in puppeteer

Sparse encoder

Reactjs datetime range picker