PySpark Dataframe from HBase

To filter data from HBase and write it back using PySpark, follow these steps:

  1. Read data from HBase into a PySpark DataFrame.
  2. Filter the DataFrame using PySpark transformations.
  3. Write the filtered DataFrame to your desired format (e.g., HBase, Parquet, CSV, etc.).

Below is the complete example covering these tasks.


1. Reading, Filtering, and Writing Data

Step-by-Step Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session with HBase Configurations
spark = SparkSession.builder \
    .appName("HBase-PySpark Read-Write") \
    .config("spark.jars", "/path/to/hbase-client.jar,/path/to/hadoop-common.jar") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "localhost") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# Step 2: Define the HBase Table Catalog
hbase_table = "my_table"
catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "{hbase_table}"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Step 3: Read Data from HBase into a PySpark DataFrame
df = spark.read \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", catalog) \
    .load()

# Step 4: Filter the DataFrame (e.g., age > 20)
filtered_df = df.filter(col("age") > 20)

# Step 5: Show the Filtered Data
filtered_df.show()

# Step 6: Write Filtered Data to a New CSV File (or HBase, if needed)
filtered_df.write \
    .mode("overwrite") \
    .csv("output/filtered_hbase_data.csv")

2. Explanation of Steps

1. Spark Session Initialization

  • Configures Spark to interact with HBase using Zookeeper settings.
  • Loads the HBase and Hadoop client libraries (ensure the .jar paths are correct).

2. HBase Table Catalog

  • Defines how PySpark maps HBase columns to a DataFrame structure.

3. Filtering the Data

  • df.filter(col("age") > 20) filters out rows where the age is less than or equal to 20.

4. Writing Data

  • The write operation stores the filtered data in CSV format.
  • You can adjust the format to Parquet, JSON, or write it back to HBase.

3. Writing Data Back to HBase (Optional)

To write the filtered data back to HBase, update the catalog and use the HBase connector.

Example: Writing Data to HBase

# Define a new catalog for the target HBase table
write_catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "filtered_table"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Write the filtered DataFrame back to HBase
filtered_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", write_catalog) \
    .mode("overwrite") \
    .save()

4. Summary

  • Read Data from HBase: Use PySpark with the HBase connector to load the data.
  • Filter Data: Apply filter() or where() clauses on the DataFrame.
  • Write Filtered Data: Save the data in CSV, Parquet, or write it back to HBase.

This approach ensures a smooth read, filter, and write pipeline using HBase with PySpark. Let me know if you run into any issues or need further clarification!

댓글

이 블로그의 인기 게시물

Fundamentals of English Grammar #1

The logs of the kubelet service

Firebase with Flutter

Using venv in Python

max_active_runs of Airflow

운영체제를 개발 방법

Auto-populate a calendar in an MUI (Material-UI) TextField component

Decimal (ASCII) value of each letter in the alphabet

"Do you happen to"

php 에서의 static 변수