PySpark Dataframe from HBase

To filter data from HBase and write it back using PySpark, follow these steps:

  1. Read data from HBase into a PySpark DataFrame.
  2. Filter the DataFrame using PySpark transformations.
  3. Write the filtered DataFrame to your desired format (e.g., HBase, Parquet, CSV, etc.).

Below is the complete example covering these tasks.


1. Reading, Filtering, and Writing Data

Step-by-Step Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session with HBase Configurations
spark = SparkSession.builder \
    .appName("HBase-PySpark Read-Write") \
    .config("spark.jars", "/path/to/hbase-client.jar,/path/to/hadoop-common.jar") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "localhost") \
    .config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
    .getOrCreate()

# Step 2: Define the HBase Table Catalog
hbase_table = "my_table"
catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "{hbase_table}"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Step 3: Read Data from HBase into a PySpark DataFrame
df = spark.read \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", catalog) \
    .load()

# Step 4: Filter the DataFrame (e.g., age > 20)
filtered_df = df.filter(col("age") > 20)

# Step 5: Show the Filtered Data
filtered_df.show()

# Step 6: Write Filtered Data to a New CSV File (or HBase, if needed)
filtered_df.write \
    .mode("overwrite") \
    .csv("output/filtered_hbase_data.csv")

2. Explanation of Steps

1. Spark Session Initialization

  • Configures Spark to interact with HBase using Zookeeper settings.
  • Loads the HBase and Hadoop client libraries (ensure the .jar paths are correct).

2. HBase Table Catalog

  • Defines how PySpark maps HBase columns to a DataFrame structure.

3. Filtering the Data

  • df.filter(col("age") > 20) filters out rows where the age is less than or equal to 20.

4. Writing Data

  • The write operation stores the filtered data in CSV format.
  • You can adjust the format to Parquet, JSON, or write it back to HBase.

3. Writing Data Back to HBase (Optional)

To write the filtered data back to HBase, update the catalog and use the HBase connector.

Example: Writing Data to HBase

# Define a new catalog for the target HBase table
write_catalog = f"""
{{
    "table": {{
        "namespace": "default", 
        "name": "filtered_table"
    }},
    "rowkey": "key",
    "columns": {{
        "key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
        "name": {{ "cf": "cf1", "col": "name", "type": "string" }},
        "age": {{ "cf": "cf1", "col": "age", "type": "int" }}
    }}
}}
"""

# Write the filtered DataFrame back to HBase
filtered_df.write \
    .format("org.apache.hadoop.hbase.spark") \
    .option("catalog", write_catalog) \
    .mode("overwrite") \
    .save()

4. Summary

  • Read Data from HBase: Use PySpark with the HBase connector to load the data.
  • Filter Data: Apply filter() or where() clauses on the DataFrame.
  • Write Filtered Data: Save the data in CSV, Parquet, or write it back to HBase.

This approach ensures a smooth read, filter, and write pipeline using HBase with PySpark. Let me know if you run into any issues or need further clarification!

댓글

이 블로그의 인기 게시물

Install and run an FTP server using Docker

Using the MinIO API via curl

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

오늘의 문장2

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

To switch to a specific tag in a Git repository

You can use Sublime Text from the command line by utilizing the subl command

티베트-버마어파 와 한어파(중국어파)의 어순 비교