这篇文章主要讲解了“ActiveMq死信队列怎么监听”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“ActiveMq死信队列怎么监听”吧!
想试试能不能把mq搞死,就试了下模拟并发10000个请求,最高时等待队列数量为53,也就是说高并发时必然会出现实际业务与数据库的数据不一致的情况,搭配redis使用时,尽可能的应当(必须)从redis里面取数据。
此时发现一条死信队列的消息,具体产生原因不详,分析和解决下此问题。
死信队列的产生
activemq默认使用异步发送模式,如果设置了持久化但没开启事务的话,会发生消息丢失的情况,mq宕机也会丢失。
2.具体哪些情况会引发消息重发?
①Client用了transactions且在session中调用了rollback
②Client用了transactions且在调用commit之前关闭或者没有commit
③Client在CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover(允许消息重发模式下)
消息的重发时间间隔为1秒,重发次数默认最高6次。
一个消息被redelivedred超过6次,消费端会给mq一个"poison ack",表示这个消息有毒,此时这个消息会被放入DLQ死信队列。
由于我是本地mq,我可能搞下测试环境的mq,本地是重试4次后就进入死信队列了。
在哪里改?
redeliveryPolicy.setMaximumRedeliveries(3);
public ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerurl); //信任所有包下的序列化对象,解决无法发送对象消息 factory.setTrustAllPackages(true); factory.setUserName(userName); factory.setPassword(password); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(3);// 修改重发次数为3次 factory.setRedeliveryPolicy(redeliveryPolicy); return factory; }
也可在配置文件中修改
如何处理死信队列中的消息?
1:人工处理(太累)
2:定时任务(太耗性能)
3:监听死信队列
4:死信队列写库
如何监听死信队列?
思路:死信队列也是一个队列,创建一个监听器,监听死信队列ActiveMQ.DLQ队列是否有消息,有消息就进行消费。
/**
* @author zhaokkstart
* @create 2020-09-07 16:27
*/
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQDestination;;
public class QueueMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
ActiveMQDestination queues=(ActiveMQDestination)message.getJMSDestination();
if(queues.getPhysicalName().equalsIgnoreCase("ActiveMQ.DLQ")){
System.out.println("监听队列:"+queues.getPhysicalName()+"消费了消息:");
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("from get textMessage:\t" + tm.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
try {
System.out.println("from get MapMessage:\t" + mm.getString("msgId"));
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
// 如果是Object消息
if (message instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) message;
System.out.println("from get ObjectMessage:\t");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
使用监听还需要启动消费者吗?
可以不启动消费者,需要启动生产者
消息监听器MessageListener
1.MessageListener2.SessionAwareMessageListener,SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。3.MessageListenerAdapter,MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。其另外一个主要的功能是可以自动的发送返回消息。
1.队列长时间无人监听时,自动删除该队列,在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它可以删除“没有未处理消息、并且没有消费者的Destination”。我就觉得我们的业务队列太多了,没计算过,但至少实在50个以上的,没有必要~,不常用的可以手动去启停。
本文参考资料
ActiveMQ队列消息积压问题 https://blog.51cto.com/winters1224/2049432如何监听死信队列?https://www.cnblogs.com/zhoading/p/12009116.htmlactivemq 中如何消费已经在死信队列中的消息 https://q.cnblogs.com/q/90513/推荐博文 https://blog.csdn.net/vop444/article/details/85222353
感谢各位的阅读,以上就是“ActiveMq死信队列怎么监听”的内容了,经过本文的学习后,相信大家对ActiveMq死信队列怎么监听这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。