本节主要讨论了 Netty 的数据处理组件 ChannelHandler。
一、Channel 生命周期
Channel 有个简单但强大的状态模型,下面是 Channel 的四个状态:
Channel 的正常生命周期如下图,当这些状态发生变化时,对应的事件将会生成。
二、ChannelHandler 生命周期
ChannelHandler 定义的生命周期如下,当 ChannelHandler 添加到 ChannelPipeline,或者从 ChannelPipeline 移除后,这些将会调用。
三、ChannelInbounderHandler
ChannelInboundHandler 的生命周期方法如下,当接收到数据或者与之关联的 Channel 状态改变时调用。
可以看到,这些方法与 Channel 的生命周期接近。
其中 channelRead 和 channelReadComplete 方法在读操作开始和完成时调用。
另外,channelWritabilityChanged 方法在 channel 写状态发生变化时调用。
四、ChannelOutboundHandler
ChannelOutboundHandler 提供了出站操作时调用的方法。
另外,它具有在请求时演示操作或者事件的能力。比如,当你在写数据到远程的过程中被意外暂停,你可以延时进行刷新操作,然后在迟些时候继续。
下面是提供的方法(继承自 ChannelHandler 未列出来):
五、资源管理
Netty 使用引用计数器来处理池化的 ByteBuf。所以当 ByteBuf 完全处理后,要确保引用计数器被调整。
当你覆盖了 channelRead 操作,在处理完消息之后,需要释放它,如下:
1 public class DiscardServerHandler extends ChannelInboundHandlerAdapter { 2 /** 3 * 收到数据时调用 4 */ 5 @Override 6 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 7 // 丢弃收到的数据 8 ((ByteBuf) msg).release(); 9 } 10 }
Netty 提供了一个特殊的称为 SimpleChannelInboundHander 的实现,该实现将在用户通过 channelRead0() 方法处理完数据之后,自动释放该消息。
当你在处理写操作,并丢弃消息时,你需要通知 ChannelPromise 数据已经被处理,如下:
1 public class DiscardOutbounderHandler extends ChannelOutboundHandlerAdapter { 2 @Override 3 public void write(ChannelHandlerContext ctx, 4 Object msg, ChannelPromise promise) throws Exception { 5 ReferenceCountUtil.release(msg); // 释放资源 6 promise.setSuccess(); // 通知 ChannelPromise 数据已经被处理 7 } 8 }
六、使用 ChannelHandler
下图展示了 ChannelPipeline,Channel,ChannelHandler 和 ChanelHandlerContext 的关系:
- Channel 绑定到 ChannelPipeline
- ChannelPipeline 绑定到包含 ChannelHandler 的 Channel
- ChannelHandler
- 当添加 ChannelHandler 到 ChannelPipeline 时,ChannelHandlerContext 被创建
如果要完成一个写操作,有以下两种方式:
第一种就是从 ChannelHandlerContext 获取到 Channel 的引用,执行 Channel 上的 write() 方法,如下:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 // 丢弃收到的数据 3 ((ByteBuf) msg).release(); 4 5 Channel channel = ctx.channel(); // 获取channel引用 6 // 通过channel写缓存 7 channel.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); 8 }
第二种是从 ChannelHandlerContext 获取到 ChannelPipeline 的引用,执行 ChannelPipeline 上的 write() 方法,如下:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 ChannelPipeline pipeline = ctx.pipeline(); // 获取pipeline引用 3 // 通过pipeline写缓存 4 pipeline.write(Unpooled.copiedBuffer("Netty", CharsetUtil.UTF_8)); 5 }
写操作流程如下:
- 事件传递给 ChannelPipeline 的第一个 ChannelHandler
- ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的下一个 ChannelHandler
- 与 2 类似
但是有些时候不希望总是从 ChannelPipeline 的第一个 ChannelHandler 开始事件,我们希望从一个特定的 ChannelHandler 开始处理。你必须引用于此 ChannelHandler 的前一个 ChannelHandler 关联的 ChannelHandlerContext,利用它调用与自身关联的 ChannelHandler 的下一个 ChannelHandler。如下:
1 ChannelHandlerContext ctx = context; // 获得 ChannelHandlerContext引用 2 // write()将会把缓冲区发送到下一个ChannelHandler 3 ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
流程如下:
- 直接从特定的 ChannelHandler 开始执行
- 事件发送到下一个 ChannelHandler
- 经过最后一个 ChannelHandler 后,事件从 ChannelPipeline 移除