zoukankan      html  css  js  c++  java
  • netty源码解析(4.0)-20 ChannelHandler: 自己实现一个自定义协议的服务器和客户端

      本章不会直接分析Netty源码,而是通过使用Netty的能力实现一个自定义协议的服务器和客户端。通过这样的实践,可以更深刻地理解Netty的相关代码,同时可以了解,在设计实现自定义协议的过程中需要解决的一些关键问题。

      本周章涉及到的代码可以从github上下载: https://github.com/brandonlyg/tinytransport.git

    设计协议

      本章要设计的协议是基于TCP的应用层协议。在设计一个协议之前需要先回答以下几个问题:

    • 使用场景是什么?
    • 这个协议有哪些功能?
    • 性能上有什么要求?
    • 对网络带宽有什么要求?
    • 安全上有哪些要求?  

      接下来依次回答这些问题:

      

      使用场景

      在可信任的内部网络中,不同进程之间高速交换消息。

      功能

    • 在客户端和服务器进行消息交换。
    • 发送消息然后异步接收响应。
    • 客户端和服务器之间可以保持长连接。
    • 传输大量的数据。

      性能

      数据包的提取性能接近内存copy。

      

      扩展性

      可以通过扩展header字段,进而扩展协议的功能。

      带宽

      尽量少的冗余数据,占用尽量小的带宽。

      

      安全

      由于是在可信任的内网中交互消息,没有特别端安全性要求。

      这些问题的答案,就是整个协议的设计要求。下面就按照这些设计要求来设计一套完整的协议,具体类容包括以下两个部分:

    • 数据包的格式。
    • 客户端和服务器端消息的交互规则。

    数据包格式的设计

      设计自己的数据包格式之前,我们先来回顾以下LengthFieldBasedFrameDecoder能够处理的数据包格式: 

      | header | contentLength | conent | 

      这个类把header的设计留给了子类,现在我们的注意力只需要集中在header字段上即可。下面是header设计:

      | begin | version | cmd | contentType | compression | sequenceId | resCode | 

      整个数据包的格式就是:

      | begin | version | cmd | contentType | compression | sequenceId | resCode | contentLength | content |

      现在来看一下这个数据包能实现哪些设计要求。

      begin

      类型: 32位无符号整数(uint32),这字段是一个常量,用来准确第定位到数据包的开始位置,这样就能更准确地分离出数据包,进而保证了“客户端和服务器端进行消息交换”。它的设计还要平衡数据包提取性能和准确性。严格来说,数据包中只能有一个begin,形式化描述如下:

      1. 设一个数据包P的长度是L,P(i)表示数据包中任意一个Byte,begin=0XADEF4BC9(这个值可以任意选择,尽量不选择有意义的数字)。

      2. 设反序列化一个uint32的算法是ui=deserUint32(i), i>=0 && i < L。

      3. 必须满足: deserUint32(0) == begin, 且deserUint32(i) != begin, i > 0 && i < L。

      要在(1)(2)两个前提条件下满足第(3)点,需要设计一个转义符EC=0xFF, 对P中除begin以外的部分进行转义,转义算法是:

      如果deserUint32(i)==begin或P(i)==EC,  在P(i)前面插入EC。

      找到begin的算法是:

      如果deserUint32(i)==begin且P(i-1)!=EC。

      逆转义算法是:

      如果P(i)==EC, P(i+1)==EC或deserUint32(i+1)==begin,  删除P(i)。

      以上使用转义符的方案,虽然能够准确地找到begin,但算法复杂度是O(L),显然不能满足“接近内存copy"这个要求。但是如果不使用转义符,就可以达到这个性能要求。如果仔细计算一下begin重复的概率就会发现, 它的重复概率只有1/0x100000000,如果再结合length字段一起检查数据包的正确性,得到错误数据包的概率就会更低。不使用转义符,以极小的出错概率换取性能大幅提升是一笔合适的买卖。

      总的来说,begin可以满足两个设计要求: 消息交换,数据包的提取性能接近内存copy。

      

      version

      类型:uint8。协议的版本号,这个字段用来满足“扩展性”要求。每个version对应一种不同的header结构,换言之,知道了版本号,就知道怎样解析header。 

      cmd

      类型: uint8。这个字段用来定义不同数据包的功能。可以使用这个字段定义心跳数据包,使用心跳数据包让"服务器和客户端保持长连接"。此外业务层可使用这个字段定义自己需要的数据包。

      contentType

      类型: uint8。这个字段是content的类型。使用这个字段可以在content数据交给业务层之前,对他进行一下特殊的处理。用户可以定义自己的的消息类型。它可以加"消息交换"的能力。

      

      compression

      类型: uint8。 压缩算法。这个字段可以用来表示content使用的压缩算法。通过使用适当的压缩算法,压缩满足"传输大量数据"和"带宽"的要求。

      

      sequenceId

      类型: uint32。这个字段是数据包的唯一序列号。只需要保证在一个socket连接建立-断开周期内保证它的唯一性即可。使用这个ID,可以实现“发送消息然后异步接收响应”。

      

      resCode

      类型: uint8。响应数据包的状态码,用来在响应数据包中附带异常信息。  

      至此数据包的格式已经设计完毕。接下来设计必要的交互规则。

    协议交互规则设计

      使用心跳保持长连接

      cmd: PING(0x01), PONG(0x02)。客户端连接到服务器之后,每隔一段时间发送一个PING包,服务器端收到之后立即响应PONG包。服务器端在一个超时时间后没有收到PING就认为TCP连接不可用,主动端开。客户端在发送PING之后,经过一个超时时间后没有收到PONG就认为连接不可用,重新建立连接。

     

      消息的请求和响应

      cmd: REQUEST(0x10), RESPONSE(0x02)。客户端使用REQUEST包向服务器发送请求,服务使用RESPONSE包响应。请求和响应的sequenceId一致。

      

      推送消息

      cmd: PUSH(0x20)。使用PUSH向对方推送消息,不需要响应。

    代码分析

      这个轻量级的客户端和服务器框架在架构上分为4个部分:

    • 数据包: Frame, FrameDecoder, FrameEncoder, FrameGzipCodec。
    • 消息: FMessage, FrameToMessageDecoder, MessageToFrameEncode, FMessageHandler, FMessageTrait, FMTraits。
    • 客户端框架: TcpConnector, TcpClient。
    • 服务器端框架: TcpServer。

      由于前面已经详细讲解了设计原理,这里只重点分析一下关键代码。

      Frame

      Frame是数据包类型,它的主要功能是数据包的序列化(encode方法)和反序列化(decode)。

      序列化方法:

     1 /**
     2      * 把Frame对象编码成数据包
     3      * @param out
     4      */
     5     public void encode(ByteBuf out){
     6         out.writeInt(BEGIN);
     7         out.writeByte(header.getVersion());
     8         out.writeByte(header.getCmd().getValue());
     9         out.writeByte(header.getContentType());
    10         out.writeByte(header.getCompression());
    11         out.writeInt(header.getSequenceId());
    12         out.writeByte(header.getResCode());
    13 
    14         int contentLength = 0;
    15         if(null != content){
    16             contentLength = content.readableBytes();
    17         }
    18         if(contentLength > MAX_CONTENT_LENGTH){
    19             throw new TooLongFrameException("content too long. contentLength:"+contentLength);
    20         }
    21         out.writeShort(contentLength);
    22         if(null != content){
    23             out.writeBytes(content);
    24         }
    25     }

      6-12行,序列化header中除contentLength的其他字段。

      14-21行,序列化contentLength字段。

      22-24行,序列content。

      反序列化方法

     1 /**
     2      * 从数据包解码得到Frame
     3      * @param in 一个完整的数据包
     4      * @return Frame对象
     5      */
     6     public static Frame decode(ByteBuf in){
     7         if(in.readableBytes() < HEADER_LENGTH){
     8             throw new CorruptedFrameException("pack length less than header length("+HEADER_LENGTH+")");
     9         }
    10 
    11         //得到header
    12         Header header = new Header();
    13         in.readInt();
    14         header.setVersion(in.readByte());
    15         header.setCmd(Command.valueOf(in.readByte() & 0xFF));
    16         header.setContentType((byte)(in.readByte() & 0xFF));
    17         header.setCompression((byte)(in.readByte() & 0xFF));
    18         header.setSequenceId(in.readInt());
    19         header.setResCode((byte)(in.readByte() & 0xFF));
    20 
    21         //读出content
    22         int contentLength = in.readShort() & 0xFFFF;
    23         if(in.readableBytes() != contentLength){
    24             throw new CorruptedFrameException("content is not match."+in.readableBytes() + "-" + contentLength);
    25         }
    26 
    27         ByteBuf content = contentLength > 0 ? in.retainedSlice(in.readerIndex(), contentLength) : null;
    28         in.skipBytes(contentLength);
    29 
    30         //创建Frame对象
    31         Frame frame = new Frame();
    32         frame.setHeader(header);
    33         frame.setContent(content);
    34 
    35         if(null != content) content.release();
    36 
    37         return frame;
    38     }

      这段代码,注释已经比较清晰了,这里就不再多说。

      FrameDecoder

       这个类继承了LengthFieldBasedFrameDecoder,所以只需要很少的代码就可以从Byte流中分离出数据包。

     1     public FrameDecoder(){
     2         super(Frame.MAX_LENGTH, Frame.HEADER_LENGTH - 2, 2);
     3     }
     4 
     5     @Override
     6     protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
     7         //找到begin位置
     8         int start = in.readerIndex();
     9         int begin = in.getInt(start + 0);
    10         if(begin != Frame.BEGIN){
    11             dropFailedData(in);
    12         }
    13 
    14         //解码得到Frame对象
    15         ByteBuf dataPack = null;
    16         try{
    17             dataPack = (ByteBuf)super.decode(ctx, in);
    18             Frame frame = Frame.decode(dataPack);
    19             return frame;
    20         }finally {
    21             if(null != dataPack){
    22                 dataPack.release();
    23             }
    24         }
    25     }

      2行,设置了数据包的最大长度Frame.MAX_LENGTH, 数据包header除contentLength之外的长度Frame.HEADER_LENGTH-2, contentLength字段的长度。这样,只要正确地找到数据包的开始位置就能LengthFieldBasedFrameDecoder就能帮助我们把数据包提取出来。

      8-12行,确定数据包的开始位置。

      17-18行,提取数据包,并把数据包反序列化成Frame。

      FMessageTrait

      为了能够灵活地处理FMessage的content, 框架中定义了FMessageTrait接口,可以使用不同个FMessageTrait实现处理不同的content类型。

     1 /**
     2  * FMessage消息特征接口,根据不同的contentType进行Frame和FMessage之间的转换
     3  */
     4 public interface FMessageTrait {
     5 
     6     /**
     7      * 得到匹配的contentType
     8      * @return contentType的值
     9      */
    10     int getContentType();
    11 
    12     /**
    13      * 把FMessage转换成Frame
    14      * @param fmsg
    15      * @return
    16      * @throws EncoderException
    17      */
    18     Frame encode(FMessage fmsg) throws EncoderException;
    19 
    20     /**
    21      * 把Frame转换成FMessage
    22      * @param frame
    23      * @return
    24      * @throws DecoderException
    25      */
    26     FMessage decode(Frame frame) throws DecoderException;
    27 }

      FrameToMessageDecoder和MessageToFrameEncoder使用FMessageTrait进行FMessage和Frame之间的转换。

     1 /**
     2  * 把Frame转换成FMessage
     3  */
     4 @ChannelHandler.Sharable
     5 public class FrameToMessageDecoder extends MessageToMessageDecoder<Frame> {
     6 
     7     private Map<Integer, FMessageTrait> fmTraits = new HashMap<>();
     8 
     9 
    10     public void addFMessageTrait(FMessageTrait trait){
    11         fmTraits.put(trait.getContentType(), trait);
    12     }
    13 
    14     @Override
    15     protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
    16         int contentType = frame.getHeader().getContentType();
    17         FMessageTrait trait = fmTraits.get(contentType);
    18         if(null == trait){
    19             throw new EncoderException("can't find trait. contentType:"+contentType);
    20         }
    21 
    22         FMessage fmsg = trait.decode(frame);
    23         out.add(fmsg);
    24     }
    25 }

      10-12行,把FMessageTrait放入map中。构建contentType-FMessageTrait之间的映射。

      17行,从map中得到FMessageTrait。

      22行,使用FMessageTrait把Frame转换成FMessage。

      MessageToFrameEncoder的实现类似。不同的是在22处调用FMessageTrait的encode方法把FMessage转换成Frame。

      FMTraits中给出了几种常见的FMessageTrait实现:

    • FMTraitBytes:  处理byte array类型的content。
    • FMTraitString: 处理String类型的content。
    • FMTraitJson: 处理Json格式是content。
    • FMTraitProtobuf: 处理protobuf格式的content。

      他们都有一个共同的祖先AbstractFMTrait, 这个抽象类实现FMessageTrait的encode和decode方法,定义了两个抽象方法encodeContent和decodeContent,子类只需专注于content的处理就可以了。

      下面以FMTraitBytes为例,讲解一下FMessageTrait的具体实现。FMTraitBytes处理的FMessage类型要求conent是byte[]类型。

     1     public static final int BYTES = 0x01;
     2     public static final FMessageTrait FMTBytes = new FMTraitBytes();
     3     public static class FMTraitBytes extends AbstractFMTrait {
     4         protected int contentType;
     5 
     6         public FMTraitBytes(){
     7             this(BYTES);
     8         }
     9 
    10         public FMTraitBytes(int contentType){
    11             this.contentType = contentType;
    12         }
    13 
    14         @Override
    15         public int getContentType() {
    16             return contentType;
    17         }
    18 
    19         @Override
    20         protected ByteBuf encodeContent(FMessage fmsg) throws EncoderException{
    21             byte[] bytes = (byte[])fmsg.getContent();
    22 
    23             ByteBuf buf = null;
    24             if(null != bytes && bytes.length > 0){
    25                 buf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
    26                 buf.writeBytes(bytes);
    27             }
    28 
    29             return buf;
    30         }
    31 
    32         @Override
    33         protected Object decodeContent(Frame frame) throws DecoderException {
    34             ByteBuf buf = frame.getContent();
    35             byte[] bytes = null;
    36             if(null != buf && buf.readableBytes() > 0){
    37                 bytes = new byte[buf.readableBytes()];
    38                 buf.readBytes(bytes);
    39             }
    40 
    41             return bytes;
    42         }
    43     }

      6-17行,实现了contentType的设置和获取。

      21-29行,把FMessage的content转换成ByteBuf。

      34-42行, 发Frame的content转换成byte[]。

      FMessageHandler

      这是一个专门用来处理FMessage的ChannelInboundHandler。channelRead0方法负责把不同cmd的FMessage派发到专用方法处理,这些方法有:

    • onPing: 收到PING, 会自动响应一个PONG。
    • onPong: 收到PONG。
    • onRequest: 收到REQUEST。
    • onResponse: 收到RESPONSE。
    • onPush: 收到PUSH。

      客户端框架

      TcpConnector功能是发起连接,它的主要功能集中在以下三个方法中。

     1    public void addFMessageTrait(FMessageTrait trait){
     2         fmEncoder.addFMessageTrait(trait);
     3         fmDecoder.addFMessageTrait(trait);
     4     }
     5 
     6     public TcpClient connect(InetSocketAddress address) throws Exception{
     7         ChannelFuture future = bootstrap.connect(address);
     8         Channel channel = future.channel();
     9 
    10         TcpClient client = new TcpClient(channel, workerElg.next());
    11         channel.attr(TcpClient.CLIENT).set(client);
    12 
    13         future.sync();
    14 
    15         return client;
    16     }
    17 
    18    protected void doInitChannel(SocketChannel ch) throws Exception {
    19         ChannelPipeline pl = ch.pipeline();
    20 
    21         pl.addLast(H_FRAME_DECODER, new FrameDecoder());
    22         pl.addLast(H_FRAME_ENCODER, frameEncoder);
    23 
    24         pl.addLast(H_READ_TIMEOUT, new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
    25 
    26         pl.addLast(H_FM_DECODER, fmDecoder);
    27         pl.addLast(H_FM_ENCODER, fmEncoder);
    28 
    29         pl.addLast(H_FM_HANDLER, clientHandler);
    30     }

      addFMessageTrait设置FMessageTrait,开发者可以根据需要定制FMessage的处理能力,FMTraitBytes会默认添加。

      connect用来发起连接,创建TcpClient对象。

      doInitChannel初始化Channel, 开发者可以覆盖这个方法,定制channel的ChannelHandler。

      另外,TcpConnector内部实现了一个FMessageHandler的派生类ClientHandler。这个类的channelActive方法中启动一个定时任务定时发送PING。onResponse方法负责调用TcpClient的onResponse方法。

      TcpClient是客户端连接对象,它主要有两个方法:

      public boolean send(FMessage msg);

      public Promise<FMessage> send(FMessage msg, TimeUnit timeUnit, long timeout);

      第一个不处理响应。第二个可以异步数量响应。

      另外还有一个给TcpConnector使用的onResponse方法,用来触发第二个send返回Promise对象的回调。

      服务器端框架

      TcpServer是服务器端框架,它比较简单。开发者只需要覆盖doInitChannel,添加自己的ChannelHandler,就可以实现服务器端的定制。  

      

      

      

      

  • 相关阅读:
    linux指令备份
    jdk安装
    java-成员变量的属性与成员函数的覆盖
    Codeforces Round #384 (Div. 2) E
    Codeforces Round #384 (Div. 2) ABCD
    Codeforces Round #383 (Div. 2) D 分组背包
    ccpcfinal总结
    HDU 3966 & POJ 3237 & HYSBZ 2243 & HRBUST 2064 树链剖分
    HDU 5965 枚举模拟 + dp(?)
    Educational Codeforces Round 6 E dfs序+线段树
  • 原文地址:https://www.cnblogs.com/brandonli/p/11251519.html
Copyright © 2011-2022 走看看