温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

netty服务端处理请求联合pipeline源码分析

发布时间:2023-04-25 17:07:05 来源:亿速云 阅读:143 作者:iii 栏目:开发技术

本篇内容主要讲解“netty服务端处理请求联合pipeline源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“netty服务端处理请求联合pipeline源码分析”吧!

    两个问题

    • 在客户端接入的时候, NioMessageUnsaferead方法中pipeline.fireChannelRead(readBuf.get(i))为什么会调用到ServerBootstrap的内部类ServerBootstrapAcceptor中的channelRead()方法。

    • 客户端handler是什么时候被添加的?

    先分析第一个问题。回到netty处理客户端请求分析_1中服务端接收到accpet事件后,进行读取的方法NioMessageUnsafe.read()

    NioMessageUnsafe.read()

    public void read() {
        //必须是NioEventLoop方法调用的, 不能通过外部线程调用
        assert eventLoop().inEventLoop();
        //服务端channel的config
        final ChannelConfig config = config();
        //服务端channel的pipeline
        final ChannelPipeline pipeline = pipeline();
        //处理服务端接入的速率
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        //设置配置
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    //创建jdk底层的channel
                    //readBuf用于临时承载读到链接
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }
                    //分配器将读到的链接进行计数
                    allocHandle.incMessagesRead(localRead);
                    //连接数是否超过最大值
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            int size = readBuf.size();
            //遍历每一条客户端连接
            for (int i = 0; i < size; i ++) {
                readPending = false;
                //传递事件, 将创建NioSokectChannel进行传递
                //最终会调用ServerBootstrap的内部类ServerBootstrapAcceptor的channelRead()方法
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
            //代码省略
        } finally {
            //代码省略
        }
    }

    重点看pipeline.fireChannelRead(readBuf.get(i))

    首先, 这里pipeline是服务端channelpipeline, 也就是NioServerSocketChannelpipeline

    我们学习过pipeline之后, 对这种写法并不陌生, 就是传递channelRead事件, 这里通过传递channelRead事件走到了ServerBootstrapAcceptorchannelRead()方法, 说明在这步之前, ServerBootstrapAcceptor作为一个handler添加到了服务端channelpipeline中, 那么这个handler什么时候添加的呢?

    我们回顾下第一章, 初始化NioServerSocketChannel的时候, 调用了ServerBootstrap的init方法 回顾下ServerBootstrap.init的调用链路:

    ServerBootstrap.bind(8899) ---> AbstractBootstrap.doBind(final SocketAddress localAddress) ---> AbstractBootstrap.initAndRegister() ---> ServerBootstrap.init(Channel channel)

    ServerBootstrap.init(Channel channel)

    void init(Channel channel) throws Exception {
        //获取用户定义的选项(1)
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }
        //获取用户定义的属性(2)
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //获取channel的pipline(3)
        ChannelPipeline p = channel.pipeline();
        //work线程组(4)
        final EventLoopGroup currentChildGroup = childGroup;
        //用户设置的Handler(5)
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        //选项转化为Entry对象(6)
        synchronized (childOptions) { 
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        //属性转化为Entry对象(7)
        synchronized (childAttrs) { 
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //添加服务端handler(8)
        p.addLast(new ChannelInitializer<Channel>() {
            //初始化channel
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) { 
                    pipeline.addLast(handler);
                } 
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() { 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

    我们重点关注第8步, 添加服务端channel, 这里的pipeline, 是服务服务端channelpipeline, 也就是NioServerSocketChannel绑定的pipeline, 这里添加了一个ChannelInitializer类型的handler

    ChannelInitializer的继承关系

    public abstract class ChannelInitializer&lt;C extends Channel&gt; extends ChannelInboundHandlerAdapter {
        //省略类体
    }

    我们看到其继承了ChannelInboundHandlerAdapter, 说明是一个inbound类型的handler

    这里我们可能会想到, 添加完handler会执行handlerAdded, 然后在handlerAdded方法中做了添加ServerBootstrapAcceptor这个handler

    但是, 实际上并不是这样的, 当程序执行到这里, 并没有马上执行handlerAdded, 我们紧跟addLast方法

    最后执行到DefualtChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //判断handler是否被重复添加(1)
            checkMultiplicity(handler);
            //创建一个HandlerContext并添加到列表(2)
            newCtx = newContext(group, filterName(name, handler), handler);
            //添加HandlerContext(3)
            addLast0(newCtx);
            //是否已注册
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                //回调用户事件
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        //回调添加事件(4)
        callHandlerAdded0(newCtx);
        return this;
    }

    首先完成了handler的添加, 但是并没有马上执行回调

    这里我们重点关注if (!registered)这个条件判断, 其实在注册完成, registered会变成true, 但是走到这一步的时候NioServerSockeChannel并没有完成注册(可以回顾第一章看注册在哪一步), 所以会进到if里并返回自身

    DefualtChannelPipeline.callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added)

    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered;
        //判断是否已添加, 未添加, 进行添加, 已添加进行删除
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        //获取第一个Callback任务
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        //如果第一个Callback任务为空
        if (pending == null) {
            //将第一个任务设置为刚创建的任务
            pendingHandlerCallbackHead = task;
        } else {
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

    因我们调用这个方法的时候added传的true, 所以PendingHandlerCallback task赋值为new PendingHandlerAddedTask(ctx)

    PendingHandlerAddedTask这个类, 我们从名字可以看出, 这是一个handler添加的延迟任务, 用于执行handler延迟添加的操作, 同样也对应一个名字为PendingHandlerRemovedTask的类, 用于执行延迟删除handler的操作, 这两个类都继承抽象类PendingHandlerCallback

    PendingHandlerAddedTask构造方法
    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    进入super(ctx)

    PendingHandlerCallback构造方法
    PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    在父类中, 保存了要添加的context, 也就是ChannelInitializer类型的包装类

    回到callHandlerCallbackLater方法
    PendingHandlerCallback pending = pendingHandlerCallbackHead;

    这表示获取第一个PendingHandlerCallback的任务, 其实PendingHandlerCallback是一个单向链表, 自身维护一个PendingHandlerCallback类型的next, 指向下一个任务, 在DefaultChannelPipeline这个类中, 定义了个PendingHandlerCallback类型的引用pendingHandlerCallbackHead, 用来指向延迟回调任务的中的第一个任务。

    之后判断这个任务是为空, 如果是第一次添加handler, 那么这里就是空, 所以将第一个任务赋值为我们刚创建的添加任务。

    如果不是第一次添加handler, 则将我们新创建的任务添加到链表的尾部, 因为这里我们是第一次添加, 所以第一个回调任务就指向了我们创建的添加handler的任务。

    完成这一系列操作之后, addLast方法返归, 此时并没有完成添加操作。

    而什么时候完成添加操作的呢?

    回到在服务端channel注册时候的会走到AbstractChannel.register0方法 回顾下AbstractChannel.register0的调用链路:

    ServerBootstrap.bind(8899) ---> AbstractBootstrap.doBind(final SocketAddress localAddress) ---> AbstractBootstrap.initAndRegister() ---> config().group().register(channel) ---> SingleThreadEventLoop.register(final ChannelPromise promise) ---> AbstractChannel.register(EventLoop eventLoop, final ChannelPromise promise) ---> AbstractChannel.register0(ChannelPromise promise)

    AbstractChannel.register0(ChannelPromise promise)

    private void register0(ChannelPromise promise) {
        try {
            //做实际的注册(1)
            doRegister();
            neverRegistered = false;
            registered = true;
            //触发事件(2)
            pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            //触发注册成功事件(3)
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    //传播active事件(4)
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            //省略代码
        }
    }

    重点关注第二步pipeline.invokeHandlerAddedIfNeeded(), 这里已经通过doRegister()方法完成了实际的注册, 我们跟到该方法中

    pipeline.invokeHandlerAddedIfNeeded()

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            callHandlerAddedForAllHandlers();
        }
    }

    这里会判断是否第一次注册, 这里返回true, 然后会执行callHandlerAddedForAllHandlers()方法, 我们跟进去

    DefaultChannelPipeline.callHandlerAddedForAllHandlers
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;
            registered = true;
            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            this.pendingHandlerCallbackHead = null;
        }
        //获取task
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            //执行添加handler方法
            task.execute();
            task = task.next;
        }
    }

    这里拿到第一个延迟执行handler添加的task其实就是我们之前剖析过的, 延迟执行handler添加的task, 就是PendingHandlerAddedTask对象

    while循环中, 通过执行execute()方法将handler添加

    进入PendingHandlerAddedTask.execute()
    void execute() {
        //获取当前eventLoop线程
        EventExecutor executor = ctx.executor();
        //是当前执行的线程
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            try {
                //添加到队列
                executor.execute(this);
            } catch (RejectedExecutionException e) {
                //代码省略
            }
        }
    }

    再进入callHandlerAdded0方法

    callHandlerAdded0(final AbstractChannelHandlerContext ctx)
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
            //省略...
        }
    }

    终于在这里, 我们看到了执行回调的方法

    再回到ServerBootstrap.init(Channel channel)

    void init(Channel channel) throws Exception {
        //获取用户定义的选项(1)
        final Map&lt;ChannelOption&lt;?&gt;, Object&gt; options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }
        //获取用户定义的属性(2)
        final Map&lt;AttributeKey&lt;?&gt;, Object&gt; attrs = attrs0();
        synchronized (attrs) {
            for (Entry&lt;AttributeKey&lt;?&gt;, Object&gt; e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey&lt;Object&gt; key = (AttributeKey&lt;Object&gt;) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //获取channel的pipline(3)
        ChannelPipeline p = channel.pipeline();
        //work线程组(4)
        final EventLoopGroup currentChildGroup = childGroup;
        //用户设置的Handler(5)
        final ChannelHandler currentChildHandler = childHandler;
        final Entry&lt;ChannelOption&lt;?&gt;, Object&gt;[] currentChildOptions;
        final Entry&lt;AttributeKey&lt;?&gt;, Object&gt;[] currentChildAttrs;
        //选项转化为Entry对象(6)
        synchronized (childOptions) { 
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        //属性转化为Entry对象(7)
        synchronized (childAttrs) { 
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //添加服务端handler(8)
        p.addLast(new ChannelInitializer&lt;Channel&gt;() {
            //初始化channel
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) { 
                    pipeline.addLast(handler);
                } 
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() { 
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

    我们继续看第8步添加服务端handler

    因为这里的handlerChannelInitializer, 所以完成添加之后会调用ChannelInitializerhandlerAdded方法

    跟到handlerAdded方法

    ChannelInitializer.handlerAdded(ChannelHandlerContext ctx)

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //默认情况下, 会返回true
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

    因为执行到这步服务端channel已经完成注册, 所以会执行到initChannel方法

    ChannelInitializer.initChannel(ChannelHandlerContext ctx)
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        //这段代码是否被执行过
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                //调用之后会删除当前节点
                remove(ctx);
            }
            return true;
        }
        return false;
    }

    我们关注initChannel这个方法, 这个方法是在ChannelInitializer的匿名内部来实现的, 这里我们注意, 在initChannel方法执行完毕之后会调用remove(ctx)删除当前节点

    继续跟进initChannel方法
    public void initChannel(Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) { 
            pipeline.addLast(handler);
        } 
        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() { 
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

    这里首先添加用户自定义的handler, 这里如果用户没有定义, 则添加不成功, 然后, 会调用addLastServerBootstrapAcceptor这个handler添加了进去, 同样这个handler也继承了ChannelInboundHandlerAdapter, 在这个handler中, 重写了channelRead方法, 所以, 这就是第一个问题的答案

    紧接着我们看第二个问题:客户端handler是什么时候被添加的?

    看ServerBootstrapAcceptor的channelRead方法

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        //添加channelHadler, 这个channelHandler, 就是用户代码添加的ChannelInitializer
        child.pipeline().addLast(childHandler);
        //代码省略
        try {
            //work线程注册channel
            childGroup.register(child).addListener(new ChannelFutureListener() {
                //代码省略
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

    这里真相可以大白了, 服务端再创建完客户端channel之后, 将新创建的NioSocketChannel作为参数触发channelRead事件(可以回顾NioMessageUnsafe.read方法, 代码这里就不贴了), 所以这里的参数msg就是NioSocketChannel

    拿到channel时候再将客户端的handler添加进去, 我们回顾客户端handler的添加过程:

    .childHandler(new ChannelInitializer&lt;SocketChannel&gt;() {
        @Override
        public void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new StringEncoder());
            ch.pipeline().addLast(new ServerHandler());
        }
    });

    和服务端channel的逻辑一样, 首先会添加ChannelInitializer这个handler但是没有注册所以没有执行添加handler的回调, 将任务保存到一个延迟回调的task

    等客户端channel注册完毕, 会将执行添加handler的回调, 也就是handlerAdded方法, 在回调中执行initChannel方法将客户端handler添加进去, 然后删除ChannelInitializer这个handler

    因为在服务端channel中这块逻辑已经进行了详细的剖析, 所以这边就不在赘述, 同学们可以自己跟进去走一遍流程

    这里注意, 因为每创建一个NioSoeketChannel都会调用服务端ServerBootstrapAcceptorchannelRead方法, 所以这里会将每一个NioSocketChannelhandler进行添加。

    到此,相信大家对“netty服务端处理请求联合pipeline源码分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

    向AI问一下细节

    免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

    AI