这篇文章给大家介绍如何理解Java RabbitMQ的TTL和DLX,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:
第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;
第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。
如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。
如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。
还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。
具体代码如下:
//设置队列上所有的消息的有效期,单位为毫秒 Map<String, Object> argss = new HashMap<String , Object>(); arguments.put("x-message-ttl " , 5000);//5秒钟 channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;
查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。
过了几秒钟后发现消息已经不存在了。
也可以用RabbitMQ的命令行模式来设置:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
还可以通过HTTP接口调用:
$ curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' http://ip:15672/api/queues/{vhost}/{queuename}
发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示
Builder bd = new AMQP.BasicProperties().builder(); bd.deliveryMode(2);//持久化 bd.expiration("100000");//设置消息有效期100秒钟 BasicProperties pros = bd.build(); String message = "测试ttl消息"; channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
另外也可以通过HTTPAPI 接口设置:
$ curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish
完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。
package cn.wkp.rabbitmq.newest.ttl; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import cn.wkp.rabbitmq.util.ConnectionUtil; /** * * @ClassName: Send * @Description: 消息有效期 * @author wkg * @date: 2021年9月1日 下午11:28:22 */ public class Send { private final static String EXCHANGE_NAME = "ttl_exchange"; private final static String QUEUE_NAME = "ttl_queue"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); //*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准***** //声明队列 Map<String, Object> arguments=new HashMap<String,Object>(); //设置队列上所有的消息的有效期,单位为毫秒 arguments.put("x-message-ttl", 5000);//5秒钟 channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); //绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); Builder bd = new AMQP.BasicProperties().builder(); bd.deliveryMode(2);//持久化 bd.expiration("100000");//设置消息有效期100秒钟 BasicProperties pros = bd.build(); String message = "测试ttl消息"; channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes()); System.out.println("Sent message:" + message); // 关闭通道和连接 channel.close(); connection.close(); } }
上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。
服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 18000); //队列有效期18秒 channel.queueDeclare("myqueue", false, false, false, args);
DLX是Dead-Letter-Exchange的简写,意思是死信交换机。
它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
消息过期
队列达到最大长度
当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。
因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。
下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。
客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示
channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLX args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置) //为队列myqueue 添加DLX channel.queueDeclare("myqueue" , false , false , false , args);
上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:
package cn.wkp.rabbitmq.newest.dlx; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import cn.wkp.rabbitmq.util.ConnectionUtil; public class SendDLX { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明一个交换机,做死信交换机用 channel.exchangeDeclare("dlx_exchange", "topic", true, false, null); //声明一个队列,做死信队列用 channel.queueDeclare("dlx_queue", true, false, false, null); //队列绑定到交换机上 channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*"); channel.exchangeDeclare("normal_exchange", "fanout", true, false, null); Map<String, Object> arguments=new HashMap<String, Object>(); arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置) //为队列normal_queue 添加DLX channel.queueDeclare("normal_queue", true, false, false, arguments); channel.queueBind("normal_queue", "normal_exchange", ""); channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes()); System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date())); channel.close(); connection.close(); } }
上面是发送者的代码,运行后观察控制台可以看到如下所示:
死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"
队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定
queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:
消息发送到交换机normal_exchange,然后路由到队列normal_queue上
因为队列normal_queue没有消费者,消息过期后成为死信消息
死信消息携带设置的x-dead-letter-routing-key=dlx.test进入到死信交换机dlx_exechage
dlx_exechage与dlx_queue绑定的routing key为"dlx.*",死信消息的路由键dlx.test符合该规则被路由到dlx.queue上面。
然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除
package cn.wkp.rabbitmq.newest.dlx; import java.io.IOException; import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.wkp.rabbitmq.util.ConnectionUtil; public class RecvDLX { public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare("dlx_exchange", "topic", true, false, null); channel.queueDeclare("dlx_queue", true, false, false, null); channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*"); // 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date())); // 消费者手动发送ack应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; System.out.println("消费死信队列中的消息======================"); // 监听队列 channel.basicConsume("dlx_queue", false, consumer); } }
运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。
消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2021-09-24 16:30:05:740
发送消息时间:2021-09-24 17:57:00:730
关于如何理解Java RabbitMQ的TTL和DLX就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。