Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。
1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)
2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)
DefaultMQProducer producer = new DefaultMQProducer("local-test-producer"); producer.setNamesrvAddr("10.76.0.38:9876"); producer.start(); for (int i = 0; i < 1000; i++) { Order order = new Order(); order.orderId = i; order.status = "生成"; Message msg1 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult1={}",sendResult1); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="付款"; Message msg2 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult2={}",sendResult2); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="发货"; Message msg3 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } //MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。 }, order.orderId); log.info("sendResult3={}",sendResult1); }
消费端主要逻辑如下,主要MessageListenerOrderly回调实现同一个MessageQueue里面的消息不会被并发消费:
//同一个MessageQueue里面的消息要顺序消费,不能并发消费。 //但是同一个Topic的不同MessageQueue是可以同时消费的 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2"); consumer.setNamesrvAddr("10.76.0.38:9876"); consumer.subscribe("test", ""); consumer.setPullBatchSize(1); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); // consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { List<String> messages = new ArrayList<>(); for (MessageExt msg : msgs) { messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost()); } System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); Thread.currentThread().join();
源码分析:
我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,
那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?
就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。
看完上述内容,你们掌握Message Queue Selector如何实现顺序消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。