Spring Boot with Kafka Streams
Using Spring Boot with Kafka Streams simplifies the process further. Spring Boot provides excellent support for Kafka through Spring Kafka, allowing seamless integration and management.
Here’s a step-by-step guide to building a Kafka Streams application with Spring Boot:
1. Setup Your Spring Boot Application
Add Dependencies
Include the necessary dependencies in your pom.xml
:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version> <!-- Version may vary -->
</dependency>
2. Configure Kafka in application.yml
Define Kafka properties:
spring:
kafka:
bootstrap-servers: localhost:9092
streams:
application-id: kafka-stream-app
properties:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
3. Create the Kafka Streams Application
Define a Spring Boot Component
Create a component where the stream processing logic resides:
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
@Component
@EnableKafkaStreams
public class KafkaStreamProcessor {
@Bean
public KStream<String, String> kStreamProcessor(StreamsBuilder builder) {
// Read messages from the source topic
KStream<String, String> sourceStream = builder.stream("source-topic");
// Transform the data
sourceStream.mapValues(value -> {
JsonObject jsonObject = JsonParser.parseString(value).getAsJsonObject();
jsonObject.addProperty("processed", true); // Example transformation
return jsonObject.toString();
}).to("destination-topic"); // Write to the destination topic
return sourceStream;
}
}
4. Start the Application
Ensure Kafka and your topics are running:
kafka-topics --create --topic source-topic --bootstrap-server localhost:9092 kafka-topics --create --topic destination-topic --bootstrap-server localhost:9092
Run your Spring Boot application (
mvn spring-boot:run
).
5. Produce and Consume Messages
Produce Messages
Send test messages to the source topic:
kafka-console-producer --topic source-topic --bootstrap-server localhost:9092
{"name": "John", "age": 30}
Consume Messages
Read transformed messages from the destination topic:
kafka-console-consumer --topic destination-topic --bootstrap-server localhost:9092 --from-beginning
You should see the transformed messages, e.g.:
{"name":"John","age":30,"processed":true}
Key Features with Spring Boot
- Configuration: Using
application.yml
, managing Kafka properties is straightforward. - Component Scanning: Spring Boot automatically detects Kafka-related beans.
- Error Handling: You can add
GlobalKTable
,Transformers
, or error topics to handle failures. - Testing: Use Spring Kafka Test for integration testing.
Example Test with Embedded Kafka:
Add the dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
Test template:
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"source-topic", "destination-topic"})
public class KafkaStreamProcessorTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
// Add tests here
}
Would you like further assistance with testing or advanced configurations?
댓글
댓글 쓰기