zoukankan      html  css  js  c++  java
  • 9.ChannelHandlerContext

    继承关系

    该接口与 给大动脉来一刀 - ChannelPipeLine 接口一样都继承了 ChannelOutboundInvokerChannelInboundInvoker, 这两个接口下面有说.

    关于 AttributeMap 接口可以储存自定义的属性.

    ChannelInboundInvoker & ChannelOutboundInvoker

    这两个接口和 ChannelInboundHandler & ChannelOutboundHandler 接口类似.

    Handler 关键字的这两个接口, 就是最终的入站和出站处理类; 而 Invoker 关键字的接口, 它虽然也有和 Handler 关键字接口中类似的方法, 但在最终都会委托给 AbstractChannelHandlerContext 抽象类中的对应静态方法.

    但是有 Invoker 关键字的接口中, 会有一些特有的方法, 这些方法根据具体实现类有不同的实现, 例如:

    // ChannelInboundInvoker 中的 newPromise() 方法.
    ChannelPromise newPromise();
    
    // 该方法在 DefaultChannelPipeline 中的实现.
    public final ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel);
    }
    
    // 该方法在 AbstractChannelHandlerContext 中的实现.
    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }
    

    AbstractChannelHandlerContext

    构造方法

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                    String name, Class<? extends ChannelHandler> handlerClass) {
    
        // 下面这三个没啥好说的就是赋值
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
    
        // 调用 ChannelHandlerMask 中的静态方法 mask.
        this.executionMask = mask(handlerClass);
    
        // 如果 EventLoop 或者 Executor 实例实现了 OrderedEventExecutor 接口, 则顺序执行.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
    
    // 用来获取和计算 ChannelHandler 类型的 mask 值.
    // 该值表示了 ChannelHandler 是 入站还是出站, 并且对那些事件感兴趣.
    static int mask(Class<? extends ChannelHandler> clazz) {
        // 获取缓存
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        if (mask == null) {
            // 如果缓存中没有, 则计算.
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }
    
    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        // mask = 1
        int mask = MASK_EXCEPTION_CAUGHT;
        try {
            // 判断是否是 ChannelInboundHandler 类型.
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                // 或上 MASK_ALL_INBOUND(511), 添加所有 inbound 关心的事件位.
                mask |= MASK_ALL_INBOUND;
    
                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    // 表示此 Handler 并不关心此事件, 将对应位上的数字变为相反, 即 1->0
                    // 这里 registered 事件为二进制第二位为 1, 则跳过的话, 将第二位变为 0
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
    
                // ...
            }
    
            // 判断是否是 ChannelOutboundHandler 类型.
            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                // MASK_ALL_OUTBOUND(130561)
                mask |= MASK_ALL_OUTBOUND;
    
                // 下面这里与入站类似
                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                                SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
    
                // ...
            }
    
            // 无关 Inbound 和 Outbound, 都可以关心的事件.
            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        }
        catch (Exception e) {
            PlatformDependent.throwException(e);
        }
        return mask;
    }
    

    mask0 方法中的代码太多就不复制全部的了, 举个例子当有一个 ChannelHandler 实例为 ChannelInboundHandler 类型, 并且只关心 channelRead 方法, 这个时候 mask 的值为 33.

    主要属性和方法

    该抽象类有一个默认实现为 DefaultChannelHandlerContext, 该实现类的初始化可以在 给大动脉来一刀 - ChannelPipeLine 文章中找到.

    下面是一些该类比较重要的属性和方法:

    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    

    还记得前面说过的 ChannelPipleline 吗? ChannelHandler 的管理容器, 它内部维护了一个 ChannelHandler 的链表, 可以方便的实现 ChannelHandler 的查找、添加、删除、替换、遍历等.

    在内部维护的 ChannelHandler 链表, 是一个双向链表, 并且只会维护相同的头和尾; 而对于上一个或下一个 ChannelHandler, 是根据当前抽象类中的这两个属性维护的.

    换句话说, AbstractChannelHandlerContext 是 Netty 实现 Pipeline 机制的关键.

    读消息

    给大动脉来一刀-ChannelPipeLine#OP-WRITE-事件传播 文章中的代码处有说, 第一个 channelRead 方法是通过调用 ChannelPipeline#fireChannelRead(); 方法执行.

    该方法的源码为:

    public final ChannelPipeline fireChannelRead(Object msg) {
        // head 是 ChannelPipleline 中所维护的内部类 HeadContext.
        // 不多说对应文章中有介绍.
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    这里画重点, 就堵一台考面包机.

    也就是说 ChannelPipeline 想要执行 ChannelHandler 链表, 靠的就是 AbstractChannelHandlerContext 抽象类中的相关静态方法. 值得注意的是: 这些静态方法的作用域全部为 protected.

    除了上面说的这个静态方法外, 还有 invokeChannelRegistered 或者 invokeChannelReadComplete, 其实它就是包含了与 ChannelInboundInvoker 接口相对应的静态方法.

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        // 由于是让 ChannelHandler 从头开始执行, 所以这里的 next 为 HeadContext 实例.
        // 判断是否要做资源检测. 值得注意的是: 在这些静态方法中只有该静态方法才会做资源检测.
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        // 判断该方法是不是在 ChannelHandler 所关联的执行器中调用的.
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                // 这段代码被执行, 是因为 ChannelHandler 没有被移除并且不是有序执行.
                // channelRead 方法就是 ChannelHandler 实现类中的方法了.
                // this 就是和 ChannelHandler 实现类关联的 ChannelHandlerContext 了.
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            // 该方法下面有讲.
            fireChannelRead(msg);
        }
    }
    
    /**
     * ordered == true: 表示该 ChannelHandler 没有关联执行器 或者 执行器实现了 OrderedEventExecutor 接口.
     * 注意下面是该变量取反.
     *
     * 返回 true: 
     *        当前已经执行了 ChannelHandler#handlerAdded(ChannelHandlerContext) 方法 或者
     *        关联的执行器没有实现 OrderedEventExecutor 接口 并且 
     *        该 ChannelHandler 的状态是不确定有没有执行 handlerAdded 方法.
     *
     * 而且这里也保证了该 ChannelHandler 没有被移除, 也就是 handlerState != REMOVE_COMPLETE.
     */
    private boolean invokeHandler() {
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }
    

    当前例子中 invokeChannelRead 静态方法中的第一个参数传递的是 HeadContext 也就是调用 ChannelHandler 双向链表中的第一个; 当然了也可以从中途执行, 这要看具体的业务逻辑了.

    但是也要注意 ordered 变量, 该变量表示是否有序执行; 通过 invokeHandler() 方法可以判断出, 有序执行指的是当前 ChannelHandler 的状态为 ADD_COMPLETE 时则执行对应的方法; 而无序指的是当前 ChannelHandler 的状态为 ADD_PENDING 时也可以执行对应的方法. 如果是有序执行但是当前 ChannelHandler 的状态为 ADD_PENDING, 则调用 fireChannelRead(Object) 方法执行下一个 ChannelHandler.

    因为当调用 ChannelHandler#handlerAdded(ChannelHandlerContext) 方法时, 才说明 ChannelHandler 被真正添加到了 ChannelPipeline 中.

    如果想要调用下一个 ChannelHandler 实例中的 channelRead 方法时(也就是继续传播), 只需要直接调用 ctx.fireChannelRead(msg) 方法, 也就是 AbstractChannelHandlerContext#fireChannelRead(Object) 方法.

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        // MASK_CHANNEL_READ = 32
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }
    

    ChannelHandlerContext 是一个双向链表, findContextInbound 方法的作用就是找到下一个符合条件的 ChannelHandler.

    所谓的条件有两种, 第一种: 根据方法名可以看出下一个必须是 Inbound 类型; 第二种: 根据参数可以看出必须关注了 MASK_CHANNEL_READ 事件, 也就是实现了 channelRead 方法. 而 invokeChannelRead 方法就是前面说的静态方法了.

    也就是说, 想不想传递给下一个 ChannelHandler 来继续处理这条消息, 完全是靠 AbstractChannelHandlerContext 类中的 fireChannelRead(Object) 方法决定.

    写消息

    当消息处理完成后, 一般都是需要回复. 可以通过 AbstractChannelHandlerContext#write(...) 方法进行回复.

    public ChannelPromise newPromise() {
        return new DefaultChannelPromise(channel(), executor());
    }
    
    // 这个 write 方法并不是真正的在写出数据, 而是将数据存放到一个成员变量中.
    // 其 flush 方法才是真正写出数据.
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        write(msg, false, promise);
        return promise;
    }
    
    // 该方法相当于 write 和 flush 方法一起调用.
    public ChannelFuture writeAndFlush(Object msg) {
        return writeAndFlush(msg, newPromise());
    }
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        write(msg, true, promise);
        return promise;
    }
    

    上面这四个发送数据的方法, 并不是最终方法. 最终方法是由私有的 write 方法实现.

    private void write(Object msg, Boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            // 这里是做安全检查.
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                return;
            }
        }
        catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
    
        // findContextOutbound 方法与 findContextInbound 方法类似, 只不过是用来查询下一个 Outbound.
        // 注意这里: 如果 flush 为 true, 则会找到实现了 write 和 flush 方法的 Outbound.
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                        (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                // 下面有说.
                next.invokeWriteAndFlush(m, promise);
            } else {
                // 该方法中的逻辑, 如果 invokeHandler() 返回 true, 则直接执行 invokeWrite0 方法.
                // 否则就会重新调用 write(Object, ChannelPromise) 方法, 
                // 找到下一个满足条件的 ChannelHandler.
                next.invokeWrite(m, promise);
            }
        } else {
            // 这里为什么不直接调用 executor.execute? 这里先留个坑.
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }
    
    void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            // 内部直接调用 ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            invokeWrite0(msg, promise);
    
            // 内部直接调用 ((ChannelOutboundHandler) handler()).flush(this);
            invokeFlush0();
        } else {
            // 这里也是重新调用.
            writeAndFlush(msg, promise);
        }
    }
    

    现在需要返回一条消息, 所以在下面箭头指向的 ChannelHandler 中调用了 write 方法, 那么找到的第一个 Outbound 是 4 还是 2?

    是 4 还是 2 这要看 write 方法是通过哪种方式调用的了, 这与上面说的读消息一样都是有两种方式; 这两种方式也就是别的文章说的 从 Pipeline 传播 和 从 Context 传播, 但我个人更倾向理解为: 在 Pipeline 中传播但是传播的方式不同, 第一种方式是从 Pipeline 最开始进行传播, 第二种方式为从某个 ChannelHandler 处直接传播.

    如果在 3 中调用动态的 write 方法, 那么找到的第一个 Outbound 是 2, 这里没什么好解释的. 但是在大多数情况下需要从第一个 Outbound 开始执行, 也就是上图中的 4, 然后再执行 2 最后执行 1.

    // 这两种方法都是一样的, 最终都会调用 pipeline 中的 write 方法.
    
    channelHandlerContext.channel().write(...);
    channelHandlerContext.pipeline().write(...);
    

    pipeline 中的 write 方法源码如下.

    // tail 就是 TailContext 对象.
    public final ChannelFuture write(Object msg) {
        // 这里就是 AbstractChannelHandlerContext#write(Object) 方法, 也就是上面说的那个.
        return tail.write(msg);
    }
    

    值得注意的是, HeadContext 是必须要执行的, 不管是读还是写数据时. 这是因为写数据时该类的 write 方法会调用 AbstractChannel#write(Object, ChannelPromise) 方法将要发送的数据写入缓冲区. 可以把这个缓冲区理解为出站缓冲区.

    public final void write(Object msg, ChannelPromise promise) {
        // 该类是在 AbstractChannel 内部使用的数据结构, 用于存储准备发送的出站写请求.
        // 该类的源码在 https://www.cnblogs.com/scikstack/p/13524675.html 查看 
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    
        // ...
    
        int size;
        try {
            // 这里调用的是 AbstractNioByteChannel 中的 filterOutboundMessage 方法.
            // 该方法中会判断参数 msg 是不是以下类型:
            //   - ByteBuf: 则会判断是否为 DirectBuffer, 如果不是则转换为 DirectBuffer 后返回.
            //   - FileRegion: 则直接返回 msg.
            //
            // 对于其它类型则会抛出 UnsupportedOperationException 异常.
            msg = filterOutboundMessage(msg);
    
            // 这里调用的是 DefaultMessageSizeEstimator.HandleImpl#size() 方法. 获取 msg 的可读字节数.
            //
            // 如果是 ByteBuf 或 ByteBufHolder 类型则调用 readableBytes() 方法.
            // 如果是 FileRegion 类型, 则会返回 0.
            // 其它类型则返回 8.
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }
        outboundBuffer.addMessage(msg, size, promise);
    }
    
    flush 缓冲区

    该方法与 write 方法的调用方式一样, 都是可以通过下面两种方式调用, 以及直接调用动态方法 flush().

    channelHandlerContext.channel().flush(...);
    channelHandlerContext.pipeline().flush(...);
    

    最终也都是要调用HeadContext#flush(ChannelHandlerContext)方法, 在该方法中会调用AbstractChannel.AbstractUnsafe#flush()方法来发送数据.

    public final void flush() {
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
    
        outboundBuffer.addFlush();
    
        // inFlush0 为 true, 或者 ChannelOutboundBuffer#isEmpty() 为 true, 则直接返回不会继续执行.
        // 否则会先判断通道如果没有打开或关闭会抛出对应的异常.
        // 会调用 NioSocketChannel 实现类中的 doWrite(outboundBuffer); 方法.
        flush0();
    }
    

    outboundBuffer.addFlush(); 关于该方法的具体操作, 可以通过 10.ChannelOutboundBuffer 文章查看.

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        // 通过 writeSpinCount 变量控制最大循环次数, 以防止单次写入操作消耗过多时间阻塞线程.
        // 也就是说, 最多循环 16 次, 没写完也暂时不管了.
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // 全部写完后清除 OP_WRITE 事件.
                clearOpWrite();
                return;
            }
    
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();
    
            switch (nioBufferCnt) {
                case 0:
                    writeSpinCount -= doWrite0(in);
                break;
                case 1: {
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    long attemptedBytes = in.nioBufferSize();
                    
                    // 该方法是线程安全的.
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
    
                    // 待写入数据: 缓冲区中的内容, 写入数据: 已发送的内容.
                    //
                    // 这里有个 adjustMaxBytesPerGatheringWrite(..) 该方法的作用是, 通过本次写入数据和待写入数据进行动态调整 ByteBuffer 大小;
                    // 如果 待写入数据 等于 写入数据, 那么下次就扩大 ByteBuffer, 会扩大一倍.
                    // 如果 待写入数据 大于 阀值(4M), 并且本次写入的还没有待写入数据的一半多, 就将 ByteBuffer 调整为, 待写入数据 的一半大小.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite);
    
                    // 获取 flushedEntry 中 msg 消息, 如果 msg 类型不是 io.netty.buffer.ByteBuf 则直接返回.
                    // 如果 msg 的可读字节数, 小于等于了发送字节数, 就说明消息发送完毕就会将 flushedEntry 指向下一个 Entry.
                    // 否则就说明 msg 只发送了部分数据, 那么就会调整 msg 的可读字节数.
                    // 最后清除 NIO_BUFFERS 缓存的 ByteBuffer 引用, 以便可以进行 GC 处理.
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
        incompleteWrite(writeSpinCount < 0);
    }
    

    ChannelHandler 生命周期

    • handlerAdded: 新建立的连接会按照初始化策略, 把 handler 添加到 ChannelPipeline 中, 也就是 ChannelPipeline#add*, 例如: addLast addBefore 方法执行完成后的回调, 因为这些方法中都会调用 callHandlerAdded0 方法. ;
    • channelRegistered: 当该连接分配到具体的 worker 线程后, 该回调会被调用, AbstractChannel#register 方法中调用 ChannelPipeline#fireChannelRegistered 方法;
    • channelActive: AbstractChannel#bind 方法中添加任务调用 ChannelPipeline#fireChannelActive 方法;
    • channelRead: 客户端向服务端发来数据, 每次都会回调此方法, 表示有数据可读;
    • channelReadComplete: 服务端每次读完一次完整的数据之后, 回调该方法, 表示数据读取完毕;
    • channelInactive: 断开连接时, 在 AbstractChannel.AbstractUnsafe#disconnect 和 AbstractChannel.AbstractUnsafe#deregister 方法中调用 ChannelPipeline#fireChannelInactive, 另外再 ChannelInboundHandlerAdapter#channelUnregistered 方法中会直接调用 ChannelHandlerContext#fireChannelInactive() 方法;
    • channelUnRegistered: 断开连接时, 还是在 ChannelPipeline#deregister 方法中当调用完 AbstractChannel.AbstractUnsafe#fireChannelInactive 方法后会调用 ChannelPipeline#fireChannelUnregistered, 另外再 ChannelInboundHandlerAdapter#channelUnregistered 方法中会直接调用 ChannelHandlerContext#fireChannelUnregistered() 方法;
    • handlerRemoved: 对应 handlerAdded, 将 handler 从该 channel 的 pipeline 移除后的回调方法 ChannelPipeline#remove.

    参考资料

    Netty详解之九:使用ByteBuf
    Netty使用案例 -发送队列积压导致内存泄漏(二)
    Netty详解之六:Channel
    Netty源码分析系列之writeAndFlush()下
    3. Netty源码阅读之Channel
    Netty-11-channelHandler的生命周期
    Netty学习笔记之ChannelHandler
    ChannelHandler生命周期
    Netty中ChannelHandler的生命周期
    类AbstractChannelHandlerContext
    netty 通道处理器上下文
    Netty pipeline分析(一)
    Netty源码解析之pipeline传播事件机制
    Netty学习笔记(番外篇) - ChannelHandler、ChannelPipeline和ChannelHandlerContext的联系

  • 相关阅读:
    加法问题,编程入门课
    三.Python变量,常量,注释
    二、python介绍
    一. python 安装
    Syntax behind sorted(key=lambda :)
    Java动态数组
    Package name does not correspond to the file path (IntelliJ IDEA)
    理解Java构造器中的"this"
    The Constructor with No Arguments
    160229-01、web页面常用功能js实现
  • 原文地址:https://www.cnblogs.com/scikstack/p/13524561.html
Copyright © 2011-2022 走看看