本文目的:测试数据在ChannelPipeline中的流经顺序及状态。
先看本文的测试代码:
AdditionalInBoundHandler:入站处理器,不做任何处理,只是在响应读事件时打印用来观察,并继续通过fireChannelRead传递读事件。
public class AdditionalInBoundHandler extends ChannelInboundHandlerAdapter { private String name; public AdditionalInBoundHandler(String name){ this.name = name; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ System.out.println("go through additional InBoundHandler[" + name + "]; msg type[" + msg.getClass() + "]"); ctx.fireChannelRead(msg); } }
BigIntegerDecoder:解码器,用来读取Bytebuf中的字节数据,解码成BigInter对象。
public class BigIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { System.out.println("BigIntegerDecoder"); // Wait until the length prefix is available. if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int length = in.readInt(); if (in.readableBytes() < length){ in.resetReaderIndex(); return; } byte[] data = new byte[length]; in.readBytes(data); out.add(new BigInteger(data)); }
NettyServerHandler:将读到的BigInteger加1并写出。
public class NettyServerHandler extends SimpleChannelInboundHandler<BigInteger> { @Override public void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception { System.out.println("NettyServerHandler"); ctx.writeAndFlush(msg.add(BigInteger.ONE)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
NumberEncoder:将BigInteger对象编码成字节数字,由ByteBuf写出。
public class NumberEncoder extends MessageToByteEncoder<Number> { @Override protected void encode(ChannelHandlerContext ctx, Number msg, ByteBuf out) { System.out.println("NumberEncoder"); // Convert to a BigInteger first for easier implementation. BigInteger v; if (msg instanceof BigInteger) { v = (BigInteger) msg; } else { v = new BigInteger(String.valueOf(msg)); } // Convert the number into a byte array. byte[] data = v.toByteArray(); int dataLength = data.length; // Write a message. out.writeInt(dataLength); // data length out.writeBytes(data); // data } }
AdditionalOutboundHandler:出站处理器,不对数据做任何处理,直接写出,只是简单增加了打印功能,方便测试。
public class AdditionalOutBoundHandler extends ChannelOutboundHandlerAdapter { private String name; public AdditionalOutBoundHandler(String name) { this.name = name; } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("go through additional outbound handler[" + name + "];msg type [" + msg.getClass() + "]"); ctx.write(msg, promise); } }
测试代码:channelHandlerTest。通过EmbeddedChannel进行测试。
public class ChannelHandlerTest { @Test public void channelHandlerTest(){ ByteBuf buf = Unpooled.buffer(); EmbeddedChannel embeddedChannel = new EmbeddedChannel(); ChannelPipeline pipeline = embeddedChannel.pipeline(); pipeline.addLast(new AdditionalInBoundHandler("in handler 1")); pipeline.addLast(new AdditionalInBoundHandler("in handler 2")); pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); pipeline.addLast(new AdditionalOutBoundHandler("out handler 1")); pipeline.addLast(new AdditionalInBoundHandler("in handler 3")); pipeline.addLast(new NettyServerHandler()); pipeline.addLast(new AdditionalInBoundHandler("in handler 4")); pipeline.addLast(new AdditionalOutBoundHandler("out handler 2")); byte[] bytes = new byte[]{0x01, 0x02, 0x03, 0x04}; BigInteger bi = new BigInteger(bytes); buf.writeInt(4); buf.writeBytes(bytes); //因为nettyServerHandler fire了写事件,因此channelpipeline尾部没数据可读 assertTrue(!embeddedChannel.writeInbound(buf)); ByteBuf readBuf = embeddedChannel.readOutbound(); int length = readBuf.readInt(); bytes = new byte[length]; readBuf.readBytes(bytes); System.out.println(new BigInteger(bytes)); } }
我们可以看到ChannelPipeline中的顺序是头到尾:InBound1(入) -> InBound2(入) -> BigIntegerDecoder(入) -> NumberEncoder(出) -> OutBound1(出) -> InBound3(入) -> NettyServerHandler(入) -> InBound4(入) -> OutBound2(出)。
执行测试代码,我们可以看到控制台中的输出是:
我们通过一张流程图片更清晰的看一下整个流经过程以及数据的状态:
图上的箭头很清楚的表示了数据流经的过程,[XXX则注释出了此时的数据对象。
入站数据沿着ChannelPipeline的头至尾依次寻找下一个可以处理对应消息的处理器。
出战数据则会沿着ChannelPipeline的尾至头依次寻找下一个可以处理对应消息的处理器。
比如为什么数据没有流经InBoundHandler4和OutBoundHandler2?
因为NettyServerHandler发出了一个写事件,因此数据从尾至头的方法找下一个可以处理写消息的处理器,即OutBoundHandler1,在顺延下去。