zoukankan      html  css  js  c++  java
  • .Net 下通过缓存提高TCP传输速度

        .Net 提供了一个NetworkStream 用于TCP 的读写,实际使用时发现直接操作效率很低,哪怕把TCP 的发送缓存和接受缓存设置很大也没有太大提高。后来在对 NetworkStream 读写前设置了缓存,性能一下子提高了很多。

        从实际测试结果看设置自己的写缓存,对性能的提升最为显著。我分析了一下,其原因很可能是在向NetworkStream 序列化对象时,序列化程序调用了大量的Write 方法向NetworkStream写入数据,每次向NetworkStream写入数据,数据被首先写入TCP的发送缓存,并且由调用线程设置一个信号通知.Net framework 内部的TCP线程发送缓冲区中已经有数据,TCP线程被激活并读取发送缓冲区中的数据,组包并向网卡写入数据。频繁的调用 NetworkStream.Write 写入小块数据将导致调用线程和TCP线程反复切换,并大量触发网卡中断,导致发送效率低下。如果我们在发送前将数据缓存并按较大的数据块发送给TCP线程,则大大减少线程切换和网卡中断数量,从而大大提高传输效率。

        问题到这里还没有结束,我们发送的对象往往较大,如果我们将发送对象全部序列化到buffer中再发送,那么势必占用大量内存,实际上我们无法忍受这种对内存无限制申请的行为,试想一个1G大小的对象,我们在发送前为它另外再开辟1个G的内存来缓存,对于系统来说简直是无法忍受。由于我们用.net 发送数据,我们在发送时需要将对象序列化到流中,而不能像 C/C++那样直接通过指针来读取数据(当然你也可以用unsafe代码,但这种方式会带来其他问题,而且并不为大家所推荐),所以我们需要开发一个专门用 TCP 发送缓存的流来处理读写前的缓存。为此我开发了一个 TcpCacheStream 类,这个类被用在读写 NetworkStream 前先进行缓存。

        调用方法很简单

       

    • 发送过程
    object msg;
    //初始化 msg 过程省略
    System.Net.Sockets.NetworkStream _ClientStream;
    //初始化 _ClientStream 过程省略
     
    //创建TcpCacheStream 
    TcpCacheStream tcpStream = new TcpCacheStream(_ClientStream);
     
    //二进制序列化 msg 对象到 TcpCacheStream 
    IFormatter formatter = new BinaryFormatter();
    formatter.Serialize(tcpStream, msg);
     
    //将缓存中最后一包的数据发送出去
    tcpStream.Flush();

      

    • 接收过程
     
    System.Net.Sockets.NetworkStream _ClientStream;
    //初始化 _ClientStream 过程省略
     
    //创建TcpCacheStream 
    TcpCacheStream tcpStream = new TcpCacheStream(_ClientStream);
     
    //从 TcpCacheStream 二进制反序列化
    IFormatter formatter = new BinaryFormatter();
    objcet result = formatter.Deserialize(tcpStream);

    TcpCacheStream 类为调用者封装了缓存的过程,这个缓存过程实际并不复杂,发送时数据先写入TcpCacheStream的buf中,当buf满后才向NetworkStream 写入数据,否则只缓存。由于最后一包不能保证正好填满buf,我们在写入数据后一定要调用 Flush 方法,将所有数据都发送出去。接收的过程反过来,如果buf中没有数据,就先将数据读入到buf中,然后再COPY给调用者,如果已经有数据则直接COPY给调用者。

    TcpCacheStream 的代码如下:

        [Serializable]
        public class TcpCacheStream : Stream
        {
            #region Private fields
            const int BUF_SIZE = 4096;
            private byte[] _Buf = new byte[BUF_SIZE];
     
            private MemoryStream _CacheStream = new MemoryStream(BUF_SIZE);
            private NetworkStream _NetworkStream;
     
            private int _BufLen = 0;
     
            #endregion
     
            #region Private properties
     
            private MemoryStream CacheStream
            {
                get
                {
                    return _CacheStream;
                }
            }
     
            #endregion
     
     
            #region Public properties
     
            /// <summary>
            /// get or set the Network Stream
            /// </summary>
            public NetworkStream NetworkStream
            {
                get
                {
                    return _NetworkStream;
                }
            }
     
            #endregion
     
     
            public TcpCacheStream(NetworkStream networkStream)
            {
                _NetworkStream = networkStream;
            }
     
            #region Implement stream class
     
            public override bool CanRead
            {
                get
                {
                    return true;
                }
            }
     
            public override bool CanSeek
            {
                get
                {
                    return false;
                }
            }
     
            public override bool CanWrite
            {
                get
                {
                    return true;
                }
            }
     
            public override void Flush()
            {
                NetworkStream.Write(_Buf, 0, _BufLen);
                NetworkStream.Flush();
            }
     
            public override long Length
            {
                get
                {
                    throw new Exception("This stream can not seek!");
                }
            }
     
            public override long Position
            {
                get
                {
                    throw new Exception("This stream can not seek!");
                }
     
                set
                {
                    throw new Exception("This stream can not seek!");
                }
            }
     
            public override int Read(byte[] buffer, int offset, int count)
            {
                int len = 0;
     
                //If cache is not empty, read from cache
                if (CacheStream.Length > CacheStream.Position)
                {
                    len = CacheStream.Read(buffer, offset, count);
                    return len;
                }
     
                //Fill cache
                len = NetworkStream.Read(_Buf, 0, BUF_SIZE);
     
                if (len == 0)
                {
                    return 0;
                }
     
                CacheStream.Position = 0;
                CacheStream.Write(_Buf, 0, len);
                CacheStream.SetLength(len);
                CacheStream.Position = 0;
     
                len = CacheStream.Read(buffer, offset, count);
     
                return len;
            }
     
            public override long Seek(long offset, SeekOrigin origin)
            {
                throw new Exception("This stream can not seek!");
            }
     
            public override void SetLength(long value)
            {
                throw new Exception("This stream can not seek!");
            }
     
            public override void Write(byte[] buffer, int offset, int count)
            {
                if (offset + count > buffer.Length)
                {
                    throw new ArgumentException("Count + offset large then buffer.Length");
                }
     
                int remain = count - (BUF_SIZE - _BufLen);
     
                if (remain < 0)
                {
                    Array.Copy(buffer, offset, _Buf, _BufLen, count);
                    _BufLen = BUF_SIZE + remain;
                }
                else
                {
                    Array.Copy(buffer, offset, _Buf, _BufLen, BUF_SIZE - _BufLen);
                    NetworkStream.Write(_Buf, 0, _Buf.Length);
     
                    Array.Copy(buffer, offset + BUF_SIZE - _BufLen, _Buf, 0, remain);
     
                    _BufLen = remain;
                }
            }
     
            #endregion
        }
  • 相关阅读:
    第3次作业卷积神经网络
    SpringCloud Sidecar 整合.Net WebApi
    redefinition of class解决
    DP学习笔记
    NOIP2018 Day2毒瘤题目
    NOIP Day1总结
    关于DP
    这次的PION的总结
    NOIP模拟赛D2T1自己的解题思路
    ABAP Editor自动完成功能
  • 原文地址:https://www.cnblogs.com/eaglet/p/1595887.html
Copyright © 2011-2022 走看看