本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
ConsumerManager public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { //通知同组内的其他consumer this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; } public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group); } } if (isNotifyConsumerIdsChangedEnable) { //单向通知channel this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } }
DefaultConsumerIdsChangeListener @Override public void handle(ConsumerGroupEvent event, String group, Object... args) { case CHANGE: if (args == null || args.length < 1) { return; } List<Channel> channels = (List<Channel>) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { //对组内的其他consumer的channel连接发送单向通知(不管对方有木有收到) for (Channel chl : channels) { this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break; }
Broker2Client public void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; } NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { //发送异常,只是打印log log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); } }
通知channel是单向的,也就是不管对方有没有答复,都认为发送成功了,这样会有两种情况发生:
channel收到消息:收到消息后,channel会触发rebalance,正常逻辑
channel没收到消息:该consumer不会触发rebalance,存在问题!
register:该consumer不知道已经有新的consumer加入,造成同一个mq会有多个consumer进行消费
unregister:该consumer不知道有consumer下线,造成部分mq没有consumer负责消费
我们先看unregister这种情况
在consumer启动时,会同时启动一个RebalanceService线程,这个线程做的事就是每隔20秒主动进行一次rebalance,这样就能把unregister这种影响降低,最多导致该mq的消息会延迟20秒之后才有consumer负责消费
RebalanceService private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")); @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
接下来分析比较大条的Register
同一个mq在同一组内有不同的consumer消费,这种情况在clustering模式下是有大问题的,会造成重复消费,消费进度错误等问题,带着rocketmq应该不至于犯如此低级错误的想法再继续看代码,果然别有洞天
RebalanceImpl private void rebalanceByTopic(final String topic, final boolean isOrder) { //rebalance过程 //关键点在这,在上面rebalance完之后, 就能知道自己该负责哪些mq的消费 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); } private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { for (MessageQueue mq : mqSet) { //如果是新增的mq,会尝试调用远程broker lock mq,获取锁失败,则说明有其他consumer获取了锁,自己应该放弃消费该mq if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } } } }
以上就是consumer数量变化会怎样,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。