The Consumer.wakeup() method in the Kafka Consumer

The Consumer.wakeup() method in the Kafka Consumer API is used to interrupt a long-running operation (like a poll() call) in another thread safely. It’s primarily useful for shutting down a Kafka consumer gracefully during application shutdown or when handling external signals like SIGTERM.


How Consumer.wakeup() Works

  1. Interrupts a Blocking poll():

    • If a consumer thread is blocked in poll(), calling wakeup() causes the poll() to throw a WakeupException.
  2. Does Not Close the Consumer:

    • After wakeup() is called, you still need to close the consumer explicitly using consumer.close().
  3. Thread-Safe:

    • You can safely call wakeup() from any thread.

Typical Use Case

  • You want to stop the consumer when the application is shutting down.
  • Handle external interruptions like signals or user requests gracefully.

Example Implementation

Here’s an example of how to use Consumer.wakeup() for graceful shutdown:

Java Code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerWithWakeup {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "example-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Shutdown hook to handle termination signals
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutdown signal received...");
            consumer.wakeup(); // Interrupt poll()
        }));

        try {
            while (true) {
                // Poll for messages
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed record: key=%s, value=%s%n", record.key(), record.value());
                }
            }
        } catch (org.apache.kafka.common.errors.WakeupException e) {
            // Handle the wakeup signal (expected during shutdown)
            System.out.println("Consumer interrupted by wakeup.");
        } finally {
            consumer.close(); // Ensure the consumer is properly closed
            System.out.println("Consumer closed.");
        }
    }
}

Key Steps in the Code

  1. Runtime.getRuntime().addShutdownHook:

    • Registers a shutdown hook to detect termination signals and call consumer.wakeup().
  2. Handle WakeupException:

    • Catch and ignore WakeupException during shutdown. This is a normal part of the shutdown process.
  3. Explicit Consumer Close:

    • Ensure the consumer is closed in the finally block to release resources.

Why Use Consumer.wakeup()?

  • To interrupt a blocking poll() safely without relying on thread interruptions.
  • To ensure graceful shutdown of the consumer and avoid potential resource leaks or uncommitted offsets.

Let me know if you need more advanced examples or additional details!

댓글

이 블로그의 인기 게시물

To switch to a specific tag in a Git repository

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Using the MinIO API via curl

To download a file from MinIO using Spring Boot, 스프링부트 Minio 사용하기

리눅스의 부팅과정 (프로세스, 서비스 관리)

Chromium 개발 환경 세팅, 크로미움 개발 준비하기

Joining an additional control plane node to an existing Kubernetes cluster

urllib3 with proxy settings

CDPEvents in puppeteer

Avro + Grpc in python