温馨提示×

spring整合kafka如何进行消息转换

小樊
84
2024-12-14 12:55:13
栏目: 大数据

在Spring整合Kafka时,消息转换是一个重要的环节。为了实现消息转换,你可以使用Kafka的MessageConverter接口。Spring提供了两种常用的MessageConverter实现:StringHttpMessageConverterByteArrayHttpMessageConverter。但在Kafka消费者端,我们通常使用KafkaDeserializer来处理消息的反序列化。

以下是一个简单的示例,展示了如何在Spring整合Kafka时进行消息转换:

  1. 首先,创建一个自定义的KafkaDeserializer,用于实现消息的反序列化。在这个例子中,我们将把字符串消息转换为MyMessage对象:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.databind.ObjectMapper;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.io.IOException;

public class MyKafkaDeserializer extends ErrorHandlingDeserializer<MyMessage> {

    private final ObjectMapper objectMapper = new ObjectMapper();

    public MyKafkaDeserializer() {
        this(null);
    }

    public MyKafkaDeserializer(String keyDeserializer) {
        super(new JsonDeserializer<>(MyMessage.class));
    }

    @Override
    protected MyMessage deserialize(String topic, byte[] data) throws IOException {
        return objectMapper.readValue(data, MyMessage.class);
    }
}
  1. 在你的Kafka消费者配置类中,将自定义的MyKafkaDeserializer添加到KafkaListenerEndpointRegistry中:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MyMessage> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<String, MyMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaDeserializer.class);
        return props;
    }

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        MethodKafkaListenerEndpoint<String, MyMessage> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setId("myListener");
        endpoint.setTopics("my-topic");
        endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory());
        endpoint.setBean(new MyKafkaConsumer());
        registrar.registerEndpoint(endpoint);
    }

    @Override
    public void configureKafkaListenerEndpointRegistrars(KafkaListenerEndpointRegistry registry) {
        registry.registerEndpoints(configureKafkaListeners(new KafkaListenerEndpointRegistrar()));
    }
}
  1. 创建一个Kafka消费者监听器类MyKafkaConsumer,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyKafkaConsumer {

    @KafkaListener(id = "myListener", groupId = "my-group")
    public void listen(MyMessage message) {
        System.out.println("Received message: " + message);
    }
}
  1. 最后,确保你的MyMessage类已经正确实现了序列化和反序列化。你可以使用Jackson库来实现这个功能:
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class MyMessage {

    private String content;

    // Getters and setters

    public static MyMessage fromJson(String json) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(json, MyMessage.class);
    }

    public String toJson() throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.writeValueAsString(this);
    }
}

现在,当你的Spring应用从Kafka消费消息时,消息将被转换为MyMessage对象,然后由MyKafkaConsumer处理。

0