Apache Kafka 消费者可以通过提交偏移量来跟踪它们已经处理过的消息。偏移量是消费者在消费消息时记录的一个位置,用于表示该消费者已经读取到了哪一条消息。当消费者处理完一条消息后,它会提交这条消息的偏移量到 Kafka 的内置主题中,以便 Kafka 知道该消费者已经处理了哪些消息。
以下是使用 offset 进行偏移量提交的基本步骤:
subscribe
方法订阅一个或多个 Kafka 主题。这将使消费者开始监听这些主题上的消息。commitSync
或 commitAsync
方法来提交这条消息的偏移量。commitSync
方法将立即提交偏移量,而 commitAsync
方法则将偏移量提交到 Kafka,但不会等待提交完成。以下是使用 offset 进行偏移量提交的示例代码(Java):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// 提交偏移量
consumer.commitSync();
}
}
在上面的示例中,我们首先创建了一个 Kafka 消费者,并订阅了名为 “my-topic” 的主题。然后,我们使用一个无限循环来持续消费消息。在每次循环中,我们从 Kafka 代理拉取一条或多条消息,并对这些消息进行处理。最后,我们调用 commitSync
方法来提交每条消息的偏移量。
需要注意的是,提交偏移量应该在处理完消息之后进行,以确保消息不会被重复消费。另外,可以根据实际需求选择使用 commitSync
或 commitAsync
方法来提交偏移量。如果使用 commitAsync
方法,需要注意处理提交失败的情况,以避免消息丢失或重复消费。