在 Spring Boot 中整合 Kafka 并实现消息优先级,你需要遵循以下步骤:
在你的 pom.xml
文件中添加 Kafka 和 Spring Kafka 相关的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-support</artifactId>
</dependency>
在你的 application.yml
或 application.properties
文件中配置 Kafka 相关参数:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
创建一个配置类,用于设置 Kafka 生产者的消息优先级属性:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.producer.properties.priority.enable}")
private boolean priorityEnable;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
if (priorityEnable) {
configProps.put(ProducerConfig.PRIORITY_CONFIG, 1); // 设置消息优先级,数值越大优先级越高
}
return configProps;
}
}
创建一个 Kafka 生产者类,使用上面配置的属性发送消息:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message, int priority) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
record.headers().add("priority", String.valueOf(priority).getBytes());
kafkaTemplate.send(record);
}
}
创建一个 Kafka 消费者类,订阅指定主题并处理消息:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
@Component
public class KafkaConsumerService {
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Bean
public ConsumerConfig consumerConfig() {
return new ConsumerConfig(
Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, groupId),
Collections.singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"),
Collections.singletonMap(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer),
Collections.singletonMap(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer),
Duration.ofMillis(3000)
);
}
@KafkaListener(topics = "${spring.kafka.consumer.topic}", groupId = groupId)
public void listen(ConsumerRecords<String, String> records) {
records.forEach(record -> {
// 处理消息逻辑
System.out.println("Received message: " + record.value() + " with priority: " + record.headers().get("priority"));
});
}
}
现在,你已经成功整合了 Kafka 并实现了消息优先级。当你使用 KafkaProducerService
发送消息时,可以通过设置 priority
参数来指定消息优先级。消费者在处理消息时,可以从消息头中获取优先级信息。