Kafka 消息重试可以通过以下几种方式实现:
客户端重试:
max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
等参数来控制消费者每次拉取的消息数量和时间,从而间接控制重试次数。try-catch
块捕获异常并重新尝试处理消息。消息确认机制:
死信队列(DLQ):
幂等性处理:
外部重试系统:
下面是一个简单的示例代码,展示了如何在 Kafka 消费者中实现手动重试:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class RetryableKafkaConsumer {
private final KafkaConsumer<String, String> consumer;
private final int maxRetries;
public RetryableKafkaConsumer(String bootstrapServers, String groupId, String topic, int maxRetries) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
this.maxRetries = maxRetries;
}
public void consume() {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
int retries = 0;
boolean success = false;
while (!success && retries < maxRetries) {
try {
// 处理消息的逻辑
System.out.printf("Processing record: key = %s, value = %s%n", record.key(), record.value());
success = true; // 假设处理成功
} catch (Exception e) {
retries++;
System.err.printf("Error processing record: key = %s, value = %s, retry count = %d%n", record.key(), record.value(), retries);
}
}
if (!success) {
System.err.printf("Failed to process record after %d retries: key = %s, value = %s%n", maxRetries, record.key(), record.value());
}
}
}
}
public void close() {
consumer.close();
}
public static void main(String[] args) {
RetryableKafkaConsumer consumer = new RetryableKafkaConsumer("localhost:9092", "test-group", "test-topic", 3);
consumer.consume();
consumer.close();
}
}
在这个示例中,消费者在处理消息时,如果遇到异常,会进行重试,最多重试 maxRetries
次。如果重试次数达到上限仍然失败,则记录错误信息。