The Consumer.wakeup() method in the Kafka Consumer
The Consumer.wakeup() method in the Kafka Consumer API is used to interrupt a long-running operation (like a poll() call) in another thread safely. It’s primarily useful for shutting down a Kafka consumer gracefully during application shutdown or when handling external signals like SIGTERM.
How Consumer.wakeup() Works
Interrupts a Blocking
poll():- If a consumer thread is blocked in
poll(), callingwakeup()causes thepoll()to throw aWakeupException.
- If a consumer thread is blocked in
Does Not Close the Consumer:
- After
wakeup()is called, you still need to close the consumer explicitly usingconsumer.close().
- After
Thread-Safe:
- You can safely call
wakeup()from any thread.
- You can safely call
Typical Use Case
- You want to stop the consumer when the application is shutting down.
- Handle external interruptions like signals or user requests gracefully.
Example Implementation
Here’s an example of how to use Consumer.wakeup() for graceful shutdown:
Java Code:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerWithWakeup {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "example-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
// Shutdown hook to handle termination signals
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown signal received...");
consumer.wakeup(); // Interrupt poll()
}));
try {
while (true) {
// Poll for messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record: key=%s, value=%s%n", record.key(), record.value());
}
}
} catch (org.apache.kafka.common.errors.WakeupException e) {
// Handle the wakeup signal (expected during shutdown)
System.out.println("Consumer interrupted by wakeup.");
} finally {
consumer.close(); // Ensure the consumer is properly closed
System.out.println("Consumer closed.");
}
}
}
Key Steps in the Code
Runtime.getRuntime().addShutdownHook:- Registers a shutdown hook to detect termination signals and call
consumer.wakeup().
- Registers a shutdown hook to detect termination signals and call
Handle
WakeupException:- Catch and ignore
WakeupExceptionduring shutdown. This is a normal part of the shutdown process.
- Catch and ignore
Explicit Consumer Close:
- Ensure the consumer is closed in the
finallyblock to release resources.
- Ensure the consumer is closed in the
Why Use Consumer.wakeup()?
- To interrupt a blocking
poll()safely without relying on thread interruptions. - To ensure graceful shutdown of the consumer and avoid potential resource leaks or uncommitted offsets.
Let me know if you need more advanced examples or additional details!
댓글
댓글 쓰기