选举的父接口为Election,其定义了lookForLeader和shutdown两个方法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。
1、AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,在3.4.0版本后已经被弃用。
2、FastLeaderElection,是标准的fast paxos算法的实现,基于TCP协议进行选举。
3、LeaderElection,在3.4.0版本后已经被弃用。
Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了投票中被选举者的服务器sid、zxid、选举周期epoch,选举者的服务器sid,状态,选周期epoch
static public class Notification {
/*
* Format version, introduced in 3.4.6
*/
public final static int CURRENTVERSION = 0x2;
int version;
/*
* Proposed leader 被选举者的服务器id
*/
long leader;
/*
* zxid of the proposed leader 被选举者的事务zxid
*/
long zxid;
/*
* Epoch 选举者的选举周期
*/
long electionEpoch;
/*
* current state of sender 选举者的节点状态
* 总共有4中
* LOOKING 寻找leader状态
* FOLLOWING 跟随者
* LEADING leader状态
*OBSERVING 不参与操作和选举
*/
QuorumPeer.ServerState state;
/*
* Address of sender 选举者的服务器id
*/
long sid;
QuorumVerifier qv;
/*
* epoch of the proposed leader 被选举者的选举周期
*/
long peerEpoch;
}
ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的sid、zxid、选举周期等信息。
static public class ToSend {
static enum mType {crequest, challenge, notification, ack}
ToSend(mType type,
long leader,
long zxid,
long electionEpoch,
ServerState state,
long sid,
long peerEpoch,
byte[] configData) {
this.leader = leader;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
this.state = state;
this.sid = sid;
this.peerEpoch = peerEpoch;
this.configData = configData;
}
/*
* Proposed leader in the case of notification 被推举的leader的sid
*/
long leader;
/*
* id contains the tag for acks, and zxid for notifications
* 被推举的leader的最大事务id
*/
long zxid;
/*
* Epoch 选举者的选举周期
*/
long electionEpoch;
/*
* Current state; 选举者的节点状态
*/
QuorumPeer.ServerState state;
/*
* Address of recipient选举者的服务器sid
*/
long sid;
/*
* Used to send a QuorumVerifier (configuration info)
*/
byte[] configData = dummyData;
/*
* Leader epoch 被选举者的选举周期
*/
long peerEpoch;
}
Messenger包含了WorkerReceiver和WorkerSender两个内部类
1、WorkerReceiver继承了ZooKeeperThread,是选票接收器。
2、其会不断地从QuorumCnxManager中的recvQueue获取其他服务器发来的选举消息,类型是Message
WorkerReceiver(QuorumCnxManager manager) {
super("WorkerReceiver");
this.stop = false;
this.manager = manager;
}
//从QuorumCnxManager中的recvQueue中获取投票消息
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
并将其转换成一个选票消息Notification,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票,把投票信息组装成ToSend加入到sendqueue队列。
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch(),
qv.toString().getBytes());
sendqueue.offer(notmsg);
3、WorkerSender也继承了ZooKeeperThread,为选票发送器,其会不断地从sendqueue中获取待发送的选票
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch,
m.configData);
manager.toSend(m.sid, requestBuffer);
}
Messenger(QuorumCnxManager manager) {
//创建WorkerSender
this.ws = new WorkerSender(manager);
// 新创建线程
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
// 设置为守护线程
this.wsThread.setDaemon(true);
// 创建WorkerReceiver
this.wr = new WorkerReceiver(manager);
// 新创建线程
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
// 设置为守护线程
this.wrThread.setDaemon(true);
}
// 完成Leader选举之后需要等待时长
final static int finalizeWait = 200;
// 两个连续通知检查之间的最大时长
final static int maxNotificationInterval = 60000;
// 管理服务器之间的连接
QuorumCnxManager manager;
// 选票发送队列,用于保存待发送的选票
LinkedBlockingQueue<ToSend> sendqueue;
// 选票接收队列,用于保存接收到的外部投票
LinkedBlockingQueue<Notification> recvqueue;
//投票者
QuorumPeer self;
Messenger messenger;
//逻辑始终,当前选举周期
AtomicLong logicalclock = new AtomicLong(); /* Election instance */
//被选举者服务器sid
long proposedLeader;
//被选举者服务器zxid
long proposedZxid;
//被选举者服务器选举周期
long proposedEpoch;
其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送
private void sendNotifications() {
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
// 构造发送消息
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
// 将发送消息放置于队列
sendqueue.offer(notmsg);
}
}
该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
// 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
// 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
// 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
1、该函数用于开始新一轮的Leader选举,其首先会将逻辑时钟自增,然后更新本服务器的选票信息(初始化选票),之后将选票信息放入sendqueue等待发送给其他服务器
2、每台服务器会不断地从recvqueue队列中获取外部选票,处理外部选票。
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
3、判断选举轮次,选票PK,更新选票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
4、归档选票,统计选票,返回最后的选票
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
//设置leading状态,否则设置为flowing
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
//最终选票
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
// 清空recvqueue队列的选票
leaveInstance(endVote);
return endVote;
}
FastLeaderElection的算法,其是ZooKeeper的核心部分,比较复杂,梳理了一下大概的流程,好多细节没有展开。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。