Kafka 命令行消费可以通过以下步骤进行设置:
安装 Kafka: 确保你已经安装了 Kafka,并且 Kafka 服务正在运行。你可以通过以下命令检查 Kafka 服务的状态:
sudo systemctl status kafka
创建消费者组:
在消费之前,你需要创建一个消费者组。你可以使用 kafka-consumer-groups.sh
脚本来创建和管理消费者组。例如:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --create --group my-consumer-group
编写消费脚本:
你可以编写一个简单的脚本来消费 Kafka 消息。以下是一个示例脚本,使用 Python 和 confluent_kafka
库:
from confluent_kafka import Consumer, KafkaError
# 创建消费者实例
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
# 订阅主题
consumer.subscribe(['my-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
运行消费脚本:
确保你已经安装了 confluent_kafka
库。你可以使用以下命令安装:
pip install confluent_kafka
然后运行你的消费脚本:
python consume_kafka.py
验证消费:
你可以使用 kafka-console-producer.sh
脚本来发送消息到 Kafka 主题,然后观察你的消费脚本是否接收到这些消息。例如:
kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
在另一个终端中运行你的消费脚本,你应该能看到接收到的消息。
通过以上步骤,你就可以成功设置 Kafka 命令行消费。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
推荐阅读:kafka命令行消费如何查看