一、DiscardClientHandler
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class DiscardClientHandler extends SimpleChannelInboundHandler<Object> { private ByteBuf content; private ChannelHandlerContext ctx; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {//(1) this.ctx = ctx; content = ctx.alloc().directBuffer(DiscardClient.SIZE).writeZero(DiscardClient.SIZE); //content = ctx.alloc().directBuffer(DiscardClient.SIZE).writeBytes("1".getBytes(CharsetUtil.UTF_8)); //发送以上消息 generatTraffic(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { content.release(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {//(2) //Server is supposed to send nothing,but if it sends somethings,discard it. } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } //*************************自定义方法 private void generatTraffic() { //flush the outbound buffer to the socket. //once flushed,generate the same amount of traffic again. ByteBuf buf = content.retainedDuplicate(); ctx.writeAndFlush(buf).addListener(trafficGenerator); //Console.log((char)buf.readByte()); log.info("{}",(char)buf.getByte(0)); } private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() {//(3) @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { generatTraffic(); } else { future.cause().printStackTrace(); future.channel().close(); } } }; }
1、发送消息
2、接收服务器返回的消息。由于服务端没有返回消息,所以此处忽略。
3、发送消息后,根据结果的处理。如果成功,继续发送消息;否则,抛出异常,关闭channel。
二、DiscardClient
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import lombok.extern.slf4j.Slf4j; @Slf4j public class DiscardClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host","127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port","8080")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(),HOST,PORT)); } p.addLast(new DiscardClientHandler()); } }); //make the connection attempt. ChannelFuture f = b.connect(HOST,PORT).sync(); //wait until the connection is closed. f.channel().closeFuture().sync(); log.info("connection is closed"); } finally { group.shutdownGracefully(); } } }
运行结果:
服务端:
客户端: