这篇文章主要介绍“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zk中learner的作用是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
learner时observer,follower的父类,定义了公共属性和方法
子类 Follower 和Observer
内部类:
PacketInFlight表示在提议中还没有commit的消息
static class PacketInFlight { TxnHeader hdr; Record rec; }
属性:
QuorumPeer | 服务器节点 |
LearnerZooKeeperServer | learner的服务节点 |
BufferedOutputStream | 输出流 |
Socket | 端口套接字 |
InetSocketAddress | 地址信息 |
InputArchive | 输入存档 |
OutputArchive | 输出存档 |
leaderProtocolVersion | leader协议版本 |
BUFFERED_MESSAGE_SIZE | 缓存信息大小 |
MessageTracker | 顺序接收和发送信息 |
方法
validateSession(ServerCnxn cnxn, long clientId, int timeout) | 验证session有效性 |
writePacket(QuorumPacket pp, boolean flush) | 发送包给leader |
readPacket(QuorumPacket pp) | 从leader读取message |
request(Request request) | 发送request给leader |
findLeader | 查找认为是leader的地址信息 |
createSocket() | 创建socket对象 |
registerWithLeader(int pktType) | 执行handshake protocal建立follower/observer连接 |
到服务器验证session有效性
void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeLong(clientId);
dos.writeInt(timeout);
dos.close();
QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
pendingRevalidations.put(clientId, cnxn);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"To validate session 0x" + Long.toHexString(clientId));
}
writePacket(qp, true);
}
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
leaderOs.writeRecord(pp, "packet");
}
if (flush) {
bufferedOutput.flush();
}
}
}
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte[] b = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
writePacket(qp, true);
}
查找当前的leader信息
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = " + current.getId());
}
return leaderServer;
}
连接套接字
sockConnect(Socket sock, InetSocketAddress addr, int timeout)
建立和leader的连接
/**
* Establish a connection with the LearnerMaster found by findLearnerMaster.
* Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
* Retries until either initLimit time has elapsed or 5 tries have happened.
* @param addr - the address of the Peer to connect to.
* @throws IOException - if the socket connection fails on the 5th attempt
* if there is an authentication failure while connecting to leader
* @throws X509Exception
* @throws InterruptedException
*/
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
this.leaderAddr = addr;
// leader connection timeout defaults to tickTime * initLimit
int connectTimeout = self.tickTime * self.initLimit;
// but if connectToLearnerMasterLimit is specified, use that value to calculate
// timeout instead of using the initLimit value
if (self.connectToLearnerMasterLimit > 0) {
connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
}
int remainingTimeout;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
if (remainingTimeout <= 0) {
LOG.error("connectToLeader exceeded on retries.");
throw new IOException("connectToLeader exceeded on retries.");
}
sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
if (self.isSslQuorum()) {
//开始握手
((SSLSocket) sock).startHandshake();
}
sock.setTcpNoDelay(nodelay);
break;
} catch (IOException e) {
//出现异常
remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
//剩余超时时间
if (remainingTimeout <= 1000) {
//打印错误日志
LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries
+ ", remaining init limit=" + remainingTimeout
+ ", connecting to " + addr, e);
throw e;
//尝试次数大于4
} else if (tries >= 4) {
//打印错误日志
LOG.error("Unexpected exception, retries exceeded. tries=" + tries
+ ", remaining init limit=" + remainingTimeout
+ ", connecting to " + addr, e);
throw e;
} else {
//发出警告
LOG.warn("Unexpected exception, tries=" + tries
+ ", remaining init limit=" + remainingTimeout
+ ", connecting to " + addr, e);
//重新尝试建立socket连接
this.sock = createSocket();
}
}
//读取配置延时时间,默认100ns
Thread.sleep(leaderConnectDelayDuringRetryMs);
}
self.authLearner.authenticate(sock, hostname);
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
到此,关于“zk中learner的作用是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/iioschina/blog/3119990