zoukankan      html  css  js  c++  java
  • c#解决TCP“粘包”问题

    一:TCP粘包产生的原理

    1,TCP粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。出现粘包现象的原因是多方面的,它既可能由发送方造成,也可能由接收方造成。

    2,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据。若连续几次发送的数据都很少,通常TCP会根据优化算法把这些数据合成一包后一次发送出去,这样接收方就收到了粘包数据。接收方引起的粘包是由于接收方用户进程不及时接收数据,从而导致粘包现象。

    3,这是因为接收方先把收到的数据放在系统接收缓冲区,用户进程从该缓冲区取数据,若下一包数据到达时前一包数据尚未被用户进程取走,则下一包数据放到系统接收缓冲区时就接到前一包数据之后,而用户进程根据预先设定的缓冲区大小从系统接收缓冲区取数据,这样就一次取到了多包数据。

    二:组件化解决粘包

    通过第三方的组件处理这个问题比较方便和快捷,比如国产开源的HP-SOCKET等。

    HP-SOCKET官方地址

    HP-Socket 是一套通用的高性能 TCP/UDP/HTTP 通信框架,包含服务端组件、客户端组件和Agent组件,广泛适用于各种不同应用场景的 TCP/UDP/HTTP 通信系统,提供 C/C++、C#、Delphi、E(易语言)、Java、Python 等编程语言接口。HP-Socket 对通信层实现完全封装,应用程序不必关注通信层的任何细节;HP-Socket 提供基于事件通知模型的 API 接口,能非常简单高效地整合到新旧应用程序中。

    为了让使用者能方便快速地学习和使用 HP-Socket ,迅速掌握框架的设计思想和使用方法,特此精心制作了大量 Demo 示例(如:PUSH 模型示例、PULL 模型示例、PACK 模型示例、性能测试示例以及其它编程语言示例)。HP-Socket 目前支持 Windows 和 Linux 平台。

    HP-Socket 的设计充分注重功能、通用型、易用性与伸缩性:

    通用性

    • HP-Socket 的唯一职责就是接收和发送字节流,不参与应用程序的协议解析等工作。

    • HP-Socket 与应用程序通过接口进行交互,并完全解耦。任何应用只要实现了HP-Socket的接口规范都可以无缝整合 HP-Socket。

    易用性

    • 易用性对所有通用框架都是至关重要的,如果太难用还不如自己重头写一个来得方便。因此,HP-Socket 的接口设计得非常简单和统一。

    • HP-Socket 完全封装了所有底层通信细节,应用程序不必也不能干预底层通信操作。通信连接被抽象为Connection ID,Connection ID 作为连接的唯一标识提供给应用程序来处理不同的连接。

    • HP-Socket 提供 PUSH / PULL / PACK 等接收模型, 应用程序可以灵活选择以手工方式、 半自动方式或全自动方式处理封解包, PULL / PACK 接收模型在降低封解包处理复杂度的同时能大大减少出错几率。

    高性能

    • Server 组件:基于IOCP / EPOLL通信模型,并结合缓存池、私有堆等技术实现高效内存管理,支持超大规模、高并发通信场景。

    • Agent 组件:Agent组件实质上是Multi-Client组件,与Server组件采用相同的技术架构。一个Agent组件对象可同时建立和高效处理大规模Socket连接。

    • Client 组件:基于Event Select / POLL通信模型,每个组件对象创建一个通信线程并管理一个Socket连接,适用于小规模客户端场景。

    伸缩性

          应用程序能够根据不同的容量要求、通信规模和资源状况等现实场景调整 HP-Socket 的各项性能参数(如:工作线程的数量、缓存池的大小、发送模式和接收模式等),优化资源配置,在满足应用需求的同时不必过度浪费资源。

    三:解决原理及代码实现

    1,采用包头(固定长度,里面存着包体的长度,发送时动态获取)+包体的传输机制。如图

    HeaderSize 存放着包体的长度,其HeaderSize本身是定长4字节;

    一个完整的数据包(L)=HeaderSize+BodySize;

    2,分包算法

      其基本思路是首先将待处理的接收数据流即系统缓冲区数据(长度设为M)强行转换成预定的结构数据形式,并从中取出结构数据长度字段L,而后根据包头计算得到第一包数据长度。

           M=系统缓冲区大小;L=用户发送的数据包=HeaderSize+BodySize;

      1)若L<M,则表明数据流包含多包数据,从其头部截取若干个字节存入临时缓冲区,剩余部分数据依此继续循环处理,直至结束。

         

      2)若L=M,则表明数据流内容恰好是一完整结构数据(即用户自定义缓冲区等于系统接收缓冲区大小),直接将其存入临时缓冲区即可。

          

      3)若L>M,则表明数据流内容尚不够构成一完整结构数据,需留待与下一包数据合并后再行处理。

            

          4)下面是代码代码实现(HP-SOCKET框架的服务器端来接收数据)

    int headSize = 4;//包头长度 固定4
            byte[] surplusBuffer = null;//不完整的数据包,即用户自定义缓冲区
            /// <summary>
            /// 接收客户端发来的数据
            /// </summary>
            /// <param name="connId">每个客户的会话ID</param>
            /// <param name="bytes">缓冲区数据</param>
            /// <returns></returns>
            private HandleResult OnReceive(IntPtr connId, byte[] bytes) 
            {
                //bytes 为系统缓冲区数据
                //bytesRead为系统缓冲区长度
                int bytesRead = bytes.Length;
                if (bytesRead > 0)
                {
                    if (surplusBuffer == null)//判断是不是第一次接收,为空说是第一次
                        surplusBuffer = bytes;//把系统缓冲区数据放在自定义缓冲区里面
                    else
                        surplusBuffer = surplusBuffer.Concat(bytes).ToArray();//拼接上一次剩余的包
                    //已经完成读取每个数据包长度
                    int haveRead = 0;
                    //这里totalLen的长度有可能大于缓冲区大小的(因为 这里的surplusBuffer 是系统缓冲区+不完整的数据包)
                    int totalLen = surplusBuffer.Length;
                    while (haveRead <= totalLen)
                    {
                        //如果在N此拆解后剩余的数据包连一个包头的长度都不够
                        //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包
                        if (totalLen - haveRead < headSize)
                        {
                            byte[] byteSub = new byte[totalLen - haveRead];
                            //把剩下不够一个完整的数据包存起来
                            Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
                            surplusBuffer = byteSub;
                            totalLen = 0;
                            break;
                        }
                        //如果够了一个完整包,则读取包头的数据
                        byte[] headByte = new byte[headSize];
                        Buffer.BlockCopy(surplusBuffer, haveRead, headByte, 0, headSize);//从缓冲区里读取包头的字节
                        int bodySize = BitConverter.ToInt32(headByte, 0);//从包头里面分析出包体的长度
    
                        //这里的 haveRead=等于N个数据包的长度 从0开始;0,1,2,3....N
                        //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存
                        if (haveRead + headSize + bodySize > totalLen)
                        {
                            byte[] byteSub = new byte[totalLen - haveRead];
                            Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
                            surplusBuffer = byteSub;
                            break;
                        }
                        else
                        {
                            //挨个分解每个包,解析成实际文字
                            String strc = Encoding.UTF8.GetString(surplusBuffer, haveRead + headSize, bodySize);
                            //AddMsg(string.Format(" > [OnReceive] -> {0}", strc));
                            //依次累加当前的数据包的长度
                            haveRead = haveRead + headSize + bodySize;
                            if (headSize + bodySize == bytesRead)//如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0
                            {
                                surplusBuffer = null;//设置空 回到原始状态
                                totalLen = 0;//清0
                            }
                        }
                    }
                }
                return HandleResult.Ok;
            }

    值此完成拆包解析文字工作。但实际上还没完成,如果这段代码是客户端接收来自服务器的数据的话就没问题了。

    仔细看IntPtr connId 每个连接的会话ID

    private HandleResult OnReceive(IntPtr connId, byte[] bytes)

    {

    }

    但是服务器端还要分辨出 每个数据包是哪个会话产生的,因为服务器端是多线程,多用户的模式,第一个数据包和第二个可能来自不同会话的数据,所以上面的代码只适用于单会话模式。

    下面我要解决这个问题。

    采用c#安全的ConcurrentDictionary,具体参考 https://msdn.microsoft.com/zh-cn/library/dd287191(v=vs.110).aspx

    最新的代码

         //线程安全的字典
            ConcurrentDictionary<IntPtr, byte[]> dic = new ConcurrentDictionary<IntPtr, byte[]>();
            int headSize = 4;//包头长度 固定4
            /// <summary>
            /// 接收客户端发来的数据
            /// </summary>
            /// <param name="connId">每个客户的会话ID</param>
            /// <param name="bytes">缓冲区数据</param>
            /// <returns></returns>
            private HandleResult OnReceive(IntPtr connId, byte[] bytes) 
            {
                //bytes 为系统缓冲区数据
                //bytesRead为系统缓冲区长度
                int bytesRead = bytes.Length;
                if (bytesRead > 0)
                {
                    byte[] surplusBuffer = null;
                    if (dic.TryGetValue(connId, out surplusBuffer))
                    {
                        byte[] curBuffer = surplusBuffer.Concat(bytes).ToArray();//拼接上一次剩余的包
                        //更新会话ID 的最新字节
                        dic.TryUpdate(connId, curBuffer, surplusBuffer);
                        surplusBuffer = curBuffer;//同步
                    }
                    else
                    {
                        //添加会话ID的bytes
                        dic.TryAdd(connId, bytes);
                        surplusBuffer = bytes;//同步
                    }
    
                    //已经完成读取每个数据包长度
                    int haveRead = 0;
                    //这里totalLen的长度有可能大于缓冲区大小的(因为 这里的surplusBuffer 是系统缓冲区+不完整的数据包)
                    int totalLen = surplusBuffer.Length;
                    while (haveRead <= totalLen)
                    {
                        //如果在N此拆解后剩余的数据包连一个包头的长度都不够
                        //说明是上次读取N个完整数据包后,剩下的最后一个非完整的数据包
                        if (totalLen - haveRead < headSize)
                        {
                            byte[] byteSub = new byte[totalLen - haveRead];
                            //把剩下不够一个完整的数据包存起来
                            Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
                            dic.TryUpdate(connId, byteSub, surplusBuffer);
                            surplusBuffer = byteSub;
                            totalLen = 0;
                            break;
                        }
                        //如果够了一个完整包,则读取包头的数据
                        byte[] headByte = new byte[headSize];
                        Buffer.BlockCopy(surplusBuffer, haveRead, headByte, 0, headSize);//从缓冲区里读取包头的字节
                        int bodySize = BitConverter.ToInt32(headByte, 0);//从包头里面分析出包体的长度
    
                        //这里的 haveRead=等于N个数据包的长度 从0开始;0,1,2,3....N
                        //如果自定义缓冲区拆解N个包后的长度 大于 总长度,说最后一段数据不够一个完整的包了,拆出来保存
                        if (haveRead + headSize + bodySize > totalLen)
                        {
                            byte[] byteSub = new byte[totalLen - haveRead];
                            Buffer.BlockCopy(surplusBuffer, haveRead, byteSub, 0, totalLen - haveRead);
                            dic.TryUpdate(connId, byteSub, surplusBuffer);
                            surplusBuffer = byteSub;
                            break;
                        }
                        else
                        {
                            //挨个分解每个包,解析成实际文字
                            String strc = Encoding.UTF8.GetString(surplusBuffer, haveRead + headSize, bodySize);
                            AddMsg(string.Format(" > {0}[OnReceive] -> {1}", connId, strc));
                            //依次累加当前的数据包的长度
                            haveRead = haveRead + headSize + bodySize;
                            
                  //如果当前接收的数据包长度正好等于缓冲区长度,则待拼接的不规则数据长度归0
                  if (headSize + bodySize == bytesRead)
                            {
                                byte[] xbtye=null;
                                dic.TryRemove(connId, out xbtye);
                                surplusBuffer = null;//设置空 回到原始状态
                                totalLen = 0;//清0
                            }
                        }
                    }
                }
                return HandleResult.Ok;
            }

    这样就解决了,多客户端会话造成的接收混乱。至此所有工作完成。以上代码就是为了参考学习,如果实在不想这么麻烦。可以直接使用HP-SOCKET通信框架的PACK模型,里面自动实现了解决粘包的问题。 

  • 相关阅读:
    JID 2.0 RC4 发布,高性能的 Java 序列化库
    FBReaderJ 1.6.3 发布,Android 电子书阅读器
    Arquillian 1.0.3.Final 发布,单元测试框架
    JavaScript 的宏扩展 Sweet.js
    Hypertable 0.9.6.5 发布,分布式数据库
    JRuby 1.7.0 发布,默认使用 Ruby 1.9 模式
    httppp 1.4.0 发布,HTTP响应时间监控
    Redis 2.6.0 正式版发布,高性能K/V服务器
    OfficeFloor 2.5.0 发布,IoC 框架
    XWiki 4.3 首个里程碑发布
  • 原文地址:https://www.cnblogs.com/wangjun8868/p/7160661.html
Copyright © 2011-2022 走看看