这篇文章主要介绍了zk工厂方法如何实现NIOServerCnxnFactory,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
NIOServerCnxnFactory类
内部类
AbstractSelectThread
AcceptThread
SelectorThread
属性
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT | 10s session过期时间 |
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS | selector 线程数 |
ZOOKEEPER_NIO_NUM_WORKER_THREADS | worker 线程数 |
directBuffer | buffer用来线程间数据交互 |
ipMap | 限制ip上连接数 |
cnxnExpiryQueue | 连接失效时间分桶队列 |
workerPool | WorkerService worker执行服务 |
acceptThread | 接收新连接,simple round-robin 分配到选择线程 |
selectorThreads | |
方法
停止接收
private void pauseAccept(long millisecs) {
acceptKey.interestOps(0);
try {
selector.select(millisecs);
} catch (IOException e) {
// ignore
} finally {
acceptKey.interestOps(SelectionKey.OP_ACCEPT);
}
}
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
accepted = true;
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
}
LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
// Round-robin assign this connection to a selector thread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException("Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
key = accepted.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
} catch (IOException e) {
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}
configure
获取客户端连接数
private int getClientCnxnCount(InetAddress cl) {
Set<NIOServerCnxn> s = ipMap.get(cl);
if (s == null) {
return 0;
}
return s.size();
}
创建连接
protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException {
return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
}
创建连接
private void addCnxn(NIOServerCnxn cnxn) throws IOException {
InetAddress addr = cnxn.getSocketAddress();
if (addr == null) {
throw new IOException("Socket of " + cnxn + " has been closed");
}
Set<NIOServerCnxn> set = ipMap.get(addr);
if (set == null) {
// in general we will see 1 connection from each
// host, setting the initial cap to 2 allows us
// to minimize mem usage in the common case
// of 1 entry -- we need to set the initial cap
// to 2 to avoid rehash when the first entry is added
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(cnxn);
cnxns.add(cnxn);
touchCnxn(cnxn);
}
思考:
为什么单机和集群模式启动不一样
单机可以直接从日志,快照恢复数据
集群根据角色划分,涉及到数据同步
感谢你能够认真阅读完这篇文章,希望小编分享的“zk工厂方法如何实现NIOServerCnxnFactory”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!
亿速云「云数据库 MySQL」免部署即开即用,比自行安装部署数据库高出1倍以上的性能,双节点冗余防止单节点故障,数据自动定期备份随时恢复。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/iioschina/blog/3118739