zoukankan      html  css  js  c++  java
  • 高效的TCP数据拆包器


    高效的TCP数据拆包器 接收器,每秒拆1KB的包达到30万以上

        /// 数据同步协义数据接收器
        /// </summary>
        /// <remarks>
        /// 主要功能有
        /// 1.将一个TCPSocket的所有数据所有接收
        /// 2.解析协义
        /// 3.解析完毕后的协义调用 Handler通知外部处理
        /// 4.定义一个协义解析线程不停的解析协义
        /// </remarks>
        public class TCPReceiver : IDisposable
        {
            #region 构造函数
            /// <summary>
            /// 数据同步协义数据接收器 实例
            /// </summary>
            public TCPReceiver()
            {
    
    
            }
    
            /// <summary>
            /// 数据同步协义数据接收器 实例
            /// </summary>
            /// <param name="protocolhead">协议头</param>
            /// <param name="protocolfoot">协议尾</param>
            public TCPReceiver(byte[] protocolhead, byte[] protocolfoot = null)
            {
                //邦定包头,与包体
                PackageHead = protocolhead;
                PackageFoot = protocolfoot;
            }
    
            #endregion
    
            /// <summary>
            /// 最大单个协义体数据长度,默认10MB
            /// </summary>
            private int maxProtocolBinary = 1024 * 1024 * 10;
            /// <summary>
            /// 最大单个协义体数据长度
            /// </summary>
            public int MaxProtocolBinary
            {
                get { return maxProtocolBinary; }
                set { maxProtocolBinary = value; }
            }
    
            /// <summary>
            /// 是否正在执行
            /// </summary>
            public bool IsRuning { get; set; }
    
            private Task task = null;
            /// <summary>
            /// 当前处理解析协义的线程
            /// </summary>
            public Task PraseProtocolTask
            {
                get { return task; }
            }
    
            /// <summary>
            /// 接收数据处理事件
            /// </summary>
            public Action<byte[], Socket> ProtocolReceivedHandler
            {
                get;
                set;
            }
    
            /// <summary>
            /// 是从哪个节点接收的数据
            /// </summary>
            public Socket Handler
            {
                get;
                set;
            }
    
            #region 接收数据加入到队列
            /// <summary>
            /// 接收数据处理集合,默认开放1MB的空间
            /// </summary>
            // protected System.Collections.Generic.Queue<byte> byteQueue = new Queue<byte>(1024 * 1024);
    
            /// <summary>
            /// 默认开放500空间,100万次单纯加入用时95毫秒
            /// </summary>
            private Queue<byte[]> receiveByteArrayQueue = new Queue<byte[]>(500);
            /// <summary>
            /// 接入队列处理器
            /// </summary>
            protected Queue<byte[]> ReceiveByteArrayQueue
            {
                get { return receiveByteArrayQueue; }
            }
    
    #if DEBUG
            //private int cuount = 1;
    #endif
    
            /// <summary>
            /// 接收数据
            /// </summary>
            public void Receive(byte[] buff)
            {
    #if DEBUG
                //严重影响性能,会变慢1117倍
                // Console.WriteLine(buff.ToHex());
                //Console.WriteLine(buff.ByteArray2HexString());
                // Console.WriteLine("-----"+cuount++);
    #endif
    
                lock (receiveByteArrayQueue)
                {
                    //加入对像数据
                    receiveByteArrayQueue.Enqueue(buff);
                }
            }
            #endregion
    
            #region 线程控制
    
    
    
            /// <summary>
            /// 停止解析协义
            /// </summary>
            public void StopParseProtocol()
            {
                IsRuning = false;
                //throw new NotImplementedException("请编写代码,在线程停止后须要将缓存队列中的数据所有处理完毕");
                //在线程停止后须要将缓存队列中的数据所有处理完毕
                for (; receiveByteArrayQueue.Count > 0; )
                {
                    //处理数据
                    ProcessBytes();
                }
            }
            #endregion
    
    
            #region 解析协义数据
            /// <summary>
            /// 分包用包头
            /// </summary>
            private byte[] packageHead = new byte[] { 0x7e };//0x7e
    
    
            /// <summary>
            /// 分包用包头
            /// </summary>
            public byte[] PackageHead
            {
                get { return packageHead; }
                set
                {
                    if (value != null)
                    {
                        packageHead = value;
                    }
                }
            }
            /// <summary>
            /// 分包用包尾
            /// </summary>
            private byte[] packageFoot = new byte[] { 0x7e };
            /// <summary>
            /// 分包用包尾
            /// </summary>
            public byte[] PackageFoot
            {
                get { return packageFoot; }
                set
                {
                    if (value != null)
                    {
                        packageFoot = value;
    
                    }
                }
            }
    
    
    
    
            /// <summary>
            /// 用于处理数据协义的功能
            /// </summary>
            List<byte> bytes = new List<byte>();
    
            /// <summary>
            /// 默认开 3MB的数据接收缓冲区,假设超过3MB则数据会挂掉
            /// </summary>
            //private byte[] ByteBuff = null;
    
            /// <summary>
            /// 协义数据实体队列,已经进行拆包后的协义数据
            /// </summary>
            private Queue<byte[]> ProtocolEntityQueue = new Queue<byte[]>(500);
    
            /// <summary>
            /// 找到分包用包头
            /// </summary>
            bool FindPackageHead = false;
            /// <summary>
            /// 找包头的当着序号
            /// </summary>
            int findHeadindex = 0;
            /// <summary>
            /// 找包尾
            /// </summary>
            int findFootIndex = 0;
    
            /// <summary>
            /// 解析协义方法
            /// 之所以先所有放到一个query里是进行高速的接收
            /// 
            /// </summary>
            public void PraseProtocol()
            {
                IsRuning = true;
                while (IsRuning)
                {
                    ProcessBytes();
                }
            }
            /// <summary>
            /// 处理队列中的数据删除包头,包尾巴
            /// </summary>
            public void ProcessBytes()
            {
                byte[] arr = null;
                //開始解析数据
                //1.取出数据
                lock (receiveByteArrayQueue)
                {
                    if (receiveByteArrayQueue.Count > 0)
                    {
                        arr = receiveByteArrayQueue.Dequeue();
                    }
                }
                if (arr != null)
                {
                    //锁处理
                    lock (bytes)
                    {
                        //此协义数据中的协义数据索引
                        // List<int> ints = new List<int>();
    
                        //2.将数据进行包查找
                        //開始从队列中取数据
                        for (int k = 0; k < arr.Length; k++)
                        {
                            //队列有数据
                            byte b = arr[k];
                            //假设超过最大接收字节数
                            if (maxProtocolBinary <= bytes.Count)
                            {
                                bytes.Clear();
                            }
                            //加入到对像集合
                            bytes.Add(b);
                            //3.从集合的前面開始取数据.找包头,进行拆包
                            #region 找包头
                            //等于包数据
                            if (packageHead.Length > 0 && b == packageHead[findHeadindex] && !FindPackageHead)
                            {
    
                                //包头找完
                                if (findHeadindex == packageHead.Length - 1)
                                {
    
                                    //ints.Add(k);
                                    System.Threading.Interlocked.Exchange(ref findHeadindex, 0);
                                    if (!FindPackageHead)
                                    {
                                        FindPackageHead = true;
                                    }
                                    //这里取一个完整包
                                    byte[] byteFarm = bytes.Take(bytes.Count - packageHead.Length).ToArray();
                                    //假设是有效的数据
                                    if (byteFarm.Length > packageHead.Length)
                                    {
                                        lock (ProtocolEntityQueue)
                                        {
                                            ProtocolEntityQueue.Enqueue(byteFarm);
                                        }
                                        //開始从 bytes 中移除数据
                                        bytes.Clear();
                                        //加入包头
                                        bytes.AddRange(packageHead);
                                    }
                                    //包头找完则找下一字节
                                    continue;
                                }
                                else
                                {
                                    System.Threading.Interlocked.Increment(ref findHeadindex);
                                }
                            }
                            else
                            {
                                System.Threading.Interlocked.Exchange(ref findHeadindex, 0);
                                //findHeadindex = 0;
                                if (!FindPackageHead && packageHead.Length == 0)
                                {
                                    FindPackageHead = true;
                                }
                            }
                            #endregion
    
                            #region 找包尾
    
                            if (packageFoot != null && packageFoot.Length > 0 && FindPackageHead)
                            {
                                if (b == packageFoot[findFootIndex])
                                {
                                    //包尾找完
                                    if (findFootIndex == packageFoot.Length - 1)
                                    {
                                        //删除包尾字节,可能会包括包头字节
                                          //byte[] byteFarm = bytes.Take(bytes.Count - packageFoot.Length).ToArray();
                                        byte[] byteFarm = bytes.ToArray();
                                        //跳过包头字节,包尾字节
                                        //byte[] byteFarm = bytes.Skip(packageHead.Length).Take(bytes.Count - (packageFoot.Length + packageHead.Length)).ToArray();
                                        //假设是有效的数据
                                        if (byteFarm.Length >= packageFoot.Length)
                                        {
                                            lock (ProtocolEntityQueue)
                                            {
                                                ProtocolEntityQueue.Enqueue(byteFarm);
                                            }
                                            //開始从 bytes 中移除数据
                                            bytes.Clear();
                                        }
                                        FindPackageHead = false;
                                        //包尾找完则找下一字节
                                        continue;
                                    }
                                    else
                                    {
                                        System.Threading.Interlocked.Increment(ref findFootIndex);
                                    }
                                }
                                else
                                {
                                    System.Threading.Interlocked.Exchange(ref findFootIndex, 0);
                                    //findFootIndex = 0;
    
                                }
                            }
    
                            #endregion
    
                        }
                    }
                    //4.又一次组成一个byte[] 进行数据解析
                    lock (ProtocolEntityQueue)
                    {
                        if (ProtocolEntityQueue.Count > 0)
                        {
                            //循环所有接收到的数据包
                            for (; ProtocolEntityQueue.Count > 0; )
                            {
    
                                //取取删除尾巴的的数据
    
    
                                //解析协议数据
                                byte[] bytearr = ProtocolEntityQueue.Dequeue();
    
    
                                //数据要大于分包的长度
                                if (bytearr.Length >= packageFoot.Length && bytearr.Length >= packageHead.Length)
                                {
                                    ProtocolReceivedHandler.Invoke(bytearr, Handler);
                                }
                            }
                        }
                    }
                }
                else
                {
                    //停止执行
                    IsRuning = false;
                    //System.Threading.Thread.Sleep(5);
                }
            }
    
            #endregion
    
            /// <summary>
            /// 析构方法
            /// </summary>
            public void Dispose()
            {
                StopParseProtocol();
            }
    
    
        }


    用法 

    TCPReceiver   rece = new  TCPReceiver();

    //将接收到的数据增加处理

    rece .Receive(buff);


    另起一个线程进行处理

    while(true)

    {

    rece .PraseProtocol();

    }


  • 相关阅读:
    阅读提问
    阅读笔记
    结对需求分析
    分工
    对软件工程课程的期望
    JAVAWEB-Spring Boot学习
    团队编程-项目作业6-程序维护
    团队-吃货之家-项目总结
    团队编程项目作业5-小组评分
    安装Vue.js之Node.js,NMP环境搭建
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/6951663.html
Copyright © 2011-2022 走看看