Pyspark 를 사용한 테이블 조인, table join with pyspark

두 개의 HBase 테이블을 가지고 PySpark를 통해 통계 데이터를 생성하려면 다음과 같은 과정이 필요합니다. PySpark와 HBase를 연동하려면 Spark와 HBase 간의 통신을 설정하고, 테이블 데이터를 불러온 후 필요한 통계를 계산하는 로직을 구현해야 합니다.

아래는 PySpark와 HBase 연동데이터 처리 과정에 대한 전체 흐름과 예제 코드입니다.


1. 환경 설정 및 의존성 추가

Spark와 HBase 간의 통신을 위해 몇 가지 의존성 패키지가 필요합니다:

  • hbase-client: HBase 클라이언트 라이브러리
  • hbase-spark: Spark와 HBase 간의 통합을 지원하는 모듈

다음과 같은 의존성을 Spark 환경에 추가해야 합니다:

spark-submit \
  --jars /path/to/hbase-spark.jar,/path/to/hbase-client.jar \
  your_spark_script.py

Spark 환경에 HBase 관련 의존성을 추가하지 않으면 PySpark 코드에서 HBase와의 연동이 제대로 동작하지 않습니다. 사용 중인 클러스터 환경에 따라 이 설정이 달라질 수 있습니다.


2. SparkContext 설정

PySpark에서 HBase와 연동하려면, hbase-site.xml 파일을 참조하거나 HBase의 설정을 명시적으로 지정해야 합니다. 이 예제에서는 SparkSession과 Hadoop 설정을 통해 HBase와의 연동을 준비합니다.

from pyspark.sql import SparkSession

# SparkSession 생성 (HBase와 통신 준비)
spark = SparkSession.builder \
    .appName("HBase-Stat-Job") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .config("spark.hadoop.hbase.mapreduce.inputtable", "table1") \
    .getOrCreate()

위 코드는 Spark 세션을 초기화하고, HBase의 ZooKeeper 설정을 추가한 것입니다. hbase-site.xml을 정확한 경로에 배치하면, Spark가 자동으로 HBase 클러스터와 연결될 수 있습니다.


3. HBase 테이블 데이터 로딩

HBase 테이블의 데이터를 PySpark DataFrame으로 불러오기 위해 다음과 같은 방식을 사용할 수 있습니다.

from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def load_hbase_table(table_name: str) -> DataFrame:
    # HBase 테이블을 DataFrame으로 읽어오기
    return spark.read \
        .format("org.apache.hadoop.hbase.spark") \
        .option("hbase.table", table_name) \
        .option("hbase.spark.use.hbasecontext", "true") \
        .load()

# 두 개의 HBase 테이블을 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")

# 데이터 확인
df1.show()
df2.show()

위 코드에서는 HBase 테이블 데이터를 Spark DataFrame으로 변환합니다. table1table2의 데이터를 각각 df1df2에 로드한 예입니다.


4. 데이터 조인 및 통계 계산

두 테이블 간의 공통 키를 기준으로 조인한 후 필요한 통계 데이터를 계산할 수 있습니다.

예제: 두 테이블의 공통 키(id)를 기준으로 조인하고, 특정 컬럼에 대한 통계 계산

from pyspark.sql.functions import avg, count

# 'id'를 기준으로 두 테이블을 조인
joined_df = df1.join(df2, on="id", how="inner")

# 특정 컬럼에 대한 평균 및 개수 계산
stats_df = joined_df.groupBy("category").agg(
    avg("value1").alias("avg_value1"),
    avg("value2").alias("avg_value2"),
    count("id").alias("count")
)

# 통계 결과 출력
stats_df.show()

위 코드에서는 두 HBase 테이블을 id 컬럼을 기준으로 조인하고, category별로 그룹화한 후, value1value2 컬럼의 평균과 id의 개수를 계산합니다.


5. 결과 저장 (선택)

계산된 통계 데이터를 다시 HBase에 저장하거나, CSV/Parquet 파일로 저장할 수 있습니다.

HBase에 저장하는 예제

stats_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("hbase.table", "stats_table") \
    .save()

CSV 파일로 저장하는 예제

stats_df.write.csv("hdfs:///user/output/stats_data.csv")

6. 전체 코드 예시

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# SparkSession 생성 및 HBase 설정
spark = SparkSession.builder \
    .appName("HBase-Stat-Job") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "zookeeper-server") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# HBase 테이블을 DataFrame으로 불러오기 함수
def load_hbase_table(table_name: str):
    return spark.read \
        .format("org.apache.hadoop.hbase.spark") \
        .option("hbase.table", table_name) \
        .option("hbase.spark.use.hbasecontext", "true") \
        .load()

# 두 개의 테이블 불러오기
df1 = load_hbase_table("table1")
df2 = load_hbase_table("table2")

# 조인 및 통계 계산
joined_df = df1.join(df2, on="id", how="inner")
stats_df = joined_df.groupBy("category").agg(
    avg("value1").alias("avg_value1"),
    avg("value2").alias("avg_value2"),
    count("id").alias("count")
)

# 통계 결과 출력
stats_df.show()

# 통계 결과를 CSV로 저장
stats_df.write.csv("hdfs:///user/output/stats_data.csv")

# 세션 종료
spark.stop()

결론

위의 예제에서는 PySpark를 사용해 두 개의 HBase 테이블에서 데이터를 불러오고, 조인한 후 통계 데이터를 계산하는 과정을 보여드렸습니다. 사용자의 환경에 따라 HBase와의 연동 방식이 다를 수 있으니, Spark 및 HBase 클러스터 환경에 맞게 설정을 조정해야 합니다.

필요에 따라 결과 데이터를 다시 HBase에 저장하거나, 다른 저장소(CSV, Parquet 등)에 저장할 수 있습니다.

댓글

이 블로그의 인기 게시물

Install and run an FTP server using Docker

Using the MinIO API via curl

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

오늘의 문장2

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

To switch to a specific tag in a Git repository

You can use Sublime Text from the command line by utilizing the subl command

티베트-버마어파 와 한어파(중국어파)의 어순 비교