Spring Boot with Kafka Streams

Using Spring Boot with Kafka Streams simplifies the process further. Spring Boot provides excellent support for Kafka through Spring Kafka, allowing seamless integration and management.

Here’s a step-by-step guide to building a Kafka Streams application with Spring Boot:


1. Setup Your Spring Boot Application

Add Dependencies

Include the necessary dependencies in your pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.9</version> <!-- Version may vary -->
</dependency>

2. Configure Kafka in application.yml

Define Kafka properties:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: kafka-stream-app
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

3. Create the Kafka Streams Application

Define a Spring Boot Component

Create a component where the stream processing logic resides:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

@Component
@EnableKafkaStreams
public class KafkaStreamProcessor {

    @Bean
    public KStream<String, String> kStreamProcessor(StreamsBuilder builder) {
        // Read messages from the source topic
        KStream<String, String> sourceStream = builder.stream("source-topic");

        // Transform the data
        sourceStream.mapValues(value -> {
            JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
            jsonObject.addProperty("processed", true); // Example transformation
            return jsonObject.toString();
        }).to("destination-topic"); // Write to the destination topic

        return sourceStream;
    }
}

4. Start the Application

  • Ensure Kafka and your topics are running:

    kafka-topics --create --topic source-topic --bootstrap-server localhost:9092
    kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
    
  • Run your Spring Boot application (mvn spring-boot:run).


5. Produce and Consume Messages

Produce Messages

Send test messages to the source topic:

kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
{"name": "John", "age": 30}

Consume Messages

Read transformed messages from the destination topic:

kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning

You should see the transformed messages, e.g.:

{"name":"John","age":30,"processed":true}

Key Features with Spring Boot

  1. Configuration: Using application.yml, managing Kafka properties is straightforward.
  2. Component Scanning: Spring Boot automatically detects Kafka-related beans.
  3. Error Handling: You can add GlobalKTable, Transformers, or error topics to handle failures.
  4. Testing: Use Spring Kafka Test for integration testing.

Example Test with Embedded Kafka:

Add the dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Test template:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"source-topic", "destination-topic"})
public class KafkaStreamProcessorTest {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    // Add tests here
}

Would you like further assistance with testing or advanced configurations?

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

How to split a list into chunks of 100 items in JavaScript, 자바스크립트 리스트 쪼개기

HTML Inline divisions at one row by Tailwind

Boilerplate for typescript server programing

가속도 & 속도

Gradle multi-module project

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

CDPEvents in puppeteer

Sparse encoder

Reactjs datetime range picker