The Consumer.wakeup() method in the Kafka Consumer
The Consumer.wakeup()
method in the Kafka Consumer API is used to interrupt a long-running operation (like a poll()
call) in another thread safely. It’s primarily useful for shutting down a Kafka consumer gracefully during application shutdown or when handling external signals like SIGTERM
.
How Consumer.wakeup()
Works
Interrupts a Blocking
poll()
:- If a consumer thread is blocked in
poll()
, callingwakeup()
causes thepoll()
to throw aWakeupException
.
- If a consumer thread is blocked in
Does Not Close the Consumer:
- After
wakeup()
is called, you still need to close the consumer explicitly usingconsumer.close()
.
- After
Thread-Safe:
- You can safely call
wakeup()
from any thread.
- You can safely call
Typical Use Case
- You want to stop the consumer when the application is shutting down.
- Handle external interruptions like signals or user requests gracefully.
Example Implementation
Here’s an example of how to use Consumer.wakeup()
for graceful shutdown:
Java Code:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerWithWakeup {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "example-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-topic"));
// Shutdown hook to handle termination signals
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutdown signal received...");
consumer.wakeup(); // Interrupt poll()
}));
try {
while (true) {
// Poll for messages
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record: key=%s, value=%s%n", record.key(), record.value());
}
}
} catch (org.apache.kafka.common.errors.WakeupException e) {
// Handle the wakeup signal (expected during shutdown)
System.out.println("Consumer interrupted by wakeup.");
} finally {
consumer.close(); // Ensure the consumer is properly closed
System.out.println("Consumer closed.");
}
}
}
Key Steps in the Code
Runtime.getRuntime().addShutdownHook
:- Registers a shutdown hook to detect termination signals and call
consumer.wakeup()
.
- Registers a shutdown hook to detect termination signals and call
Handle
WakeupException
:- Catch and ignore
WakeupException
during shutdown. This is a normal part of the shutdown process.
- Catch and ignore
Explicit Consumer Close:
- Ensure the consumer is closed in the
finally
block to release resources.
- Ensure the consumer is closed in the
Why Use Consumer.wakeup()
?
- To interrupt a blocking
poll()
safely without relying on thread interruptions. - To ensure graceful shutdown of the consumer and avoid potential resource leaks or uncommitted offsets.
Let me know if you need more advanced examples or additional details!
댓글
댓글 쓰기