这篇文章给大家介绍zk中FinalRequestProcessor的作用是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。
是处理器最后一个环节
FinalRequestProcessor implements RequestProcessor
处理器链最后一个环节处理事务和非事务请求最后一个环节
构造器
public FinalRequestProcessor(ZooKeeperServer zks) { this.zks = zks; this.requestPathMetricsCollector = zks.getRequestPathMetricsCollector(); }
处理命令信息
public void processRequest(Request request) { LOG.debug("Processing request:: {}", request); // request.addRQRec(">final"); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.SERVER_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); } ProcessTxnResult rc = zks.processTxn(request); // ZOOKEEPER-558: // In some cases the server does not close the connection (e.g., closeconn buffer // was not being queued — ZOOKEEPER-558) properly. This happens, for example, // when the client closes the connection. The server should still close the session, though. // Calling closeSession() after losing the cnxn, results in the client close session response being dropped. if (request.type == OpCode.closeSession && connClosedByClient(request)) { // We need to check if we can close the session id. // Sometimes the corresponding ServerCnxnFactory could be null because // we are just playing diffs from the leader. if (closeSession(zks.serverCnxnFactory, request.sessionId) || closeSession(zks.secureServerCnxnFactory, request.sessionId)) { return; } } if (request.getHdr() != null) { /* * Request header is created only by the leader, so this must be * a quorum request. Since we're comparing timestamps across hosts, * this metric may be incorrect. However, it's still a very useful * metric to track in the happy case. If there is clock drift, * the latency can go negative. Note: headers use wall time, not * CLOCK_MONOTONIC. */ long propagationLatency = Time.currentWallTime() - request.getHdr().getTime(); if (propagationLatency >= 0) { ServerMetrics.getMetrics().PROPAGATION_LATENCY.add(propagationLatency); } } if (request.cnxn == null) { return; } ServerCnxn cnxn = request.cnxn; long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); String lastOp = "NA"; // Notify ZooKeeperServer that the request has finished so that it can // update any request accounting/throttling limits zks.decInProcess(); zks.requestFinished(request); Code err = Code.OK; Record rsp = null; String path = null; try { if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) { /* * When local session upgrading is disabled, leader will * reject the ephemeral node creation due to session expire. * However, if this is the follower that issue the request, * it will have the correct error code, so we should use that * and report to user */ if (request.getException() != null) { throw request.getException(); } else { throw KeeperException.create(KeeperException.Code.get(((ErrorTxn) request.getTxn()).getErr())); } } KeeperException ke = request.getException(); if (ke instanceof SessionMovedException) { throw ke; } if (ke != null && request.type != OpCode.multi) { throw ke; } LOG.debug("{}", request); if (request.isStale()) { ServerMetrics.getMetrics().STALE_REPLIES.add(1); } switch (request.type) { case OpCode.ping: { lastOp = "PING"; updateStats(request, lastOp, lastZxid); cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response"); return; } case OpCode.createSession: { lastOp = "SESS"; updateStats(request, lastOp, lastZxid); zks.finishSessionInit(request.cnxn, true); return; } case OpCode.multi: { lastOp = "MULT"; rsp = new MultiResponse(); for (ProcessTxnResult subTxnResult : rc.multiResult) { OpResult subResult; switch (subTxnResult.type) { case OpCode.check: subResult = new CheckResult(); break; case OpCode.create: subResult = new CreateResult(subTxnResult.path); break; case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: subResult = new CreateResult(subTxnResult.path, subTxnResult.stat); break; case OpCode.delete: case OpCode.deleteContainer: subResult = new DeleteResult(); break; case OpCode.setData: subResult = new SetDataResult(subTxnResult.stat); break; case OpCode.error: subResult = new ErrorResult(subTxnResult.err); if (subTxnResult.err == Code.SESSIONMOVED.intValue()) { throw new SessionMovedException(); } break; default: throw new IOException("Invalid type of op"); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.multiRead: { lastOp = "MLTR"; MultiOperationRecord multiReadRecord = new MultiOperationRecord(); ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { try { Record rec; switch (readOp.getType()) { case OpCode.getChildren: rec = handleGetChildrenRequest(readOp.toRequestRecord(), cnxn, request.authInfo); subResult = new GetChildrenResult(((GetChildrenResponse) rec).getChildren()); break; case OpCode.getData: rec = handleGetDataRequest(readOp.toRequestRecord(), cnxn, request.authInfo); GetDataResponse gdr = (GetDataResponse) rec; subResult = new GetDataResult(gdr.getData(), gdr.getStat()); break; default: throw new IOException("Invalid type of readOp"); } } catch (KeeperException e) { subResult = new ErrorResult(e.code().intValue()); } ((MultiResponse) rsp).add(subResult); } break; } case OpCode.create: { lastOp = "CREA"; rsp = new CreateResponse(rc.path); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { lastOp = "CREA"; rsp = new Create2Response(rc.path, rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.delete: case OpCode.deleteContainer: { lastOp = "DELE"; err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.setData: { lastOp = "SETD"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.reconfig: { lastOp = "RECO"; rsp = new GetDataResponse( ((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(), rc.stat); err = Code.get(rc.err); break; } case OpCode.setACL: { lastOp = "SETA"; rsp = new SetACLResponse(rc.stat); err = Code.get(rc.err); requestPathMetricsCollector.registerRequest(request.type, rc.path); break; } case OpCode.closeSession: { lastOp = "CLOS"; err = Code.get(rc.err); break; } case OpCode.sync: { lastOp = "SYNC"; SyncRequest syncRequest = new SyncRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; } case OpCode.check: { lastOp = "CHEC"; rsp = new SetDataResponse(rc.stat); err = Code.get(rc.err); break; } case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); } Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); rsp = new ExistsResponse(stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.setWatches: { lastOp = "SETW"; SetWatches setWatches = new SetWatches(); // TODO We really should NOT need this!!!! request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase() .setWatches( relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), cnxn); break; } case OpCode.getACL: { lastOp = "GETA"; GetACLRequest getACLRequest = new GetACLRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ | ZooDefs.Perms.ADMIN, request.authInfo, path, null); Stat stat = new Stat(); List<ACL> acl = zks.getZKDatabase().getACL(path, stat); requestPathMetricsCollector.registerRequest(request.type, getACLRequest.getPath()); try { zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.ADMIN, request.authInfo, path, null); rsp = new GetACLResponse(acl, stat); } catch (KeeperException.NoAuthException e) { List<ACL> acl1 = new ArrayList<ACL>(acl.size()); for (ACL a : acl) { if ("digest".equals(a.getId().getScheme())) { Id id = a.getId(); Id id1 = new Id(id.getScheme(), id.getId().replaceAll(":.*", ":x")); acl1.add(new ACL(a.getPerms(), id1)); } else { acl1.add(a); } } rsp = new GetACLResponse(acl1, stat); } break; } case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); int number = zks.getZKDatabase().getAllChildrenNumber(path); rsp = new GetAllChildrenNumberResponse(number); break; } case OpCode.getChildren2: { lastOp = "GETC"; GetChildren2Request getChildren2Request = new GetChildren2Request(); ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { throw new KeeperException.NoNodeException(); } zks.checkACL( request.cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo, path, null); List<String> children = zks.getZKDatabase() .getChildren(path, stat, getChildren2Request.getWatch() ? cnxn : null); rsp = new GetChildren2Response(children, stat); requestPathMetricsCollector.registerRequest(request.type, path); break; } case OpCode.checkWatches: { lastOp = "CHKW"; CheckWatchesRequest checkWatches = new CheckWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); if (!containsWatcher) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, checkWatches.getPath()); break; } case OpCode.removeWatches: { lastOp = "REMW"; RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); if (!removed) { String msg = String.format(Locale.ENGLISH, "%s (type: %s)", path, type); throw new KeeperException.NoWatcherException(msg); } requestPathMetricsCollector.registerRequest(request.type, removeWatches.getPath()); break; } case OpCode.getEphemerals: { lastOp = "GETE"; GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals); String prefixPath = getEphemerals.getPrefixPath(); Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List<String> ephemerals = new ArrayList<>(); if (StringUtils.isBlank(prefixPath) || "/".equals(prefixPath.trim())) { ephemerals.addAll(allEphems); } else { for (String p : allEphems) { if (p.startsWith(prefixPath)) { ephemerals.add(p); } } } rsp = new GetEphemeralsResponse(ephemerals); break; } } } catch (SessionMovedException e) { // session moved is a connection level error, we need to tear // down the connection otw ZOOKEEPER-710 might happen // ie client on slow follower starts to renew session, fails // before this completes, then tries the fast follower (leader) // and is successful, however the initial renew is then // successfully fwd/processed by the leader and as a result // the client and leader disagree on where the client is most // recently attached (and therefore invalid SESSION MOVED generated) cnxn.sendCloseSession(); return; } catch (KeeperException e) { err = e.code(); } catch (Exception e) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process " + request, e); StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } LOG.error("Dumping request buffer: 0x" + sb.toString()); err = Code.MARSHALLINGERROR; } ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue()); updateStats(request, lastOp, lastZxid); try { if (request.type == OpCode.getData && path != null && rsp != null) { // Serialized read responses could be cached by the connection object. // Cache entries are identified by their path and last modified zxid, // so these values are passed along with the response. GetDataResponse getDataResponse = (GetDataResponse) rsp; Stat stat = null; if (getDataResponse.getStat() != null) { stat = getDataResponse.getStat(); } cnxn.sendResponse(hdr, rsp, "response", path, stat); } else { cnxn.sendResponse(hdr, rsp, "response"); } if (request.type == OpCode.closeSession) { cnxn.sendCloseSession(); } } catch (IOException e) { LOG.error("FIXMSG", e); } }
关于zk中FinalRequestProcessor的作用是什么就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。