Kafka consumer in a Spring Boot application using a scheduled task

Here’s how to develop a Kafka consumer in a Spring Boot application using a scheduled task for consuming messages. This approach uses @Scheduled to poll Kafka messages at fixed intervals.


Step 1: Add Maven Dependencies

Add the required dependencies for Spring Kafka and Scheduling in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

Step 2: Configure Kafka Consumer in application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: schedule-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false

Step 3: Create a Scheduled Kafka Consumer

  1. Enable Scheduling: Add the @EnableScheduling annotation to your main application class.

  2. Write the Consumer Service: Use a scheduled method to poll messages at regular intervals.

Code Example

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;

@EnableScheduling
@Service
public class ScheduledKafkaConsumer {

    private final Consumer<String, String> kafkaConsumer;

    @Autowired
    public ScheduledKafkaConsumer(ConsumerFactory<String, String> consumerFactory) {
        // Create a Kafka consumer using the Spring ConsumerFactory
        this.kafkaConsumer = consumerFactory.createConsumer();
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
    }

    @Scheduled(fixedRate = 5000) // Poll messages every 5 seconds
    public void consumeMessages() {
        System.out.println("Polling for messages...");
        try {
            // Poll messages from the topic
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            records.forEach(record -> {
                System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            });

            // Commit offsets manually
            kafkaConsumer.commitSync();
        } catch (Exception e) {
            System.err.println("Error consuming messages: " + e.getMessage());
        }
    }

    // Optional: Close the consumer when the application shuts down
    public void closeConsumer() {
        kafkaConsumer.close();
    }
}

Step 4: Main Application Class

Enable scheduling in your Spring Boot application:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaScheduledConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaScheduledConsumerApplication.class, args);
    }
}

Key Points

  1. Scheduling Interval:

    • Controlled by the @Scheduled annotation (fixedRate or fixedDelay).
    • fixedRate: Executes at fixed intervals regardless of the previous execution duration.
    • fixedDelay: Waits for the previous execution to complete before starting the next.
  2. Manual Offset Management:

    • enable-auto-commit is set to false to allow manual control of offset commits.
  3. Polling Duration:

    • The poll() method fetches messages within the specified Duration.
  4. Resource Management:

    • Ensure the consumer is closed properly to avoid resource leaks.

Advantages of Scheduled Consumption

  • Full control over polling intervals.
  • Decouples Kafka message fetching from the default listener thread model.
  • Useful for batch processing or integration with other scheduled tasks.

Let me know if you’d like to expand this with advanced features like error handling, retries, or message transformation!

댓글

이 블로그의 인기 게시물

Fundamentals of English Grammar #1

Using the MinIO API via curl

Create topic on Kafka with partition count, 카프카 토픽 생성하기

In HBase, the "memory to disk" flush operation

Install and run an FTP server using Docker

To switch to a specific tag in a Git repository

Vespa vs Milvus

Scan an HBase table with a prefix filter

To download a file from MinIO using Spring Boot, 스프링부트 Minio 사용하기

kafka polling vs listen