Developing a Kafka Streams application

Developing a Kafka Streams application that reads data from one topic, processes or converts it, and writes it to another topic can be achieved using Kafka Streams API. Here's a step-by-step guide to get you started in Java:

Prerequisites

  1. Apache Kafka installed and running.
  2. Kafka Streams API included in your project. Add the dependency in your pom.xml (if using Maven):
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.0</version> <!-- Replace with your Kafka version -->
    </dependency>
    
  3. Basic understanding of Kafka topics and Java.

Steps to Build the Kafka Streams Application

1. Define the Data Conversion Logic

Decide how you want to transform data. For example:

  • JSON to a different JSON structure.
  • Raw string to JSON or vice versa.

2. Write the Application Code

Here’s an example of a Kafka Streams application that reads JSON messages from one topic, processes them (e.g., adds a field), and writes to another topic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.Properties;

public class KafkaStreamApp {
    public static void main(String[] args) {
        // Step 1: Set up Kafka Streams properties
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Step 2: Build the Stream topology
        StreamsBuilder builder = new StreamsBuilder();

        // Read from source topic
        KStream<String, String> sourceStream = builder.stream("source-topic");

        // Process data: Convert JSON and add a new field
        KStream<String, String> transformedStream = sourceStream.mapValues(value -> {
            JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
            jsonObject.addProperty("processed", true); // Example transformation
            return jsonObject.toString();
        });

        // Write to destination topic
        transformedStream.to("destination-topic");

        // Step 3: Create KafkaStreams instance and start the application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook to stop the application gracefully
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Explanation

  1. Configuration (StreamsConfig):

    • APPLICATION_ID_CONFIG: Unique ID for the application.
    • BOOTSTRAP_SERVERS_CONFIG: Kafka cluster address.
  2. Stream Topology:

    • sourceStream: Reads data from the source topic.
    • mapValues: Processes the data (e.g., JSON transformation).
    • to: Sends the transformed data to the destination topic.
  3. Graceful Shutdown:

    • A shutdown hook ensures the application closes properly.

Testing the Application

  1. Start Kafka and create the required topics:

    kafka-topics --create --topic source-topic --bootstrap-server localhost:9092
    kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
    
  2. Produce test messages to the source topic:

    kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
    {"key": "value"}
    
  3. Consume messages from the destination topic:

    kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning
    

Extending Further

  • Use Custom SerDes for structured data like Avro or Protobuf.
  • Implement error handling for malformed messages.
  • Add logging for debugging.
  • Integrate unit tests using a library like MockKafka.

Let me know if you'd like further guidance!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

How to split a list into chunks of 100 items in JavaScript, 자바스크립트 리스트 쪼개기

HTML Inline divisions at one row by Tailwind

Boilerplate for typescript server programing

가속도 & 속도

Gradle multi-module project

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

CDPEvents in puppeteer

Sparse encoder

Reactjs datetime range picker