PySpark Dataframe from HBase
To filter data from HBase and write it back using PySpark, follow these steps:
- Read data from HBase into a PySpark DataFrame.
- Filter the DataFrame using PySpark transformations.
- Write the filtered DataFrame to your desired format (e.g., HBase, Parquet, CSV, etc.).
Below is the complete example covering these tasks.
1. Reading, Filtering, and Writing Data
Step-by-Step Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Step 1: Initialize Spark Session with HBase Configurations
spark = SparkSession.builder \
.appName("HBase-PySpark Read-Write") \
.config("spark.jars", "/path/to/hbase-client.jar,/path/to/hadoop-common.jar") \
.config("spark.hadoop.hbase.zookeeper.quorum", "localhost") \
.config("spark.hadoop.hbase.zookeeper.property.clientPort", "2181") \
.getOrCreate()
# Step 2: Define the HBase Table Catalog
hbase_table = "my_table"
catalog = f"""
{{
"table": {{
"namespace": "default",
"name": "{hbase_table}"
}},
"rowkey": "key",
"columns": {{
"key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
"name": {{ "cf": "cf1", "col": "name", "type": "string" }},
"age": {{ "cf": "cf1", "col": "age", "type": "int" }}
}}
}}
"""
# Step 3: Read Data from HBase into a PySpark DataFrame
df = spark.read \
.format("org.apache.hadoop.hbase.spark") \
.option("catalog", catalog) \
.load()
# Step 4: Filter the DataFrame (e.g., age > 20)
filtered_df = df.filter(col("age") > 20)
# Step 5: Show the Filtered Data
filtered_df.show()
# Step 6: Write Filtered Data to a New CSV File (or HBase, if needed)
filtered_df.write \
.mode("overwrite") \
.csv("output/filtered_hbase_data.csv")
2. Explanation of Steps
1. Spark Session Initialization
- Configures Spark to interact with HBase using Zookeeper settings.
- Loads the HBase and Hadoop client libraries (ensure the
.jar
paths are correct).
2. HBase Table Catalog
- Defines how PySpark maps HBase columns to a DataFrame structure.
3. Filtering the Data
df.filter(col("age") > 20)
filters out rows where theage
is less than or equal to 20.
4. Writing Data
- The
write
operation stores the filtered data in CSV format. - You can adjust the format to Parquet, JSON, or write it back to HBase.
3. Writing Data Back to HBase (Optional)
To write the filtered data back to HBase, update the catalog and use the HBase connector.
Example: Writing Data to HBase
# Define a new catalog for the target HBase table
write_catalog = f"""
{{
"table": {{
"namespace": "default",
"name": "filtered_table"
}},
"rowkey": "key",
"columns": {{
"key": {{ "cf": "rowkey", "col": "key", "type": "string" }},
"name": {{ "cf": "cf1", "col": "name", "type": "string" }},
"age": {{ "cf": "cf1", "col": "age", "type": "int" }}
}}
}}
"""
# Write the filtered DataFrame back to HBase
filtered_df.write \
.format("org.apache.hadoop.hbase.spark") \
.option("catalog", write_catalog) \
.mode("overwrite") \
.save()
4. Summary
- Read Data from HBase: Use PySpark with the HBase connector to load the data.
- Filter Data: Apply
filter()
orwhere()
clauses on the DataFrame. - Write Filtered Data: Save the data in CSV, Parquet, or write it back to HBase.
This approach ensures a smooth read, filter, and write pipeline using HBase with PySpark. Let me know if you run into any issues or need further clarification!
댓글
댓글 쓰기