zoukankan      html  css  js  c++  java
  • 事件和异常的传播 · 农场主的黑科技.

    inBound事件的传播

    • 何为inBound事件以及ChannelInboundHandler
    • ChannelRead事件的传播
      ChannelRead是典型的inbound事件,以他为例了解inbound事件的传播
    • SimpleInBoundHandler处理器

    何为inBound事件以及ChannelInboundHandler

    1546771977032

    ChannelHandler的继承关系

    • ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter.
      用户代码中常见.平时自定义channelHandler时都会继承与他们

    • ChannelHandler,所有处理器的抽象

    • ChannelHandlerAdapter
      ChannelHandler的默认实现
    • ChannelInboundHandler,ChannelOutboundHandler
      在ChannelHandler的基础上,自定义功能

    ChannelHandler定义了哪些功能

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public interface  {

    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    //handler被pipeline删除时的回调
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    //出现异常时的回调

    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    //一个注解,是否可以被多个pipeline添加
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
    // no value
    }
    }

    ChannelInboundHandler在ChannelHandler的基础上扩展了什么

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public interface ChannelInboundHandler extends  {
    //回调,Handler注册到nioEventLoop的selector上时
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    //channel的激活或失效时的回调
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    //channel读到数据或接收到连接时的回调
    //对于服务端而言是连接,对于客户端channel则是bytebuf的数据
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    //trigger一些用户自定义的事件
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    //可写状态发生了改变
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    //出现异常时的回调
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    }

    这次由ChannelRead为例,看一下inbound事件是如何传播的.

    ChannelRead事件的传播

    这里创建3个自定义的InboundHandler测试ChannelRead事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("InBoundHandlerA: " + msg);
    ctx.fireChannelRead(msg);
    }
    }
    public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("InBoundHandlerB: " + msg);//打印
    ctx.fireChannelRead(msg);//继续传播
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    //channel被激活时取到pipeline,激活channelRead事件
    ctx.channel().pipeline().fireChannelRead("hello world");
    }
    }
    public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("InBoundHandlerC: " + msg);
    ctx.fireChannelRead(msg);
    }
    }

    服务端启动代码添加childHandler的部分为

    1
    2
    3
    4
    5
    6
    7
    8
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(new InBoundHandlerA());
    ch.pipeline().addLast(new InBoundHandlerB());
    ch.pipeline().addLast(new InBoundHandlerC());
    }
    });

    此时启动后在控制台执行telnet 127.0.0.1 8888,它的后台输出结果为

    1
    2
    3
    InBoundHandlerA: hello world
    InBoundHandlerB: hello world
    InBoundHandlerC: hello world

    如果把添加childHandler的部分改为下面的顺序

    1
    2
    3
    ch.pipeline().addLast(new InBoundHandlerA());
    ch.pipeline().addLast(new InBoundHandlerC());
    ch.pipeline().addLast(new InBoundHandlerB());

    那么输出是

    1
    2
    3
    InBoundHandlerA: hello world
    InBoundHandlerC: hello world
    InBoundHandlerB: hello world

    可以推测出,inbound和添加顺序相关.在InboundHandlerBchannelActive()中打个断点.看一下它的fireChannelRead("hello world")的逻辑.会看到它会调用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
    //head节点
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
    }
    ---
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {//true
    //此时的next为head
    next.invokeChannelRead(m);
    } else {
    executor.execute(new Runnable() {
    @Override
    public void run() {
    next.invokeChannelRead(m);
    }
    });
    }
    }

    也就是说,当遇到channelRead事件时它会从调用head的invokeChannelRead.继续看

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
    //调用head的channelRead()
    ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } else {
    fireChannelRead(msg);
    }
    }
    ---
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //也是调用head的fireChannelRead
    // 这里head会把fireChannelRead原封不动的进行传播
    ctx.fireChannelRead(msg);
    }

    这个fireChannelRead是io.netty.channel.AbstractChannelHandlerContext#fireChannelRead.它会调用findContextInbound()来获取下一个inboundHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
        @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
    //获取下一个inboundHandler
    invokeChannelRead(findContextInbound(), msg);
    return this;
    }
    ---
    private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
    ctx = ctx.next;
    } while (!ctx.inbound);//寻找下一个inbound节点
    return ctx;
    }

    获取到下一个inboundHandler,也就是InboundHandlerA后它会调用io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead,这个和之前head调用的是完全相同的方法,他回去调用当前inboundHandler,也就是InboundHandlerA的readChannel方法,它就是我们在用户代码中定义的部分.

    1
    2
    3
    4
    5
    6
    7
    8
    public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    System.out.println("InBoundHandlerA: " + msg);
    ctx.fireChannelRead(msg);//继续往下传播
    }
    }

    然后InBoundHandlerAchannelRead()又会调用下一个InBoundHandlerBchannelRead(),它又会接着调用InBoundHandlerCchannelRead()

    此时回顾一下InBoundHandlerB中的逻辑,会发现.pipeline中inBound的传播方式为:

    1
    2
    ctx.fireChannelRead(msg);	//从当前节点继续往下传播
    ctx.channel().pipeline().fireChannelRead("hello world");//从head节点开始往下传播

    当传播到最后一个节点(C)时,它会传播到最后的Tail节点,调用它的channelRead().之前说过tial是用来做一些收尾工作,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //事件一直传播到tail,
    //tail会认为没有被处理,这个方法会打印一个logger.debug.来提醒开发者
    onUnhandledInboundMessage(msg);
    }
    ---
    protected void onUnhandledInboundMessage(Object msg) {
    try {
    logger.debug(
    "Discarded inbound message {} that reached at the tail of the pipeline. " +
    "Please check your pipeline configuration.", msg);
    } finally {
    //如果时bytebuf就进行释放
    ReferenceCountUtil.release(msg);
    }
    }

    SimpleInBoundHandler处理器

    以下面的自定义Handler为例,看一下它的使用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //如果没有把这里的ByteBuf传播到tail,那么tail节点就不会帮你释放这段ByteBuf
    //通常这种情况下你需要手动进行释放,
    //而SimpleChannelInboundHandler会帮你自动释放他们
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf password) throws Exception {
    if (paas(password)) {
    ctx.pipeline().remove(this);
    } else {
    ctx.close();
    }
    }

    private boolean paas(ByteBuf password) {
    return false;
    }
    }

    看一下SimpleChannelInboundHandlerchannelRead()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    boolean release = true;
    try {
    //如果是inbound
    if (acceptInboundMessage(msg)) {
    //转为bytebuf
    I imsg = (I) msg;
    //抽象方法
    //继承SimpleChannelInboundHandler时你可以做一些你想干的,并且不用考虑释放
    channelRead0(ctx, imsg);
    } else {
    release = false;
    ctx.fireChannelRead(msg);
    }
    } finally {
    if (autoRelease && release) {
    //自动释放
    ReferenceCountUtil.release(msg);
    }
    }
    }

    也就是说使用SimpleChannelInboundHandler时不需要管channelRead()而是通过channelRead0()来做原本在channelRead()中的逻辑.因为SimpleChannelInboundHandler的channelRead()中定义了从执行channelRead0()直到释放的过程,所以当channelRead0()被执行后它会自动帮你去释放

    outBound事件的传播

    • 何为outBound事件以及ChannelOutBoundHandler
    • write()事件的传播
      典型的outBound事件

    何为outBound事件以及ChannelOutBoundHandler

    ChannelOutboundHandler在ChannelHandler的基础上扩展了什么

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public interface ChannelOutboundHandler extends  {
    //端口绑定,服务端启动时的那个
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect(
    ChannelHandlerContext ctx, SocketAddress remoteAddress,
    SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;


    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;


    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;


    void flush(ChannelHandlerContext ctx) throws Exception;
    }

    相对于InBounder,outBound更像是用户主动发起的操作.而InBounder更类似于事件触发

    write()事件的传播

    这里创建3个自定义的OutboundHandler测试write事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    System.out.println("OutBoundHandlerA: " + msg);
    ctx.write(msg, promise);
    }
    }
    ---
    public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    System.out.println("OutBoundHandlerB: " + msg);
    ctx.write(msg, promise);
    }


    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) {
    //定时器,模拟服务端处理完成数据后给客户端的响应
    ctx.executor().schedule(() -> {
    ctx.channel().write("hello world");
    }, 3, TimeUnit.SECONDS);
    }
    }
    大专栏  事件和异常的传播 · 农场主的黑科技.pan class="line">---
    public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    System.out.println("OutBoundHandlerC: " + msg);
    ctx.write(msg, promise);
    }
    }

    服务端创建代码

    1
    2
    3
    4
    5
    6
    7
    8
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(new OutBoundHandlerA());
    ch.pipeline().addLast(new OutBoundHandlerB());
    ch.pipeline().addLast(new OutBoundHandlerC());
    }
    });

    输出为以下,和添加顺序正好相反.也就是说outBound的添加顺序和pipeline中的传播顺序是相反的

    1
    2
    3
    OutBoundHandlerC: hello world
    OutBoundHandlerB: hello world
    OutBoundHandlerA: hello world

    下面看看write()的源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
        @Override
    public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
    }
    ---
    @Override
    public final ChannelFuture write(Object msg) {
    //注意,这里会调用tail的write
    return tail.write(msg);
    }
    ---
    @Override
    public ChannelFuture write(Object msg) {
    //newPromise()创建一个空的回调
    return write(msg, newPromise());
    }

    这个pipeline的write()实际会调用tail的write,这里的newPromise()是一个回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
        @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    write(msg, false, promise);//flash=false
    return promise;
    }
    ---
    private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    //..
    }

    通过findContextOutbound()找到下一个传播对象,看一下它的源码

    1
    2
    3
    4
    5
    6
    7
    private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
    ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
    }

    和inbound的时候相似,通过while找前面那个outbound节点,此时会返回最后添加的C节点,继续看这个write()的逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
        private void write(Object msg, boolean flush, ChannelPromise promise) {
    //找到下一个节点
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {//true
    if (flush) {//参数传的false
    next.invokeWriteAndFlush(m, promise);
    } else {
    //最终调用这个
    next.invokeWrite(m, promise);
    }
    } else {
    //..
    }

    也就是说write()会调用下一个outbound节点的invokeWrite()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
        private void invokeWrite(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {//true
    invokeWrite0(msg, promise);//调用这个
    } else {
    write(msg, promise);
    }
    }
    ---
    private void invokeWrite0(Object msg, ChannelPromise promise) {
    //拿到ChannelOutboundHandler(节点C)后,调用它的write()方法
    ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    }

    也就是会调用用户代码自定义的write()

    1
    2
    3
    4
    5
    6
    7
    8
    public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    System.out.println("OutBoundHandlerC: " + msg);
    ctx.write(msg, promise); //继续往下传播
    }
    }

    这个write会从当前节点调用io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise),寻找下一个传播对象,并调用它的write

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
        @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    write(msg, false, promise);
    return promise;
    }
    ---
    private void write(Object msg, boolean flush, ChannelPromise promise) {
    //这时返回的就是节点B
    AbstractChannelHandlerContext next = findContextOutbound();
    //..
    }

    后面节点B又会通过相同逻辑调用A.A节点又会调用相同逻辑,但此时返回的next是head节点.也就是head节点的wtite()方法.

    1
    2
    3
    4
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){
    unsafe.write(msg, promise);
    }

    总结一下就是

    1
    2
    ctx.channel().write("hello world");	//从tail开始传播
    ctx.write(msg, promise); //从当前节点开始往下传播

    异常的传播

    • 异常的触发链
    • 异常处理的最佳实践

    异常的触发链

    在服务端启动代码中添加6个Handler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(new InBoundHandlerA());
    ch.pipeline().addLast(new InBoundHandlerB());
    ch.pipeline().addLast(new InBoundHandlerC());
    ch.pipeline().addLast(new OutBoundHandlerA());
    ch.pipeline().addLast(new OutBoundHandlerB());
    ch.pipeline().addLast(new OutBoundHandlerC());
    }
    });

    InBoundHandlerB改为以下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //显示的抛出异常,模拟读数据的过程中抛出异常
    throw new BusinessException("from InBoundHandlerB");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
    ctx.channel().pipeline().fireChannelRead("hello world");
    }
    }

    InBoundHandlerA,InBoundHandlerC,OutBoundHandlerA,OutBoundHandlerB,OutBoundHandlerC中加入以下逻辑

    1
    2
    3
    4
    5
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InBoundHandlerA.exceptionCaught()");//类名
    ctx.fireExceptionCaught(cause);
    }

    通过telnet写入数据,此时会抛出异常

    1
    2
    $ telnet 127.0.0.1 8888
    $ send ayt

    服务端的输出为

    1
    2
    3
    4
    5
    6
    InBoundHandlerB.exceptionCaught()
    InBoundHandlerC.exceptionCaught()
    OutBoundHandlerA.exceptionCaught()
    OutBoundHandlerB.exceptionCaught()
    OutBoundHandlerC.exceptionCaught()
    //..以下异常信息

    可以看出inBound的输出顺序和inbound事件的传播顺序很相似,但outBound的异常传播顺序和之前的outBound事件传播顺序是相反的.

    跟踪源码看一下,抛出异常的InBoundHandlerB#channelRead的调用顺序是怎样的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
    try {
    //调用InBoundHandlerB#channelRead
    ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) { //会捕获到异常
    notifyHandlerException(t);
    }
    } else {
    fireChannelRead(msg);
    }
    }

    跟踪抛出异常时会执行的notifyHandlerException(t)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
        private void notifyHandlerException(Throwable cause) {
    invokeExceptionCaught(cause);
    }
    ---
    private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
    try {
    //它会调用InBoundHandlerB#exceptionCaught
    handler().exceptionCaught(this, cause);
    } catch (Throwable error) {
    }
    } else {
    fireExceptionCaught(cause);
    }
    }

    也就是说在InBoundHandlerB#channelRead抛出异常时它会回调InBoundHandlerB#exceptionCaught,

    它里面通过ctx.fireExceptionCaught(cause);把异常事件继续进行传播.接着看

    1
    2
    3
    4
    5
    6
    7
    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
    //直接拿到当前节点的next节点进行传播,next和节点的添加顺序相关
    //当前节点为InBoundHandlerB,那么它的next是C
    invokeExceptionCaught(next, cause);
    return this;
    }

    InBoundHandlerB会调用C的异常捕获

    1
    2
    3
    4
    5
    6
    7
    8
      static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    next.invokeExceptionCaught(cause);//这个
    } else {
    //...
    }
    }

    接下来就和InB调用同一个方法,执行C的exceptionCaught()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
        private void invokeExceptionCaught(final Throwable cause) {
    if (invokeHandler()) {
    try {
    handler().exceptionCaught(this, cause);
    } catch (Throwable error) {
    }
    } else {
    fireExceptionCaught(cause);
    }
    }
    ---
    //用户代码InBoundHandlerC
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InBoundHandlerC.exceptionCaught()");
    ctx.fireExceptionCaught(cause);
    }

    InBoundHandlerC的下一个节点则是OutBoundHandlerA,再下一个则是OutBoundHandlerB

    1546822515412

    那么OutBoundHandlerC调用ctx.fireExceptionCaught(cause);继续往下传播异常时会传播到哪?Tail节点的exceptionCaught()

    Tail节点的exceptionCaught():

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    onUnhandledInboundException(cause);
    }
    ---
    protected void onUnhandledInboundException(Throwable cause) {
    try {
    logger.warn(
    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
    "It usually means the last handler in the pipeline did not handle the exception.",
    cause);
    } finally {
    ReferenceCountUtil.release(cause);
    }
    }

    提醒用户代码异常尚未被处理.上面的示例程序中它会打印

    1
    2
    3
    4
    5
    6
    7
    8
    Jan 07, 2019 10:00:35 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
    WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    com.imooc.netty.ch6.BusinessException: from InBoundHandlerB
    at com.imooc.netty.ch6.InBoundHandlerB.channelRead(InBoundHandlerB.java:12)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
    //...

    以上就是异常的触发链.可以发现它的异常触发和是inBound还是outBound没有关系,只会每次调用next触发下一个节点,也就是说和Handler的添加顺序有关.

    那么如果没有在用户代码中重载exceptionCaught(),它的父类会做哪些事?

    1
    2
    3
    4
    5
       //io.netty.channel.ChannelHandlerAdapter#exceptionCaught
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.fireExceptionCaught(cause);
    }

    就是直接把异常往下传播

    异常处理的最佳实践

    项目中常用的异常处理方式:在每一条channel的最后给它添加一个终极的Exception处理器,如:

    1
    2
    3
    4
    5
    6
    7
    8
    public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (cause instanceof BusinessException){
    System.out.println("BusinessException");
    }
    }
    }

    在pipeline的最后添加ExceptionCaughtHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
    ch.pipeline().addLast(new InBoundHandlerA());
    ch.pipeline().addLast(new InBoundHandlerB());
    ch.pipeline().addLast(new InBoundHandlerC());
    ch.pipeline().addLast(new OutBoundHandlerA());
    ch.pipeline().addLast(new OutBoundHandlerB());
    ch.pipeline().addLast(new OutBoundHandlerC());
    //终极的Exception处理器
    ch.pipeline().addLast(new ExceptionCaughtHandler());
    }
    });

    此时的输出为以下,说明异常在最后被处理了

    1
    2
    3
    4
    5
    6
    InBoundHandlerB.exceptionCaught()
    InBoundHandlerC.exceptionCaught()
    OutBoundHandlerA.exceptionCaught()
    OutBoundHandlerB.exceptionCaught()
    OutBoundHandlerC.exceptionCaught()
    BusinessException

    总结

    • netty如何判断ChannelHandler类型的?
      添加一个节点时,pipeline会通过一个instanceof来判断是inbound类型还是outbound类型,然后用布尔变量来标示

    • 对于ChannelHandler的添加应该遵循什么样的顺序?
      inBound事件的传播顺序和inBoundHandler的添加顺序相同
      outBound事件的传播顺序和outBoundHandler的添加顺序相反

    • 用户手动触发事件传播,不同的触发方式有什么样的区别?

      • inbound事件,如

        1
        2
        ctx.fireChannelRead(msg);	//从当前节点继续往下传播
        ctx.channel().pipeline().fireChannelRead("hello world");//从head节点开始往下传播
      • outbound事件,如

        1
        2
        ctx.write(msg, promise);	//从当前节点开始往下传播
        ctx.channel().write("hello world"); //从tail开始传播
  • 相关阅读:
    [时间篇TIME]Learn with whole Life 一生的学习
    使用IIS内置压缩功能,增加网站访问速度
    CSS中A链接样式的 "爱恨"原则
    爱情六十三课,定个开放日
    给爱子的信
    在IE中使用高级CSS3选择器
    爱情六十七课,下台阶的学问
    网上常用免费WebServices集合
    美国狗证上的10句话
    爱情六十五课,情爱无智者
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12366658.html
Copyright © 2011-2022 走看看