Develop a Kafka producer to send messages to a Kafka topic
Here’s a guide and code examples to develop a Kafka producer to send messages to a Kafka topic. The examples cover both plain Kafka Producer API and Spring Kafka Producer for convenience.
1. Plain Kafka Producer API
Maven Dependencies
Add the Kafka client dependency in your pom.xml
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version> <!-- Replace with your Kafka version -->
</dependency>
Java Code Example
This example demonstrates producing messages to a Kafka topic using the plain Kafka Producer API.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaMessageProducer {
public static void main(String[] args) {
// Configure Kafka producer properties
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka server address
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Create Kafka producer
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
String topic = "my-topic";
String key = "key1";
String value = "Hello, Kafka!";
// Create a producer record
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
// Send the message and handle response
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception == null) {
System.out.printf("Sent message to topic:%s partition:%d offset:%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
// Optionally flush producer
producer.flush();
}
}
}
2. Kafka Producer with Spring Boot
Maven Dependencies
Add the Spring Kafka dependency in your pom.xml
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Spring Boot Configuration (application.yml
)
Set up Kafka producer configurations:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
Java Code Example
Using Spring Kafka's KafkaTemplate
simplifies producing messages.
Producer Service
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value).addCallback(
success -> {
if (success != null) {
System.out.printf("Sent message to topic:%s partition:%d offset:%d%n",
success.getRecordMetadata().topic(),
success.getRecordMetadata().partition(),
success.getRecordMetadata().offset());
}
},
failure -> {
System.err.printf("Message send failed: %s%n", failure.getMessage());
}
);
}
}
Test the Producer
You can call this service from a Spring REST controller or directly in your application:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerRunner implements CommandLineRunner {
@Autowired
private KafkaProducerService kafkaProducerService;
@Override
public void run(String... args) throws Exception {
kafkaProducerService.sendMessage("my-topic", "key1", "Hello from Spring Kafka!");
}
}
Key Differences Between Plain API and Spring Kafka
Feature | Plain Kafka Producer API | Spring Kafka Producer |
---|---|---|
Ease of Use | Manual setup, more boilerplate | Simplified with KafkaTemplate |
Configuration Management | Properties in code | Managed via application.yml or .properties |
Error Handling | Explicit handling via callbacks | Spring-managed callbacks for retries |
Dependency | Minimal | Requires Spring Boot and Spring Kafka |
Let me know if you’d like to explore advanced features like transactional producers, custom serializers, or error handling strategies!
댓글
댓글 쓰기