Developing a Kafka Streams application
Developing a Kafka Streams application that reads data from one topic, processes or converts it, and writes it to another topic can be achieved using Kafka Streams API. Here's a step-by-step guide to get you started in Java:
Prerequisites
- Apache Kafka installed and running.
- Kafka Streams API included in your project. Add the dependency in your
pom.xml
(if using Maven):<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.6.0</version> <!-- Replace with your Kafka version --> </dependency>
- Basic understanding of Kafka topics and Java.
Steps to Build the Kafka Streams Application
1. Define the Data Conversion Logic
Decide how you want to transform data. For example:
- JSON to a different JSON structure.
- Raw string to JSON or vice versa.
2. Write the Application Code
Here’s an example of a Kafka Streams application that reads JSON messages from one topic, processes them (e.g., adds a field), and writes to another topic:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.util.Properties;
public class KafkaStreamApp {
public static void main(String[] args) {
// Step 1: Set up Kafka Streams properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Step 2: Build the Stream topology
StreamsBuilder builder = new StreamsBuilder();
// Read from source topic
KStream<String, String> sourceStream = builder.stream("source-topic");
// Process data: Convert JSON and add a new field
KStream<String, String> transformedStream = sourceStream.mapValues(value -> {
JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
jsonObject.addProperty("processed", true); // Example transformation
return jsonObject.toString();
});
// Write to destination topic
transformedStream.to("destination-topic");
// Step 3: Create KafkaStreams instance and start the application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook to stop the application gracefully
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Explanation
Configuration (
StreamsConfig
):APPLICATION_ID_CONFIG
: Unique ID for the application.BOOTSTRAP_SERVERS_CONFIG
: Kafka cluster address.
Stream Topology:
sourceStream
: Reads data from the source topic.mapValues
: Processes the data (e.g., JSON transformation).to
: Sends the transformed data to the destination topic.
Graceful Shutdown:
- A shutdown hook ensures the application closes properly.
Testing the Application
Start Kafka and create the required topics:
kafka-topics --create --topic source-topic --bootstrap-server localhost:9092 kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
Produce test messages to the source topic:
kafka-console-producer --topic source-topic --bootstrap-server localhost:9092 {"key": "value"}
Consume messages from the destination topic:
kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning
Extending Further
- Use Custom SerDes for structured data like Avro or Protobuf.
- Implement error handling for malformed messages.
- Add logging for debugging.
- Integrate unit tests using a library like
MockKafka
.
Let me know if you'd like further guidance!
댓글
댓글 쓰기