这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。
这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。
NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求
//NIOServerCnxnFactory.java
//第200行
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
//如果是连接请求
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
//获取IP地址对应的客户端连接数
int cnxncount = getClientCnxnCount(ia);
//如果超出则关闭
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
//每一个连接都是一个NIOServerCnxn
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//在第二个循环的时候,会进入这里,处理真正的连接请求
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
//NIOServerCnxn.java
//第237行
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
readPayload();
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
//省略部分代码
} catch (CancelledKeyException e) {
} catch (CloseRequestException e) {
} catch (EndOfStreamException e) {
} catch (IOException e) {
}
}
//NIOServerCnxn.java
//第194行
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
//NIOServerCnxn.java
//第434行
private void readConnectRequest() throws IOException, InterruptedException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
initialized = true;
}
//ZookeeperServer.java
//第886行
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request from client "
+ cnxn.getRemoteSocketAddress()
+ " client's lastZxid is 0x"
+ Long.toHexString(connReq.getLastZxidSeen()));
}
boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// this is ok -- just a packet from an old client which
// doesn't contain readOnly field
LOG.warn("Connection request from old client "
+ cnxn.getRemoteSocketAddress()
+ "; will be dropped if server is in r-o mode");
}
//如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接
if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client "
+ cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg);
}
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
//协商session超时时间
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
if (sessionId != 0) {
//如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
}
}
//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
//创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
//响应客户端
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}
//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
//省略部分代码
try {
//刷新session的超时时间
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
//提交给PrepRequestProcessor进一步处理
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
//省略部分代码
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
request.txn = new CreateSessionTxn(to);
request.request.rewind();
//这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么
zks.sessionTracker.addSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
//省略部分代码
default:
LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
}
}
关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/icebergxty/blog/3105530