zoukankan      html  css  js  c++  java
  • Netty 源码分析——ChannelPipeline

    Netty 源码分析——ChannelPipeline


    通过前面的两章我们分析了客户端和服务端的流程代码,其中在初始化 Channel 的时候一定会看到一个 ChannelPipeline。所以在 Netty 每个 Channel 中有且仅有一个 ChannelPipeline。

    比如我们来看 NioSocketChannel 的构造器初始化流程是
    NioSocketChannel -> AbstractNioByteChannel -> AbstractNioChannel -> AbstractChannel

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    

    我们发现 AbstractChannel 内部维护了一个 pipeline 属性并且在构造器中调用了 newChannelPipeline();

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    

    在 newChannelPipeline(); 方法中实例化了一个 DefaultChannelPipeline 对象,传递的 this 呢就是我们初始化的 NioSocketChannel 的实例了。从代码上就验证了每个 Channel 只对应一个 DefaultChannelPipeline 实例。

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }
    

    上面代码主要代码是 tail 和 head 的赋值。实例化两个 ChannelHandlerContext, 一个是 HeadContext 实例 head, 另一个是 TailContext 实例 tail 接着将 head 和 tail 互相指向, 构成一个双向链表。

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    }
    
    final class HeadContext extends AbstractChannelHandlerContext
                implements ChannelOutboundHandler, ChannelInboundHandler {
    }
    

    我们可以发现 TailContext、HeadContext 都继承了 AbstractChannelHandlerContext 各自都有实现了 Out 或 In 的 Handler 接口,因此它们有 Context 和 Handler 的双重属性。

    那么现在 Channel 和 ChannelPipeline 的对应关系就是如下图
    在这里插入图片描述

    ChannelInitializer 的添加

    Channel 和 ChannelPipeline 的关系我们已经了解了,那么我们定义的 handle 中明明是一个 ChannelInitializer 的实例啊,那么我们就来看一下。

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("clientHandle", new SimpleChannelInboundHandler<String>() {
          @Override
          protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
          }
        });
      }
    });
    

    上面代码在调用 handle 方法时传递了 ChannelInitializer 对象,提供了一个 initChannel 函数来给我们重写。我们首先来看下 ChannelInitializer 类结构。

    public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
    }
    
    public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    }
    
    public abstract class ChannelHandlerAdapter implements ChannelHandler {
    }
    

    从上面代码我们可以看到 ChannelInitializer 实现了 ChannelHandler。

    那么我们往回看代码看回到 io.netty.bootstrap.Bootstrap#init 中。

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());
        // 下面代码省略
    }
    

    init 方法中调用了 channel 属性中 pipeline() 来获取到 pipeline 并且调用 pipeline 的 addLast 函数将 handler 传进去,其实穿进去的就是 ChannelInitializer 实例。

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        // 内部继续调用 addLast 第一个参数为 null  第二个参数 ChannelInitializer 实例
        return addLast(null, handlers);
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        // 取出我们的 ChannelInitializer 实例内部继续调用 addLast 并且前两个参数都是 null
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查 handler 是否重复添加
            checkMultiplicity(handler);
            // 创建一个 Context 实例,第一个参数是 null 上面传的,第二个是获取 name 规则是实例的 simpleClassName + #0, 第三个是 ChannelInitializer 实例
            newCtx = newContext(group, filterName(name, handler), handler);
            // 重点添加到末尾
            addLast0(newCtx);
    
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
    
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    

    上面的 addLast 方法中,很多重载的方法, 我们关注最后这个比较重要的方法就可以了。上面代码我们可以看到 newContext 这个函数。

     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    

    newContext 函数是返回了一个 DefaultChannelHandlerContext 的实例 this 就是 ChannelPipeline, group 是 null,name 是生成的 name,handler 就是 ChannelInitializer 实例。

    我们来进入 DefaultChannelHandlerContext 构造器器看看发生了什么。

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.executionMask = mask(handlerClass);
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
    

    上面代码都是一些属性赋值,但是我们注意 this.executionMask = mask(handlerClass); 这一行我们继续看

    static int mask(Class<? extends ChannelHandler> clazz) {
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }
    

    先是从缓存获取第一次肯定为 null,为 null 后的调用了 mask0(clazz); 获取出 int 然后再放入缓存中,那么重点就是在 mask0(clazz); 中了,里面代码很多省略了不少保留了主要的几行。

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_INBOUND;
            }
    
            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_OUTBOUND;
            }
        } catch (Exception e) {
        }
        return mask;
    }
    

    上面代码我们发现会先去判断传入的 class 对象所表示的类或接口与指定的 Class 参数所表示的类或接口是否相同,那么指定的 class 参数就是 ChannelInboundHandler 了。因为 ChannelInitializer 仅仅实现了 ChannelInboundHandler 所以会执行 mask |= MASK_ALL_INBOUND; 将他标识为是一个 MASK_ALL_INBOUND 返回。而这个 int 值会传递到父类 AbstractChannelHandlerContext 中,并初始化 this.executionMask 字段。

    创建好 DefaultChannelHandlerContext 后我们回来 addLast(); 函数中,会内部继续调用 addLast0(); 方法。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    

    上面代码是将创建好的 newCtx 前置引用指向 head 后置引用指向 tail,head 后置引用指向 newCtx,tail 前置引用指向 newCtx。显然, 这个代码就是典型的双向链表的插入操作了. 当调用了 addLast 方法后, Netty 就会将此 handler 添加到双向链表中 tail 元素之前的位置.

    我们知道了 ChannelInitializer 是什么时候添加进去的了,那么我们在 ChannelInitializer 自定义的 handle 是什么时候添加的呢?

    自定义 handle 添加

    我们来回顾一下客户端在 channel 注册的过程,最后我们发现真正执行的是在 NioSocketChannel$NioSocketChannelUnsafe.register() 中完成的。仔细看里面的代码我们会发现在注册完之后有一行 pipeline.fireChannelRegistered();

    @Override
    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        // 判断当前线程是不是一个 EventLoop 如果不是就异步执行
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }
    

    上面代码执行了 invokeChannelRegistered. 我们继续看看

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }
    

    上面代码主要做了判断当前的 handler 状态是否正确,正确执行 channelRegistered(),不正确执行 fireChannelRegistered().我们先来看看 fireChannelRegistered();

    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
    }
    

    看到上面代码 findContextInbound() 函数传入了一个 MASK_CHANNEL_REGISTERED 那么这个 MASK_CHANNEL_REGISTERED 是什么呢?

    static final int MASK_CHANNEL_REGISTERED = 1 << 1;
    
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }
    

    我们发现 findContextInbound() 函数中是找到一个 executionMask 为 MASK_CHANNEL_REGISTERED 的 context.那么 MASK_CHANNEL_REGISTERED 到底是什么?

    private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
                MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
                MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
    

    还记得 MASK_ALL_INBOUND 吗,我们创建 new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); 的时候不就是设置为 MASK_ALL_INBOUND 吗?看 MASK_ALL_INBOUND 第二个参数,不就是传递到 findContextInbound() 的值吗?由此我们可以得出结论 findContextInbound() 就是要找到一个 InboundHandler 出来传入 invokeChannelRegistered() 中然后执行 channelRegistered() 函数。我们继续跟进。发现 io.netty.channel.ChannelInitializer#channelRegistered 这个方法不就是 ChannelInitializer 里的吗。

        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            if (initChannel(ctx)) {
                ctx.pipeline().fireChannelRegistered();
                removeState(ctx);
            } else {
                ctx.fireChannelRegistered();
            }
        }
    

    我们先关注 initChannel(); 方法将自己传递了进去。

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) {
            try {
                // 执行重写的 initChannel
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                // 将自己重链表中删除
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }
    

    看到上面代码我们就一目了然了调用我们重写的 initChannel 方法将 NioSocketChannel 传递进去。

    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("stringDecoder", new StringDecoder());
        pipeline.addLast("stringEncoder", new StringEncoder());
        pipeline.addLast("clientHandle", new SimpleChannelInboundHandler<String>() {
          @Override
          protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
          }
        });
      }
    });
    

    上面代码就正式的将我们自定义的一些 handle 给放入了 Channel 对应的 ChannelPipeline 中。之后在 finally 中将自己从链表中给删除。

    到了这里, 我们的 自定义 ChannelHandler 的添加过程 也分析的查不多了.本章结束,谢谢观看!

  • 相关阅读:
    vue2 下载scss依赖包
    fastjson使用
    vscode format
    flutter 中涉的深拷贝
    通过pom给maven添加编译插件
    IDEA添加动态模板(Live Templates)
    Maven启动tomcat:run异常
    Redis
    tomcat启动时启动窗口出现乱码的解决方案
    无效的源发行版,解决方案
  • 原文地址:https://www.cnblogs.com/liufeichn/p/11961611.html
Copyright © 2011-2022 走看看