在Spring整合Kafka时,消息转换是一个重要的环节。为了实现消息转换,你可以使用Kafka的MessageConverter
接口。Spring提供了两种常用的MessageConverter
实现:StringHttpMessageConverter
和ByteArrayHttpMessageConverter
。但在Kafka消费者端,我们通常使用KafkaDeserializer
来处理消息的反序列化。
以下是一个简单的示例,展示了如何在Spring整合Kafka时进行消息转换:
KafkaDeserializer
,用于实现消息的反序列化。在这个例子中,我们将把字符串消息转换为MyMessage
对象:import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.databind.ObjectMapper;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.io.IOException;
public class MyKafkaDeserializer extends ErrorHandlingDeserializer<MyMessage> {
private final ObjectMapper objectMapper = new ObjectMapper();
public MyKafkaDeserializer() {
this(null);
}
public MyKafkaDeserializer(String keyDeserializer) {
super(new JsonDeserializer<>(MyMessage.class));
}
@Override
protected MyMessage deserialize(String topic, byte[] data) throws IOException {
return objectMapper.readValue(data, MyMessage.class);
}
}
MyKafkaDeserializer
添加到KafkaListenerEndpointRegistry
中:import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, MyMessage> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaDeserializer.class);
return props;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
MethodKafkaListenerEndpoint<String, MyMessage> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setId("myListener");
endpoint.setTopics("my-topic");
endpoint.setMessageHandlerMethodFactory(kafkaListenerContainerFactory().getMessageHandlerMethodFactory());
endpoint.setBean(new MyKafkaConsumer());
registrar.registerEndpoint(endpoint);
}
@Override
public void configureKafkaListenerEndpointRegistrars(KafkaListenerEndpointRegistry registry) {
registry.registerEndpoints(configureKafkaListeners(new KafkaListenerEndpointRegistrar()));
}
}
MyKafkaConsumer
,用于处理接收到的消息:import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaConsumer {
@KafkaListener(id = "myListener", groupId = "my-group")
public void listen(MyMessage message) {
System.out.println("Received message: " + message);
}
}
MyMessage
类已经正确实现了序列化和反序列化。你可以使用Jackson库来实现这个功能:import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MyMessage {
private String content;
// Getters and setters
public static MyMessage fromJson(String json) throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(json, MyMessage.class);
}
public String toJson() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.writeValueAsString(this);
}
}
现在,当你的Spring应用从Kafka消费消息时,消息将被转换为MyMessage
对象,然后由MyKafkaConsumer
处理。