这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:
在broker对message queue加锁,保证当前client占有该队列
consumer端对MessageQueue加锁,保证当前线程占有该队列
consumer端对ProcessQueue加锁,保证当前线程占有该队列
对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:
// ConsumeMessageOrderlySerivce
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
public synchronized void lockMQPeriodically() {
if (!this.stopped) {
// 通过LOCK_BATCH_MQ请求在broker批量锁定mq
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}
ConsumeMessageOrderlyService中的消费请求提交:
// ConsumeMessageOrderlySerivce
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
// 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
顺序处理了逻辑:
// ConsumeMessageOrderlyService.ConsumeRequest
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
// 1.获取MessageQueue上的锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) { // 循环处理
// ...
// 单个ConsumeRequest最长处理时间默认60s
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
// 2. 从ProcessQueue顺序获取batchSize个消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
// ....
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
// 3. 获取ProcessQueue上的锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
// 4. 推给业务处理逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock(); // 解锁
}
// ...
// 5. 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false; // ProcessQueue为空,停止本次推送
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
在processConsumeResult中主要会执行2步操作:
在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除
更新OffsetStore
感谢你能够认真阅读完这篇文章,希望小编分享的“RocketMQ中如何实现push consumer顺序消费”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/zhuhui/blog/4562918