本篇内容介绍了“Netty是如何绑定端口和启动服务的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
服务端启动DEMO
先从一个简单的服务端启动`DEMO`开始,以下是一个标准的`Netty`服务端代码
public final class NettyServer {public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) { ChannelPipeline channelPipeline = channel.pipeline(); channelPipeline.addLast("decoder", new StringDecoder()); channelPipeline.addLast("encoder", new StringEncoder()); channelPipeline.addLast("handler", new ServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}br
注:ServerBootstrap.childHandler()用于指定处理新连接数据的读写处理逻辑,同时ServerBootstrap还提供handler()用于指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法
`ServerHandler`代码如下:
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("service receive msg:" + msg); }}br
当有新连接接入时,控制台打印出
reStructuredTexthandlerAddedchannelRegisteredchannelActivebr
但接收到新消息时,控制台打印出
reStructuredTextservice receive msg:xxxchannelReadCompletebr
本文主要分析服务端的启动过程,而新连接接入 新消息的读取会在后续章节中说明
服务端启动源码分析
`ServerBootstrap`是`Netty`为方便开发者使用而设计的一个启动类,其核心代码入口在`bind()`,代码如下
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}//通过端口号创建一个`InetSocketAddress`,然后继续调用重载的`bind()`public ChannelFuture bind(SocketAddress localAddress) {// ...return doBind(localAddress);}br
由于博客篇幅有限所以有些健壮性分支会以`// ...`略过,健壮性分支不会影响对`Netty`主流程的理解。
`Netty`服务端启动最后会调用到`doBind()`方法,代码如下
private ChannelFuture doBind(final SocketAddress localAddress) {//...final ChannelFuture regFuture = initAndRegister();//...final Channel channel = regFuture.channel();//... doBind0(regFuture, channel, localAddress, promise);//...return promise;br
在`doBind()`中我们关注两个核心方法`initAndRegister()`以及`doBind0()`
服务端`Channel`创建
final ChannelFuture initAndRegister() { Channel channel = null;// 新建Channel channel = channelFactory.newChannel();// 初始化Channelinit(channel);// 将这个Channel Register到某个对象 ChannelFuture regFuture = config().group().register(channel);return regFuture;}br
`Channel`是`Netty`的核心概念之一,它是`Netty`网络通信的主体由它负责同对端进行网络通信、注册和数据操作等功能。
`Channel`的创建是由`channelFactory.newChannel()`完成的,接下来跟踪`channelFactory`是在何时被初始化,我们层层回溯最终发现是在这个函数中
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass"); }return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}br
在`Demo`程序调用`.channel()`方法并传入`NioServerSocketChannel.class`
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz"); }this.clazz = clazz; }@Overridepublic T newChannel() {try {return clazz.newInstance(); } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t); } }}br
即在`Netty`服务端启动的时候通过反射方式(调用默认构造函数)来创建一个`NioServerSocketChannel`对象,
加下来我们继续跟进`NioServerSocketChannel`的默认构造函数
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {// 利用SelectorProvider产生一个ServerSocketChannel对象return provider.openServerSocketChannel(); } catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e); }}br
通过`newSocket(DEFAULT_SELECTOR_PROVIDER)`创建一条`server`端`channel`,然后进入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}br
该方法主要完成两个功能,首先是调用父类的构造方法然后初始化`NioServerSocketChannelConfig`属性
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;// 设置SelectionKey.OP_ACCEPT事件this.readInterestOp = readInterestOp;// 设置ServerSocketChannel为非阻塞的 ch.configureBlocking(false);}br
这里将前面通过`provider.openServerSocketChannel()`创建出来的`ServerSocketChannel`保存到成员变量,然后调用将该`channel`为非阻塞模式,这是个标准的`JDK NIO`编程的玩法。这里的`readInterestOp`即前面层层传入的`SelectionKey.OP_ACCEPT`,接下来继续跟进`super(parent)`
protected AbstractChannel(Channel parent) {this.parent = parent; id = newId();unsafe = newUnsafe(); pipeline = newChannelPipeline();}br
在`AbstractChannel`的构造方法中主要是初始化了`id`,`unsafe`,`pipeline`属性
服务端`Channel`初始化
在创建完`Channel`后,我们在`init`方法中对`Channel`进行初始化操作,代码如下
void init(Channel channel) throws Exception {// 给channel设置optionfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) { channel.config().setOptions(options); }// 给channel设置attrfinal Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {// 设置新接入channel的options currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); }synchronized (childAttrs) {// 设置新接入channel的attrs currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 设置handler到pipeline上// 这里的handler()返回的就是.handler()所设置的值 ChannelHandler handler = config.handler();if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// p.addLast()向serverChannel的流水线处理器中加入了一个ServerBootstrapAcceptor// 从名字上就可以看出来这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器 pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}br
以上代码主要完成如下功能:
ChannelFuture regFuture = config().group().register(channel);```此处`config().group()`返回的对象为`NioEventLoopGroup````java@Overridepublic ChannelFuture register(Channel channel) {// 调用了NioEventLoop对象中的register方法// NioEventLoop extends SingleThreadEventLoopreturn next().register(channel);}br
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this));}public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}br
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ... AbstractChannel.this.eventLoop = eventLoop;// ... register0(promise);}br
private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}//这一段其实也很清晰,先调用`doRegister()`进行注册protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // ... } }}br
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}//在`dobind0()`方法中通过`EventLoop`执行一个任务,接下来我们进入到`channel.bind()`方法public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}br
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {unsafe.bind(localAddress, promise);}//这里的`unsafe`就是前面提到的`AbstractUnsafe`, 准确点应该是`NioMessageUnsafe`@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ... boolean wasActive = isActive();// ... doBind(localAddress);if (!wasActive && isActive()) { invokeLater(new Runnable() { @Overridepublic void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise);}//在`doBind`方法中完成绑定操作,代码如下protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}br
最终调用到了`JDK`里面的`bind`方法真正进行了端口的绑定。按照正常流程我们前面已经分析到`isActive()`方法返回`false`,进入到`doBind()`之后如果`channel`被激活了,就发起`pipeline.fireChannelActive()`调用
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead();}//`pipeline.channelActive`会逐一调用`pipeline`中每一个节点的`channelActive`方法,所以`HeadContext`的`channelActive`将会被调用,即`readIfIsAutoRead`方法将会被调用public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) { channel.read(); }}//最终这个方法会调用到`AbstractNioChannel`的`doBeginRead`方法protected void doBeginRead() throws Exception {final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return; } readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}br
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 建NioServerSocketChannelserverSocketChannel.configureBlocking(false);//AbstractNioChannel中ch.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));// NioServerSocketChannel.doBind()Selector selector = Selector.open();// NioEventLoop.openSelector()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) // AbstractNioChannel.doBeginRead()br
“Netty是如何绑定端口和启动服务的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。