温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

zk中learner的作用是什么

发布时间:2021-06-28 17:39:32 来源:亿速云 阅读:205 作者:chen 栏目:大数据

这篇文章主要介绍“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zk中learner的作用是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

learner时observer,follower的父类,定义了公共属性和方法

子类 Follower 和Observer

内部类:

PacketInFlight表示在提议中还没有commit的消息

static class PacketInFlight { TxnHeader hdr; Record rec; }

属性:

QuorumPeer

服务器节点

LearnerZooKeeperServer

learner的服务节点

BufferedOutputStream

输出流

Socket

端口套接字

InetSocketAddress

地址信息

InputArchive

输入存档

OutputArchive

输出存档

leaderProtocolVersion

leader协议版本

BUFFERED_MESSAGE_SIZE

缓存信息大小

MessageTracker

顺序接收和发送信息

方法

validateSession(ServerCnxn cnxn, long clientId, int timeout)

验证session有效性

writePacket(QuorumPacket pp, boolean flush)

发送包给leader

readPacket(QuorumPacket pp)

从leader读取message

request(Request request)

发送request给leader

findLeader

查找认为是leader的地址信息

createSocket()

创建socket对象

registerWithLeader(int pktType)

执行handshake protocal建立follower/observer连接

到服务器验证session有效性

void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
    LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(baos);
    dos.writeLong(clientId);
    dos.writeInt(timeout);
    dos.close();
    QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
    pendingRevalidations.put(clientId, cnxn);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "To validate session 0x" + Long.toHexString(clientId));
    }
    writePacket(qp, true);
}

void writePacket(QuorumPacket pp, boolean flush) throws IOException {
    synchronized (leaderOs) {
        if (pp != null) {
            messageTracker.trackSent(pp.getType());
            leaderOs.writeRecord(pp, "packet");
        }
        if (flush) {
            bufferedOutput.flush();
        }
    }
}


void request(Request request) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId);
    oa.writeInt(request.cxid);
    oa.writeInt(request.type);
    if (request.request != null) {
        request.request.rewind();
        int len = request.request.remaining();
        byte[] b = new byte[len];
        request.request.get(b);
        request.request.rewind();
        oa.write(b);
    }
    oa.close();
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
    writePacket(qp, true);
}

查找当前的leader信息
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = " + current.getId());
    }
    return leaderServer;
}



连接套接字
sockConnect(Socket sock, InetSocketAddress addr, int timeout) 


建立和leader的连接
/**
 * Establish a connection with the LearnerMaster found by findLearnerMaster.
 * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
 * Retries until either initLimit time has elapsed or 5 tries have happened.
 * @param addr - the address of the Peer to connect to.
 * @throws IOException - if the socket connection fails on the 5th attempt
 * if there is an authentication failure while connecting to leader
 * @throws X509Exception
 * @throws InterruptedException
 */
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
    this.sock = createSocket();
    this.leaderAddr = addr;

    // leader connection timeout defaults to tickTime * initLimit
    int connectTimeout = self.tickTime * self.initLimit;

    // but if connectToLearnerMasterLimit is specified, use that value to calculate
    // timeout instead of using the initLimit value
    if (self.connectToLearnerMasterLimit > 0) {
        connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
    }

    int remainingTimeout;
    long startNanoTime = nanoTime();

    for (int tries = 0; tries < 5; tries++) {
        try {
            // recalculate the init limit time because retries sleep for 1000 milliseconds
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            if (remainingTimeout <= 0) {
                LOG.error("connectToLeader exceeded on retries.");
                throw new IOException("connectToLeader exceeded on retries.");
            }

            sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
            if (self.isSslQuorum()) {
                //开始握手
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            //出现异常
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            //剩余超时时间
            if (remainingTimeout <= 1000) {
                //打印错误日志
                LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;

                //尝试次数大于4
            } else if (tries >= 4) {
                //打印错误日志
                LOG.error("Unexpected exception, retries exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;
            } else {
                //发出警告
                LOG.warn("Unexpected exception, tries=" + tries
                         + ", remaining init limit=" + remainingTimeout
                         + ", connecting to " + addr, e);
                //重新尝试建立socket连接
                this.sock = createSocket();
            }
        }
        //读取配置延时时间,默认100ns
        Thread.sleep(leaderConnectDelayDuringRetryMs);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}

到此,关于“zk中learner的作用是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

zk
AI