编码和解码,或者数据从一种特定协议的格式到另一种格式的转换。这些任务将由通常称为编解码器的组件来处理。Netty提供了多种组件,简化了为了支持广泛的协议而创建自定义的编解码器的过程。例如,如果你正在构建一个基于Netty的邮件服务器,那么你将会发现Netty对于编解码器的支持对于实现POP3、IMAP和SMTP协议来说是多么的宝贵。
1、什么是编解码器
每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器有编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。
如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列——它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。
2、解码器
——将字节解码为消息——ByteToMessageDecoder和ReplayingDecoder
——将一种消息类型解码为另一种——MessageToMessageDecoder
因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以知道Netty的解码器实现了ChannelInboundHandler也不会让你感到意外。
每当需要为ChannelPipeline中的下一个ChannelInboundHandler转换入站数据时会用到。此外,得益于ChannelPipeline的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑,这也是Netty是如何支持代码的模块化以及复用的例子。
3、抽象类ByteToMessageDecoder
将字节解码为消息是一项如此常见的任务,以至于Netty为他提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲。
下面举一个如何使用这个类的示例,假设你接收了一个包含简单int的字节流,每个int都需要被单独处理。在这种情况下,你需要从入站ByteBuf中读取每个int,并将它传递给ChannelPipeline中的下一个ChannelInboundHandler。为了解码这个字节流,你要扩展ByteToMessageDecoder类。(需要注意的是,原始类型int在被添加到List中时,会被自动装箱为Integer),如下设计图。 每次从入站ByteBuf中读取4字节,将其解码为一个Int,然后将它添加到一个List中。当没有更多的元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。
public class ToIntegerDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
//检查是否至少有4字节可读(一个int的字节长度)
if (in.readableBytes() >= 4){
//从入站ByteBuf中读取一个int,并将其添加到解码消息的List中
out.add(in.readInt());
}
}
}
虽然ByteToMessageDecoder使得可以很简单地实现这种模式,但是你可能会发现,在调用readint()方法前不得不验证所输入的ByteBuf是否具有足够的数据有点繁琐。
4、抽象类ReplayingDecoder
ReplayingDecoder扩展了ByteToMessageDecoder类,使得我们不必调用readableBytes()方法。它通过使用一个自定义的ByteBuf实现,ReplayingDecoderByteBuf,包装传入的ByteBuf实现了这一点,其将在内部执行该调用。
public abstract class ReplayingDecoder extends ByteToMessageDecoder
类型参数S指定了用于状态管理的类型,其中Void代表不需要状态管理。以下代码展示了基于ReplayingDecoder重新实现ToIntegerDecoder。
public class ToIntegerDecoder2 extends ReplayingDecoder<Void>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt());
}
}
和之前一样,从ByteBuf中提取的int将会被添加到List中,如果没有足够的字节可用,这个readInt()方法的实现将会抛出一个Error,其将在基类中被捕获并处理。当有更多的数据可供读取时,该decode()方法将会被再次调用。
请注意ReplayingDecoderByteBuf的下面这些方面:
——并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException
——ReplayingDecoder稍慢于ByteToMessageDecoder
——如果使用ByteToMessageDecoder不会引入太多的复杂性,那么请使用它;否则,请使用ReplayingDecoder
5、抽象类MessageToMessageDecoder
public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter
类型参数I指定了decode()方法的输入参数msg的类型,它是你必须实现的唯一方法。
在这个示例中,我们将编写一个IntegerToStringDecoder解码器来扩展MessageToMessageDecoder。它的decode()方法会把Integer参数转换为它的String表示,并将拥有下列签名:
public void decode( ChannelHandlerContext ctx, Integer msg , List out) throws Exception
和之前一样,解码的String将被添加到传出的List中,并转发给下一个ChannelInboundHandler
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
Integer msg, List<Object> out) throws Exception {
//将Integer消息转换为它的String表示,并将其添加到输出的List中
out.add(String.valueOf(msg));
}
}
6、TooLongFrameException类
由于Netty是一个异步框架,所以需要在字节可以解码之前在内存中缓冲他们,因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty提供了TooLongFrameException类,其将由解码器在帧超出指定的大小限制时抛出。
为了避免这种情况,你可以设置一个最大字节数的伐值,如果超过,则会导致抛出一个TooLongFrameException,如何处理该异常则完全取决于解码器的用户。某些协议(HTTP)可能允许你返回一个特殊的响应,而在其他的情况下,唯一的选择可能就是关闭对应的连接。
以下代码展示了ByteToMessageDecoder是如何使用TooLongFrameException来通知ChannelPipeline中的其他ChannelHandler发生了帧大小溢出的。需要注意的是,如果你正在使用一个可变帧大小的协议,那么这种保护措施将是尤其重要的。
public class SafeByteToMessageDecoder extends ByteToMessageDecoder{
private static final int MAX_FRAME_SIZE = 1024;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
//检查缓冲区中是否有超过MAX_FRAME_SIZE个字节
if (readable > MAX_FRAME_SIZE){
//跳过所有的可读字节,抛出TooLongFrameException并通知ChannelHandler
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
//DO something
}
}
7、抽象类MessageToByteEncoder
这个类只有一个方法,而解码器有两个。原因是解码器通常需要在Channel关闭之后产生最后一个消息(decodeLast()方法),这显然不适用于编码器的场景——在连接关闭之后仍然产生一个消息是毫无意义的。
ShortToByteEncoder,其接受一个Short类型的实例作为消息,将它编码为Short的原始类型值,并将它写入ByteBuf中,其将随后被转发给ChannelPipeline中的下一个CHannelOutboundHandler。每个传出的Short值都将会占用ByteBuf中的2字节。
public class ShortToByteEncoder extends MessageToByteEncoder<Short>{
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
Short msg, ByteBuf out) throws Exception {
//将Short写入ByteBuf中
out.writeShort(msg);
}
}
Netty提供了一些专门化的MessageToByteEncoder,你可以基于他们实现自己的编码器。WebSocket08FrameEncoder类提供了一个很好的实例。你可以在io.netty.handler.codec.http.websocket包中找到它。
8、抽象类MessageToMessageEncoder
我们将展示对于出站数据将如何从一种消息编码为另一种。MessageToMessageEncoder类的encoder()方法提供了这种能力。
以下代码,编码器将每个出站Integer的String表示添加到了该List中。
public class IntegerToStringEncoder extends MessageToMessageEncoder<Integer>{
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
Integer msg, List<Object> out) throws Exception {
//将Integer转换为String,并将其添加到List中
out.add(String.valueOf(msg));
}
}
9、抽象的编解码器类
在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理我们一直在学习的这两种类型的操作。
通过尽可能地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是Netty设计的一个基本原则。
10、抽象类ByteToMessageCodec
场景:我们需要将字节编码为某种形式的消息,可能是POJO,随后再次对它进行编码。ByteToMessageCodec将为我们处理好了这一切,因为它结合了ByteToMessageDecoder以及他的逆向MessageToByteEncoder。
任何的请求/响应协议都可以作为使用ByteToMessageCodec的理想选择,例如,在某个SMTP的实现中,编解码器将读取传入字节,并将它们解码为一个自定义的消息类型,如SmtpRequest。而在接收端,当一个响应被创建时,将会产生一个SmtpResponse,其将被编码回字节以便进行传输。
11、CombinedChannelDuplexHandler类
结合一个解码器和编码器可能会对可重用性造成影响。但是,有一种方法即能够避免这种惩罚,又不会牺牲将一个解码器和一个编码器作为一个单独的单元部署所带来的的便利性。CombinedChannelDuplexHandler提供了这个解决方案,其声明为:
public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
这个类充当了ChannelInboundHandler和ChannelOutboundHandler(该类的类型参数I和O)的容器。通过提供分别继承了解码器类和编码器类的类型,我们可以实现一个编解码器,而又不必直接扩展抽象的编解码器类。
首先,让我们研究代码中的ByteToCharDecoder。注意,该实现扩展了ByteToMessageDecoder,因为它要从ByteBuf中读取字符。
public class ByteToCharDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 2){
//将一个或者多个Character对象添加到传出的List中
out.add(in.readChar());
}
}
}
这里的decode()方法一次将从ByteBuf中提取2字节,并将它们作为char写入到List中,其将会被自动装箱为Character对象。
以下代码,包含了CharToByteEncoder,他能将Character转换回字节。这个类扩展了MessageToByteEncoder,因为它需要将char消息编码到ByteBuf中,这是通过直接写入ByteBuf做到的。
public class CharToByteEncoder extends MessageToByteEncoder<Character>{
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
Character msg, ByteBuf out) throws Exception {
//将Character解码为char,并将其写入到出站ByteBuf中
out.writeChar(msg);
}
}
既然我们有了解码器和编码器,我们将会结合它们来构建 一个编解码器。如以下代码所示。
在某些情况下,通过这种方式结合实现相对于使用编解码器类的方式来说可能更加的简单也更加的灵活。