Developing a Kafka Streams application

Developing a Kafka Streams application that reads data from one topic, processes or converts it, and writes it to another topic can be achieved using Kafka Streams API. Here's a step-by-step guide to get you started in Java:

Prerequisites

  1. Apache Kafka installed and running.
  2. Kafka Streams API included in your project. Add the dependency in your pom.xml (if using Maven):
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.0</version> <!-- Replace with your Kafka version -->
    </dependency>
    
  3. Basic understanding of Kafka topics and Java.

Steps to Build the Kafka Streams Application

1. Define the Data Conversion Logic

Decide how you want to transform data. For example:

  • JSON to a different JSON structure.
  • Raw string to JSON or vice versa.

2. Write the Application Code

Here’s an example of a Kafka Streams application that reads JSON messages from one topic, processes them (e.g., adds a field), and writes to another topic:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.Properties;

public class KafkaStreamApp {
    public static void main(String[] args) {
        // Step 1: Set up Kafka Streams properties
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Step 2: Build the Stream topology
        StreamsBuilder builder = new StreamsBuilder();

        // Read from source topic
        KStream<String, String> sourceStream = builder.stream("source-topic");

        // Process data: Convert JSON and add a new field
        KStream<String, String> transformedStream = sourceStream.mapValues(value -> {
            JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
            jsonObject.addProperty("processed", true); // Example transformation
            return jsonObject.toString();
        });

        // Write to destination topic
        transformedStream.to("destination-topic");

        // Step 3: Create KafkaStreams instance and start the application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook to stop the application gracefully
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Explanation

  1. Configuration (StreamsConfig):

    • APPLICATION_ID_CONFIG: Unique ID for the application.
    • BOOTSTRAP_SERVERS_CONFIG: Kafka cluster address.
  2. Stream Topology:

    • sourceStream: Reads data from the source topic.
    • mapValues: Processes the data (e.g., JSON transformation).
    • to: Sends the transformed data to the destination topic.
  3. Graceful Shutdown:

    • A shutdown hook ensures the application closes properly.

Testing the Application

  1. Start Kafka and create the required topics:

    kafka-topics --create --topic source-topic --bootstrap-server localhost:9092
    kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
    
  2. Produce test messages to the source topic:

    kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
    {"key": "value"}
    
  3. Consume messages from the destination topic:

    kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning
    

Extending Further

  • Use Custom SerDes for structured data like Avro or Protobuf.
  • Implement error handling for malformed messages.
  • Add logging for debugging.
  • Integrate unit tests using a library like MockKafka.

Let me know if you'd like further guidance!

댓글

이 블로그의 인기 게시물

To switch to a specific tag in a Git repository

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Using the MinIO API via curl

To download a file from MinIO using Spring Boot, 스프링부트 Minio 사용하기

리눅스의 부팅과정 (프로세스, 서비스 관리)

Chromium 개발 환경 세팅, 크로미움 개발 준비하기

Joining an additional control plane node to an existing Kubernetes cluster

urllib3 with proxy settings

CDPEvents in puppeteer

Avro + Grpc in python