本篇内容主要讲解“zk中QuorumPeer的原理和使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“zk中QuorumPeer的原理和使用”吧!
内部类
AddressTuple 地址组
QuorumServer
ServerState状态looking f o l
ZabState ZabState当前状态
SyncMode 同步机制
LearnerType 学习类型
属性
Vote currentVote | 节点认为当前服务是谁 |
方法
构造函数
public QuorumPeer() throws SaslException {
super("QuorumPeer");
quorumStats = new QuorumStats(this);
jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
adminServer = AdminServerFactory.createAdminServer();
x509Util = createX509Util();
initialize();
}
启动方法
选举状态
epoch持久化
QuorumServer记录服务相关的属性信息等
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
this.id = id;
this.addr = addr;
this.electionAddr = electionAddr;
this.type = type;
this.clientAddr = clientAddr;
setMyAddrs();
}
构造函数
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
this();
this.cnxnFactory = cnxnFactory;
this.electionType = electionType;
this.myid = myid;
this.tickTime = tickTime;
this.initLimit = initLimit;
this.syncLimit = syncLimit;
this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
this.quorumListenOnAllIPs = quorumListenOnAllIPs;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
this.zkDb = new ZKDatabase(this.logFactory);
if (quorumConfig == null) {
quorumConfig = new QuorumMaj(quorumPeers);
}
setQuorumVerifier(quorumConfig, false);
adminServer = AdminServerFactory.createAdminServer();
}
获取最大的事务id
public long getLastLoggedZxid() {
if (!zkDb.isInitialized()) {
loadDataBase();
}
return zkDb.getDataTreeLastProcessedZxid();
}
线程启动
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//加载数据库
loadDataBase();
//启动服务连接工厂
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开始选举
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
private void loadDataBase() {
try {
zkDb.loadDataBase();
// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch);
writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
}
if (epochOfZxid > currentEpoch) {
throw new IOException("The current epoch, "
+ ZxidUtils.zxidToString(currentEpoch)
+ ", is older than the last zxid, "
+ lastProcessedZxid);
}
try {
acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
} catch (FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
acceptedEpoch = epochOfZxid;
LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch);
writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
}
if (acceptedEpoch < currentEpoch) {
throw new IOException("The accepted epoch, "
+ ZxidUtils.zxidToString(acceptedEpoch)
+ " is less than the current epoch, "
+ ZxidUtils.zxidToString(currentEpoch));
}
} catch (IOException ie) {
LOG.error("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
}
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
//正在寻找leader,创建选票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//创建选举算法
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}
从文件读取long值
private long readLongFromFile(String name) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
BufferedReader br = new BufferedReader(new FileReader(file));
String line = "";
try {
line = br.readLine();
return Long.parseLong(line);
} catch (NumberFormatException e) {
throw new IOException("Found " + line + " in " + file);
} finally {
br.close();
}
}
写入文件long值
private void writeLongToFile(String name, final long value) throws IOException {
File file = new File(logFactory.getSnapDir(), name);
new AtomicFileWritingIdiom(file, new WriterStatement() {
@Override
public void write(Writer bw) throws IOException {
bw.write(Long.toString(value));
}
});
}
到此,相信大家对“zk中QuorumPeer的原理和使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/iioschina/blog/3118198