温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎样建立连接Zookeeper中的服务端

发布时间:2021-09-13 11:55:30 来源:亿速云 阅读:206 作者:柒染 栏目:大数据

这篇文章给大家介绍Zookeeper之怎样建立连接服务端,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

服务端处理请求的代码有两种NIOServerCnxnFactory和NettyServerCnxnFactory,默认是NIOServerCnxnFactory,可以通过指定zookeeper.serverCnxnFactory参数来修改。

这两个类逻辑是一样的,只是一个用的java原生的NIO,一个用的netty,这里我们就分析下NIOServerCnxnFactory。

NIOServerCnxnFactory实现了Runnable接口,看下它的run方法,循环处理请求

//NIOServerCnxnFactory.java
//第200行
public void run() {
    while (!ss.socket().isClosed()) {
        try {
            selector.select(1000);
            Set<SelectionKey> selected;
            synchronized (this) {
                selected = selector.selectedKeys();
            }
            ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
                selected);
            Collections.shuffle(selectedList);
            for (SelectionKey k : selectedList) {
                //如果是连接请求
                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                    SocketChannel sc = ((ServerSocketChannel) k
                                        .channel()).accept();
                    InetAddress ia = sc.socket().getInetAddress();
                    //获取IP地址对应的客户端连接数
                    int cnxncount = getClientCnxnCount(ia);
                    //如果超出则关闭
                    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
                        LOG.warn("Too many connections from " + ia
                                 + " - max is " + maxClientCnxns );
                        sc.close();
                    } else {
                        LOG.info("Accepted socket connection from "
                                 + sc.socket().getRemoteSocketAddress());
                        sc.configureBlocking(false);
                        SelectionKey sk = sc.register(selector,
                                                      SelectionKey.OP_READ);
                        //每一个连接都是一个NIOServerCnxn
                        NIOServerCnxn cnxn = createConnection(sc, sk);
                        sk.attach(cnxn);
                        addCnxn(cnxn);
                    }
                } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                    //在第二个循环的时候,会进入这里,处理真正的连接请求
                    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                    c.doIO(k);
                } else {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Unexpected ops in select "
                                  + k.readyOps());
                    }
                }
            }
            selected.clear();
        } catch (RuntimeException e) {
            LOG.warn("Ignoring unexpected runtime exception", e);
        } catch (Exception e) {
            LOG.warn("Ignoring exception", e);
        }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
}

//NIOServerCnxn.java
//第237行
void doIO(SelectionKey k) throws InterruptedException {
    try {
        if (isSocketOpen() == false) {
            LOG.warn("trying to do i/o on a null socket for session:0x"
                     + Long.toHexString(sessionId));

            return;
        }
        if (k.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                    "Unable to read additional data from client sessionid 0x"
                    + Long.toHexString(sessionId)
                    + ", likely client has closed socket");
            }
            if (incomingBuffer.remaining() == 0) {
                boolean isPayload;
                if (incomingBuffer == lenBuffer) { // start of next request
                    incomingBuffer.flip();
                    isPayload = readLength(k);
                    incomingBuffer.clear();
                } else {
                    // continuation
                    isPayload = true;
                }
                if (isPayload) { // not the case for 4letterword
                    readPayload();
                }
                else {
                    // four letter words take care
                    // need not do anything else
                    return;
                }
            }
        }
        //省略部分代码
    } catch (CancelledKeyException e) {
        
    } catch (CloseRequestException e) {
        
    } catch (EndOfStreamException e) {
        
    } catch (IOException e) {
        
    }
}

//NIOServerCnxn.java
//第194行
private void readPayload() throws IOException, InterruptedException {
    if (incomingBuffer.remaining() != 0) { // have we read length bytes?
        int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from client sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely client has closed socket");
        }
    }

    if (incomingBuffer.remaining() == 0) { // have we read length bytes?
        packetReceived();
        incomingBuffer.flip();
        if (!initialized) {
            readConnectRequest();
        } else {
            readRequest();
        }
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }
}

//NIOServerCnxn.java
//第434行
private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}

