Develop a Kafka producer to send messages to a Kafka topic

Here’s a guide and code examples to develop a Kafka producer to send messages to a Kafka topic. The examples cover both plain Kafka Producer API and Spring Kafka Producer for convenience.


1. Plain Kafka Producer API

Maven Dependencies

Add the Kafka client dependency in your pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version> <!-- Replace with your Kafka version -->
</dependency>

Java Code Example

This example demonstrates producing messages to a Kafka topic using the plain Kafka Producer API.

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaMessageProducer {
    public static void main(String[] args) {
        // Configure Kafka producer properties
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // Kafka server address
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // Create Kafka producer
        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            String topic = "my-topic";
            String key = "key1";
            String value = "Hello, Kafka!";

            // Create a producer record
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            // Send the message and handle response
            producer.send(record, (RecordMetadata metadata, Exception exception) -> {
                if (exception == null) {
                    System.out.printf("Sent message to topic:%s partition:%d offset:%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });

            // Optionally flush producer
            producer.flush();
        }
    }
}

2. Kafka Producer with Spring Boot

Maven Dependencies

Add the Spring Kafka dependency in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Spring Boot Configuration (application.yml)

Set up Kafka producer configurations:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Java Code Example

Using Spring Kafka's KafkaTemplate simplifies producing messages.

Producer Service

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value).addCallback(
            success -> {
                if (success != null) {
                    System.out.printf("Sent message to topic:%s partition:%d offset:%d%n",
                            success.getRecordMetadata().topic(),
                            success.getRecordMetadata().partition(),
                            success.getRecordMetadata().offset());
                }
            },
            failure -> {
                System.err.printf("Message send failed: %s%n", failure.getMessage());
            }
        );
    }
}

Test the Producer

You can call this service from a Spring REST controller or directly in your application:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducerRunner implements CommandLineRunner {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @Override
    public void run(String... args) throws Exception {
        kafkaProducerService.sendMessage("my-topic", "key1", "Hello from Spring Kafka!");
    }
}

Key Differences Between Plain API and Spring Kafka

Feature Plain Kafka Producer API Spring Kafka Producer
Ease of Use Manual setup, more boilerplate Simplified with KafkaTemplate
Configuration Management Properties in code Managed via application.yml or .properties
Error Handling Explicit handling via callbacks Spring-managed callbacks for retries
Dependency Minimal Requires Spring Boot and Spring Kafka

Let me know if you’d like to explore advanced features like transactional producers, custom serializers, or error handling strategies!

댓글

이 블로그의 인기 게시물

Using the MinIO API via curl

Install and run an FTP server using Docker

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

You can use Sublime Text from the command line by utilizing the subl command

How to start computer vision ai

Catch multiple exceptions in Python

git 명령어