在Spring中整合Kafka以实现消息分发,你需要遵循以下步骤:
在你的项目中,添加Spring Boot Kafka的依赖。在Maven项目的pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
在application.properties或application.yml文件中,配置Kafka相关的属性,例如:
# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
创建一个Kafka配置类,用于设置Kafka的生产者和消费者模板。
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
创建一个消息监听器类,用于处理接收到的消息。这个类需要实现ConsumerAwareErrorHandler
接口,以便在发生错误时进行处理。
@Service
public class MyKafkaListener implements ConsumerAwareErrorHandler {
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
System.out.println("Error occurred while processing message: " + thrownException.getMessage());
}
}
创建一个类,用于处理接收到的消息。这个类需要实现MessageListener
接口。
public class MyKafkaMessageListener implements MessageListener {
@Override
public void onMessage(ConsumerRecord<?, ?> record) {
System.out.println("Received message: " + record.value());
}
}
创建一个类,用于分发消息。这个类需要注入KafkaTemplate
和MyKafkaMessageListener
。
@Service
public class KafkaMessageDistributor {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private MyKafkaMessageListener myKafkaMessageListener;
public void distributeMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
public void startListening(String topic) {
kafkaTemplate.execute(new KafkaCallback<String, String>() {
@Override
public void doWithKafka(String topic, ConsumerRecord<String, String> data) {
myKafkaMessageListener.onMessage(data);
}
});
}
}
在你的主应用类中,注入KafkaMessageDistributor
并启动Kafka监听。
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Autowired
private KafkaMessageDistributor kafkaMessageDistributor;
@PostConstruct
public void startKafkaListener() {
kafkaMessageDistributor.startListening("my-topic");
}
}
现在,当你的应用程序接收到发送到my-topic
主题的消息时,MyKafkaMessageListener
将处理这些消息。你可以根据需要修改这些类以实现自定义的消息分发逻辑。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>