本文参考
本篇文章是对《Netty In Action》一书第十一章"预置的ChannelHandler和编解码器"的学习摘记,主要内容为通过 SSL/TLS 保护 Netty 应用程序、构建基于 Netty 的 HTTP/HTTPS 和websocket应用程序、处理空闲的连接和超时、解码基于分隔符的协议和基于长度的协议、写大型数据
本篇先摘记后三块内容 —— 处理空闲的连接和超时、解码基于分隔符的协议和基于长度的协议、写大型数据
空闲的连接和超时
Netty提供了IdleStateHandler用于检测空闲时间和ReadTimeoutHandler、WriteTimeoutHandler超时检测,他们的描述如下
其中,IdleStateHandler可用于心跳机制,下面代码示例了如何使用通常的发送心跳消息到远程节点的方法,如果在 60 秒之内没有接收或者发送任何的数据, 我们将如何得到通知;如果没有响应,则连接会被关闭
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
//(1) IdleStateHandler 将在被触发时发送一个IdleStateEvent 事件
new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
//将一个 HeartbeatHandler 添加到ChannelPipeline中
pipeline.addLast(new HeartbeatHandler());
}
//实现 userEventTriggered() 方法以发送心跳消息
public static final class HeartbeatHandler
extends ChannelInboundHandlerAdapter {
//发送到远程节点的心跳消息
private static final ByteBuf HEARTBEAT_SEQUENCE =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
"HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
//(2) 发送心跳消息,并在发送失败时关闭该连接
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
//不是 IdleStateEvent 事件,所以将它传递给下一个 ChannelInboundHandler
super.userEventTriggered(ctx, evt);
}
}
}
}
此处新建IdleStateHandler实例时的各个构造方法参数分别为0,0,60,TimeUnit.SECONDS,分别代表禁用readerIdleTime(单独设置读闲置时间参数),禁用writerIdleTime(单独设置写闲置时间参数),统一设置allIdleTime为60(统一设置读闲置时间和写闲置时间),时间单位为秒,并且不考虑计算写闲置时间时的字节消耗
observeOutput – whether or not the consumption of bytes should be taken into consideration when assessing write idleness. The default is false.
readerIdleTime – an IdleStateEvent whose state is IdleState.READER_IDLE will be triggered when no read was performed for the specified period of time. Specify 0 to disable.
writerIdleTime – an IdleStateEvent whose state is IdleState.WRITER_IDLE will be triggered when no write was performed for the specified period of time. Specify 0 to disable.
allIdleTime – an IdleStateEvent whose state is IdleState.ALL_IDLE will be triggered when neither read nor write was performed for the specified period of time. Specify 0 to disable.
unit – the TimeUnit of readerIdleTime, writeIdleTime, and allIdleTime
如果连接超过60 秒没有接收或者发送任何的数据,那么IdleStateHandler 将会使用一个 IdleStateEvent事件来调用fireUserEventTriggered()方法,交由下一个ChannelHandler来处理
HeartbeatHandler实现 了userEventTriggered()方法,如果这个方法检测到IdleStateEvent 事件,它将会发送心 跳消息,并且添加一个将在发送操作失败时关闭该连接的ChannelFutureListener.CLOSE_ON_FAILURE
A ChannelFutureListener that closes the Channel when the operation ended up with a failure or cancellation rather than a success.
基于分隔符的协议
基于分隔符的(delimited)消息协议使用定义的字符来标记消息或消息段(通常被称为帧)的开头或者结尾。RFC文档定义的许多协议(如SMTP、POP3、IMAP以及Telnet)都是采用这种方式
下图为LineBasedFrameDecoder的处理方式示意图
下面是一个简单的代码示例,我们只需要将它的实例添加到ChannelPipline即可
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//该 LineBasedFrameDecoder 将提取的帧转发给下一个 ChannelInboundHandler
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
//添加 FrameHandler 以接收帧
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler
extends SimpleChannelInboundHandler<ByteBuf> {
@Override
//传入了单个帧的内容
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
// Do something with the data extracted from the frame
}
}
}
创建LineBasedFrameDecoder实例时,设置了它的最大帧长度maxLength,若超过最大帧长,会抛出TooLongFrameException异常
下面展示一个更加详细的例子
Cmd —— 将帧(命令)的内容存储在 ByteBuf 中,一个 ByteBuf 用于名称,另一个用于参数
CmdDecoder —— 从被重写了的 decode()方法中获取一系列的帧,每个帧通过 分隔解码,再从每个帧的内容构建一个Cmd的实例
CmdHandler —— 从CmdDecoder获取解码的Cmd对象,并对它进行一些处理
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
private static final byte SPACE = (byte)' ';
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//添加 CmdDecoder 以提取 Cmd 对象,并将它转发给下一个 ChannelInboundHandler
pipeline.addLast(new CmdDecoder(64 * 1024));
//添加 CmdHandler 以接收和处理 Cmd 对象
pipeline.addLast(new CmdHandler());
}
//Cmd POJO
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf name() {
return name;
}
public ByteBuf args() {
return args;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer)
throws Exception {
//从 ByteBuf 中提取由行尾符序列分隔的帧
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
if (frame == null) {
//如果输入中没有帧,则返回 null
return null;
}
//查找第一个空格字符的索引。前面是命令名称,接着是参数
int index = frame.indexOf(frame.readerIndex(),
frame.writerIndex(), SPACE);
//使用包含有命令名称和参数的切片创建新的 Cmd 对象
return new Cmd(frame.slice(frame.readerIndex(), index),
frame.slice(index + 1, frame.writerIndex()));
}
}
public static final class CmdHandler
extends SimpleChannelInboundHandler<Cmd> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Cmd msg)
throws Exception {
// Do something with the command
//处理传经 ChannelPipeline 的 Cmd 对象
}
}
}
基于长度的协议
基于长度的协议可以解码固定长度的帧也可以解码不是固定长度的帧,此时需要从头部字段确定帧的长度,然后从数据流中提取指定的字节数
下图是针对两种实现的解码器
FixedLengthFrameDecoder的实现过程示意图如下
LengthFieldBasedFrameDecoder的实现过程示意图如下
下面是LengthFieldBasedFrameDecoder的一个代码示例
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(
//使用 LengthFieldBasedFrameDecoder 解码帧起始的前 8 个字节中的消息
new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
//添加 FrameHandler 以处理每个帧
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler
extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
// Do something with the frame
//处理帧的数据
}
}
}
多次写大型数据
由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟
当需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗,其中较为关键的是 interface ChunkedInput<B>的实现,其中类型参数 B 是 readChunk()方法返回的类型
Netty 预置了该接口的 4 个实现,如下所示,每个都代表了一个将由ChunkedWriteHandler处理的不定长度的数据流
当仅需要对文件内容进行直接传输,而不需要应用程序对数据的任何处理时,可以采用下面“一次写大型数据”中的零拷贝特性
一次写大型数据
零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll (只支持Linux)传输时才可使用的特性。它使你可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,其在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。特别地,它对于实现了数据加密或者压缩的文件系统是不可用的——只能传输文件的原始内容。不过,传输已被加密的文件则不是问题
这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在 Netty 的核心中,所以应用程序所有需要做的就是使用一个FileRegion接口的实现,其在 Netty的 API 文档中的定义是:"通过支持零拷贝的文件传输的Channel来发送的文件区域"
A region of a file that is sent via a Channel which supports zero-copy file transfer
public class FileRegionWriteHandler extends ChannelInboundHandlerAdapter {
private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
private static final File FILE_FROM_SOMEWHERE = new File("");
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
File file = FILE_FROM_SOMEWHERE; //get reference from somewhere
Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference from somewhere
//...
//创建一个FileInputStream
FileInputStream in = new FileInputStream(file);
//以该文件的完整长度创建一个新的 DefaultFileRegion
FileRegion region = new DefaultFileRegion(
in.getChannel(), 0, file.length());
//发送该 DefaultFileRegion,并注册一个 ChannelFutureListener
channel.writeAndFlush(region).addListener( new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
//处理失败
Throwable cause = future.cause();
// Do something
}
}
});
}
}