zoukankan      html  css  js  c++  java
  • Netty源码分析第4章(pipeline)---->第5节: 传播outbound事件

     

    Netty源码分析第五章: pipeline

     

    第五节: 传播outBound事件

    了解了inbound事件的传播过程, 对于学习outbound事件传输的流程, 也不会太困难

    在我们业务代码中, 有可能使用wirte方法往写数据:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().write("test data");
    }

    当然, 直接调用write方法是不能往对方channel中写入数据的, 因为这种方式只能写入到缓冲区, 还要调用flush方法才能将缓冲区数据刷到channel中, 或者直接调用writeAndFlush方法, 有关逻辑, 我们会在后面章节中详细讲解, 这里只是以wirte方法为例为了演示outbound事件的传播的流程

    这里我们同样给出两种写法:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //写法1
        ctx.channel().write("test data");
        //写法2
        ctx.write("test data");
    }

    这两种写法有什么区别, 我们首先跟到第一种写法中去:

    ctx.channel().write("test data");

    这里获取ctx所绑定的channel

    我们跟到AbstractChannel的write方法中:

    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

    这里pipeline是DefaultChannelPipeline

    跟到其write方法中:

    public final ChannelFuture write(Object msg) {
        //从tail节点开始(从最后的节点往前写)
        return tail.write(msg);
    }

    这里调用tail节点write方法, 这里我们应该能分析到, outbound事件, 是通过tail节点开始往上传播的, 带着这点猜想, 我们继往下看

     

    其实tail节点并没有重写write方法, 最终会调用其父类AbstractChannelHandlerContextwrite方法

    AbstractChannelHandlerContextwrite方法:

    public ChannelFuture write(Object msg) { 
        return write(msg, newPromise());
    }

    我们看到这里有个newPromise()这个方法, 这里是创建一个Promise对象, 有关Promise的相关知识我们会在以后的章节剖析

    我们继续跟write:

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        //代码省略
        write(msg, false, promise);
        return promise;
    }

    继续跟write:

    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //没有调flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    这里跟我们上一小节剖析过channelRead方法有点类似, 但是事件传输的方向有所不同, 这里findContextOutbound()是获取上一个标注outbound事件的HandlerContext

    跟到findContextOutbound中:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

    这里的逻辑我们似曾相识, 跟我们上一小节的findContextInbound()方法有点像, 只是过程是反过来的

    在这里, 会找到当前context的上一个节点, 如果标注的事件不是outbound事件, 则继续往上找, 意思就是找到上一个标注outbound事件的节点

     

     

    回到write方法:

    AbstractChannelHandlerContext next = findContextOutbound();

    这里将找到节点赋值到next属性中

    因为我们之前分析的write事件是从tail节点传播的, 所以上一个节点就有可能是用户自定的handler所属的context

     

    然后判断是否为当前eventLoop线程, 如果是不是, 则封装成task异步执行, 如果不是, 则继续判断是否调用了flush方法, 因为我们这里没有调用, 所以会执行到next.invokeWrite(m, promise),

    我们继续跟invokeWrite:

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    这里会判断当前handler的状态是否是添加状态, 这里返回的是true, 将会走到invokeWrite0(msg, promise)这一步

    继续跟invokeWrite0:

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            //调用当前handler的wirte()方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

    这里的逻辑也似曾相识, 调用了当前节点包装的handlerwrite方法, 如果用户没有重写write方法, 则会交给其父类处理

    我们跟到ChannelOutboundHandlerAdapterwrite方法中看:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    这里调用了当前ctxwrite方法, 这种写法和我们小节开始的写法是相同的, 我们回顾一下:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //写法1
        ctx.channel().write("test data");
        //写法2
        ctx.write("test data");
    }

    我们跟到其write方法中, 这里走到的是AbstractChannelHandlerContext类的write方法:

    private void write(Object msg, boolean flush, ChannelPromise promise) { 
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                //没有调flush
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

    又是我们所熟悉逻辑, 找到当前节点的上一个标注事件为outbound事件的节点, 继续执行invokeWrite方法, 根据之前的剖析, 我们知道最终会执行到上一个handlerwrite方法中

    走到这里已经不难理解, ctx.channel().write("test data")其实是从tail节点开始传播写事件, ctx.write("test data")是从自身开始传播写事件

     

    所以, handler中如果重写了write方法要传递write事件, 一定采用ctx.write("test data")这种方式或者交给其父类处理处理, 而不能采用ctx.channel().write("test data")这种方式, 因为会造成每次事件传输到这里都会从tail节点重新传输, 导致不可预知的错误

     

     

    如果用代码中没有重写handlerwrite方法, 则事件会一直往上传输, 当传输完所有的outbound节点之后, 最后会走到head节点的wirte方法中

    我们跟到HeadContextwrite方法中:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    我们看到write事件最终会流向这里, 通过unsafe对象进行最终的写操作

     

    有关inbound事件和outbound事件的传输, 可通过下图进行说明:

    4-5-1

     

    上一节: 传播inbound事件

    下一节: 传播异常事件

     

  • 相关阅读:
    Python-selenium 下拉框定位
    Python3+Selenium3自动化测试-(五)
    Python3+Selenium3自动化测试-(四)
    Python3+Selenium3自动化测试-(三)
    Python3+Selenium3自动化测试-(二)
    Python3+Selenium3自动化测试-(一)
    python selenium 下载安装(一)
    Elasticsearch查询——布尔查询Bool Query
    centos7 重启网卡失败
    kibana 创建饼图
  • 原文地址:https://www.cnblogs.com/xiangnan6122/p/10204459.html
Copyright © 2011-2022 走看看