这篇文章主要介绍“elasticsearch节点的transport请求发送怎么处理”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点的transport请求发送怎么处理”文章能帮助大家解决问题。
前一篇分析对nettytransport的启动及连接,本篇主要分析transport请求的发送和处理过程。
cluster中各个节点之间需要相互发送很多信息,如master检测其它节点是否存在,node节点定期检测master节点是否存储,cluster状态的发布及搜索数据请求等等。为了保证信息传输,elasticsearch定义了一个19字节长度的信息头HEADER_SIZE = 2 + 4 + 8 + 1 + 4,以'E','S'开头,接着是4字节int信息长度,然后是8字节long型信息id,接着是一个字节的status,最后是4字节int型version。
所有的节点间的信息都是以这19个字节开始。同时elasticsearch对于节点间的所有action都定义 了名字,如对master的周期检测action,internal:discovery/zen/fd/master_ping,每个action对应着相应的messagehandler。接下来会进行详分析。
代码在nettytransport中如下所示:
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { //参数说明:node发送的目的节点,requestId请求id,action action名称,request请求,options包括以下几种操作 RECOVERY,BULK,REG,STATE,PING; Channel targetChannel = nodeChannel(node, options);//获取对应节点的channel,channel在连接节点时初始化完成(请参考上一篇) if (compress) { options.withCompress(true); } byte status = 0; //设置status 包括以下几种STATUS_REQRES = 1 << 0; STATUS_ERROR = 1 << 1; STATUS_COMPRESS = 1 << 2; status = TransportStatus.setRequest(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);//初始写出流 boolean addedReleaseListener = false; try { bStream.skip(NettyHeader.HEADER_SIZE);//留出message header的位置 StreamOutput stream = bStream; // only compress if asked, and, the request is not bytes, since then only // the header part is compressed, and the "body" can't be extracted as compressed if (options.compress() && (!(request instanceof BytesTransportRequest))) { status = TransportStatus.setCompress(status); stream = CompressorFactory.defaultCompressor().streamOutput(stream); } stream = new HandlesStreamOutput(stream); // we pick the smallest of the 2, to support both backward and forward compatibility // note, this is the only place we need to do this, since from here on, we use the serialized version // as the version to use also when the node receiving this request will send the response with Version version = Version.smallest(this.version, node.version()); stream.setVersion(version); stream.writeString(transportServiceAdapter.action(action, version)); ReleasableBytesReference bytes; ChannelBuffer buffer; // it might be nice to somehow generalize this optimization, maybe a smart "paged" bytes output // that create paged channel buffers, but its tricky to know when to do it (where this option is // more explicit). if (request instanceof BytesTransportRequest) { BytesTransportRequest bRequest = (BytesTransportRequest) request; assert node.version().equals(bRequest.version()); bRequest.writeThin(stream); stream.close(); bytes = bStream.bytes(); ChannelBuffer headerBuffer = bytes.toChannelBuffer(); ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer(); buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer); } else { request.writeTo(stream); stream.close(); bytes = bStream.bytes(); buffer = bytes.toChannelBuffer(); } NettyHeader.writeHeader(buffer, requestId, status, version);//写信息头 ChannelFuture future = targetChannel.write(buffer);//写buffer同时获取future,发送信息发生在这里 ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes); future.addListener(listener);//添加listener addedReleaseListener = true; transportServiceAdapter.onRequestSent(node, requestId, action, request, options); } finally { if (!addedReleaseListener) { Releasables.close(bStream.bytes()); } } }
以上就是request的发送过程,获取目标node的channel封装请求写入信息头,然后发送并使用listener监听,这里transportRequest是一个抽象类,它继承了TransportMessage同时实现了streamable接口。cluster中对它的实现非常多,各个功能都有相应的request,这里就不一一列举,后面的代码分析中会时常涉及。
request发送只是transport的一部分功能,有发送就要有接收,这样transport的功能才完整。接下来就是对接收过程的分析。上一篇中简单介绍过netty的使用,message的处理是通过MessageHandler处理,因此nettyTransport的信息处理逻辑都在MessageChannelHandler的messageReceived()方法中,代码如下所示:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Transports.assertTransportThread(); Object m = e.getMessage(); if (!(m instanceof ChannelBuffer)) {//非buffer之间返回 ctx.sendUpstream(e); return; } //解析message头 ChannelBuffer buffer = (ChannelBuffer) m; int size = buffer.getInt(buffer.readerIndex() - 4); transportServiceAdapter.received(size + 6); // we have additional bytes to read, outside of the header boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0; int markedReaderIndex = buffer.readerIndex(); int expectedIndexReader = markedReaderIndex + size; // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh // buffer, or in the cumlation buffer, which is cleaned each time StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size); //读取信息头中的几个重要元数据 long requestId = buffer.readLong(); byte status = buffer.readByte(); Version version = Version.fromId(buffer.readInt()); StreamInput wrappedStream; ………… if (TransportStatus.isRequest(status)) {//处理请求 String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); } else { logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action); } buffer.readerIndex(expectedIndexReader); } } else {//处理响应 TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId); // ignore if its null, the adapter logs it if (handler != null) { if (TransportStatus.isError(status)) { handlerResponseError(wrappedStream, handler); } else { handleResponse(ctx.getChannel(), wrappedStream, handler); } } else { // if its null, skip those bytes buffer.readerIndex(markedReaderIndex + size); } ………… wrappedStream.close(); }
以上就是信息处理逻辑,这个方法基础自netty的SimpleChannelUpstreamHandler类。作为MessageHandler会在client和server启动时加入到handler链中,在信息到达后netty会自动调用handler链依次处理。这是netty的内容,就不详细说明,请参考netty文档。
代码如下所示:
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString();//读出action的名字 transportServiceAdapter.onRequestReceived(requestId, action); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);//获取处理该信息的handler if (handler == null) { throw new ActionNotFoundTransportException(action); } final TransportRequest request = handler.newInstance(); request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); request.readFrom(buffer); if (handler.executor() == ThreadPool.Names.SAME) { //noinspection unchecked handler.messageReceived(request, transportChannel);//使用该handler处理信息。 } else { threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); } } catch (Throwable e) { try { transportChannel.sendResponse(e); } catch (IOException e1) { logger.warn("Failed to send error message back to client for action [" + action + "]", e); logger.warn("Actual Exception", e1); } } return action; }
几个关键部分在代码中进行了标注。这里仍旧不能看到请求是如何处理的。因为cluster中的请求各种各样,如ping,discovery,index等等,因此不可能使用同一种处理方式。因此request最终又被提交给handler处理。每个功能请求都实现了自己的handler,当请求被提交给handler时会做对应的处理。这里再说一下transportServiceAdapter,消息的处理都是通过它适配转发完成。request的完整处理流程是:messageReceived()方法收到信息判断是request会将其转发到transportServiceAdapter的handler方法,handler方法查找对应的requesthandler,使用将信息转发给该handler进行处理。这里就不举例说明,在后面的discover分析中我们会看到发现,ping等请求的处理过程。
response通过handleResponse方法进行处理,代码如下:
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) { final TransportResponse response = handler.newInstance(); response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); response.remoteAddress(); try { response.readFrom(buffer); } catch (Throwable e) { handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e)); return; } try { if (handler.executor() == ThreadPool.Names.SAME) { //noinspection unchecked handler.handleResponse(response);//转发给对应的handler } else { threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, response)); } } catch (Throwable e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); } }
response的处理过程跟request很类似。每个request都会对应一个handler和一个response的处理handler,会在时候的时候注册到transportService中。请求到达时根据action名称获取到handler处理request,根据requestId获取对应的response handler进行响应。
关于“elasticsearch节点的transport请求发送怎么处理”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注亿速云行业资讯频道,小编每天都会为大家更新不同的知识点。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。