Spring Boot with Kafka Streams

Using Spring Boot with Kafka Streams simplifies the process further. Spring Boot provides excellent support for Kafka through Spring Kafka, allowing seamless integration and management.

Here’s a step-by-step guide to building a Kafka Streams application with Spring Boot:


1. Setup Your Spring Boot Application

Add Dependencies

Include the necessary dependencies in your pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.9</version> <!-- Version may vary -->
</dependency>

2. Configure Kafka in application.yml

Define Kafka properties:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: kafka-stream-app
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

3. Create the Kafka Streams Application

Define a Spring Boot Component

Create a component where the stream processing logic resides:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

@Component
@EnableKafkaStreams
public class KafkaStreamProcessor {

    @Bean
    public KStream<String, String> kStreamProcessor(StreamsBuilder builder) {
        // Read messages from the source topic
        KStream<String, String> sourceStream = builder.stream("source-topic");

        // Transform the data
        sourceStream.mapValues(value -> {
            JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
            jsonObject.addProperty("processed", true); // Example transformation
            return jsonObject.toString();
        }).to("destination-topic"); // Write to the destination topic

        return sourceStream;
    }
}

4. Start the Application

  • Ensure Kafka and your topics are running:

    kafka-topics --create --topic source-topic --bootstrap-server localhost:9092
    kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
    
  • Run your Spring Boot application (mvn spring-boot:run).


5. Produce and Consume Messages

Produce Messages

Send test messages to the source topic:

kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
{"name": "John", "age": 30}

Consume Messages

Read transformed messages from the destination topic:

kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning

You should see the transformed messages, e.g.:

{"name":"John","age":30,"processed":true}

Key Features with Spring Boot

  1. Configuration: Using application.yml, managing Kafka properties is straightforward.
  2. Component Scanning: Spring Boot automatically detects Kafka-related beans.
  3. Error Handling: You can add GlobalKTable, Transformers, or error topics to handle failures.
  4. Testing: Use Spring Kafka Test for integration testing.

Example Test with Embedded Kafka:

Add the dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Test template:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"source-topic", "destination-topic"})
public class KafkaStreamProcessorTest {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    // Add tests here
}

Would you like further assistance with testing or advanced configurations?

댓글

이 블로그의 인기 게시물

Fundamentals of English Grammar #1

To switch to a specific tag in a Git repository

kafka polling vs listen

Create topic on Kafka with partition count, 카프카 토픽 생성하기

Scan an HBase table with a prefix filter

To download a file from MinIO using Spring Boot, 스프링부트 Minio 사용하기

Joining an additional control plane node to an existing Kubernetes cluster

Vespa vs Milvus

max_active_runs of Airflow

urllib3 with proxy settings