是的,Kafka的消费者组ID(group ID)可以手动指定。在创建消费者时,可以通过设置group.id
属性来指定消费者所属的消费者组ID。这个属性是可选的,如果未指定,Kafka会为该消费者生成一个默认的消费者组ID。
以下是一个使用Python的confluent_kafka
库创建Kafka消费者的示例,其中手动指定了消费者组ID:
from confluent_kafka import Consumer, KafkaError
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_custom_group'
}
consumer = Consumer(conf)
consumer.subscribe(['my_topic'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f"End of partition reached {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
else:
raise KafkaException(msg.error())
else:
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
在这个示例中,我们将group.id
设置为my_custom_group
,以便在Kafka集群中识别该消费者所属的消费者组。