zoukankan      html  css  js  c++  java
  • Netty ChannelPipeline组件作用

    ChannelPipeline是ChannelHandler的容器,类似于Servlet的Filter过滤器,它负责处理和拦截 inbound(入站) 或者 outbound(出站) 的事件和操作;(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权);

    每一个新创建的 Channel(也可以说是每创建一个新的连接) 都将会被分配一个新的 ChannelPipeline,这项关联是永久性的; Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的;在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预;

    //把一个业务处理类(handler)添加到链中的第一个位置
    ChannelPipeline addFirst(ChannelHandler... handlers);
    
    //把一个业务处理类(handler)添加到链中的最后一个位置
    ChannelPipeline addLast(ChannelHandler... handlers);
    

      

    事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler,处理;随后, 通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler;

    ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互;保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象;即ChannelHandlerContext中包含一个具体的事件处理器 ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和channel的信息,方便对 ChannelHandler进行调用;

      

    通过 ChannelHandlerContext 触发的操作的事件流,ChannelHandler可以通知其所属的ChannelPipeline 中 的下一 个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline;

    虽然被调用的 Channel ChannelPipeline 上的 write()方法将一直传播事件通过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的;类似下面这张图;

    对于客户端和服务端,入站和出站是相对的;以客户端应用程序为例,如果事件的传播方向是从客户端到服务端的(发送请求到服务端),那么对于客户端为出站的(即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理),对于服务端是入站的;

    Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者的事件传播会经过整个ChannelPipeline,而ChannelHandlerContext就只会在后续的Handler里面传播;

    ChannelInboundHandler之间的传递,主要通过调用ChannelHandlerContext里面的FireXXX()方法来实现下个Handler的调用;

    关于有多个入站出站ChannelHandler的执行顺序,测试如下;

    • 服务端

      服务端启动类添加Handler

    serverBootstrap.group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new InboundHandler1());
                ch.pipeline().addLast(new InboundHandler2());
                ch.pipeline().addLast(new OutboundHandler1());
                ch.pipeline().addLast(new OutboundHandler2());
            }
        });

      OutboundHandler1

    public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
        private final static Logger log = LoggerFactory.getLogger(OutboundHandler1.class);
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    
            ByteBuf data = (ByteBuf) msg;
            log.info("|OutboundHandler1 write : "+data.toString(CharsetUtil.UTF_8));
            ctx.write(Unpooled.copiedBuffer("OutboundHandler1 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
            ctx.flush();
        }
    }
    

      OutboundHandler2

    public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
        private final static Logger log = LoggerFactory.getLogger(OutboundHandler2.class);
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    
            ByteBuf data = (ByteBuf) msg;
            log.info("|OutboundHandler2 write : "+data.toString(CharsetUtil.UTF_8));
            ctx.write(Unpooled.copiedBuffer("OutboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
            ctx.flush();
        }
    }
    

      InboundHandler1

    public class InboundHandler1 extends ChannelInboundHandlerAdapter {
        private final static Logger log = LoggerFactory.getLogger(InboundHandler1.class);
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            ByteBuf data = (ByteBuf) msg;
            log.info("|InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
    
            // 执行下一个InboundHandler
            ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

      InboundHandler2

    public class InboundHandler2 extends ChannelInboundHandlerAdapter {
        private final static Logger log = LoggerFactory.getLogger(InboundHandler2.class);
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            ByteBuf data = (ByteBuf) msg;
            log.info("|InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8));
            ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    • 客户端

      客户端启动类

    bootstrap.group(group)
    	.channel(NioSocketChannel.class)
    	.remoteAddress(new InetSocketAddress(host, port))
    	.option(ChannelOption.TCP_NODELAY,true)
    	.handler(new ChannelInitializer<SocketChannel>() {
    		@Override
    		protected void initChannel(SocketChannel ch) throws Exception {
    			ch.pipeline().addLast(new EchoClientHandler());
    		}
    	});
    

      

      EchoClientHandler

    public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        private final static Logger logger = LoggerFactory.getLogger(EchoServerHandler.class);
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    
            logger.info("Client received: " + msg.toString(CharsetUtil.UTF_8));
    
        }
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("Active");
            ctx.writeAndFlush(Unpooled.copiedBuffer("echo test...",CharsetUtil.UTF_8));
        }
    
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            logger.info("EchoClientHandler channelReadComplete");
            ctx.close();
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    

      

    执行结果如下:

    两个OutboundHandler并没有执行,很奇怪,翻看源码,发现是InboundHandler的使用不对; 

    pipeline上添加handler最终会往执行下面的流程;

    io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)

    io.netty.channel.DefaultChannelPipeline#addLast0

    最终pipeline上的实例如下:

    如下图

     

    • 使用pipeline或channel发送数据

     将上面的InboundHandler2的channelRead方法中的发送数据方式改成如下方式:

    ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
    

      

    当在InboundHandler执行write方法,执行流程如下;

    调用链路如下:

    io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object)
    
     ->io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)
    
      ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)
    
       ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise)
    
        ->io.netty.channel.AbstractChannelHandlerContext#findContextOutbound
    

      

    io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)

    findContextOutbound会从tail节点往前找OutboundHandler,如下图;

    最终的执行结果如下:

    服务端:

    客户端:

    • 使用ctx发送数据的方式

    而InboundHandler2的channelRead方法的发送数据是下面这种方式:

    ctx.writeAndFlush(xxx);
    

    与pipeline或channel的发送不同的是在执行 io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise) 前会执行io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object);

    此时ctx为InboundHandler2;

     

    此时会从ctx往前找OutboundHandler,从pipeline上看ctx往前找是没有的,如下图;

     如果不想改变InboundHandler2的channelRead方法,可以将服务端的handler添加顺序更改就可以了,如下;

    serverBootstrap.group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,1024)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
    		    ch.pipeline().addLast(new OutboundHandler1());
                ch.pipeline().addLast(new OutboundHandler2());
                ch.pipeline().addLast(new InboundHandler1());
                ch.pipeline().addLast(new InboundHandler2());
            }
        });
    

    执行效果如下:

    服务端

    客户端:

    最终handler间的事件传播方向,也就是执行方向,InboundHandler之间传递数据,通过ctx.fireChannelRead(msg)InboundHandler顺序执行,OutboundHandler逆序执行,如下;

    传播的数据也会像包装一样,InboundHandler2会将InboundHandler1的数据打包,传递到OutboundHandler2,再打包,在传递到OutboundHandler1;感觉这个有点像StringBuilder的append方法;

     
  • 相关阅读:
    01.Markdown学习
    微信小程序开发基础
    如何在本地搭建微信小程序服务器
    Golang | 报错
    Golang | 扩展
    Golang | 基础
    Golang | 基础
    Golang | 基础
    Chrome——书签同步码云
    Rustlings_structs
  • 原文地址:https://www.cnblogs.com/coder-zyc/p/14404454.html
Copyright © 2011-2022 走看看