温馨提示×

spring整合kafka如何实现消息过滤

小樊
81
2024-12-14 12:56:23
栏目: 大数据

在Spring整合Kafka中,实现消息过滤的方法主要有两种:使用Kafka Message Filter或者使用Spring Kafka的ConsumerAwareErrorHandler。下面是这两种方法的详细说明和示例代码。

  1. 使用Kafka Message Filter

Kafka Message Filter允许你在消费者端对消息进行过滤。要实现这个功能,你需要创建一个实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口的类,并重写onConsume方法。在这个方法中,你可以根据需要对消息进行过滤。

首先,创建一个实现ConsumerInterceptor接口的类:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class MessageFilterInterceptor implements ConsumerInterceptor<String, String> {

    @Override
    public void onConsume(Consumer<String, String> consumer, ConsumerRecords<String, String> records) {
        for (ConsumerRecord<String, String> record : records) {
            // 在这里对消息进行过滤
            if (shouldFilter(record)) {
                consumer.seekToCurrentPosition(new TopicPartition(record.topic(), record.partition()));
            } else {
                // 如果消息满足条件,则继续处理
                System.out.printf("Consumed record: key = %s, value = %s, partition = %d, offset = %d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }

    @Override
    public void onCommitOffsets(Consumer<String, String> consumer, Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 不需要实现此方法
    }

    @Override
    public void close() {
        // 不需要实现此方法
    }

    @Override
    public void configure(Properties props) {
        // 不需要实现此方法
    }

    private boolean shouldFilter(ConsumerRecord<String, String> record) {
        // 在这里实现你的过滤逻辑
        // 例如,只处理键为"exampleKey"的消息
        return record.key().equals("exampleKey");
    }
}

接下来,在Spring配置类中注册这个拦截器:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setId("exampleEndpoint");
        endpoint.setTopics("exampleTopic");
        endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory());
        endpoint.setBean(new KafkaMessageFilterConsumer());
        endpoint.setMethod(KafkaMessageFilterConsumer.class.getDeclaredMethods()[0]);
        registrar.registerEndpoint(endpoint);
    }

    @Override
    public void configureMessageConverters(List<KafkaListenerEndpoint> endpoints) {
        // 不需要实现此方法
    }
}

最后,创建一个消费者类,并使用@KafkaListener注解指定要监听的端点:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaMessageFilterConsumer {

    @KafkaListener(id = "exampleEndpoint", groupId = "${spring.kafka.consumer.group-id}")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}
  1. 使用ConsumerAwareErrorHandler

ConsumerAwareErrorHandler允许你在发生错误时对消息进行处理。你可以在这个处理器中实现消息过滤逻辑。

首先,创建一个实现org.springframework.kafka.listener.ConsumerAwareErrorHandler接口的类:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaException;
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
import org.springframework.stereotype.Component;

@Component
public class MessageFilterErrorHandler implements ConsumerAwareErrorHandler {

    private final Consumer<String, String> consumer;

    public MessageFilterErrorHandler(Consumer<String, String> consumer) {
        this.consumer = consumer;
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // 在这里对消息进行过滤
        if (shouldFilter(data)) {
            consumer.seekToCurrentPosition(new TopicPartition(data.topic(), data.partition()));
        } else {
            // 如果消息满足条件,则抛出异常
            throw new KafkaException("Error processing message: " + data, thrownException);
        }
    }

    private boolean shouldFilter(ConsumerRecord<?, ?> record) {
        // 在这里实现你的过滤逻辑
        // 例如,只处理键为"exampleKey"的消息
        return record.key().equals("exampleKey");
    }
}

接下来,在Spring配置类中注册这个错误处理器:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    // ... 其他配置 ...

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setErrorHandler(new MessageFilterErrorHandler(consumerFactory().getConsumer()));
        return factory;
    }

    // ... 其他配置 ...
}

这样,当发生错误时,MessageFilterErrorHandler会拦截错误并对消息进行过滤。如果消息满足过滤条件,则忽略该消息;否则,抛出异常。

0