温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

zk中QuorumPeer的原理和使用

发布时间:2021-06-29 14:58:20 来源:亿速云 阅读:193 作者:chen 栏目:大数据

本篇内容主要讲解“zk中QuorumPeer的原理和使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“zk中QuorumPeer的原理和使用”吧!

内部类

AddressTuple 地址组

QuorumServer

ServerState状态looking f o l

ZabState ZabState当前状态

SyncMode 同步机制

LearnerType 学习类型

zk中QuorumPeer的原理和使用

属性

Vote currentVote

节点认为当前服务是谁

方法

构造函数

public QuorumPeer() throws SaslException {
    super("QuorumPeer");
    quorumStats = new QuorumStats(this);
    jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
    adminServer = AdminServerFactory.createAdminServer();
    x509Util = createX509Util();
    initialize();
}
启动方法
选举状态
epoch持久化

QuorumServer记录服务相关的属性信息等
public QuorumServer(long id, InetSocketAddress addr, InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
    this.id = id;
    this.addr = addr;
    this.electionAddr = electionAddr;
    this.type = type;
    this.clientAddr = clientAddr;
    setMyAddrs();
}

构造函数
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir, File dataLogDir, int electionType, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit, boolean quorumListenOnAllIPs, ServerCnxnFactory cnxnFactory, QuorumVerifier quorumConfig) throws IOException {
    this();
    this.cnxnFactory = cnxnFactory;
    this.electionType = electionType;
    this.myid = myid;
    this.tickTime = tickTime;
    this.initLimit = initLimit;
    this.syncLimit = syncLimit;
    this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
    this.quorumListenOnAllIPs = quorumListenOnAllIPs;
    this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
    this.zkDb = new ZKDatabase(this.logFactory);
    if (quorumConfig == null) {
        quorumConfig = new QuorumMaj(quorumPeers);
    }
    setQuorumVerifier(quorumConfig, false);
    adminServer = AdminServerFactory.createAdminServer();
}

获取最大的事务id
public long getLastLoggedZxid() {
    if (!zkDb.isInitialized()) {
        loadDataBase();
    }
    return zkDb.getDataTreeLastProcessedZxid();
}


线程启动
@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    //加载数据库
    loadDataBase();
    //启动服务连接工厂
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    //开始选举
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}


private void loadDataBase() {
    try {
        zkDb.loadDataBase();

        // load the epochs
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;
            LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, "
                                  + ZxidUtils.zxidToString(currentEpoch)
                                  + ", is older than the last zxid, "
                                  + lastProcessedZxid);
        }
        try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;
            LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, "
                                  + ZxidUtils.zxidToString(acceptedEpoch)
                                  + " is less than the current epoch, "
                                  + ZxidUtils.zxidToString(currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}


public synchronized void startLeaderElection() {
    try {
        if (getPeerState() == ServerState.LOOKING) {
            //正在寻找leader,创建选票
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }
    //创建选举算法
    this.electionAlg = createElectionAlgorithm(electionType);
}


protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
    return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
    return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}


从文件读取long值
private long readLongFromFile(String name) throws IOException {
    File file = new File(logFactory.getSnapDir(), name);
    BufferedReader br = new BufferedReader(new FileReader(file));
    String line = "";
    try {
        line = br.readLine();
        return Long.parseLong(line);
    } catch (NumberFormatException e) {
        throw new IOException("Found " + line + " in " + file);
    } finally {
        br.close();
    }
}

写入文件long值
private void writeLongToFile(String name, final long value) throws IOException {
    File file = new File(logFactory.getSnapDir(), name);
    new AtomicFileWritingIdiom(file, new WriterStatement() {
        @Override
        public void write(Writer bw) throws IOException {
            bw.write(Long.toString(value));
        }
    });
}


到此,相信大家对“zk中QuorumPeer的原理和使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zk
AI