本篇内容主要讲解“zk中QuorumPeer的原理和使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“zk中QuorumPeer的原理和使用”吧!
内部类
AddressTuple 地址组
QuorumServer
ServerState状态looking f o l
ZabState ZabState当前状态
SyncMode 同步机制
LearnerType 学习类型
属性
Vote currentVote | 节点认为当前服务是谁 |
方法
构造函数
public QuorumPeer() throws SaslException { super("QuorumPeer"); quorumStats = new QuorumStats(this); jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>(); adminServer = AdminServerFactory.createAdminServer(); x509Util = createX509Util(); initialize(); }
启动方法 选举状态 epoch持久化 QuorumServer记录服务相关的属性信息等 public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) { this.id = id; this.addr = addr; this.electionAddr = electionAddr; this.type = type; this.clientAddr = clientAddr; setMyAddrs(); } 构造函数 public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException { this(); this.cnxnFactory = cnxnFactory; this.electionType = electionType; this.myid = myid; this.tickTime = tickTime; this.initLimit = initLimit; this.syncLimit = syncLimit; this.connectToLearnerMasterLimit = connectToLearnerMasterLimit; this.quorumListenOnAllIPs = quorumListenOnAllIPs; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); this.zkDb = new ZKDatabase(this.logFactory); if (quorumConfig == null) { quorumConfig = new QuorumMaj(quorumPeers); } setQuorumVerifier(quorumConfig, false); adminServer = AdminServerFactory.createAdminServer(); } 获取最大的事务id public long getLastLoggedZxid() { if (!zkDb.isInitialized()) { loadDataBase(); } return zkDb.getDataTreeLastProcessedZxid(); } 线程启动 @Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } //加载数据库 loadDataBase(); //启动服务连接工厂 startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } //开始选举 startLeaderElection(); startJvmPauseMonitor(); super.start(); } private void loadDataBase() { try { zkDb.loadDataBase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch (FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try { acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } catch (FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version acceptedEpoch = epochOfZxid; LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch); writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); } if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } } catch (IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } } public synchronized void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { //正在寻找leader,创建选票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch (IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } //创建选举算法 this.electionAlg = createElectionAlgorithm(electionType); } protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException { return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb)); } protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception { return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb)); } protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException { return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb)); } 从文件读取long值 private long readLongFromFile(String name) throws IOException { File file = new File(logFactory.getSnapDir(), name); BufferedReader br = new BufferedReader(new FileReader(file)); String line = ""; try { line = br.readLine(); return Long.parseLong(line); } catch (NumberFormatException e) { throw new IOException("Found " + line + " in " + file); } finally { br.close(); } } 写入文件long值 private void writeLongToFile(String name, final long value) throws IOException { File file = new File(logFactory.getSnapDir(), name); new AtomicFileWritingIdiom(file, new WriterStatement() { @Override public void write(Writer bw) throws IOException { bw.write(Long.toString(value)); } }); }
到此,相信大家对“zk中QuorumPeer的原理和使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。