这篇文章主要介绍RocketMQ中如何实现push consumer消息拉取,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
RebalanceImpl.updateProcessQueueTableInRebalance方法的末尾,对于每一个新生成的ProcessQueue都会创建一个PullRequest执行首次消息拉取操作。PullRequest会通过RebalanceImpl.dispatchPullRequest方法达到DefaultMQPushConsumerImpl.executePullRequestImmediately,然后被投递到PullMessageService的本地队列中。
PulMessageService会启动一个服务线程,不断消费投递到本地队列中的PullRequest,最终调用到DefaultMQPushConsumerImpl.pullMessage方法。PullMessageService被MQClientInstance持有,同一个客户端实例中所有的push consumer产生的PullRequest都会被投递到同一个PullMessageService本地队列中排队等待执行。
DefaultMQPushConsumerImpl.pullMessage是消息拉取的核心方法。该方法首先会执行一系列的限流判断,若命中限流条件则本次执行结束,等待一个固定时间之后会再次将同一个PullRequest投递到PullMessageService中重新触发消息拉取。
DefaultMQPushConsumerImpl.pullMessage核心逻辑:
public void pullMessage(final PullRequest pullRequest) {
// 限流判断 ....
final long beginTimestamp = System.currentTimeMillis();
// 消息拉取callback
PullCallback pullCallback = new PullCallback() {...};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 获取当前队列已经被消费到最新的offset,通过本次pull请求附带在broker上commit该offset
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString(); // 消息过滤表达式
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 发起pull请求,成功后异步回调pullCallback
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
pullMessage方法中创建的匿名PullCallback用来处理拉取到的消息列表:
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 1. 反序列化,并执行过滤
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 2. 保存到本地ProcessQueue中缓存
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 3. 提交到ConsumeMessageService中,被push到message listener执行业务处理
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 4. 提交下一次PullRequest
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
// ....
}
}
}
// ....
};
上面的第3步consumeMessageService.submitConsumeRequest中将根据并行或串行不同的方式将message提交给listener执行业务处理动作。
消息拉取的整体流程如下:
以上是“RocketMQ中如何实现push consumer消息拉取”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/zhuhui/blog/4562618