zoukankan      html  css  js  c++  java
  • Netty源码分析第4章(pipeline)---->第4节: 传播inbound事件

     

    Netty源码分析第四章: pipeline

     

    第四节: 传播inbound事件

     

     

    有关于inbound事件, 在概述中做过简单的介绍, 就是以自己为基准, 流向自己的事件, 比如最常见的channelRead事件, 就是对方发来数据流的所触发的事件, 己方要对这些数据进行处理, 这一小节, 以激活channelRead为例讲解有关inbound事件的处理流程

     

    在业务代码中, 我们自己的handler往往会通过重写channelRead方法来处理对方发来的数据, 那么对方发来的数据是如何走到channelRead方法中了呢, 也是我们这一小节要剖析的内容

     

    在业务代码中, 传递channelRead事件方式是通过fireChannelRead方法进行传播的

     

    这里给大家看两种写法:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }

    这里重写了channelRead方法, 并且方法体内继续通过fireChannelRead方法进行传播channelRead事件, 那么这两种写法有什么异同?

    我们先以写法2为例, 将这种写法进行剖析

    这里首先获取当前context的pipeline对象, 然后通过pipeline对象调用自身的fireChannelRead方法进行传播, 因为默认创建的DefaultChannelpipeline

    我们跟到DefaultChannelpipeline的fireChannelRead方法中:

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    这里首先调用的是AbstractChannelHandlerContext类的静态方法invokeChannelRead, 参数传入head节点和事件的消息

    我们跟进invokeChannelRead方法:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    这里的Object m m通常就是我们传入的msg, 而next, 目前是head节点, 然后再判断是否为当前eventLoop线程, 如果不是则将方法包装成task交给eventLoop线程处理

    我们跟到invokeChannelRead方法中:

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    首先通过invokeHandler()判断当前handler是否已添加, 如果添加, 则执行当前handler的chanelRead方法, 其实这里我们基本上就明白了, 通过fireChannelRead方法传递事件的过程中, 其实就是找到相关handler执行其channelRead方法, 由于我们在这里的handler就是head节点, 所以我们跟到HeadContext的channelRead方法中:

    HeadContext的channelRead方法:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //向下传递channelRead事件
        ctx.fireChannelRead(msg);
    }

    在这里我们看到, 这里通过fireChannelRead方法继续往下传递channelRead事件, 而这种调用方式, 就是我们刚才分析用户代码的第一种调用方式:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }

    这里直接通过context对象调用fireChannelRead方法, 那么和使用pipeline调用有什么区别的, 我会回到HeadConetx的channelRead方法, 我们来剖析ctx.fireChannelRead(msg)这句, 大家就会对这个问题有答案了, 跟到ctx的fireChannelRead方法中, 这里会走到AbstractChannelHandlerContext类中的fireChannelRead方法中

    跟到AbstractChannelHandlerContext类中的fireChannelRead方法:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    这里我们看到, invokeChannelRead方法中传入了一个findContextInbound()参数, 而这findContextInbound方法其实就是找到当前Context的下一个节点

    跟到findContextInbound方法:

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

    这里的逻辑也比较简单, 是通过一个doWhile循环, 找到当前handlerContext的下一个节点, 这里要注意循环的终止条件, while (!ctx.inbound)表示下一个context标志的事件不是inbound的事件, 则循环继续往下找, 言外之意就是要找到下一个标注inbound事件的节点

    有关事件的标注, 之前的小节已经剖析过了, 如果是用户定义的handler, 是通过handler继承的接口而定的, 如果tail或者head, 那么是在初始化的时候就已经定义好, 这里不再赘述

    回到AbstractChannelHandlerContext类的fireChannelRead方法中:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

    找到下一个节点后, 继续调用invokeChannelRead方法, 传入下一个和消息对象:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        //第一次执行next其实就是head
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    这里的逻辑我们又不陌生了, 因为我们传入的是当前context的下一个节点, 所以这里会调用下一个节点invokeChannelRead方法, 因我们刚才剖析的是head节点, 所以下一个节点有可能是用户添加的handler的包装类HandlerConext的对象

    这里我们跟进invokeChannelRead方法中去:

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try { 
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                //发生异常的时候在这里捕获异常
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

    又是我们熟悉的逻辑, 调用了自身handler的channelRead方法, 如果是用户自定义的handler, 则会走到用户定义的channelRead()方法中去, 所以这里就解释了为什么通过传递channelRead事件, 最终会走到用户重写的channelRead方法中去

    同样, 也解释了该小节最初提到过的两种写法的区别:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //写法1:
        ctx.fireChannelRead(msg);
        //写法2
        ctx.pipeline().fireChannelRead(msg);
    }

    写法1是通过当前节点往下传播事件

    写法2是通过头节点往下传递事件

    所以, 在handler中如果如果要在channelRead方法中传递channelRead事件, 一定要采用写法2的方式向下传递, 或者交给其父类处理, 如果采用1的写法则每次事件传输到这里都会继续从head节点传输, 从而陷入死循环或者发生异常

    这里有一点需要注意, 如果用户代码中channelRead方法, 如果没有显示的调用ctx.fireChannelRead(msg)那么事件则不会再往下传播, 则事件会在这里终止, 所以如果我们写业务代码的时候要考虑有关资源释放的相关操作

    如果ctx.fireChannelRead(msg)则事件会继续往下传播, 如果每一个handler都向下传播事件, 当然, 根据我们之前的分析channelRead事件只会在标识为inbound事件的HandlerConetext中传播, 传播到最后, 则最终会调用到tail节点的channelRead方法

    我们跟到tailConext的channelRead方法中:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

    我们跟进到onUnhandledInboundMessage方法中:

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            //释放资源
            ReferenceCountUtil.release(msg);
        }
    }

    这里做了释放资源的相关的操作

    至此, channelRead事件传输相关罗辑剖析完整, 其实对于inbound事件的传输流程都会遵循这一逻辑, 小伙伴们可以自行剖析其他inbound事件的传输流程

     

    上一节: handler的删除

    下一节: 传播outbound事件

  • 相关阅读:
    日志类
    sql查询数据并导出问题
    高并发系统设计(十七):【系统架构】微服务化后,系统架构要如何改造?
    高并发系统设计(十五):【消息队列】如何降低消息队列系统中消息的延迟?
    高并发系统设计(十四):【消息队列】如何消息不丢失?并且保证消息仅仅被消费一次?
    高并发系统设计(十三):消息队列的三大作用:削峰填谷、异步处理、模块解耦
    高并发系统设计(十二):【缓存的正确使用姿势】缓存穿透了怎么办?如何最大程度避免缓存穿透
    高并发系统设计(十一):【缓存的正确使用姿势】缓存如何做到高可用?
    ThinkPad X1 Carbon无法识别第二屏幕
    如何设置两个TPLink路由器桥接
  • 原文地址:https://www.cnblogs.com/xiangnan6122/p/10204431.html
Copyright © 2011-2022 走看看