温馨提示×

kafka定时消费如何配置

小樊
81
2024-12-16 22:47:19
栏目: 大数据

Kafka 定时消费可以通过设置消费者的 auto.offset.reset 属性为 earliestlatest,以及设置消费者的 enable.auto.commit 属性为 truefalse 来实现。

具体来说,如果要将 Kafka 消息的消费时间设置为每隔 5 秒进行一次,可以使用以下代码:

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(
    'my_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my_group',
    value_deserializer=lambda v: v.decode('utf-8')
)

while True:
    for msg in consumer:
        print(msg.value)
        time.sleep(5)  # 每隔 5 秒消费一条消息

在上面的代码中,auto_offset_reset='earliest' 表示从最早的消息开始消费,enable_auto_commit=True 表示自动提交消费偏移量。在循环中,我们使用 time.sleep(5) 来控制每隔 5 秒消费一条消息。

需要注意的是,如果设置了 enable_auto_commit=True,则会在每次消费完一条消息后自动提交偏移量,因此不需要手动提交偏移量。如果设置为 false,则需要手动提交偏移量,否则可能会导致重复消费或者漏消费消息。

0