一 处理基于流的传输(Scoket Buffer的一点附加说明)
对于例如TCP/IP这种基于流的传输协议实现,接收到的数据会被存储在socket的接受缓冲区内。不幸的是,基于流的传输不是一个包队列而是一个字节队列。在一个开放的系统中,这意味着即使我们发送了两条消息分别包含在两个数据包里,接收方不会当作两条消息来对待,而是将其放在同一个字节独列中。因此,传输不能保证收到的消息与发送的消息一致。
对于时间客户端的例子,一个32位的int数据量非常小,一般不会被分片(链路层限制一个package大小一般为1500字节),但是问题是它确实有可能被分成多片,分片的概率随着网络的繁忙而增加。最简单的解决办法就是增加一个内部的累加缓冲,等累计满4个字节时再向上提交数据。
package io.netty.example.time; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { private ByteBuf buf; @Override public void handlerAdded(ChannelHandlerContext ctx) { buf = ctx.alloc().buffer(4); // (1) } @Override public void handlerRemoved(ChannelHandlerContext ctx) { buf.release(); // (1) buf = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; buf.writeBytes(m); // (2) m.release(); if (buf.readableBytes() >= 4) { // (3) long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L; System.out.println(new Date(currentTimeMillis)); ctx.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
(1)ChannelHandler有两个存活期方法:handlerAdded()和handlerRemoved(),这两个方法允许我们自己构造一个初始化任务或结束任务。
(2)首先,所有的接受的数据先放到累计缓存里。
(3)然后,handler必须检查是否有了足够的data,如在本例中须足够4个字节,然后执行实际的业务逻辑。若数据不足,当更多的数据到达时,netty会再次执行channelReade()方法直到累计到4个字节。
第二个解决办法
当字段便多时,第一种解决方案会变得非常复杂且不可维护,可以通过向ChannelPipeline中增加多个ChannelHandler的方法,将一个大的ChannelHandler分解成多个模块来降低应用的复杂性。例如,
package io.netty.example.time; public class TimeDecoder extends ByteToMessageDecoder { // (1) @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) if (in.readableBytes() < 4) { return; // (3) } out.add(in.readBytes(4)); // (4) } }
(1)ByteToMessageDecoder是一ChannelInboundHandler的实现类,可以非常容易的处理分片问题。
(2)当新的数据到达时,ByteToMessageDecoder讲数据存储在一个内在的累积buffer中,调用decode()方法进行处理
(3)decode()根据接收到的字节大小进行判定,若满4个字节则增加一个对象到list。
(4)如果decode()增加了个一个out的对象,意味着decoder编码成功。ByteToMessageDecoder会丢弃累积buffer中已经读过的部分。若out.add(null),decoder即停止。
由于ChannlePipeline中增加了一个handler,因此我们必须修改ChannelInitializer为:
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); } });
如果你是个冒险者,你可能会尝试ReplayingDecoder。该handler进一步简化了decoder。
public class TimeDecoder extends ReplayingDecoder<VoidEnum> { @Override protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out, VoidEnum state) { out.add(in.readBytes(4)); } }
另外关于decode的例子可以参考一下两个包
io.netty.example.factorial
for a binary protocol, andio.netty.example.telnet
for a text line-based protocol.
利用POJO代替ByteBuf
目前我们看到的例子均是使用ByteBuf作为协议消息的数据结构。在这一节,我们将使用POJO来代替ByteBuf来改善Time协议的客户端和服务器。
使用POJO的有点是十分明显的,通过分离出解析ByteBuf中数据的代码,handler会变得更加可维护和重用。在Time的客户端和服务器的例子中,我们只读32字节的integer且这并不是一个主要的直接应用ByteBuf的案例。然而,你会发现当你实现一个真正的协议时,做这样的分离是十分必要的。
首先,我们先定义一个新类型,UnixTime
package io.netty.example.time; import java.util.Date; public class UnixTime { private final int value; public UnixTime() { this((int) (System.currentTimeMillis() / 1000L + 2208988800L)); } public UnixTime(int value) { this.value = value; } public int value() { return value; } @Override public String toString() { return new Date((value() - 2208988800L) * 1000L).toString(); } }
我们重新编写TimeDecoder来禅师一个UnixTIme。
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } out.add(new UnixTime(in.readInt())); }
然后我们更新decoder,TimeClientHandler就不再使用ByteBuf了
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { UnixTime m = (UnixTime) msg; System.out.println(m); ctx.close(); }
是不是非常的简洁优雅。Server端同样如此。首先更新ServerHandler
@Override public void channelActive(ChannelHandlerContext ctx) { ChannelFuture f = ctx.writeAndFlush(new UnixTime()); f.addListener(ChannelFutureListener.CLOSE); }
现在需要编写解码部分,encoder是ChannelOutbountHandler的实现类,将UnixTIme转化成下层的ByteBuf,编写encoder要比编写decoder简单的多,因为此时不必考虑tcp包分片的问题。
package io.netty.example.time; public class TimeEncoder extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { UnixTime m = (UnixTime) msg; ByteBuf encoded = ctx.alloc().buffer(4); encoded.writeInt(m.value()); ctx.write(encoded, promise); // (1) } }
(1)这一行中有些十分重要的内容
首先,我们将原始的ChannelPromis找原来的样子传输,以保证Netty在写入链路时能够正确的标记成功或失败。
其次,并不调用ctx.flush().handler有一个默认的flush方法,若想每次写都flush则须:ctx.write(encoded,false,promise);或ctx.writeAndFlush(encode,promise);