Kafka 定时消费可以通过设置消费者的 auto.offset.reset
属性为 earliest
或 latest
,以及设置消费者的 enable.auto.commit
属性为 true
或 false
来实现。
具体来说,如果要将 Kafka 消息的消费时间设置为每隔 5 秒进行一次,可以使用以下代码:
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my_group',
value_deserializer=lambda v: v.decode('utf-8')
)
while True:
for msg in consumer:
print(msg.value)
time.sleep(5) # 每隔 5 秒消费一条消息
在上面的代码中,auto_offset_reset='earliest'
表示从最早的消息开始消费,enable_auto_commit=True
表示自动提交消费偏移量。在循环中,我们使用 time.sleep(5)
来控制每隔 5 秒消费一条消息。
需要注意的是,如果设置了 enable_auto_commit=True
,则会在每次消费完一条消息后自动提交偏移量,因此不需要手动提交偏移量。如果设置为 false
,则需要手动提交偏移量,否则可能会导致重复消费或者漏消费消息。