zoukankan      html  css  js  c++  java
  • 处理业务:事件是如何在 pipeline 中传播的

    处理业务:事件是如何在 pipeline 中传播的

    Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

    在上一节 接收数据:自适应缓冲区和连接读是为了解决什么问题 中,我们知道 NioEventLoop 不断的轮询,接收 OP_READ 事件;然后将读取到的数据通过 pipeline.fireChannelRead(byteBuf) 传播出去。所以业务的处理实际上是在 pipeline 的 channelRead() 上处理的,本小节也主要关注事件是在 pipeline 中传播行为。

    事件在 pipeline 中的传播,需要关注事件的传播行为和执行线程:

    1. 传播行为:事件执行到某个 Handler 后,如果不手动触发 ctx.fireChannelRead,则传播中断。
    2. 执行线程:业务线程默认是在 NioEventLoop 中执行。如果业务处理有阻塞,需要考虑另起线程执行。

    1. 主线分析

    1.1 主线

    主线其实在上一节已经讲了,在读取数据时会触发 pipeline.fireChannelRead(byteBuf) 把读取到的数据传播出去。我们现在重点分析事件是如何在 pipeline 中传播的。

    Handler 执行资格:

    • 实现 ChannelInboundHandler
    • 实现方法 channelRead 不能加注解 @Skip
    • Handler 执行链是可以被中断的。如果不主动触发 ctx.fireChannelRead 方法,则不会再继续往下执行。

    1.2 知识点

    (1)处理业务本质

    数据在 pipeline 中所有的 ChannelInboundHandler 的 channelRead() 执行。并且执行可以被中断。

    (2)业务处理线程

    默认处理线程是 Channel 绑定的 NioEventLoop 线程,也可以设置其他线程:

    pipeline.addLast(new UnorderedThreadPoolEventExecutor(10), serverHandler);
    

    2. 源码分析

    2.1 fireChannelRead 在 pipeline 中的传播

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    说明: 可以看到 fireChannelRead 是从 head -> tail 一直身后传播。

    2.1 传播行为

    pipeline 中一旦被中间某个 Handler 执行,则传播行为中断。如果需要继续执行下去,则需要主动调用 ctx.fireChannelRead。

    // AbstractChannelHandlerContext
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
        return this;
    }
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
    

    说明: 可以看到要么执行 channelRead 方法,要么执行 fireChannelRead 方法直到找到一个对应的 Handler 为止。如果不主要调用 ctx.fireChannelRead,则传播行为会中断。

    可能会奇怪,通过 findContextInbound() 不是找到对应的 Handler 了,为什么还需要通过 invokeHandler() 再判断一次?其实,findContextInbound 方法是查找有 channelRead 的 Handler,而 invokeHandler 方法则是判断这个 Handler 是否被删除了。

    2.3 执行线程

    每个 Handler 都是在自己对应的 executor 中执行,默认为 NioEventLoop 线程。当然也可以自己指定其它线程。

    // AbstractChannelHandlerContext
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    

    说明: 我们需要重点关注业务的执行线程,因为如果业务占用时间过长,会影响 Netty IO 的吞吐率。

    2.4 ChannelPipeline vs ChannelHandlerContext

    Netty 将每个 Handler 都包装成 ChannelHandlerContext 添加到 ChannelPipeline 中。虽然说 ChannelPipeline 也就调用 ChannelHandlerContext 中的方法,但 pipeline 会从 head 或 tail 开始遍历,而 ctx 只会从当前 hander 开始遍历。

    我们还是拿以下四个 read 对比:channel.read()、pipeline.read()、ctx.read()、unsafe.read()

    • channel.read():直接调用 pipeline.read()。
    • pipeline.read():调用 tail.read(),从 head 或 tail 经历全部的 Handler。实际上,最后的 head.read() 调用 unsafe.beginRead(),这个方法会注册 OP_ACCEPT 或 OP_READ 事件从而激活 Channel。
    • ctx.read():从当前 ctx 开始之后的全部的 Handler。如果发送数据,需要使用 ctx.write 而不是 ctx.channel().write。
    • unsafe.read():最底层的 API。和 unsafe.beginRead() 不同,unsafe#read 会真正从 socket revbuf 读取数据。

    2.5 HeadContext vs TailContext

    • HeadContext:
      • inbound 由事件触发,从 head -> tail,所以需要调用 ctx.firexxx 将事件传播下去。
      • outbound 由用户触发,从 tail -> head,通常会直接调用到 head。如果中间某个 Handler 重新自定义实现该方法,则不会再向下调,见 AbstractChannelHandlerContext#invokeWrite0。
    • TailContext:基本上没什么功能,一直向下传播事件即可。
    HeadContext 功能
    方法 执行顺序 功能说明
    bind outbound unsafe.bind
    connect outbound unsafe.connect
    disconnect outbound unsafe.disconnect
    close outbound unsafe.close
    deregister outbound unsafe.deregister
    read outbound unsafe.beginRead:注册感兴趣事件
    write outbound unsafe.write
    flush outbound unsafe.flush
    exceptionCaught inbound ctx.fireExceptionCaught
    channelRegistered inbound callHandlerAddedForAllHandlers
    ctx.fireChannelRegistered
    channelUnregistered inbound ctx.fireChannelUnregistered
    destroy
    channelActive inbound ctx.fireChannelActive
    readIfIsAutoRead:调用unsafe.beginRead
    channelInactive inbound ctx.fireChannelInactive()
    channelRead inbound ctx.fireChannelRead
    channelReadComplete inbound ctx.fireChannelReadComplete
    readIfIsAutoRead
    userEventTriggered inbound ctx.fireUserEventTriggered
    channelWritabilityChanged inbound ctx.fireChannelWritabilityChanged
    TailContext 功能
    方法 执行顺序 功能说明
    channelRead inbound ReferenceCountUtil.release(msg)

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    Python基础学习九 数据库备份
    Python基础学习八 写日志
    Python 小练习三 发邮件
    Python基础补充(二) 多核CPU上python多线程并行的一个假象【转】
    pat 1118 Birds in Forest (25分) 并查集
    Java Map实现按value从大到小排序
    java实现排列组合(通俗易懂)
    java实现24点游戏代码
    eclipse搭建struts2环境及所遇到的问题
    java非常好用的读取文件的流的代码
  • 原文地址:https://www.cnblogs.com/binarylei/p/12641011.html
Copyright © 2011-2022 走看看