zoukankan      html  css  js  c++  java
  • ctx.close() 和 ctx.channel().close() 到底有何区别?

    我最近在项目中,遇到一个问题,ctx.close() 和 ctx.channel().close() 到底有何区别?

    即调用 ChannelHandlerContext#close() 和 Channel#close() 有何不同?

    从现象来看

    建议先看一下下面这篇文章:

    [翻译]Netty4中 Ctx.close 与 Ctx.channel.close 的区别 跳转 click here

    假如我们有这样一个 双向处理器 SomeHandler

    import io.netty.channel.*;
    
    @ChannelHandler.Sharable
    public class SomeHandler extends ChannelDuplexHandler {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println(ctx.name() + " channelRead: " + msg);
            String result = (String) msg;
            if (("ctx.close." + ctx.name()).equals(result)) {
                ctx.close();
            } else if (("ctx.channel.close." + ctx.name()).equals(result)) {
                ctx.channel().close();
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            ctx.fireChannelInactive();
        }
    
        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
            System.out.println(ctx.name() + " close");
            ctx.close(promise);
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            System.out.println(ctx.name() + " write");
            ctx.write(String.format("[%s]%s", ctx.name(), msg), promise);
        }
    }
    

    假如,我们的服务器端构建的管道:

    ChannelPipeline p = ...;
    p.addLast("A", new SomeHandler());
    p.addLast("B", new SomeHandler());
    p.addLast("C", new SomeHandler());
    ...
    
    完整的 服务端代码 点击此处
    
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup boss = new NioEventLoopGroup(1);
            NioEventLoopGroup worker = new NioEventLoopGroup();
            StringDecoder stringDecoder = new StringDecoder();
            StringEncoder stringEncoder = new StringEncoder();
            SomeHandler aHandler = new SomeHandler();
            SomeHandler bHandler = new SomeHandler();
            SomeHandler cHandler = new SomeHandler();
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .channel(NioServerSocketChannel.class)
                    .group(boss, worker)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel channel) {
                            channel.pipeline()
                                    .addLast("decoder", stringDecoder)
                                    .addLast("encoder", stringEncoder)
                                    .addLast("A", aHandler)
                                    .addLast("B", bHandler)
                                    .addLast("C", cHandler);
                        }
                    });
            bootstrap.bind(8098).sync();
        }
    }
    

    如果,客户端 发送给服务端的数据是 ctx.close.B,输出日志将是:

    A channelRead: ctx.close.B
    B channelRead: ctx.close.B
    A close

    这里,你没有看到输出 B close ,但是不用惊讶,因为,你调用的是上下文 ctx.close() 方法,它是不会再去调用当前处理器对象的 close 方法的。

    如果,客户端 发送给服务端的数据是 channel.close.B,输出的日志将是:

    A channelRead: ctx.channel.close.B
    B channelRead: ctx.channel.close.B
    C close
    B close
    A close

    从源码来说

    ctx.close()

    通常,ctx 的类是 DefaultChannelHandlerContext,在调用 close 或者 writeAndFlush 这类出站方法时,最终会调用 AbstractChannelHandlerContext 的方法:

    abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
        @Override
        public ChannelFuture close() {
            return close(newPromise());
        }
        
        @Override
        public ChannelFuture close(final ChannelPromise promise) {
            if (isNotValidPromise(promise, false)) {
                // cancelled
                return promise;
            }
            
            // 这个方法寻找下一个能够处理 CLOSE 事件的出站处理器
            final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeClose(promise);
            } else {
                safeExecute(executor, new Runnable() {
                    @Override
                    public void run() {
                        next.invokeClose(promise);
                    }
                }, promise, null, false);
            }
    
            return promise;
        }
    
        @Override
        public ChannelFuture writeAndFlush(Object msg) {
            return writeAndFlush(msg, newPromise());
        }
    
        @Override
        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
            write(msg, true, promise);
            return promise;
        }
        
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            ObjectUtil.checkNotNull(msg, "msg");
            try {
                if (isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    // cancelled
                    return;
                }
            } catch (RuntimeException e) {
                ReferenceCountUtil.release(msg);
                throw e;
            }
            
            // 这个方法寻找下一个能够处理相应事件的处理器
            final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                    (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
                if (!safeExecute(executor, task, promise, m, !flush)) {
                    // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                    // and put it back in the Recycler for re-use later.
                    //
                    // See https://github.com/netty/netty/issues/8343.
                    task.cancel();
                }
            }
        }    
    
    
        private AbstractChannelHandlerContext findContextOutbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            EventExecutor currentExecutor = executor();
            do {
                // 向前寻找出站处理器
                ctx = ctx.prev;
            } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
            return ctx;
        }
    }
    

    注意哦,出站方法是向前寻找处理器。

    ctx.channel().close()

    通常,ctx.channel() 返回的类可以是 NioSocketChannel,在调用 close 或者 writeAndFlush 这类出站方法时,
    会调用 AbstractChannelclose 或者 writeAndFlush 方法,
    再接着就是调用 DefaultChannelPipelineclose 或者 writeAndFlush 方法:

    public class DefaultChannelPipeline implements ChannelPipeline {
        @Override
        public final ChannelFuture close(ChannelPromise promise) {
            return tail.close(promise);
        }
    
        @Override
        public final ChannelFuture writeAndFlush(Object msg) {
            return tail.writeAndFlush(msg);
        }
    }
    

    你会发现,都是从管道的 TailContext 开始调用,因此所有符合要求的出站处理器都将被执行。

    但是,你还要注意,处理器的写法

    public class SomeHandler extends ChannelOutboundHandlerAdapter {
        /**
         * 如果不覆写这个 close 方法,直接运行父类的方法,也是正常的。
         */
        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
            // 如果出站处理器中,不加下面这段话,可能会导致 Channel 无法被正常关闭!
            ctx.close(promise);
        }
        /**
         * 如果不覆写这个 write 方法,直接运行父类的方法,也是正常的。
         */
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            // 如果出站处理器中,不加下面这段话,可能会导致 Channel 无法正常发出消息!
            ctx.write(msg, promise);
        }
    }
    
  • 相关阅读:
    Flink1.9重大改进和新功能
    【2020】DBus,一个更能满足企业需求的大数据采集平台
    大数据运维:大数据平台+海量数据
    大数据运维尖刀班 | 集群_监控_CDH_Docker_K8S_两项目_腾讯云服务器
    离线数仓和实时数仓架构与设计
    【全集】IDEA入门到实战
    Mysql快速入门
    RabbitMQ安装
    消息队列MQ简介
    C#特性
  • 原文地址:https://www.cnblogs.com/kendoziyu/p/14787189.html
Copyright © 2011-2022 走看看