RabbitMQ发布确认高级问题的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
1. 存在的问题
再生产环境中由于一些不明原因导致rabbitmq
重启,在RabbitMQ
重启期间生产者消息投递失败,会导致消息丢失。
当消息不能正常被接收的时候,我们需要将消息存放在缓存中。
spring.rabbitmq.host=192.168.123.129 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 spring.rabbitmq.publisher-confirm-type=correlated
NONE
:禁用发布确认模式,是默认值。
CORRELATED
:发布消息成功到交换机会触发回调方方法。
CORRELATED
:就是发布一个就确认一个。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回调接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的ID及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }else{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } } }
import com.xiao.springbootrabbitmq.utils.MyCallBack; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("发送得内容是:{}",message); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("接收到队列" + CONFIRM_QUEUE_NAME + "消息:{}",msg); } }
1. 第一种情况
ID
为1
的消息正常送达,ID
为2
的消息由于RoutingKey
的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收。
2. 第二种情况
我们再上一种情况下修改了ID
为1
的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息。
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(就是消息被交换机成功接收后,无法到达队列),那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个事件的。
通过设置该参数可以在消息传递过程中不可达目的地时将消息返回给生产者。
spring.rabbitmq.publisher-returns=true
需要在配置文件种开启返回回调
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); log.info("发送得内容是:{}",message + routingKey1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("发送得内容是:{}",message + routingKey2); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回调接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的ID及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }else{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { Message message = returnedMessage.getMessage(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String replyText = returnedMessage.getReplyText(); log.error("消息{},被交换机{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }
其他类的代码与上一小节案例相同
ID
为2
的消息由于RoutingKey
不可路由,但是还是被回调函数处理了。
这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange"; public static final String BACKUP_QUEUE_NAME = "backup_queue"; public static final String WARNING_QUEUE_NAME = "warning_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } @Bean public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning_queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("报警发现不可路由的消息内容为:{}",msg); } }
mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。