小编给大家分享一下如何设置RabbitMQ延迟队列,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
rabbitmq的消息TTL和死信Exchange结合
1.消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。
2.Dead Letter Exchanges
Exchage的概念在这里就不在赘述。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
①.一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
②. 上面的消息的TTL到了,消息过期了。
③. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
3.实现延迟队列
我们先设置好各个配置的字符串
public interface TestMq {/** * 队列名 */ String TEST_QUEUE = "test";; /** * 服务添加routing key */ String ROUTING_KEY_TEST = "post.test"; /** * 死信队列 */ String DEAD_QUEUE = "dead"; String ROURING_KEY_DEAD = "dead.routing.key"; String MQ_EXCHANGE_DEAD = "dead.exchange";}
配置信息
/** * rabbitmq配置 * */@Configurationpublic class RabbitmqConfig { /** * 死信队列 * @return */ @Bean public Queue deadQueue() { Map<String,Object> arguments = new HashMap<>(); //此处填入死信交换机 arguments.put("x-dead-letter-exchange", TestMq.MQ_EXCHANGE_DEAD); //此处填入消息队列的路由,而非死信队列自己的路由 arguments.put("x-dead-letter-routing-key", TestMq.ROUTING_KEY_TEST); return new Queue(TestMq.DEAD_QUEUE,true,false,false,arguments); } /** * 死信交换机 * @return */ @Bean public DirectExchange deadExchange() { return new DirectExchange(TestMq.MQ_EXCHANGE_DEAD); } /** * 绑定死信队列到死信交换机 * @return */ @Bean public Binding bindingDeadExchange() { return BindingBuilder.bind(deadQueue()).to(deadExchange()) .with(TestMq.ROURING_KEY_DEAD); } /** * 被消费者侦听的获取消息的队列 * @return */ @Bean public Queue testQueue() { return new Queue(TestMq.TEST_QUEUE,true,false,false); } /** * 将消息队列绑定到死信交换机,跟死信队列的路由不同 * @return */ @Bean public Binding bindingTest() { return BindingBuilder.bind(testQueue()).to(deadExchange()) .with(TestMq.ROUTING_KEY_TEST); } }
消息生产者
@Slf4j@Componentpublic class TestSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); MessagePostProcessor processor = message -> {//给消息设置的过期时间,我们这里为10秒 message.getMessageProperties().setExpiration(10000 + ""); return message; }; this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor); }/** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 对消息对象进行二进制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
消费者
@Slf4j@Component@RabbitListener(queues = TestMq.TEST_QUEUE)public class TestConsumer {@RabbitHandler public void receice(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); Integer orderNo = unSerialize(data); log.info(orderNo + "为收到的消息"); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private Integer unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,Integer.class); }finally { input.close(); } } }
我们随便写个测试
@Servicepublic class TestService {@Autowired private TestSender sender; @PostConstruct public void test() {//此处顺序为死信交换机,死信队列路由,消息 sender.send(TestMq.MQ_EXCHANGE_DEAD,TestMq.ROURING_KEY_DEAD,1); } }
经测试
2019-10-11 17:26:18.079 INFO 879 --- [ main] c.g.rabbitdelay.config.TestSender : send content=1
2019-10-11 17:26:18.098 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [xxx.xxx.xxx.xxx:5672]
2019-10-11 17:26:18.227 INFO 879 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2301b75:0/SimpleConnection@243f003c [delegate=amqp://admin@xxx.xxx.xxx.xxx:5672/, localPort= 52345]
2019-10-11 17:26:18.337 INFO 879 --- [39.9.225.2:5672] c.g.rabbitdelay.config.TestSender : send ack success
2019-10-11 17:26:18.446 INFO 879 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-10-11 17:26:18.751 INFO 879 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-10-11 17:26:18.959 INFO 879 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-11 17:26:18.962 INFO 879 --- [ main] c.g.rabbitdelay.RabbitdelayApplication : Started RabbitdelayApplication in 17.093 seconds (JVM running for 27.45)
2019-10-11 17:26:28.342 INFO 879 --- [ntContainer#0-1] c.g.rabbitdelay.consumer.TestConsumer : 1为收到的消息
通过日志可以看到,发送消息是18秒,收到消息消费为28秒,中间隔了10秒钟。
以上是“如何设置RabbitMQ延迟队列”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。