上节介绍的是discard协议,即不给客户端返回消息。本节主要说下,echo协议,即服务端收到消息后原样返回给客户端。
为了实现此需求,只需要在DiscardServerHandler中重写channelRead()方法,即可。如下:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("接收并原样返回:{}",in.toString(io.netty.util.CharsetUtil.UTF_8)); ctx.write(msg);//(1) ctx.flush();//(2) }
1、ChannelHandlerContext对象提供了很多方法,如:
ctx.read();
ctx.write();
ctx.flust();
ctx.channel();
ctx.pipeline();
ctx.alloc();
ctx.close();
ctx.connect();
ctx.disconnect();
注意,我们这里没有release 接收到的消息,不像discard例子那样。因为,当消息写到物理网络中后,netty才会在自动release。
在channelRead方法内的最后,加入in.release();即
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("接收并原样返回:{}",in.toString(io.netty.util.CharsetUtil.UTF_8)); ctx.write(msg);//(1) ctx.flush();//(2) in.release(); }
则报异常:io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
因此,不能在server端的channelRead方法内加入in.release()。
当ctx.flush(msg)后,可以在channelReadComplete()方法中release。
2、ctx.write(object)并不会将消息写到物理网络中,而是在内部buffer中的,需要使用ctx.flush()来写入到物理网络中。当然,也可以使用ctx.writeAndFlush(msg)代替。
-----telnet测试:
客户端:
[root@cent7-zuoys ~]# telnet 10.134.253.10 8080
Trying 10.134.253.10...
Connected to 10.134.253.10.
Escape character is '^]'.
遥远2
遥远2
服务端:16:22:12.636 [nioEventLoopGroup-3-1] 接收并原样返回:遥远2
-----client测试:
@Slf4j public class EchoClientHandler extends SimpleChannelInboundHandler<Object> { private ByteBuf content; private ChannelHandlerContext ctx; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; //content = ctx.alloc().directBuffer(EchoClient.SIZE).writeZero(EchoClient.SIZE); content = ctx.alloc().directBuffer(EchoClient.SIZE).writeBytes("遥远2".getBytes(CharsetUtil.UTF_8)); //发送以上消息 generatTraffic(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { content.release(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; log.info("接收消息:{}",in.toString(io.netty.util.CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //*************************自定义方法 private void generatTraffic() { //flush the outbound buffer to the socket. //once flushed,generate the same amount of traffic again. ByteBuf buf = content.retainedDuplicate(); ctx.writeAndFlush(buf).addListener(trafficGenerator); //Console.log((char)buf.readByte()); log.info("发送消息:{}",buf.toString(io.netty.util.CharsetUtil.UTF_8)); } private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info("success!"); } else { future.cause().printStackTrace(); future.channel().close(); } } }; }
红字部分,为修改的。在channelRead0方法中,之前是空的,现在是因为,服务端返回消息了,需要在此处接收。
运行结果:
客户端:
16:50:42.443 [nioEventLoopGroup-2-1] success!
16:50:42.446 [nioEventLoopGroup-2-1] 发送消息:遥远2
16:50:42.461 [nioEventLoopGroup-2-1] 接收消息:遥远2
服务端:16:50:42.456 [nioEventLoopGroup-3-1] 接收并原样返回:遥远2
SimpleChannelInboundHandler 与 ChannelInboundHandlerAdapter区别:
服务端使用ChannelInboundHandlerAdapter,客户端使用SimpleChannelInboundHandler 。
因为ChannelInboundHandlerAdapter,在channelRead()内不能release。而SimpleChannelInboundHandler 自动release。