Kafka consumer in a Spring Boot application using a scheduled task

Here’s how to develop a Kafka consumer in a Spring Boot application using a scheduled task for consuming messages. This approach uses @Scheduled to poll Kafka messages at fixed intervals.


Step 1: Add Maven Dependencies

Add the required dependencies for Spring Kafka and Scheduling in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

Step 2: Configure Kafka Consumer in application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: schedule-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false

Step 3: Create a Scheduled Kafka Consumer

  1. Enable Scheduling: Add the @EnableScheduling annotation to your main application class.

  2. Write the Consumer Service: Use a scheduled method to poll messages at regular intervals.

Code Example

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;

@EnableScheduling
@Service
public class ScheduledKafkaConsumer {

    private final Consumer<String, String> kafkaConsumer;

    @Autowired
    public ScheduledKafkaConsumer(ConsumerFactory<String, String> consumerFactory) {
        // Create a Kafka consumer using the Spring ConsumerFactory
        this.kafkaConsumer = consumerFactory.createConsumer();
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
    }

    @Scheduled(fixedRate = 5000) // Poll messages every 5 seconds
    public void consumeMessages() {
        System.out.println("Polling for messages...");
        try {
            // Poll messages from the topic
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            records.forEach(record -> {
                System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            });

            // Commit offsets manually
            kafkaConsumer.commitSync();
        } catch (Exception e) {
            System.err.println("Error consuming messages: " + e.getMessage());
        }
    }

    // Optional: Close the consumer when the application shuts down
    public void closeConsumer() {
        kafkaConsumer.close();
    }
}

Step 4: Main Application Class

Enable scheduling in your Spring Boot application:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaScheduledConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaScheduledConsumerApplication.class, args);
    }
}

Key Points

  1. Scheduling Interval:

    • Controlled by the @Scheduled annotation (fixedRate or fixedDelay).
    • fixedRate: Executes at fixed intervals regardless of the previous execution duration.
    • fixedDelay: Waits for the previous execution to complete before starting the next.
  2. Manual Offset Management:

    • enable-auto-commit is set to false to allow manual control of offset commits.
  3. Polling Duration:

    • The poll() method fetches messages within the specified Duration.
  4. Resource Management:

    • Ensure the consumer is closed properly to avoid resource leaks.

Advantages of Scheduled Consumption

  • Full control over polling intervals.
  • Decouples Kafka message fetching from the default listener thread model.
  • Useful for batch processing or integration with other scheduled tasks.

Let me know if you’d like to expand this with advanced features like error handling, retries, or message transformation!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

To download a file from MinIO using Spring Boot, 스프링부트 Minio 사용하기