ChannelPipeline是ChannelHandler的容器,类似于Servlet的Filter过滤器,它负责处理和拦截 inbound(入站) 或者 outbound(出站) 的事件和操作;(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,拦截穿过 Channel 的输入输出 event, ChannelPipeline 实现了拦截器的一种高级形式,使得用户可以对事件的处理以及ChannelHanler之间交互获得完全的控制权);
每一个新创建的 Channel(也可以说是每创建一个新的连接) 都将会被分配一个新的 ChannelPipeline,这项关联是永久性的; Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的;在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预;
//把一个业务处理类(handler)添加到链中的第一个位置 ChannelPipeline addFirst(ChannelHandler... handlers); //把一个业务处理类(handler)添加到链中的最后一个位置 ChannelPipeline addLast(ChannelHandler... handlers);
事件将会被 ChannelInboundHandler 或者 ChannelOutboundHandler,处理;随后, 通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler;
ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler 交互;保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象;即ChannelHandlerContext中包含一个具体的事件处理器 ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和channel的信息,方便对 ChannelHandler进行调用;
通过 ChannelHandlerContext 触发的操作的事件流,ChannelHandler可以通知其所属的ChannelPipeline 中 的下一 个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline;
虽然被调用的 Channel 或 ChannelPipeline 上的 write()方法将一直传播事件通过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的;类似下面这张图;
对于客户端和服务端,入站和出站是相对的;以客户端应用程序为例,如果事件的传播方向是从客户端到服务端的(发送请求到服务端),那么对于客户端为出站的(即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理),对于服务端是入站的;
Channel、ChannelPipeline、ChannelHandlerContext 都可以调用此方法,前两者的事件传播会经过整个ChannelPipeline,而ChannelHandlerContext就只会在后续的Handler里面传播;
ChannelInboundHandler之间的传递,主要通过调用ChannelHandlerContext里面的FireXXX()方法来实现下个Handler的调用;
关于有多个入站出站ChannelHandler的执行顺序,测试如下;
- 服务端
服务端启动类添加Handler
serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new InboundHandler2()); ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); } });
OutboundHandler1
public class OutboundHandler1 extends ChannelOutboundHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(OutboundHandler1.class); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("|OutboundHandler1 write : "+data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutboundHandler1 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
OutboundHandler2
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(OutboundHandler2.class); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("|OutboundHandler2 write : "+data.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer("OutboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
InboundHandler1
public class InboundHandler1 extends ChannelInboundHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(InboundHandler1.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("|InboundHandler1 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); // 执行下一个InboundHandler ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
InboundHandler2
public class InboundHandler2 extends ChannelInboundHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(InboundHandler2.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info("|InboundHandler2 channelRead 服务端收到数据:" + data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
- 客户端
客户端启动类
bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, port)) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } });
EchoClientHandler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private final static Logger logger = LoggerFactory.getLogger(EchoServerHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { logger.info("Client received: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("Active"); ctx.writeAndFlush(Unpooled.copiedBuffer("echo test...",CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { logger.info("EchoClientHandler channelReadComplete"); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
执行结果如下:
两个OutboundHandler并没有执行,很奇怪,翻看源码,发现是InboundHandler的使用不对;
pipeline上添加handler最终会往执行下面的流程;
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
io.netty.channel.DefaultChannelPipeline#addLast0
最终pipeline上的实例如下:
如下图
-
使用pipeline或channel发送数据
将上面的InboundHandler2的channelRead方法中的发送数据方式改成如下方式:
ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("|InboundHandler2 "+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8));
当在InboundHandler执行write方法,执行流程如下;
调用链路如下:
io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object) ->io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object) ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object) ->io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise) ->io.netty.channel.AbstractChannelHandlerContext#findContextOutbound
io.netty.channel.DefaultChannelPipeline#writeAndFlush(java.lang.Object)
findContextOutbound会从tail节点往前找OutboundHandler,如下图;
最终的执行结果如下:
服务端:
客户端:
-
使用ctx发送数据的方式
而InboundHandler2的channelRead方法的发送数据是下面这种方式:
ctx.writeAndFlush(xxx);
与pipeline或channel的发送不同的是在执行 io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise) 前会执行io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object);
此时ctx为InboundHandler2;
此时会从ctx往前找OutboundHandler,从pipeline上看ctx往前找是没有的,如下图;
如果不想改变InboundHandler2的channelRead方法,可以将服务端的handler添加顺序更改就可以了,如下;
serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new OutboundHandler1()); ch.pipeline().addLast(new OutboundHandler2()); ch.pipeline().addLast(new InboundHandler1()); ch.pipeline().addLast(new InboundHandler2()); } });
执行效果如下:
服务端
客户端:
最终handler间的事件传播方向,也就是执行方向,InboundHandler之间传递数据,通过ctx.fireChannelRead(msg),InboundHandler顺序执行,OutboundHandler逆序执行,如下;
传播的数据也会像包装一样,InboundHandler2会将InboundHandler1的数据打包,传递到OutboundHandler2,再打包,在传递到OutboundHandler1;感觉这个有点像StringBuilder的append方法;