Kafka consumer in a Spring Boot application using a scheduled task
Here’s how to develop a Kafka consumer in a Spring Boot application using a scheduled task for consuming messages. This approach uses @Scheduled
to poll Kafka messages at fixed intervals.
Step 1: Add Maven Dependencies
Add the required dependencies for Spring Kafka and Scheduling in your pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
Step 2: Configure Kafka Consumer in application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: schedule-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
Step 3: Create a Scheduled Kafka Consumer
Enable Scheduling: Add the
@EnableScheduling
annotation to your main application class.Write the Consumer Service: Use a scheduled method to poll messages at regular intervals.
Code Example
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Collections;
@EnableScheduling
@Service
public class ScheduledKafkaConsumer {
private final Consumer<String, String> kafkaConsumer;
@Autowired
public ScheduledKafkaConsumer(ConsumerFactory<String, String> consumerFactory) {
// Create a Kafka consumer using the Spring ConsumerFactory
this.kafkaConsumer = consumerFactory.createConsumer();
kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
}
@Scheduled(fixedRate = 5000) // Poll messages every 5 seconds
public void consumeMessages() {
System.out.println("Polling for messages...");
try {
// Poll messages from the topic
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
});
// Commit offsets manually
kafkaConsumer.commitSync();
} catch (Exception e) {
System.err.println("Error consuming messages: " + e.getMessage());
}
}
// Optional: Close the consumer when the application shuts down
public void closeConsumer() {
kafkaConsumer.close();
}
}
Step 4: Main Application Class
Enable scheduling in your Spring Boot application:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaScheduledConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaScheduledConsumerApplication.class, args);
}
}
Key Points
Scheduling Interval:
- Controlled by the
@Scheduled
annotation (fixedRate
orfixedDelay
). fixedRate
: Executes at fixed intervals regardless of the previous execution duration.fixedDelay
: Waits for the previous execution to complete before starting the next.
- Controlled by the
Manual Offset Management:
enable-auto-commit
is set tofalse
to allow manual control of offset commits.
Polling Duration:
- The
poll()
method fetches messages within the specifiedDuration
.
- The
Resource Management:
- Ensure the consumer is closed properly to avoid resource leaks.
Advantages of Scheduled Consumption
- Full control over polling intervals.
- Decouples Kafka message fetching from the default listener thread model.
- Useful for batch processing or integration with other scheduled tasks.
Let me know if you’d like to expand this with advanced features like error handling, retries, or message transformation!
댓글
댓글 쓰기