//ZookeeperServer.java
//第886行
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    ConnectRequest connReq = new ConnectRequest();
    connReq.deserialize(bia, "connect");
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request from client "
                  + cnxn.getRemoteSocketAddress()
                  + " client's lastZxid is 0x"
                  + Long.toHexString(connReq.getLastZxidSeen()));
    }
    boolean readOnly = false;
    try {
        readOnly = bia.readBool("readOnly");
        cnxn.isOldClient = false;
    } catch (IOException e) {
        // this is ok -- just a packet from an old client which
        // doesn't contain readOnly field
        LOG.warn("Connection request from old client "
                 + cnxn.getRemoteSocketAddress()
                 + "; will be dropped if server is in r-o mode");
    }
    //如果客户端没有设置readOnly,但是服务端是只读的,直接抛出异常关闭连接
    if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
        String msg = "Refusing session request for not-read-only client "
            + cnxn.getRemoteSocketAddress();
        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
        String msg = "Refusing session request for client "
            + cnxn.getRemoteSocketAddress()
            + " as it has seen zxid 0x"
            + Long.toHexString(connReq.getLastZxidSeen())
            + " our last zxid is 0x"
            + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
            + " client must try another server";

        LOG.info(msg);
        throw new CloseRequestException(msg);
    }
    //协商session超时时间
    int sessionTimeout = connReq.getTimeOut();
    byte passwd[] = connReq.getPasswd();
    int minSessionTimeout = getMinSessionTimeout();
    if (sessionTimeout < minSessionTimeout) {
        sessionTimeout = minSessionTimeout;
    }
    int maxSessionTimeout = getMaxSessionTimeout();
    if (sessionTimeout > maxSessionTimeout) {
        sessionTimeout = maxSessionTimeout;
    }
    cnxn.setSessionTimeout(sessionTimeout);
    // We don't want to receive any packets until we are sure that the
    // session is setup
    cnxn.disableRecv();
    long sessionId = connReq.getSessionId();
    if (sessionId != 0) {
        //如果sessionId不是0,说明是之前已经连接过的客户端因为掉线等原因重新连接的情况
        long clientSessionId = connReq.getSessionId();
        LOG.info("Client attempting to renew session 0x"
                 + Long.toHexString(clientSessionId)
                 + " at " + cnxn.getRemoteSocketAddress());
        serverCnxnFactory.closeSession(sessionId);
        cnxn.setSessionId(sessionId);
        reopenSession(cnxn, sessionId, passwd, sessionTimeout);
    } else {
        LOG.info("Client attempting to establish new session at "
                 + cnxn.getRemoteSocketAddress());
        createSession(cnxn, passwd, sessionTimeout);
    }
}

//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    //创建一个session,zookeeper的session管理比较复杂,具体情况下一章分析
    long sessionId = sessionTracker.createSession(timeout);
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);
    //响应客户端
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;
}

//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
    //省略部分代码
    
    try {
        //刷新session的超时时间
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            //提交给PrepRequestProcessor进一步处理
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
        } else {
            LOG.warn("Received packet at server of unknown type " + si.type);
            new UnimplementedRequestProcessor().processRequest(si);
        }
    } catch (MissingSessionException e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dropping request: " + e.getMessage());
        }
    } catch (RequestProcessorException e) {
        LOG.error("Unable to process request:" + e.getMessage(), e);
    }
}

//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
            throws KeeperException, IOException, RequestProcessorException {
    request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                Time.currentWallTime(), type);

    switch (type) {
        //省略部分代码
        
        case OpCode.createSession:
            request.request.rewind();
            int to = request.request.getInt();
            request.txn = new CreateSessionTxn(to);
            request.request.rewind();
            //这里又调用了一次addSession,但是之前的代码其实已经新增过了,不太明白为什么
            zks.sessionTracker.addSession(request.sessionId, to);
            zks.setOwner(request.sessionId, request.getOwner());
            break;
            
        //省略部分代码
            
        default:
            LOG.error("Invalid OpCode: {} received by PrepRequestProcessor", type);
    }
}

关于Zookeeper之怎样建立连接服务端就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI