zoukankan      html  css  js  c++  java
  • 使用Newlife网络库管道模式解决数据粘包(二)

    上一篇我们讲了 如何创建一个基本的Newlife网络服务端 这边我们来讲一下如何解决粘包的问题

    在上一篇总我们注册了Newlife的管道处理器 ,我们来看看他是如何实现粘包处理的

    svr.Add<ReciveFilter>();//粘包处理管道

    首先看一下我们设备的上传数据协议

     

     设备上报的数据包头包含了固定的包头包尾,整个包的数据长度,设备编号。

     包头:板卡类型,帧类型 2个字节 0x01 0x70

     帧长度: 为两个字节 并且数据的字节序为  高字节在前 ,C#正常默认为低字节在前。

     设备号:15位的ASCII 字符串

     包尾: 两个字节 0x0D 0x0A  固定

     下面来解决粘包的问题

      Newlife网络库提供了几种常见的封包协议来解决粘包的问题,其中有一个 LengthFieldCodec解码器 这个解码器以长度字段作为头部 恰好符合我们的需求,我们就以这个解码器稍作改造来解决我们的粘包问题吧

      由于这个解码器是适用于 只包含包头和包体的数据结构,且长度为包体的长度,而我们的协议 是包含包头包体包尾,并且帧长度为整个包的长度,长度为高位在前的数据结构,所以我们需要对整个解码器稍微做一些改造来符合我们的数据结构 。

      我们来看下代码 其中

      

            #region 属性
            /// <summary>长度所在位置</summary>
            public Int32 Offset { get; set; }=2;
    
            /// <summary>长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2</summary>
            public Int32 Size { get; set; } = 2;
    
            /// <summary>过期时间,超过该时间后按废弃数据处理,默认500ms</summary>
            public Int32 Expire { get; set; } = 500;
            #endregion

    在我们的协议中可以看到 设置了数据包的长度位置,长度占据的字节数,下面我们来获取一下整个包的长度

    /// <summary>从数据流中获取整帧数据长度</summary>
            /// <param name="pk"></param>
            /// <param name="offset"></param>
            /// <param name="size"></param>
            /// <returns>数据帧长度(包含头部长度位)</returns>
            protected  Int32 GetLength(Packet pk, Int32 offset, Int32 size)
            {
                if (offset < 0) return pk.Total - pk.Offset;
                // 数据不够,连长度都读取不了
                if (offset >= pk.Total) return 0;
    
                // 读取大小
                var len = 0;
                switch (size)
                {
                    case 2:
                        var lenArry = pk.ReadBytes(offset, 2);
                        //高位在前,反转数组,获取长度
                        Array.Reverse(lenArry);
                        len = lenArry.ToUInt16();
                        break;
                    default:
                        throw new NotSupportedException();
                }
    
                // 判断后续数据是否足够
                if (len > pk.Total) return 0;
    
                return len;
            }

    获取长度后我们就可以从数据流中读取一个完整的包了

            /// <summary>解码</summary>
            /// <param name="context"></param>
            /// <param name="pk"></param>
            /// <returns></returns>
            protected override IList<Packet> Decode(IHandlerContext context, Packet pk)
            {
                var ss = context.Owner as IExtend;
                var mcp = ss["CodecItem"] as CodecItem;
                if (mcp == null) ss["CodecItem"] = mcp = new CodecItem();
    
                var pks = ParseNew(pk, mcp, 0, ms => GetLength(ms, Offset, Size), Expire);
    
                // 跳过头部长度
                var len = Offset + Math.Abs(Size);
                foreach (var item in pks)
                {
                    item.Set(item.Data, item.Offset + len, item.Count - len);
                    //item.SetSub(len, item.Count - len);
                }
    
                return pks;
            }
    
    
            #region 粘包处理
            /// <summary>分析数据流,得到一帧数据</summary>
            /// <param name="pk">待分析数据包</param>
            /// <param name="codec">参数</param>
            /// <param name="getLength">获取长度</param>
            /// <param name="expire">缓存有效期</param>
            /// <returns></returns>
            protected IList<Packet> ParseNew(Packet pk, CodecItem codec, int startIndex, Func<Packet, Int32> getLength, Int32 expire = 5000)
            {
                var _ms = codec.Stream;
                var nodata = _ms == null || _ms.Position < 0 || _ms.Position >= _ms.Length;
    
                var list = new List<Packet>();
                // 内部缓存没有数据,直接判断输入数据流是否刚好一帧数据,快速处理,绝大多数是这种场景
                if (nodata)
                {
                    if (pk == null) return list.ToArray();
    
                    var idx = 0;
                    while (idx < pk.Total)
                    {
                        //var pk2 = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                        var pk2 = pk.Slice(idx);
                        var len = getLength(pk2);
                        if (len <= 0 || len > pk2.Count) break;
    
                        pk2.Set(pk2.Data, startIndex, len);
                        //pk2.SetSub(0, len);
                        list.Add(pk2);
                        idx += len;
                    }
                    // 如果没有剩余,可以返回
                    if (idx == pk.Total) return list.ToArray();
    
                    // 剩下的
                    //pk = new Packet(pk.Data, pk.Offset + idx, pk.Total - idx);
                    pk = pk.Slice(idx);
                }
    
                if (_ms == null) codec.Stream = _ms = new MemoryStream();
    
                // 加锁,避免多线程冲突
                lock (_ms)
                {
                    // 超过该时间后按废弃数据处理
                    var now = TimerX.Now;
                    if (_ms.Length > _ms.Position && codec.Last.AddMilliseconds(expire) < now)
                    {
                        _ms.SetLength(0);
                        _ms.Position = 0;
                    }
                    codec.Last = now;
    
                    // 合并数据到最后面
                    if (pk != null && pk.Total > 0)
                    {
                        var p = _ms.Position;
                        _ms.Position = _ms.Length;
                        pk.WriteTo(_ms);
                        _ms.Position = p;
                    }
    
                    // 尝试解包
                    while (_ms.Position < _ms.Length)
                    {
                        //var pk2 = new Packet(_ms.GetBuffer(), (Int32)_ms.Position, (Int32)_ms.Length);
                        var pk2 = new Packet(_ms);
                        var len = getLength(pk2);
    
                        // 资源不足一包
                        if (len <= 0 || len > pk2.Total) break;
    
                        // 解包成功
                        pk2.Set(pk2.Data, startIndex, len);
                        //pk2.SetSub(0, len);
                        list.Add(pk2);
    
                        _ms.Seek(len, SeekOrigin.Current);
                    }
    
                    // 如果读完了数据,需要重置缓冲区
                    if (_ms.Position >= _ms.Length)
                    {
                        _ms.SetLength(0);
                        _ms.Position = 0;
                    }
    
                    return list;
                }
            }

       粘包处理管道完成后,就可以在Recive中去处理一个完整的数据包啦,我来解析一下这个状态的数据并且来保存设备连接  

      首先定义一个字典项用来保存设备的连接信息.设备号,连接的SessionId

      

       /// <summary>
            /// newLife连接保持 
            /// </summary>
            private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();



      由于我们的数据中 帧类型不同的请求中帧类型是不一样的 所以解析数据需要做区分处理 我们来或者状态上传信息中的设备号并且和连接关联

     private Dictionary<string, int> OnLineClients = new Dictionary<string, int>();
            private object _lock=new object();
            private void Recive(object sender, ReceivedEventArgs e)
            {
    
                INetSession session = (INetSession)sender;
                var pk = e.Message as Packet;
                if (pk.Count == 0)
                {
                    XTrace.WriteLine("数据包解析错误");
                    return;
    
                }
                try
                {
                    //数据包
                    var respBytes = pk.Data;
                    //获取帧类型
                    var dataTypeBytes = respBytes[1];
    
                    if (dataTypeBytes == 0x70)
                    {
                        //数值
                        byte[] deviceNoByte = new byte[15];
                        Buffer.BlockCopy(respBytes, 4, deviceNoByte, 0, 15); //从缓冲区里读取包头的字节
                        string deviceNo = Encoding.ASCII.GetString(deviceNoByte);
                        XTrace.WriteLine("设备编号:" + deviceNo);
    //保存连接信息
    SaveClientConnection(deviceNo, session.ID);
    //获取设备号后保存连接信息 } //支付宝 } catch (Exception ex) { XTrace.WriteLine(ex.Message); } } /// <summary> /// 保存在线信息 /// </summary> /// <param name="deviceNo"></param> /// <param name="sessionId"></param> private void SaveClientConnection(string deviceNo, int sessionId) { lock (_lock) { if (OnLineClients.ContainsKey(deviceNo)) { OnLineClients[deviceNo] = sessionId; } else { OnLineClients.Add(deviceNo,sessionId); } } }

    好了数据粘包问题解决啦同时保存了设备连接信息,下面来解决如何定时检查测试在线状态。

        

      

      

  • 相关阅读:
    redis实现分布式缓存
    redis持久化
    Redis五种数据类型
    Azure Digital Twins(1)-创建实例并设置角色
    Azure Digital Twins(2)- 在本地使用ADT Explorer 管理数字孪生
    Azure Digital Twins(3)- 数字孪生体和数字孪生图
    Azure + 5G + AI + IOT可以这么玩
    使用Azure Storage API 上传 文件解决微信小程序中上传图片的问题
    Azure入门(1)- Azure核心概念
    利用 Management Group 和Policy 控制Azure 指定资源的创建
  • 原文地址:https://www.cnblogs.com/yushuo/p/10286472.html
Copyright © 2011-2022 走看看