PySpark Dataframe from HBase

To filter data from HBase and write it back using PySpark, follow these steps:

  1. Read data from HBase into a PySpark DataFrame.
  2. Filter the DataFrame using PySpark transformations.
  3. Write the filtered DataFrame to your desired format (e.g., HBase, Parquet, CSV, etc.).

Below is the complete example covering these tasks.


1. Reading, Filtering, and Writing Data

Step-by-Step Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session with HBase Configurations
spark = SparkSession.builder \
    .appName("HBase-PySpark Read-Write") \
    .config("spark.jars", "/path/to/hbase-client.jar,/path/to/hadoop-common.jar") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "localhost") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# Step 2: Define the HBase Table Catalog
hbase_table = "my_table"
catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "{hbase_table}"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Step 3: Read Data from HBase into a PySpark DataFrame
df = spark.read \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", catalog) \
    .load()

# Step 4: Filter the DataFrame (e.g., age > 20)
filtered_df = df.filter(col("age") > 20)

# Step 5: Show the Filtered Data
filtered_df.show()

# Step 6: Write Filtered Data to a New CSV File (or HBase, if needed)
filtered_df.write \
    .mode("overwrite") \
    .csv("output/filtered_hbase_data.csv")

2. Explanation of Steps

1. Spark Session Initialization

  • Configures Spark to interact with HBase using Zookeeper settings.
  • Loads the HBase and Hadoop client libraries (ensure the .jar paths are correct).

2. HBase Table Catalog

  • Defines how PySpark maps HBase columns to a DataFrame structure.

3. Filtering the Data

  • df.filter(col("age") > 20) filters out rows where the age is less than or equal to 20.

4. Writing Data

  • The write operation stores the filtered data in CSV format.
  • You can adjust the format to Parquet, JSON, or write it back to HBase.

3. Writing Data Back to HBase (Optional)

To write the filtered data back to HBase, update the catalog and use the HBase connector.

Example: Writing Data to HBase

# Define a new catalog for the target HBase table
write_catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "filtered_table"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Write the filtered DataFrame back to HBase
filtered_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", write_catalog) \
    .mode("overwrite") \
    .save()

4. Summary

  • Read Data from HBase: Use PySpark with the HBase connector to load the data.
  • Filter Data: Apply filter() or where() clauses on the DataFrame.
  • Write Filtered Data: Save the data in CSV, Parquet, or write it back to HBase.

This approach ensures a smooth read, filter, and write pipeline using HBase with PySpark. Let me know if you run into any issues or need further clarification!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

vsftpd default directory

Using venv in Python

Offset out of range error in Kafka, 카프카 트러블슈팅

리눅스 (cron - 주기적 작업실행 데몬)

[Ubuntu] *.deb 파일 설치 방법

Sparse encoder

Retrieving Open Graph (OG) tags from an HTML document in Java