  • 关于netty的多个handler链式模式

    1. 老规矩, 引入我们喜闻乐见的maven依赖

    1 <dependency>
    2     <groupId>io.netty</groupId>
    3     <artifactId>netty-all</artifactId>
    4     <version>4.1.6.Final</version>
    5 </dependency>

    2. 服务端

      2.1: 服务端引导类:

     1 public class EchoServer {
     3     private int port;
     5     private EchoServer(int port) {
     6         this.port = port;
     7     }
     9     private void start() throws Exception {
    10         System.out.println("Echo Server Start");
    11         EventLoopGroup group = new NioEventLoopGroup();
    12         try {
    13             ServerBootstrap b = new ServerBootstrap();
    14             b.group(group)
    15                     .channel(NioServerSocketChannel.class)
    16                     .localAddress(new InetSocketAddress(port))
    17                     .childHandler(new ChannelInitializer<SocketChannel>() {
    18                         @Override
    19                         public void initChannel(SocketChannel ch) throws Exception {
    20                             ch.pipeline().addLast(new EchoOutboundHandler1());
    21                             ch.pipeline().addLast(new EchoOutboundHandler2());
    23                             ch.pipeline().addLast(new EchoInboundHandler1());
    24                             ch.pipeline().addLast(new EchoInboundHandler2());
    25                         }
    26                     });
    27             ChannelFuture f = b.bind().sync();
    28             System.out.println("Server Start Listen At: " + port);
    29             f.channel().closeFuture().sync();
    30         } finally {
    31             group.shutdownGracefully();
    32         }
    33     }
    35     public static void main(String[] args) throws Exception {
    36         int port;
    37         if (args.length > 0) {
    38             port = Integer.parseInt(args[0]);
    39         } else {
    40             port = 8080;
    41         }
    42         new EchoServer(port).start();
    43     }
    44 }

      2.2 EchoOutboundHandler1

     1 public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter {
     3     @Override
     4     public void channelRead(ChannelHandlerContext ctx, Object msg) {
     5         // 读取msg中的数据
     6         ByteBuf result = (ByteBuf) msg;
     7         byte[] bytes = new byte[result.readableBytes()];
     8         result.readBytes(bytes);
     9         String resultStr = new String(bytes);
    10         System.out.println("Server Received: " + resultStr + ": Inbound 1 Is OK");
    12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
    13         result.writeBytes(resultStr.getBytes());
    14         ctx.fireChannelRead(result);
    15     }
    17 }

      2.3 EchoInboundHandler2

     1 public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter {
     3     @Override
     4     public void channelRead(ChannelHandlerContext ctx, Object msg) {
     5         // 读取msg中的数据
     6         ByteBuf result = (ByteBuf) msg;
     7         byte[] bytes = new byte[result.readableBytes()];
     8         result.readBytes(bytes);
     9         String resultStr = new String(bytes);
    10         System.out.println("Server Received: " + resultStr + ": Inbound 2 Is OK");
    12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
    13         result.writeBytes(resultStr.getBytes());
    14         ctx.write(result);
    15     }
    17 }

      2.4 EchoOutboundHandler2 

     1 public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter {
     3     @Override
     4     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     5         // 读取msg中的数据
     6         ByteBuf result = (ByteBuf) msg;
     7         byte[] bytes = new byte[result.readableBytes()];
     8         result.readBytes(bytes);
     9         String resultStr = new String(bytes);
    10         System.out.println("Server Received: " + resultStr + ": Outbound 2 Is OK");
    12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
    13         result.writeBytes(resultStr.getBytes());
    14         ctx.write(result);
    15     }
    16 }

      2.5 EchoOutboundHandler1 

     1 public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter {
     3     @Override
     4     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
     5         // 读取msg中的数据
     6         ByteBuf result = (ByteBuf) msg;
     7         byte[] bytes = new byte[result.readableBytes()];
     8         result.readBytes(bytes);
     9         String resultStr = new String(bytes);
    10         System.out.println("Server Received: " + resultStr + ": Outbound 1 Is OK");
    12         // 处理完msg中的数据后往msg中重新存放新的数据继续传递
    13         result.writeBytes(resultStr.getBytes());
    14         ctx.write(result);
    15         ctx.flush();
    16     }
    17 }

    3. 客户端

     1 public class EchoClient {
     3     private String host;
     4     private int port;
     6     private EchoClient(String host, int port) {
     7         this.host = host;
     8         this.port = port;
     9     }
    11     private void start() throws Exception {
    12         System.out.println("Echo Client Start");
    13         EventLoopGroup group = new NioEventLoopGroup();
    14         try {
    15             Bootstrap b = new Bootstrap();
    16             b.group(group)
    17                     .channel(NioSocketChannel.class)
    18                     .remoteAddress(new InetSocketAddress(host, port))
    19                     .handler(new ChannelInitializer<SocketChannel>() {
    20                         @Override
    21                         public void initChannel(SocketChannel ch) throws Exception {
    22                             ch.pipeline().addLast(new EchoClientHandler());
    23                         }
    24                     });
    25             ChannelFuture f = b.connect().sync();
    26             System.out.println("Server Client Listen IP: [" + host + ":" + port + "]");
    27             f.channel().closeFuture().sync();
    28         } finally {
    29             group.shutdownGracefully();
    30         }
    31     }
    33     public static void main(String[] args) throws Exception {
    34         String host = "";
    35         int port = 8080;
    36         int len = 2;
    37         if (args.length == len) {
    38             host = args[0];
    39             port = Integer.parseInt(args[1]);
    40         }
    41         new EchoClient(host, port).start();
    42     }
    44 }
     1 public class EchoClientHandler extends ChannelInboundHandlerAdapter {
     3     @Override
     4     public void channelActive(ChannelHandlerContext ctx) {
     5         ctx.writeAndFlush(Unpooled.copiedBuffer("Netty Rocks!", CharsetUtil.UTF_8));
     6     }
     8     @Override
     9     public void channelRead(ChannelHandlerContext ctx, Object msg) {
    10         // 读取msg中的数据
    11         ByteBuf result = (ByteBuf) msg;
    12         byte[] bytes = new byte[result.readableBytes()];
    13         result.readBytes(bytes);
    14         String resultStr = new String(bytes);
    15         System.out.println("Echo Client Received Is OK: " + resultStr);
    16         ctx.close();
    17     }
    19     @Override
    20     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    21         cause.printStackTrace();
    22         ctx.close();
    23     }
    24 }

    4. 结果:



    5. 注意事项:

    5.1. ChannelInboundHandler之间的传递, 通过调用 ctx.fireChannelRead(msg) 实现; 调用ctx.write(msg) 将传递到ChannelOutboundHandler

    5.2. ctx.write()方法执行后, 需要调用flush()方法才能令它立即执行

    5.3. ChannelOutboundHandler 在注册的时候需要放在最后一个ChannelInboundHandler之前, 否则将无法传递到ChannelOutboundHandler

