zoukankan      html  css  js  c++  java
  • Netty之解决TCP粘包拆包(自定义协议)

    1、什么是粘包/拆包

           一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘包呢?主要原因是TCP是以流的方式来处理数据,再加上网络上MTU的往往小于在应用处理的消息数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的存在。处理粘包的唯一方法就是制定应用层的数据通讯协议,通过协议来规范现有接收的数据是否满足消息数据的需要。

    2、解决办法

         2.1、消息定长,报文大小固定长度,不够空格补全,发送和接收方遵循相同的约定,这样即使粘包了通过接收方编程实现获取定长报文也能区分。

         2.2、包尾添加特殊分隔符,例如每条报文结束都添加回车换行符(例如FTP协议)或者指定特殊字符作为报文分隔符,接收方通过特殊分隔符切分报文区分。

         2.3、将消息分为消息头和消息体,消息头中包含表示信息的总长度(或者消息体长度)的字段

    3、自定义协议,来实现TCP的粘包/拆包问题

          3.0  自定义协议,开始标记           

                  

          3.1  自定义协议的介绍

                 

          3.2  自定义协议的类的封装

                 

          3.3  自定义协议的编码器

                 

          3.4  自定义协议的解码器

              

    4、协议相关的实现

          4.1  协议的封装

    [java] view plain copy
     
     print?
    1. import java.util.Arrays;  
    2.   
    3. /** 
    4.  * <pre> 
    5.  * 自己定义的协议 
    6.  *  数据包格式 
    7.  * +——----——+——-----——+——----——+ 
    8.  * |协议开始标志|  长度             |   数据       | 
    9.  * +——----——+——-----——+——----——+ 
    10.  * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 
    11.  * 2.传输数据的长度contentLength,int类型 
    12.  * 3.要传输的数据 
    13.  * </pre> 
    14.  */  
    15. public class SmartCarProtocol {  
    16.     /** 
    17.      * 消息的开头的信息标志 
    18.      */  
    19.     private int head_data = ConstantValue.HEAD_DATA;  
    20.     /** 
    21.      * 消息的长度 
    22.      */  
    23.     private int contentLength;  
    24.     /** 
    25.      * 消息的内容 
    26.      */  
    27.     private byte[] content;  
    28.   
    29.     /** 
    30.      * 用于初始化,SmartCarProtocol 
    31.      *  
    32.      * @param contentLength 
    33.      *            协议里面,消息数据的长度 
    34.      * @param content 
    35.      *            协议里面,消息的数据 
    36.      */  
    37.     public SmartCarProtocol(int contentLength, byte[] content) {  
    38.         this.contentLength = contentLength;  
    39.         this.content = content;  
    40.     }  
    41.   
    42.     public int getHead_data() {  
    43.         return head_data;  
    44.     }  
    45.   
    46.     public int getContentLength() {  
    47.         return contentLength;  
    48.     }  
    49.   
    50.     public void setContentLength(int contentLength) {  
    51.         this.contentLength = contentLength;  
    52.     }  
    53.   
    54.     public byte[] getContent() {  
    55.         return content;  
    56.     }  
    57.   
    58.     public void setContent(byte[] content) {  
    59.         this.content = content;  
    60.     }  
    61.   
    62.     @Override  
    63.     public String toString() {  
    64.         return "SmartCarProtocol [head_data=" + head_data + ", contentLength="  
    65.                 + contentLength + ", content=" + Arrays.toString(content) + "]";  
    66.     }  
    67.   
    68. }  

          4.2  协议的编码器

    [java] view plain copy
     
     print?
    1. /** 
    2.  * <pre> 
    3.  * 自己定义的协议 
    4.  *  数据包格式 
    5.  * +——----——+——-----——+——----——+ 
    6.  * |协议开始标志|  长度             |   数据       | 
    7.  * +——----——+——-----——+——----——+ 
    8.  * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 
    9.  * 2.传输数据的长度contentLength,int类型 
    10.  * 3.要传输的数据 
    11.  * </pre> 
    12.  */  
    13. public class SmartCarEncoder extends MessageToByteEncoder<SmartCarProtocol> {  
    14.   
    15.     @Override  
    16.     protected void encode(ChannelHandlerContext tcx, SmartCarProtocol msg,  
    17.             ByteBuf out) throws Exception {  
    18.         // 写入消息SmartCar的具体内容  
    19.         // 1.写入消息的开头的信息标志(int类型)  
    20.         out.writeInt(msg.getHead_data());  
    21.         // 2.写入消息的长度(int 类型)  
    22.         out.writeInt(msg.getContentLength());  
    23.         // 3.写入消息的内容(byte[]类型)  
    24.         out.writeBytes(msg.getContent());  
    25.     }  
    26. }  

          4.3  协议的解码器

    [java] view plain copy
     
     print?
    1. import java.util.List;  
    2. import io.netty.buffer.ByteBuf;  
    3. import io.netty.channel.ChannelHandlerContext;  
    4. import io.netty.handler.codec.ByteToMessageDecoder;  
    5.   
    6. /** 
    7.  * <pre> 
    8.  * 自己定义的协议 
    9.  *  数据包格式 
    10.  * +——----——+——-----——+——----——+ 
    11.  * |协议开始标志|  长度             |   数据       | 
    12.  * +——----——+——-----——+——----——+ 
    13.  * 1.协议开始标志head_data,为int类型的数据,16进制表示为0X76 
    14.  * 2.传输数据的长度contentLength,int类型 
    15.  * 3.要传输的数据,长度不应该超过2048,防止socket流的攻击 
    16.  * </pre> 
    17.  */  
    18. public class SmartCarDecoder extends ByteToMessageDecoder {  
    19.   
    20.     /** 
    21.      * <pre> 
    22.      * 协议开始的标准head_data,int类型,占据4个字节. 
    23.      * 表示数据的长度contentLength,int类型,占据4个字节. 
    24.      * </pre> 
    25.      */  
    26.     public final int BASE_LENGTH = 4 + 4;  
    27.   
    28.     @Override  
    29.     protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,  
    30.             List<Object> out) throws Exception {  
    31.         // 可读长度必须大于基本长度  
    32.         if (buffer.readableBytes() >= BASE_LENGTH) {  
    33.             // 防止socket字节流攻击  
    34.             // 防止,客户端传来的数据过大  
    35.             // 因为,太大的数据,是不合理的  
    36.             if (buffer.readableBytes() > 2048) {  
    37.                 buffer.skipBytes(buffer.readableBytes());  
    38.             }  
    39.   
    40.             // 记录包头开始的index  
    41.             int beginReader;  
    42.   
    43.             while (true) {  
    44.                 // 获取包头开始的index  
    45.                 beginReader = buffer.readerIndex();  
    46.                 // 标记包头开始的index  
    47.                 buffer.markReaderIndex();  
    48.                 // 读到了协议的开始标志,结束while循环  
    49.                 if (buffer.readInt() == ConstantValue.HEAD_DATA) {  
    50.                     break;  
    51.                 }  
    52.   
    53.                 // 未读到包头,略过一个字节  
    54.                 // 每次略过,一个字节,去读取,包头信息的开始标记  
    55.                 buffer.resetReaderIndex();  
    56.                 buffer.readByte();  
    57.   
    58.                 // 当略过,一个字节之后,  
    59.                 // 数据包的长度,又变得不满足  
    60.                 // 此时,应该结束。等待后面的数据到达  
    61.                 if (buffer.readableBytes() < BASE_LENGTH) {  
    62.                     return;  
    63.                 }  
    64.             }  
    65.   
    66.             // 消息的长度  
    67.   
    68.             int length = buffer.readInt();  
    69.             // 判断请求数据包数据是否到齐  
    70.             if (buffer.readableBytes() < length) {  
    71.                 // 还原读指针  
    72.                 buffer.readerIndex(beginReader);  
    73.                 return;  
    74.             }  
    75.   
    76.             // 读取data数据  
    77.             byte[] data = new byte[length];  
    78.             buffer.readBytes(data);  
    79.   
    80.             SmartCarProtocol protocol = new SmartCarProtocol(data.length, data);  
    81.             out.add(protocol);  
    82.         }  
    83.     }  
    84.   
    85. }  

          4.4  服务端加入协议的编/解码器

                

          4.5  客户端加入协议的编/解码器

              

    5、服务端的实现

    [java] view plain copy
     
     print?
    1. import io.netty.bootstrap.ServerBootstrap;  
    2. import io.netty.channel.ChannelFuture;  
    3. import io.netty.channel.ChannelInitializer;  
    4. import io.netty.channel.ChannelOption;  
    5. import io.netty.channel.EventLoopGroup;  
    6. import io.netty.channel.nio.NioEventLoopGroup;  
    7. import io.netty.channel.socket.SocketChannel;  
    8. import io.netty.channel.socket.nio.NioServerSocketChannel;  
    9. import io.netty.handler.logging.LogLevel;  
    10. import io.netty.handler.logging.LoggingHandler;  
    11.   
    12. public class Server {  
    13.   
    14.     public Server() {  
    15.     }  
    16.   
    17.     public void bind(int port) throws Exception {  
    18.         // 配置NIO线程组  
    19.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
    20.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    21.         try {  
    22.             // 服务器辅助启动类配置  
    23.             ServerBootstrap b = new ServerBootstrap();  
    24.             b.group(bossGroup, workerGroup)  
    25.                     .channel(NioServerSocketChannel.class)  
    26.                     .handler(new LoggingHandler(LogLevel.INFO))  
    27.                     .childHandler(new ChildChannelHandler())//  
    28.                     .option(ChannelOption.SO_BACKLOG, 1024) // 设置tcp缓冲区 // (5)  
    29.                     .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)  
    30.             // 绑定端口 同步等待绑定成功  
    31.             ChannelFuture f = b.bind(port).sync(); // (7)  
    32.             // 等到服务端监听端口关闭  
    33.             f.channel().closeFuture().sync();  
    34.         } finally {  
    35.             // 优雅释放线程资源  
    36.             workerGroup.shutdownGracefully();  
    37.             bossGroup.shutdownGracefully();  
    38.         }  
    39.     }  
    40.   
    41.     /** 
    42.      * 网络事件处理器 
    43.      */  
    44.     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
    45.         @Override  
    46.         protected void initChannel(SocketChannel ch) throws Exception {  
    47.             // 添加自定义协议的编解码工具  
    48.             ch.pipeline().addLast(new SmartCarEncoder());  
    49.             ch.pipeline().addLast(new SmartCarDecoder());  
    50.             // 处理网络IO  
    51.             ch.pipeline().addLast(new ServerHandler());  
    52.         }  
    53.     }  
    54.   
    55.     public static void main(String[] args) throws Exception {  
    56.         new Server().bind(9999);  
    57.     }  
    58. }  

    6、服务端Handler的实现

    [java] view plain copy
     
     print?
    1. import io.netty.channel.ChannelHandlerAdapter;  
    2. import io.netty.channel.ChannelHandlerContext;  
    3.   
    4. public class ServerHandler extends ChannelHandlerAdapter {  
    5.     // 用于获取客户端发送的信息  
    6.     @Override  
    7.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
    8.             throws Exception {  
    9.         // 用于获取客户端发来的数据信息  
    10.         SmartCarProtocol body = (SmartCarProtocol) msg;  
    11.         System.out.println("Server接受的客户端的信息 :" + body.toString());  
    12.   
    13.         // 会写数据给客户端  
    14.         String str = "Hi I am Server ...";  
    15.         SmartCarProtocol response = new SmartCarProtocol(str.getBytes().length,  
    16.                 str.getBytes());  
    17.         // 当服务端完成写操作后,关闭与客户端的连接  
    18.         ctx.writeAndFlush(response);  
    19.         // .addListener(ChannelFutureListener.CLOSE);  
    20.   
    21.         // 当有写操作时,不需要手动释放msg的引用  
    22.         // 当只有读操作时,才需要手动释放msg的引用  
    23.     }  
    24.   
    25.     @Override  
    26.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
    27.             throws Exception {  
    28.         // cause.printStackTrace();  
    29.         ctx.close();  
    30.     }  
    31. }  

    7、客户端的实现

    [java] view plain copy
     
     print?
    1. import io.netty.bootstrap.Bootstrap;  
    2. import io.netty.channel.ChannelFuture;  
    3. import io.netty.channel.ChannelInitializer;  
    4. import io.netty.channel.ChannelOption;  
    5. import io.netty.channel.EventLoopGroup;  
    6. import io.netty.channel.nio.NioEventLoopGroup;  
    7. import io.netty.channel.socket.SocketChannel;  
    8. import io.netty.channel.socket.nio.NioSocketChannel;  
    9.   
    10. public class Client {  
    11.   
    12.     /** 
    13.      * 连接服务器 
    14.      *  
    15.      * @param port 
    16.      * @param host 
    17.      * @throws Exception 
    18.      */  
    19.     public void connect(int port, String host) throws Exception {  
    20.         // 配置客户端NIO线程组  
    21.         EventLoopGroup group = new NioEventLoopGroup();  
    22.         try {  
    23.             // 客户端辅助启动类 对客户端配置  
    24.             Bootstrap b = new Bootstrap();  
    25.             b.group(group)//  
    26.                     .channel(NioSocketChannel.class)//  
    27.                     .option(ChannelOption.TCP_NODELAY, true)//  
    28.                     .handler(new MyChannelHandler());//  
    29.             // 异步链接服务器 同步等待链接成功  
    30.             ChannelFuture f = b.connect(host, port).sync();  
    31.   
    32.             // 等待链接关闭  
    33.             f.channel().closeFuture().sync();  
    34.   
    35.         } finally {  
    36.             group.shutdownGracefully();  
    37.             System.out.println("客户端优雅的释放了线程资源...");  
    38.         }  
    39.   
    40.     }  
    41.   
    42.     /** 
    43.      * 网络事件处理器 
    44.      */  
    45.     private class MyChannelHandler extends ChannelInitializer<SocketChannel> {  
    46.         @Override  
    47.         protected void initChannel(SocketChannel ch) throws Exception {  
    48.             // 添加自定义协议的编解码工具  
    49.             ch.pipeline().addLast(new SmartCarEncoder());  
    50.             ch.pipeline().addLast(new SmartCarDecoder());  
    51.             // 处理网络IO  
    52.             ch.pipeline().addLast(new ClientHandler());  
    53.         }  
    54.   
    55.     }  
    56.   
    57.     public static void main(String[] args) throws Exception {  
    58.         new Client().connect(9999, "127.0.0.1");  
    59.   
    60.     }  
    61.   
    62. }  

    8、客户端Handler的实现

    [java] view plain copy
     
     print?
    1. import io.netty.channel.ChannelHandlerAdapter;  
    2. import io.netty.channel.ChannelHandlerContext;  
    3. import io.netty.util.ReferenceCountUtil;  
    4.   
    5. //用于读取客户端发来的信息  
    6. public class ClientHandler extends ChannelHandlerAdapter {  
    7.   
    8.     // 客户端与服务端,连接成功的售后  
    9.     @Override  
    10.     public void channelActive(ChannelHandlerContext ctx) throws Exception {  
    11.         // 发送SmartCar协议的消息  
    12.         // 要发送的信息  
    13.         String data = "I am client ...";  
    14.         // 获得要发送信息的字节数组  
    15.         byte[] content = data.getBytes();  
    16.         // 要发送信息的长度  
    17.         int contentLength = content.length;  
    18.   
    19.         SmartCarProtocol protocol = new SmartCarProtocol(contentLength, content);  
    20.   
    21.         ctx.writeAndFlush(protocol);  
    22.     }  
    23.   
    24.     // 只是读数据,没有写数据的话  
    25.     // 需要自己手动的释放的消息  
    26.     @Override  
    27.     public void channelRead(ChannelHandlerContext ctx, Object msg)  
    28.             throws Exception {  
    29.         try {  
    30.             // 用于获取客户端发来的数据信息  
    31.             SmartCarProtocol body = (SmartCarProtocol) msg;  
    32.             System.out.println("Client接受的客户端的信息 :" + body.toString());  
    33.   
    34.         } finally {  
    35.             ReferenceCountUtil.release(msg);  
    36.         }  
    37.     }  
    38.   
    39.     @Override  
    40.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
    41.             throws Exception {  
    42.         ctx.close();  
    43.     }  
    44.   
    45. }  

    9、参考的博客地址

    [java] view plain copy
     
     print?
    1. http://www.cnblogs.com/whthomas/p/netty-custom-protocol.html  
    2. http://www.cnblogs.com/fanguangdexiaoyuer/p/6131042.html  
  • 相关阅读:
    Java jni字符串转换
    Python读取PE文件(exe/dll)中的时间戳
    深度学习word embedding猜测性别初探
    闪存内容汇编(截止20170405)
    如何自动化安装字体(命令行批量)
    如何分析进程的内存占用问题
    Python print报ascii编码异常的靠谱解决办法
    Linux界面自动化测试框架不完全汇总
    Qt实现同步(阻塞式)http get等网络访问操作
    基于第三方开源库的OPC服务器开发指南(4)——后记:与另一个开源库opc workshop库相关的问题
  • 原文地址:https://www.cnblogs.com/cnsanshao/p/10955034.html
Copyright © 2011-2022 走看看