Kafka中的消息确认(acknowledgment)是通过消费者与Kafka集群之间的交互来实现的。当消费者处理完一个消息后,它会向Kafka发送一个确认信号,表明该消息已经被成功处理。这样可以确保消息不会被重复消费,并且可以帮助Kafka跟踪消费者的处理进度。
Kafka支持两种消息确认机制:
props
中设置enable.auto.commit
属性为true
。例如:Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "true");
需要注意的是,自动确认模式不适用于需要确保消息不被重复消费的场景。
Consumer
接口的acknowledge()
方法来实现。要使用手动确认,你需要在消费者的props
中设置enable.auto.commit
属性为false
,并实现一个AcknowledgingConsumer
。例如:Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
// 创建一个AcknowledgingConsumer
AcknowledgingConsumer<String, String> acknowledgingConsumer = new AbstractConsumer<String, String>(props) {
@Override
public void onConsume(Collection<ConsumerRecord<String, String>> records, ConsumerContext context) {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
// 发送确认信号
context.commitSync();
}
}
};
在这个例子中,我们在onConsume()
方法中处理消息,并在处理完每个消息后调用context.commitSync()
来发送确认信号。这样可以确保消息不会被重复消费,并且可以帮助Kafka跟踪消费者的处理进度。