Zookeeper 使用 Zookeeper Atomic Broadcast (ZAB) 协议来保障分布式数据一致性。
ZAB是一种支持崩溃恢复的消息广播协议,采用类似2PC的广播模式保证正常运行时性能,并使用基于 Paxos 的策略保证崩溃恢复时的一致性。
有的Follower服务器分发Commit消息,要求其将前一个Proposal进行提交。
ZAB一些包括两种基本的模式:崩溃恢复和消息广播。
1、当整个服务框架启动过程中或Leader服务器出现网络中断、崩溃退出与重启等异常情况时,ZAB协议就会进入恢复模式并选举产生新的Leader服务器。当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步之后,ZAB协议就会退出恢复模式,状态同步是指数据同步,用来保证集群在过半的机器能够和Leader服务器的数据状态保持一致。
2、当集群中已经有过半的Follower服务器完成了和Leader服务器的状态同步,那么整个服务框架就可以进入消息广播模式,当一台同样遵守ZAB协议的服务器启动后加入到集群中,如果此时集群中已经存在一个Leader服务器在负责进行消息广播,那么加入的服务器就会自觉地进入数据恢复模式:找到Leader所在的服务器,并与其进行数据同步,然后一起参与到消息广播流程中去。Zookeeper只允许唯一的一个Leader服务器来进行事务请求的处理,Leader服务器在接收到客户端的事务请求后,会生成对应的事务提议并发起一轮广播协议,而如果集群中的其他机器收到客户端的事务请求后,那么这些非Leader服务器会首先将这个事务请求转发给Leader服务器。
3、当Leader服务器出现崩溃或者机器重启、集群中已经不存在过半的服务器与Leader服务器保持正常通信时,那么在重新开始新的一轮的原子广播事务操作之前,所有进程首先会使用崩溃恢复协议来使彼此到达一致状态,于是整个ZAB流程就会从消息广播模式进入到崩溃恢复模式。一个机器要成为新的Leader,必须获得过半机器的支持,同时由于每个机器都有可能会崩溃,因此,ZAB协议运行过程中,前后会出现多个Leader,并且每台机器也有可能会多次成为Leader,进入崩溃恢复模式后,只要集群中存在过半的服务器能够彼此进行正常通信,那么就可以产生一个新的Leader并再次进入消息广播模式。如一个由三台机器组成的ZAB服务,通常由一个Leader、2个Follower服务器组成,某一个时刻,加入其中一个Follower挂了,整个ZAB集群是不会中断服务的。
ZAB协议中节点存在四种状态:
Leading: 当前节点为集群 Leader,负责协调事务
Following: 当前节点为 Follower 在 Leader 协调下执行事务
Looking: 集群没有正在运行的 Leader, 正处于选举过程
Observing: 节点跟随 Leader 保存系统最新的状态提供读服务,但不参与选举和事务投票
Zab协议消息广播有以下4个步骤组成:
- Leader发送PROPOSAL给集群中所有的节点。
- 节点在收到PROPOSAL之后,把PROPOSAL落盘,发送一个ACK给Leader。
- Leader在收到大多数节点的ACK之后,发送COMMIT给集群中所有的Follower节点。
- 如果存在Observer节点,Leader同时发送INFORM信息给Observer服务节点同步数据,Observer只接收Leader的INFORM消息同步数据,不参与Leader选举和事务提交。
在Leader服务器出现崩溃,或者由于网络原因导致Leader服务器失去了与过半Follower的联系,那么就会进入崩溃恢复模式,在ZAB协议中,为了保证程序的正确运行,整个恢复过程结束后需要选举出一个新的Leader服务器,因此,ZAB协议需要一个高效且可靠的Leader选举算法,从而保证能够快速地选举出新的Leader,同时,Leader选举算法不仅仅需要让Leader自身知道已经被选举为Leader,同时还需要让集群中的所有其他机器也能够快速地感知到选举产生的新的Leader服务器。
ZAB协议的基本原则
假设一个事务在Leader服务器上被提交了,并且已经得到了过半Follower服务器的Ack反馈,但是在它Commit消息发送给所有Follower机器之前,Leader服务挂了。如下图所示:
在集群正常运行过程中的某一个时刻,Server1是Leader服务器,其先后广播了P1、P2、C1、P3、C2(C2是Commit Of Proposal2的缩写),其中,当Leader服务器发出C2后就立即崩溃退出了,针对这种情况,ZAB协议就需要确保事务Proposal2最终能够在所有的服务器上都被提交成功,否则将出现不一致。
如果在崩溃恢复过程中出现一个需要被丢弃的提议,那么在崩溃恢复结束后需要跳过该事务Proposal,如下图所示:
假设初始的Leader服务器Server1在提出一个事务Proposal3之后就崩溃退出了,从而导致集群中的其他服务器都没有收到这个事务Proposal,于是,当Server1恢复过来再次加入到集群中的时候,ZAB协议需要确保丢弃Proposal3这个事务。
能够确保提交已经被Leader提交的事务的Proposal,同时丢弃已经被跳过的事务Proposal。如果让Leader选举算法能够保证新选举出来的Leader服务器拥有集群中所有机器最高编号(ZXID最大)的事务Proposal,那么就可以保证这个新选举出来的Leader一定具有所有已经提交的提议,更为重要的是如果让具有最高编号事务的Proposal机器称为Leader,就可以省去Leader服务器查询Proposal的提交和丢弃工作这一步骤了。
完成Leader选举后,在正式开始工作前,Leader服务器首先会确认日志中的所有Proposal是否都已经被集群中的过半机器提交了,即是否完成了数据同步。Leader服务器需要确所有的Follower服务器都能够接收到每一条事务Proposal,并且能够正确地将所有已经提交了的事务Proposal应用到内存数据库中。Leader服务器会为每个Follower服务器维护一个队列,并将那些没有被各Follower服务器同步的事务以Proposal消息的形式逐个发送给Follower服务器,并在每一个Proposal消息后面紧接着再发送一个Commit消息,以表示该事务已经被提交,等到Follower服务器将所有其尚未同步的事务Proposal都从Leader服务器上同步过来并成功应用到本地数据库后,Leader服务器就会将该Follower服务器加入到真正的可用Follower列表并开始之后的其他流程。
1、 发现,选举产生Leader,产生最新的epoch(每次选举产生新Leader的同时产生新epoch)。
2、 同步,各Follower和Leader完成数据同步。
3、广播,Leader处理客户端的写操作,并将状态变更广播至Follower,Follower多数通过之后Leader发起将状态变更落地Commit。
在正常运行过程中,ZAB协议会一直运行于阶段三来反复进行消息广播流程,如果出现崩溃或其他原因导致Leader缺失,那么此时ZAB协议会再次进入发现阶段,选举新的Leader。
ProposalRequestProcessor.proce***equest()方法发送PROPOSAL 给每一个节点。它调用Leader.propose()方法把PROPOSAL
入队到各个follower的queuedPackets,然后直接把PROPOSAL提交给leader节点自己的SyncRequestProcessor 。
以下是大概的代码路径:
ProposalRequestProcessor.proce***equest(request)
zks.getLeader().propose(request)
sendPacket(pp)
for f in forwardingFollowers
f.queuePacket(qp)
queuedPackets.add(p)
syncProcessor.proce***equest(request)
SyncRequestProcessor先处理
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
然后是Leader的ACK处理器处理,返回给Leader自己ACK结果
AckRequestProcessor.proce***equest()
proce***equest()
leader.processAck(self.getId(), request.zxid, null)
Follower. followLeader()方法处理接收到的QuorumPacket, case Leader.PROPOSAL分支处理的就是PROPOSAL。
Follower.followLeader()
loop
readPacket(qp)
leaderIs.readRecord(pp, "packet")
processPacket(qp)
case Leader.PROPOSAL
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr)
fzk.logRequest(hdr, txn)
syncProcessor.proce***equest(request)
case Leader.COMMIT:
fzk.commit(qp.getZxid())
commitProcessor.commit(request)
SyncRequestProcessor的处理逻辑
SyncRequestProcessor.run()
zks.getZKDatabase().append(si)
flush(toFlush)
zks.getZKDatabase().commit()
while (!toFlush.isEmpty())
Request i = toFlush.remove()
if (nextProcessor != null)
nextProcessor.proce***equest(i)
QuorumPacket qp = new QuorumPacket(Leader.ACK)
learner.writePacket(qp, false)
leaderOs.writeRecord(pp, "packet")
((Flushable)nextProcessor).flush()
learner.writePacket(null, true)
bufferedOutput.flush()
Leader的processAck()处理ACK消息,如果收到大多数节点的ACK,发送COMMIT给所有的follower节点,并调用leader自己 的CommitProcessor。 processAck()有两个调用入口:1. LeaderHandler的run()方法处理来自follower的ACK。2. AckRequestProcessor的proce***equest方法处理leader自己的ACK。
Leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress())
Proposal p = outstandingProposals.get(zxid)
p.addAck(sid)
tryToCommit(p, zxid, followerAddr)
if !p.hasAllQuorums()
return false;
// Commit on all followers
commit(zxid)
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null)
sendPacket(qp)
// Commit on Leader
zk.commitProcessor.commit(p.request)
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
已经提交的请求,交给ToBeAppliedRequestProcessor准备应用到内存数据库
ToBeAppliedRequestProcessor.proce***equest()
next.proce***equest(request)
最后交给FinalRequestProcessor,返回响应结果
CommitProcessor.run()
request = queuedRequests.poll()
processCommitted()
sendToNextProcessor(pending)
//返回响应结果
FinalRequestProcessor.proce***equest()
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。