Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。
1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)
2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)
DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
producer.setNamesrvAddr("10.76.0.38:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
Order order = new Order();
order.orderId = i;
order.status = "生成";
Message msg1 = new Message("local-test-producer",
"TagA",
JsonUtils.toJson(order).getBytes()
);
SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
}, order.orderId);
log.info("sendResult1={}",sendResult1);
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
order.status="付款";
Message msg2 = new Message("local-test-producer",
"TagA",
JsonUtils.toJson(order).getBytes()
);
SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
}, order.orderId);
log.info("sendResult2={}",sendResult2);
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
order.status="发货";
Message msg3 = new Message("local-test-producer",
"TagA",
JsonUtils.toJson(order).getBytes()
);
producer.send(msg2, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
}, order.orderId);
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
//MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。
}, order.orderId);
log.info("sendResult3={}",sendResult1);
}
消费端主要逻辑如下,主要MessageListenerOrderly回调实现同一个MessageQueue里面的消息不会被并发消费:
//同一个MessageQueue里面的消息要顺序消费,不能并发消费。
//但是同一个Topic的不同MessageQueue是可以同时消费的
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
consumer.setNamesrvAddr("10.76.0.38:9876");
consumer.subscribe("test", "");
consumer.setPullBatchSize(1);
consumer.setConsumeThreadMin(1);
consumer.setConsumeThreadMax(1);
// consumer.registerMessageListener(new MessageListenerConcurrently() {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
List<String> messages = new ArrayList<>();
for (MessageExt msg : msgs) {
messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
}
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
Thread.currentThread().join();
源码分析:
我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,
那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?
就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。
看完上述内容,你们掌握Message Queue Selector如何实现顺序消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/yangzhongyu/blog/3073211