这篇文章给大家分享的是有关RocketMQ设计之故障规避机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
NameServer
为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer
不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外
MQFaultStrategy类的:
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否开启故障延迟机制 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判断Queue是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //默认轮询 return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } }
在选择查找路由时,选择消息队列的关键步骤:
先按轮询算法选择一个消息队列
从故障列表判断该消息队列是否可用
LatencyFaultToleranceImpl中判断是否可用:
@Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
判断是否在故障列表中,不在故障列表中代表可用。
在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp
在消息发送结束后和发送出现异常时调用updateFaultItem()
方法来更新故障列表,computeNotAvailableDuration()
根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。
LatencyFaultToleranceImpl类的updateFaultItem方法:
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //加入故障列表 old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
FaultItem
存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用
感谢各位的阅读!关于“RocketMQ设计之故障规避机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。