PySpark Dataframe from HBase

To filter data from HBase and write it back using PySpark, follow these steps:

  1. Read data from HBase into a PySpark DataFrame.
  2. Filter the DataFrame using PySpark transformations.
  3. Write the filtered DataFrame to your desired format (e.g., HBase, Parquet, CSV, etc.).

Below is the complete example covering these tasks.


1. Reading, Filtering, and Writing Data

Step-by-Step Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session with HBase Configurations
spark = SparkSession.builder \
    .appName("HBase-PySpark Read-Write") \
    .config("spark.jars", "/path/to/hbase-client.jar,/path/to/hadoop-common.jar") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "localhost") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# Step 2: Define the HBase Table Catalog
hbase_table = "my_table"
catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "{hbase_table}"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Step 3: Read Data from HBase into a PySpark DataFrame
df = spark.read \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", catalog) \
    .load()

# Step 4: Filter the DataFrame (e.g., age > 20)
filtered_df = df.filter(col("age") > 20)

# Step 5: Show the Filtered Data
filtered_df.show()

# Step 6: Write Filtered Data to a New CSV File (or HBase, if needed)
filtered_df.write \
    .mode("overwrite") \
    .csv("output/filtered_hbase_data.csv")

2. Explanation of Steps

1. Spark Session Initialization

  • Configures Spark to interact with HBase using Zookeeper settings.
  • Loads the HBase and Hadoop client libraries (ensure the .jar paths are correct).

2. HBase Table Catalog

  • Defines how PySpark maps HBase columns to a DataFrame structure.

3. Filtering the Data

  • df.filter(col("age") > 20) filters out rows where the age is less than or equal to 20.

4. Writing Data

  • The write operation stores the filtered data in CSV format.
  • You can adjust the format to Parquet, JSON, or write it back to HBase.

3. Writing Data Back to HBase (Optional)

To write the filtered data back to HBase, update the catalog and use the HBase connector.

Example: Writing Data to HBase

# Define a new catalog for the target HBase table
write_catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "filtered_table"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Write the filtered DataFrame back to HBase
filtered_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", write_catalog) \
    .mode("overwrite") \
    .save()

4. Summary

  • Read Data from HBase: Use PySpark with the HBase connector to load the data.
  • Filter Data: Apply filter() or where() clauses on the DataFrame.
  • Write Filtered Data: Save the data in CSV, Parquet, or write it back to HBase.

This approach ensures a smooth read, filter, and write pipeline using HBase with PySpark. Let me know if you run into any issues or need further clarification!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

How to split a list into chunks of 100 items in JavaScript, 자바스크립트 리스트 쪼개기

HTML Inline divisions at one row by Tailwind

Boilerplate for typescript server programing

가속도 & 속도

Gradle multi-module project

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

CDPEvents in puppeteer

Sparse encoder

Reactjs datetime range picker