在Spring整合Kafka中,实现消息过滤的方法主要有两种:使用Kafka Message Filter或者使用Spring Kafka的ConsumerAwareErrorHandler
。下面是这两种方法的详细说明和示例代码。
Kafka Message Filter允许你在消费者端对消息进行过滤。要实现这个功能,你需要创建一个实现org.apache.kafka.clients.consumer.ConsumerInterceptor
接口的类,并重写onConsume
方法。在这个方法中,你可以根据需要对消息进行过滤。
首先,创建一个实现ConsumerInterceptor
接口的类:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Properties;
public class MessageFilterInterceptor implements ConsumerInterceptor<String, String> {
@Override
public void onConsume(Consumer<String, String> consumer, ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
// 在这里对消息进行过滤
if (shouldFilter(record)) {
consumer.seekToCurrentPosition(new TopicPartition(record.topic(), record.partition()));
} else {
// 如果消息满足条件,则继续处理
System.out.printf("Consumed record: key = %s, value = %s, partition = %d, offset = %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
@Override
public void onCommitOffsets(Consumer<String, String> consumer, Map<TopicPartition, OffsetAndMetadata> offsets) {
// 不需要实现此方法
}
@Override
public void close() {
// 不需要实现此方法
}
@Override
public void configure(Properties props) {
// 不需要实现此方法
}
private boolean shouldFilter(ConsumerRecord<String, String> record) {
// 在这里实现你的过滤逻辑
// 例如,只处理键为"exampleKey"的消息
return record.key().equals("exampleKey");
}
}
接下来,在Spring配置类中注册这个拦截器:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setId("exampleEndpoint");
endpoint.setTopics("exampleTopic");
endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory());
endpoint.setBean(new KafkaMessageFilterConsumer());
endpoint.setMethod(KafkaMessageFilterConsumer.class.getDeclaredMethods()[0]);
registrar.registerEndpoint(endpoint);
}
@Override
public void configureMessageConverters(List<KafkaListenerEndpoint> endpoints) {
// 不需要实现此方法
}
}
最后,创建一个消费者类,并使用@KafkaListener
注解指定要监听的端点:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageFilterConsumer {
@KafkaListener(id = "exampleEndpoint", groupId = "${spring.kafka.consumer.group-id}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
ConsumerAwareErrorHandler
允许你在发生错误时对消息进行处理。你可以在这个处理器中实现消息过滤逻辑。
首先,创建一个实现org.springframework.kafka.listener.ConsumerAwareErrorHandler
接口的类:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaException;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.springframework.stereotype.Component;
@Component
public class MessageFilterErrorHandler implements ConsumerAwareErrorHandler {
private final Consumer<String, String> consumer;
public MessageFilterErrorHandler(Consumer<String, String> consumer) {
this.consumer = consumer;
}
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
// 在这里对消息进行过滤
if (shouldFilter(data)) {
consumer.seekToCurrentPosition(new TopicPartition(data.topic(), data.partition()));
} else {
// 如果消息满足条件,则抛出异常
throw new KafkaException("Error processing message: " + data, thrownException);
}
}
private boolean shouldFilter(ConsumerRecord<?, ?> record) {
// 在这里实现你的过滤逻辑
// 例如,只处理键为"exampleKey"的消息
return record.key().equals("exampleKey");
}
}
接下来,在Spring配置类中注册这个错误处理器:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
// ... 其他配置 ...
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new MessageFilterErrorHandler(consumerFactory().getConsumer()));
return factory;
}
// ... 其他配置 ...
}
这样,当发生错误时,MessageFilterErrorHandler
会拦截错误并对消息进行过滤。如果消息满足过滤条件,则忽略该消息;否则,抛出异常。