Spring Boot with Kafka Streams

Using Spring Boot with Kafka Streams simplifies the process further. Spring Boot provides excellent support for Kafka through Spring Kafka, allowing seamless integration and management.

Here’s a step-by-step guide to building a Kafka Streams application with Spring Boot:


1. Setup Your Spring Boot Application

Add Dependencies

Include the necessary dependencies in your pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.9</version> <!-- Version may vary -->
</dependency>

2. Configure Kafka in application.yml

Define Kafka properties:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    streams:
      application-id: kafka-stream-app
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

3. Create the Kafka Streams Application

Define a Spring Boot Component

Create a component where the stream processing logic resides:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

@Component
@EnableKafkaStreams
public class KafkaStreamProcessor {

    @Bean
    public KStream<String, String> kStreamProcessor(StreamsBuilder builder) {
        // Read messages from the source topic
        KStream<String, String> sourceStream = builder.stream("source-topic");

        // Transform the data
        sourceStream.mapValues(value -> {
            JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
            jsonObject.addProperty("processed", true); // Example transformation
            return jsonObject.toString();
        }).to("destination-topic"); // Write to the destination topic

        return sourceStream;
    }
}

4. Start the Application

  • Ensure Kafka and your topics are running:

    kafka-topics --create --topic source-topic --bootstrap-server localhost:9092
    kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
    
  • Run your Spring Boot application (mvn spring-boot:run).


5. Produce and Consume Messages

Produce Messages

Send test messages to the source topic:

kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
{"name": "John", "age": 30}

Consume Messages

Read transformed messages from the destination topic:

kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning

You should see the transformed messages, e.g.:

{"name":"John","age":30,"processed":true}

Key Features with Spring Boot

  1. Configuration: Using application.yml, managing Kafka properties is straightforward.
  2. Component Scanning: Spring Boot automatically detects Kafka-related beans.
  3. Error Handling: You can add GlobalKTable, Transformers, or error topics to handle failures.
  4. Testing: Use Spring Kafka Test for integration testing.

Example Test with Embedded Kafka:

Add the dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Test template:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"source-topic", "destination-topic"})
public class KafkaStreamProcessorTest {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    // Add tests here
}

Would you like further assistance with testing or advanced configurations?

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

vsftpd default directory

[Ubuntu] *.deb 파일 설치 방법

Offset out of range error in Kafka, 카프카 트러블슈팅

리눅스 (cron - 주기적 작업실행 데몬)

리눅스 (하드링크&소프트링크)

CDPEvents in puppeteer

Using venv in Python