Zookeeper源码中session管理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
//ZookeeperServer.java
//第617行
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout);
//省略部分代码
}
//SessionTrackerImpl.java
//第236行
synchronized public long createSession(int sessionTimeout) {
addSession(nextSessionId, sessionTimeout);
return nextSessionId++;
}
//SessionTrackerImpl.java
//第241行
synchronized public void addSession(long id, int sessionTimeout) {
//保存sessionId和过期时间的关系
sessionsWithTimeout.put(id, sessionTimeout);
//如果session不存在,就新建一个
if (sessionsById.get(id) == null) {
SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
sessionsById.put(id, s);
//省略日志打印
} else {
//省略日志打印
}
//将session按照一定规则聚合
touchSession(id, sessionTimeout);
}
//SessionTrackerImpl.java
//第166行
synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
//省略日志打印
}
SessionImpl s = sessionsById.get(sessionId);
// Return false, if the session doesn't exists or marked as closing
if (s == null || s.isClosing()) {
return false;
}
long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
//如果当前session的过期时间大于这个值,不需要操作
if (s.tickTime >= expireTime) {
// Nothing needs to be done
return true;
}
//将session从旧的桶中移出,并放入(剩余超时时间更长的)新的桶
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
s.tickTime = expireTime;
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
//SessionTrackerImpl.java
//第89行
private long roundToInterval(long time) {
//expirationInterval就是zookeeper的心跳周期(tickTime),默认值是3000
//这段计算的意思是将过期时间每3000ms分一个段
//比如200ms、500ms、3000ms返回0,3001ms、5000ms返回3000
//由于这里的time是加了Time.currentElapsedTime()的,所以不会出现0的情况
return (time / expirationInterval + 1) * expirationInterval;
}
//ZookeeperServer.java
//第728行
public void submitRequest(Request si) {
//省略部分代码
try {
touch(si.cnxn);
//省略部分代码
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
//ZookeeperServer.java
//第368行
void touch(ServerCnxn cnxn) throws MissingSessionException {
//省略部分代码
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException(
"No session with sessionid 0x" + Long.toHexString(id)
+ " exists, probably expired and removed");
}
}
zookeeper服务端响应客户端的请求时,都会调用submitRequest方法,最终会调用到touchSession方法,这里会将session移动到新的桶中
//SessionTrackerImpl.java
//第142行
synchronized public void run() {
try {
while (running) {
currentTime = Time.currentElapsedTime();
//在SessionTrackerImpl初始化的时候,会给nextExpirationTime赋一个初值
//nextExpirationTime = roundToInterval(Time.currentElapsedTime());
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
//如果到达了过期时间,则移除对应桶中的所有session
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
//ZookeeperServer.java
//第353行
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
//ZookeeperServer.java
//第329行
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
//PrepRequestProcessor.java
//第294行
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
//省略代码
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
HashSet<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
//删除session关联的所有临时节点
synchronized (zks.outstandingChanges) {
//zookeeper的大部分操作都会记录并放入列表
for (ChangeRecord c : zks.outstandingChanges) {
//c.stat == null表示这是删除操作
if (c.stat == null) {
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path3Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path3Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}
LOG.info("Processed session termination for sessionid: 0x"
+ Long.toHexString(request.sessionId));
break;
//省略代码
}
}
看完上述内容,你们掌握Zookeeper源码中session管理的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/icebergxty/blog/3105730