这篇文章主要介绍“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zk中learner的作用是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
learner时observer,follower的父类,定义了公共属性和方法
子类 Follower 和Observer
内部类:
PacketInFlight表示在提议中还没有commit的消息
static class PacketInFlight { TxnHeader hdr; Record rec; }
属性:
QuorumPeer | 服务器节点 |
LearnerZooKeeperServer | learner的服务节点 |
BufferedOutputStream | 输出流 |
Socket | 端口套接字 |
InetSocketAddress | 地址信息 |
InputArchive | 输入存档 |
OutputArchive | 输出存档 |
leaderProtocolVersion | leader协议版本 |
BUFFERED_MESSAGE_SIZE | 缓存信息大小 |
MessageTracker | 顺序接收和发送信息 |
方法
validateSession(ServerCnxn cnxn, long clientId, int timeout) | 验证session有效性 |
writePacket(QuorumPacket pp, boolean flush) | 发送包给leader |
readPacket(QuorumPacket pp) | 从leader读取message |
request(Request request) | 发送request给leader |
findLeader | 查找认为是leader的地址信息 |
createSocket() | 创建socket对象 |
registerWithLeader(int pktType) | 执行handshake protocal建立follower/observer连接 |
到服务器验证session有效性
void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { LOG.info("Revalidating client: 0x" + Long.toHexString(clientId)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeLong(clientId); dos.writeInt(timeout); dos.close(); QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null); pendingRevalidations.put(clientId, cnxn); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, ZooTrace.SESSION_TRACE_MASK, "To validate session 0x" + Long.toHexString(clientId)); } writePacket(qp, true); } void writePacket(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } void request(Request request) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream oa = new DataOutputStream(baos); oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); if (request.request != null) { request.request.rewind(); int len = request.request.remaining(); byte[] b = new byte[len]; request.request.get(b); request.request.rewind(); oa.write(b); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); writePacket(qp, true); } 查找当前的leader信息 protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader's correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn't find the leader with id = " + current.getId()); } return leaderServer; } 连接套接字 sockConnect(Socket sock, InetSocketAddress addr, int timeout) 建立和leader的连接 /** * Establish a connection with the LearnerMaster found by findLearnerMaster. * Followers only connect to Leaders, Observers can connect to any active LearnerMaster. * Retries until either initLimit time has elapsed or 5 tries have happened. * @param addr - the address of the Peer to connect to. * @throws IOException - if the socket connection fails on the 5th attempt * if there is an authentication failure while connecting to leader * @throws X509Exception * @throws InterruptedException */ protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception { this.sock = createSocket(); this.leaderAddr = addr; // leader connection timeout defaults to tickTime * initLimit int connectTimeout = self.tickTime * self.initLimit; // but if connectToLearnerMasterLimit is specified, use that value to calculate // timeout instead of using the initLimit value if (self.connectToLearnerMasterLimit > 0) { connectTimeout = self.tickTime * self.connectToLearnerMasterLimit; } int remainingTimeout; long startNanoTime = nanoTime(); for (int tries = 0; tries < 5; tries++) { try { // recalculate the init limit time because retries sleep for 1000 milliseconds remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); if (remainingTimeout <= 0) { LOG.error("connectToLeader exceeded on retries."); throw new IOException("connectToLeader exceeded on retries."); } sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout)); if (self.isSslQuorum()) { //开始握手 ((SSLSocket) sock).startHandshake(); } sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { //出现异常 remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000); //剩余超时时间 if (remainingTimeout <= 1000) { //打印错误日志 LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; //尝试次数大于4 } else if (tries >= 4) { //打印错误日志 LOG.error("Unexpected exception, retries exceeded. tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); throw e; } else { //发出警告 LOG.warn("Unexpected exception, tries=" + tries + ", remaining init limit=" + remainingTimeout + ", connecting to " + addr, e); //重新尝试建立socket连接 this.sock = createSocket(); } } //读取配置延时时间,默认100ns Thread.sleep(leaderConnectDelayDuringRetryMs); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
到此,关于“zk中learner的作用是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。