Zookeeper源码中session管理的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
//ZookeeperServer.java //第617行 long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { long sessionId = sessionTracker.createSession(timeout); //省略部分代码 } //SessionTrackerImpl.java //第236行 synchronized public long createSession(int sessionTimeout) { addSession(nextSessionId, sessionTimeout); return nextSessionId++; } //SessionTrackerImpl.java //第241行 synchronized public void addSession(long id, int sessionTimeout) { //保存sessionId和过期时间的关系 sessionsWithTimeout.put(id, sessionTimeout); //如果session不存在,就新建一个 if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); //省略日志打印 } else { //省略日志打印 } //将session按照一定规则聚合 touchSession(id, sessionTimeout); } //SessionTrackerImpl.java //第166行 synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { //省略日志打印 } SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } long expireTime = roundToInterval(Time.currentElapsedTime() + timeout); //如果当前session的过期时间大于这个值,不需要操作 if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } //将session从旧的桶中移出,并放入(剩余超时时间更长的)新的桶 SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; } //SessionTrackerImpl.java //第89行 private long roundToInterval(long time) { //expirationInterval就是zookeeper的心跳周期(tickTime),默认值是3000 //这段计算的意思是将过期时间每3000ms分一个段 //比如200ms、500ms、3000ms返回0,3001ms、5000ms返回3000 //由于这里的time是加了Time.currentElapsedTime()的,所以不会出现0的情况 return (time / expirationInterval + 1) * expirationInterval; }
//ZookeeperServer.java //第728行 public void submitRequest(Request si) { //省略部分代码 try { touch(si.cnxn); //省略部分代码 } catch (MissingSessionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Dropping request: " + e.getMessage()); } } catch (RequestProcessorException e) { LOG.error("Unable to process request:" + e.getMessage(), e); } } //ZookeeperServer.java //第368行 void touch(ServerCnxn cnxn) throws MissingSessionException { //省略部分代码 if (!sessionTracker.touchSession(id, to)) { throw new MissingSessionException( "No session with sessionid 0x" + Long.toHexString(id) + " exists, probably expired and removed"); } }
zookeeper服务端响应客户端的请求时,都会调用submitRequest方法,最终会调用到touchSession方法,这里会将session移动到新的桶中
//SessionTrackerImpl.java //第142行 synchronized public void run() { try { while (running) { currentTime = Time.currentElapsedTime(); //在SessionTrackerImpl初始化的时候,会给nextExpirationTime赋一个初值 //nextExpirationTime = roundToInterval(Time.currentElapsedTime()); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } SessionSet set; //如果到达了过期时间,则移除对应桶中的所有session set = sessionSets.remove(nextExpirationTime); if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); } //ZookeeperServer.java //第353行 public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); } //ZookeeperServer.java //第329行 private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); } //PrepRequestProcessor.java //第294行 protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type); switch (type) { //省略代码 case OpCode.closeSession: // We don't want to do this check since the session expiration thread // queues up this operation without being the session owner. // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); HashSet<String> es = zks.getZKDatabase().getEphemerals(request.sessionId); //删除session关联的所有临时节点 synchronized (zks.outstandingChanges) { //zookeeper的大部分操作都会记录并放入列表 for (ChangeRecord c : zks.outstandingChanges) { //c.stat == null表示这是删除操作 if (c.stat == null) { es.remove(c.path); } else if (c.stat.getEphemeralOwner() == request.sessionId) { es.add(c.path); } } for (String path3Delete : es) { addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path3Delete, null, 0, null)); } zks.sessionTracker.setSessionClosing(request.sessionId); } LOG.info("Processed session termination for sessionid: 0x" + Long.toHexString(request.sessionId)); break; //省略代码 } }
看完上述内容,你们掌握Zookeeper源码中session管理的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。