The Consumer.wakeup() method in the Kafka Consumer

The Consumer.wakeup() method in the Kafka Consumer API is used to interrupt a long-running operation (like a poll() call) in another thread safely. It’s primarily useful for shutting down a Kafka consumer gracefully during application shutdown or when handling external signals like SIGTERM.


How Consumer.wakeup() Works

  1. Interrupts a Blocking poll():

    • If a consumer thread is blocked in poll(), calling wakeup() causes the poll() to throw a WakeupException.
  2. Does Not Close the Consumer:

    • After wakeup() is called, you still need to close the consumer explicitly using consumer.close().
  3. Thread-Safe:

    • You can safely call wakeup() from any thread.

Typical Use Case

  • You want to stop the consumer when the application is shutting down.
  • Handle external interruptions like signals or user requests gracefully.

Example Implementation

Here’s an example of how to use Consumer.wakeup() for graceful shutdown:

Java Code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithWakeup {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "example-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Shutdown hook to handle termination signals
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutdown signal received...");
            consumer.wakeup(); // Interrupt poll()
        }));

        try {
            while (true) {
                // Poll for messages
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record: key=%s, value=%s%n", record.key(), record.value());
                }
            }
        } catch (org.apache.kafka.common.errors.WakeupException e) {
            // Handle the wakeup signal (expected during shutdown)
            System.out.println("Consumer interrupted by wakeup.");
        } finally {
            consumer.close(); // Ensure the consumer is properly closed
            System.out.println("Consumer closed.");
        }
    }
}

Key Steps in the Code

  1. Runtime.getRuntime().addShutdownHook:

    • Registers a shutdown hook to detect termination signals and call consumer.wakeup().
  2. Handle WakeupException:

    • Catch and ignore WakeupException during shutdown. This is a normal part of the shutdown process.
  3. Explicit Consumer Close:

    • Ensure the consumer is closed in the finally block to release resources.

Why Use Consumer.wakeup()?

  • To interrupt a blocking poll() safely without relying on thread interruptions.
  • To ensure graceful shutdown of the consumer and avoid potential resource leaks or uncommitted offsets.

Let me know if you need more advanced examples or additional details!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

Install and run an FTP server using Docker

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

You can use Sublime Text from the command line by utilizing the subl command

How to start computer vision ai

Catch multiple exceptions in Python

git 명령어