在 Spring Boot 中整合 Kafka 时,可以通过配置 RetryTemplate
和 KafkaListenerEndpointRegistrar
来实现错误重试机制。以下是一个简单的示例:
pom.xml
文件中添加 Kafka 和 Spring Retry 的依赖:<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring Retry -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
</dependencies>
application.yml
或 application.properties
文件中配置 Kafka 和 Retry 的相关属性:spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retry:
enabled: true
max-attempts: 3
backoff:
initial-interval: 1000
multiplier: 2
max-interval: 10000
RetryTemplate
和 KafkaListenerEndpointRegistrar
:@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate retryTemplate() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(10000L);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
@Bean
public KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar(RetryTemplate retryTemplate) {
KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
registrar.setRetryTemplate(retryTemplate);
return registrar;
}
}
@KafkaListener
注解标注。在这个例子中,我们创建一个简单的 ConsumerRecord
处理类:@Service
public class KafkaConsumerListener {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("Received message: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
现在,当消费者接收到消息时,如果处理过程中发生错误,Spring Retry 将自动重试,直到达到最大重试次数。