zoukankan      html  css  js  c++  java
  • Netty4.0学习笔记系列之四:混合使用coder和handler

    Handler如何使用在前面的例子中已经有了示范,那么同样是扩展自ChannelHandler的Encoder和Decoder,与Handler混合后又是如何使用的?本文将通过一个实际的小例子来展示它们的用法。

    该例子模拟一个Server和Client,两者之间通过http协议进行通讯,在Server内部通过一个自定义的StringDecoder把httprequest转换成String。Server端处理完成后,通过StringEncoder把String转换成httpresponse,发送给客户端。具体的处理流程如图所示:

    其中红色框中的Decoder、Encoder及request都是Netty框架自带的,灰色框中的三个类是我自己实现的。

    Server端的类有:Server StringDecoder BusinessHandler StringEncoder四个类。

    1、Server 启动netty服务,并注册handler、coder,注意注册的顺序:

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import io.netty.bootstrap.ServerBootstrap;  
    4. import io.netty.channel.ChannelFuture;  
    5. import io.netty.channel.ChannelInitializer;  
    6. import io.netty.channel.ChannelOption;  
    7. import io.netty.channel.EventLoopGroup;  
    8. import io.netty.channel.nio.NioEventLoopGroup;  
    9. import io.netty.channel.socket.SocketChannel;  
    10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    11. import io.netty.handler.codec.http.HttpRequestDecoder;  
    12. import io.netty.handler.codec.http.HttpResponseEncoder;  
    13.   
    14. // 测试coder 和 handler 的混合使用  
    15. public class Server {  
    16.     public void start(int port) throws Exception {  
    17.         EventLoopGroup bossGroup = new NioEventLoopGroup();   
    18.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    19.         try {  
    20.             ServerBootstrap b = new ServerBootstrap();   
    21.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)   
    22.                     .childHandler(new ChannelInitializer<SocketChannel>() {   
    23.                                 @Override  
    24.                                 public void initChannel(SocketChannel ch) throws Exception {  
    25.                                     // 都属于ChannelOutboundHandler,逆序执行  
    26.                                     ch.pipeline().addLast(new HttpResponseEncoder());  
    27.                                     ch.pipeline().addLast(new StringEncoder());  
    28.                                       
    29.                                     // 都属于ChannelIntboundHandler,按照顺序执行  
    30.                                     ch.pipeline().addLast(new HttpRequestDecoder());  
    31.                                     ch.pipeline().addLast(new StringDecoder());  
    32.                                     ch.pipeline().addLast(new BusinessHandler());  
    33.                                 }  
    34.                             }).option(ChannelOption.SO_BACKLOG, 128)   
    35.                     .childOption(ChannelOption.SO_KEEPALIVE, true);   
    36.   
    37.             ChannelFuture f = b.bind(port).sync();   
    38.   
    39.             f.channel().closeFuture().sync();  
    40.         } finally {  
    41.             workerGroup.shutdownGracefully();  
    42.             bossGroup.shutdownGracefully();  
    43.         }  
    44.     }  
    45.   
    46.     public static void main(String[] args) throws Exception {  
    47.         Server server = new Server();  
    48.         server.start(8000);  
    49.     }  
    50. }  

    2、StringDecoder 把httpRequest转换成String,其中ByteBufToBytes是一个工具类,负责对ByteBuf中的数据进行读取

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.channel.ChannelInboundHandlerAdapter;  
    5. import io.netty.handler.codec.http.HttpContent;  
    6. import io.netty.handler.codec.http.HttpHeaders;  
    7. import io.netty.handler.codec.http.HttpRequest;  
    8.   
    9. import org.slf4j.Logger;  
    10. import org.slf4j.LoggerFactory;  
    11.   
    12. import com.guowl.utils.ByteBufToBytes;  
    13.   
    14. public class StringDecoder extends ChannelInboundHandlerAdapter {  
    15.     private static Logger   logger  = LoggerFactory.getLogger(StringDecoder.class);  
    16.     private ByteBufToBytes  reader;  
    17.   
    18.     @Override  
    19.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    20.         logger.info("StringDecoder : msg's type is " + msg.getClass());  
    21.         if (msg instanceof HttpRequest) {  
    22.             HttpRequest request = (HttpRequest) msg;  
    23.             reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(request));  
    24.         }  
    25.   
    26.         if (msg instanceof HttpContent) {  
    27.             HttpContent content = (HttpContent) msg;  
    28.             reader.reading(content.content());  
    29.   
    30.             if (reader.isEnd()) {  
    31.                 byte[] clientMsg = reader.readFull();  
    32.                 logger.info("StringDecoder : change httpcontent to string ");  
    33.                 ctx.fireChannelRead(new String(clientMsg));  
    34.             }  
    35.         }  
    36.     }  
    37.   
    38. }  

    3、BusinessHandler 具体处理业务的类,把客户端的请求打印出来,并向客户端发送信息

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.channel.ChannelInboundHandlerAdapter;  
    5.   
    6. import org.slf4j.Logger;  
    7. import org.slf4j.LoggerFactory;  
    8.   
    9. public class BusinessHandler extends ChannelInboundHandlerAdapter {  
    10.     private Logger  logger  = LoggerFactory.getLogger(BusinessHandler.class);  
    11.   
    12.     @Override  
    13.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    14.         String clientMsg = "client said : " + (String) msg;  
    15.         logger.info("BusinessHandler read msg from client :" + clientMsg);  
    16.         ctx.write("I am very OK!");  
    17.     }  
    18.   
    19.     @Override  
    20.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
    21.         ctx.flush();  
    22.     }  
    23. }  


    4、StringEncoder 把字符串转换成HttpResponse

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;  
    4. import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;  
    5. import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;  
    6. import static io.netty.handler.codec.http.HttpResponseStatus.OK;  
    7. import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;  
    8.   
    9. import org.slf4j.Logger;  
    10. import org.slf4j.LoggerFactory;  
    11.   
    12. import io.netty.buffer.Unpooled;  
    13. import io.netty.channel.ChannelHandlerContext;  
    14. import io.netty.channel.ChannelOutboundHandlerAdapter;  
    15. import io.netty.channel.ChannelPromise;  
    16. import io.netty.handler.codec.http.DefaultFullHttpResponse;  
    17. import io.netty.handler.codec.http.FullHttpResponse;  
    18. import io.netty.handler.codec.http.HttpHeaders.Values;  
    19.   
    20. // 把String转换成httpResponse  
    21. public class StringEncoder extends ChannelOutboundHandlerAdapter {  
    22.     private Logger  logger  = LoggerFactory.getLogger(StringEncoder.class);  
    23.   
    24.     @Override  
    25.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
    26.         logger.info("StringEncoder response to client.");  
    27.         String serverMsg = (String) msg;  
    28.   
    29.         FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(serverMsg  
    30.                 .getBytes()));  
    31.         response.headers().set(CONTENT_TYPE, "text/plain");  
    32.         response.headers().set(CONTENT_LENGTH, response.content().readableBytes());  
    33.         response.headers().set(CONNECTION, Values.KEEP_ALIVE);  
    34.         ctx.write(response);  
    35.         ctx.flush();  
    36.     }  
    37. }  

    Client端有两个类:Client ClientInitHandler
    1、Client 与Server端建立连接,并向Server端发送HttpRequest请求。

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import io.netty.bootstrap.Bootstrap;  
    4. import io.netty.buffer.Unpooled;  
    5. import io.netty.channel.ChannelFuture;  
    6. import io.netty.channel.ChannelInitializer;  
    7. import io.netty.channel.ChannelOption;  
    8. import io.netty.channel.EventLoopGroup;  
    9. import io.netty.channel.nio.NioEventLoopGroup;  
    10. import io.netty.channel.socket.SocketChannel;  
    11. import io.netty.channel.socket.nio.NioSocketChannel;  
    12. import io.netty.handler.codec.http.DefaultFullHttpRequest;  
    13. import io.netty.handler.codec.http.HttpHeaders;  
    14. import io.netty.handler.codec.http.HttpMethod;  
    15. import io.netty.handler.codec.http.HttpRequestEncoder;  
    16. import io.netty.handler.codec.http.HttpResponseDecoder;  
    17. import io.netty.handler.codec.http.HttpVersion;  
    18.   
    19. import java.net.URI;  
    20.   
    21. public class Client {  
    22.     public void connect(String host, int port) throws Exception {  
    23.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    24.   
    25.         try {  
    26.             Bootstrap b = new Bootstrap();   
    27.             b.group(workerGroup);   
    28.             b.channel(NioSocketChannel.class);   
    29.             b.option(ChannelOption.SO_KEEPALIVE, true);   
    30.             b.handler(new ChannelInitializer<SocketChannel>() {  
    31.                 @Override  
    32.                 public void initChannel(SocketChannel ch) throws Exception {  
    33.                     ch.pipeline().addLast(new HttpResponseDecoder());  
    34.                     ch.pipeline().addLast(new HttpRequestEncoder());  
    35.                     ch.pipeline().addLast(new ClientInitHandler());  
    36.                 }  
    37.             });  
    38.   
    39.             // Start the client.  
    40.             ChannelFuture f = b.connect(host, port).sync();  
    41.   
    42.             URI uri = new URI("http://127.0.0.1:8000");  
    43.             String msg = "Are you ok?";  
    44.             DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,  
    45.                     uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes()));  
    46.   
    47.             request.headers().set(HttpHeaders.Names.HOST, host);  
    48.             request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);  
    49.             request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());  
    50.             f.channel().write(request);  
    51.             f.channel().flush();  
    52.             f.channel().closeFuture().sync();  
    53.         } finally {  
    54.             workerGroup.shutdownGracefully();  
    55.         }  
    56.   
    57.     }  
    58.   
    59.     public static void main(String[] args) throws Exception {  
    60.         Client client = new Client();  
    61.         client.connect("127.0.0.1", 8000);  
    62.     }  
    63. }  

    2、ClientInitHandler 从Server端读取响应信息

    [java] view plain copy
    1. package com.guowl.testmulticoderandhandler;  
    2.   
    3. import io.netty.buffer.ByteBuf;  
    4. import io.netty.channel.ChannelHandlerContext;  
    5. import io.netty.channel.ChannelInboundHandlerAdapter;  
    6. import io.netty.handler.codec.http.HttpContent;  
    7. import io.netty.handler.codec.http.HttpHeaders;  
    8. import io.netty.handler.codec.http.HttpResponse;  
    9.   
    10. import com.guowl.utils.ByteBufToBytes;  
    11.   
    12. public class ClientInitHandler extends ChannelInboundHandlerAdapter {  
    13.     private ByteBufToBytes  reader;  
    14.   
    15.     @Override  
    16.     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
    17.         if (msg instanceof HttpResponse) {  
    18.             HttpResponse response = (HttpResponse) msg;  
    19.             if (HttpHeaders.isContentLengthSet(response)) {  
    20.                 reader = new ByteBufToBytes((int) HttpHeaders.getContentLength(response));  
    21.             }  
    22.         }  
    23.   
    24.         if (msg instanceof HttpContent) {  
    25.             HttpContent httpContent = (HttpContent) msg;  
    26.             ByteBuf content = httpContent.content();  
    27.             reader.reading(content);  
    28.             content.release();  
    29.   
    30.             if (reader.isEnd()) {  
    31.                 String resultStr = new String(reader.readFull());  
    32.                 System.out.println("Server said:" + resultStr);  
    33.             }  
    34.         }  
    35.     }  
    36.   
    37.     @Override  
    38.     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
    39.         ctx.close();  
    40.     }  
    41.   
    42. }  

    工具类:ByteBufToBytes 对ByteBuf的数据进行读取,支持流式读取(reading 和 readFull方法结合使用)
    ByteBufToBytes 

    [java] view plain copy
    1. package com.guowl.utils;  
    2.   
    3. import io.netty.buffer.ByteBuf;  
    4. import io.netty.buffer.Unpooled;  
    5.   
    6. public class ByteBufToBytes {  
    7.     private ByteBuf temp;  
    8.   
    9.     private boolean end = true;  
    10.   
    11.     public ByteBufToBytes() {}  
    12.   
    13.     public ByteBufToBytes(int length) {  
    14.         temp = Unpooled.buffer(length);  
    15.     }  
    16.   
    17.     public void reading(ByteBuf datas) {  
    18.         datas.readBytes(temp, datas.readableBytes());  
    19.         if (this.temp.writableBytes() != 0) {  
    20.             end = false;  
    21.         } else {  
    22.             end = true;  
    23.         }  
    24.     }  
    25.   
    26.     public boolean isEnd() {  
    27.         return end;  
    28.     }  
    29.   
    30.     public byte[] readFull() {  
    31.         if (end) {  
    32.             byte[] contentByte = new byte[this.temp.readableBytes()];  
    33.             this.temp.readBytes(contentByte);  
    34.             this.temp.release();  
    35.             return contentByte;  
    36.         } else {  
    37.             return null;  
    38.         }  
    39.     }  
    40.   
    41.     public byte[] read(ByteBuf datas) {  
    42.         byte[] bytes = new byte[datas.readableBytes()];  
    43.         datas.readBytes(bytes);  
    44.         return bytes;  
    45.     }  
    46. }  

    运行结果:

    2014-03-19 23:50:48 StringDecoder : msg's type is class io.netty.handler.codec.http.DefaultHttpRequest
    2014-03-19 23:50:48 StringDecoder : msg's type is class io.netty.handler.codec.http.DefaultLastHttpContent
    2014-03-19 23:50:48 StringDecoder : change httpcontent to string 
    2014-03-19 23:50:48 BusinessHandler read msg from client :client said : Are you ok?
    2014-03-19 23:50:48 StringEncoder response to client.

    可以看到执行顺序为:StringDecoder BusinessHandler StringEncoder ,其它的都是Netty自身的,没有打印。

    通过该实例证明,Encoder、Decoder的本质也是Handler,它们的执行顺序、使用方法与Handler保持一致。

    执行顺序是:Encoder 先注册的后执行,与OutboundHandler一致;Decoder是先注册的先执行,与InboundHandler一致。

  • 相关阅读:
    动手动脑
    编写一个程序,用户输入两个数,求出其加减乘除,并用消息框显示计算结果
    实验报告
    《大道至简第二章读后感》
    《大道至简》第一章读后感
    CentOS 6.x 播放 mp3 音乐 —— 成功
    CentOS下通过rdesktop连接Windows远程桌面
    Linux之文件系统的简单操作
    Linux之档案管理
    如何判断raid1中哪块硬盘损坏?
  • 原文地址:https://www.cnblogs.com/zeroone/p/8490935.html
Copyright © 2011-2022 走看看