Kafka consumer in a Spring Boot application using a scheduled task

Here’s how to develop a Kafka consumer in a Spring Boot application using a scheduled task for consuming messages. This approach uses @Scheduled to poll Kafka messages at fixed intervals.


Step 1: Add Maven Dependencies

Add the required dependencies for Spring Kafka and Scheduling in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

Step 2: Configure Kafka Consumer in application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: schedule-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: false

Step 3: Create a Scheduled Kafka Consumer

  1. Enable Scheduling: Add the @EnableScheduling annotation to your main application class.

  2. Write the Consumer Service: Use a scheduled method to poll messages at regular intervals.

Code Example

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;

@EnableScheduling
@Service
public class ScheduledKafkaConsumer {

    private final Consumer<String, String> kafkaConsumer;

    @Autowired
    public ScheduledKafkaConsumer(ConsumerFactory<String, String> consumerFactory) {
        // Create a Kafka consumer using the Spring ConsumerFactory
        this.kafkaConsumer = consumerFactory.createConsumer();
        kafkaConsumer.subscribe(Collections.singletonList("my-topic"));
    }

    @Scheduled(fixedRate = 5000) // Poll messages every 5 seconds
    public void consumeMessages() {
        System.out.println("Polling for messages...");
        try {
            // Poll messages from the topic
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            records.forEach(record -> {
                System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            });

            // Commit offsets manually
            kafkaConsumer.commitSync();
        } catch (Exception e) {
            System.err.println("Error consuming messages: " + e.getMessage());
        }
    }

    // Optional: Close the consumer when the application shuts down
    public void closeConsumer() {
        kafkaConsumer.close();
    }
}

Step 4: Main Application Class

Enable scheduling in your Spring Boot application:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaScheduledConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaScheduledConsumerApplication.class, args);
    }
}

Key Points

  1. Scheduling Interval:

    • Controlled by the @Scheduled annotation (fixedRate or fixedDelay).
    • fixedRate: Executes at fixed intervals regardless of the previous execution duration.
    • fixedDelay: Waits for the previous execution to complete before starting the next.
  2. Manual Offset Management:

    • enable-auto-commit is set to false to allow manual control of offset commits.
  3. Polling Duration:

    • The poll() method fetches messages within the specified Duration.
  4. Resource Management:

    • Ensure the consumer is closed properly to avoid resource leaks.

Advantages of Scheduled Consumption

  • Full control over polling intervals.
  • Decouples Kafka message fetching from the default listener thread model.
  • Useful for batch processing or integration with other scheduled tasks.

Let me know if you’d like to expand this with advanced features like error handling, retries, or message transformation!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

Install and run an FTP server using Docker

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

You can use Sublime Text from the command line by utilizing the subl command

How to start computer vision ai

Catch multiple exceptions in Python

git 명령어