Consume a topic from kafka by Python

Sure, I can help you develop Python code to consume topics from Apache Kafka using the confluent-kafka library. Make sure you have Kafka installed and running, and you have the confluent-kafka library installed using pip install confluent-kafka.

Here's an example of how you can consume messages from a Kafka topic:

from confluent_kafka import Consumer, KafkaError

# Kafka configuration
config = {
    'bootstrap.servers': 'localhost:9092',  # Replace with your Kafka broker addresses
    'group.id': 'my-group',  # Consumer group ID
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

# Create Kafka consumer instance
consumer = Consumer(config)

# Subscribe to a topic
topic = 'my-topic'  # Replace with the topic you want to consume from
consumer.subscribe([topic])

# Consume messages
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for new messages with a timeout
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('Reached end of partition')
            else:
                print('Error: {}'.format(msg.error()))
        else:
            print('Received message: {}'.format(msg.value().decode('utf-8')))

except KeyboardInterrupt:
    pass

finally:
    consumer.close()

Replace 'localhost:9092' with your Kafka broker addresses, 'my-group' with your desired consumer group, and 'my-topic' with the topic you want to consume from.

Remember to handle messages, errors, and implement your own processing logic based on your use case.

Keep in mind that Kafka topics can have multiple partitions, and you might want to consider using multiple consumers in a group to parallelize the processing of messages from different partitions for better scalability.

Also, make sure you have the necessary Kafka server and topic configurations set up correctly before running the consumer code.

댓글

이 블로그의 인기 게시물

Install and run an FTP server using Docker

Using the MinIO API via curl

PYTHONPATH, Python 모듈 환경설정

Elasticsearch Ingest API

오늘의 문장2

How to checkout branch of remote git, 깃 리모트 브랜치 체크아웃

Fundamentals of English Grammar #1

To switch to a specific tag in a Git repository

You can use Sublime Text from the command line by utilizing the subl command

티베트-버마어파 와 한어파(중국어파)의 어순 비교