The Consumer.wakeup() method in the Kafka Consumer

The Consumer.wakeup() method in the Kafka Consumer API is used to interrupt a long-running operation (like a poll() call) in another thread safely. It’s primarily useful for shutting down a Kafka consumer gracefully during application shutdown or when handling external signals like SIGTERM.


How Consumer.wakeup() Works

  1. Interrupts a Blocking poll():

    • If a consumer thread is blocked in poll(), calling wakeup() causes the poll() to throw a WakeupException.
  2. Does Not Close the Consumer:

    • After wakeup() is called, you still need to close the consumer explicitly using consumer.close().
  3. Thread-Safe:

    • You can safely call wakeup() from any thread.

Typical Use Case

  • You want to stop the consumer when the application is shutting down.
  • Handle external interruptions like signals or user requests gracefully.

Example Implementation

Here’s an example of how to use Consumer.wakeup() for graceful shutdown:

Java Code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithWakeup {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "example-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Shutdown hook to handle termination signals
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutdown signal received...");
            consumer.wakeup(); // Interrupt poll()
        }));

        try {
            while (true) {
                // Poll for messages
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record: key=%s, value=%s%n", record.key(), record.value());
                }
            }
        } catch (org.apache.kafka.common.errors.WakeupException e) {
            // Handle the wakeup signal (expected during shutdown)
            System.out.println("Consumer interrupted by wakeup.");
        } finally {
            consumer.close(); // Ensure the consumer is properly closed
            System.out.println("Consumer closed.");
        }
    }
}

Key Steps in the Code

  1. Runtime.getRuntime().addShutdownHook:

    • Registers a shutdown hook to detect termination signals and call consumer.wakeup().
  2. Handle WakeupException:

    • Catch and ignore WakeupException during shutdown. This is a normal part of the shutdown process.
  3. Explicit Consumer Close:

    • Ensure the consumer is closed in the finally block to release resources.

Why Use Consumer.wakeup()?

  • To interrupt a blocking poll() safely without relying on thread interruptions.
  • To ensure graceful shutdown of the consumer and avoid potential resource leaks or uncommitted offsets.

Let me know if you need more advanced examples or additional details!

댓글

이 블로그의 인기 게시물

PYTHONPATH, Python 모듈 환경설정

You can use Sublime Text from the command line by utilizing the subl command

git 명령어

[gRPC] server of Java and client of Typescript

[Ubuntu] Apache2.4.x 설치

Create topic on Kafka with partition count, 카프카 토픽 생성하기

리눅스의 부팅과정 (프로세스, 서비스 관리)

Auto-populate a calendar in an MUI (Material-UI) TextField component

The pierce selector in Puppeteer