zoukankan      html  css  js  c++  java
  • .Net 下通过缓存提高TCP传输速度

        .Net 提供了一个NetworkStream 用于TCP 的读写,实际使用时发现直接操作效率很低,哪怕把TCP 的发送缓存和接受缓存设置很大也没有太大提高。后来在对 NetworkStream 读写前设置了缓存,性能一下子提高了很多。

        从实际测试结果看设置自己的写缓存,对性能的提升最为显著。我分析了一下,其原因很可能是在向NetworkStream 序列化对象时,序列化程序调用了大量的Write 方法向NetworkStream写入数据,每次向NetworkStream写入数据,数据被首先写入TCP的发送缓存,并且由调用线程设置一个信号通知.Net framework 内部的TCP线程发送缓冲区中已经有数据,TCP线程被激活并读取发送缓冲区中的数据,组包并向网卡写入数据。频繁的调用 NetworkStream.Write 写入小块数据将导致调用线程和TCP线程反复切换,并大量触发网卡中断,导致发送效率低下。如果我们在发送前将数据缓存并按较大的数据块发送给TCP线程,则大大减少线程切换和网卡中断数量,从而大大提高传输效率。

        问题到这里还没有结束,我们发送的对象往往较大,如果我们将发送对象全部序列化到buffer中再发送,那么势必占用大量内存,实际上我们无法忍受这种对内存无限制申请的行为,试想一个1G大小的对象,我们在发送前为它另外再开辟1个G的内存来缓存,对于系统来说简直是无法忍受。由于我们用.net 发送数据,我们在发送时需要将对象序列化到流中,而不能像 C/C++那样直接通过指针来读取数据(当然你也可以用unsafe代码,但这种方式会带来其他问题,而且并不为大家所推荐),所以我们需要开发一个专门用 TCP 发送缓存的流来处理读写前的缓存。为此我开发了一个 TcpCacheStream 类,这个类被用在读写 NetworkStream 前先进行缓存。

        调用方法很简单

       

    • 发送过程
    object msg;
    //初始化 msg 过程省略
    System.Net.Sockets.NetworkStream _ClientStream;
    //初始化 _ClientStream 过程省略
     
    //创建TcpCacheStream 
    TcpCacheStream tcpStream = new TcpCacheStream(_ClientStream);
     
    //二进制序列化 msg 对象到 TcpCacheStream 
    IFormatter formatter = new BinaryFormatter();
    formatter.Serialize(tcpStream, msg);
     
    //将缓存中最后一包的数据发送出去
    tcpStream.Flush();

      

    • 接收过程
     
    System.Net.Sockets.NetworkStream _ClientStream;
    //初始化 _ClientStream 过程省略
     
    //创建TcpCacheStream 
    TcpCacheStream tcpStream = new TcpCacheStream(_ClientStream);
     
    //从 TcpCacheStream 二进制反序列化
    IFormatter formatter = new BinaryFormatter();
    objcet result = formatter.Deserialize(tcpStream);

    TcpCacheStream 类为调用者封装了缓存的过程,这个缓存过程实际并不复杂,发送时数据先写入TcpCacheStream的buf中,当buf满后才向NetworkStream 写入数据,否则只缓存。由于最后一包不能保证正好填满buf,我们在写入数据后一定要调用 Flush 方法,将所有数据都发送出去。接收的过程反过来,如果buf中没有数据,就先将数据读入到buf中,然后再COPY给调用者,如果已经有数据则直接COPY给调用者。

    TcpCacheStream 的代码如下:

        [Serializable]
        public class TcpCacheStream : Stream
        {
            #region Private fields
            const int BUF_SIZE = 4096;
            private byte[] _Buf = new byte[BUF_SIZE];
     
            private MemoryStream _CacheStream = new MemoryStream(BUF_SIZE);
            private NetworkStream _NetworkStream;
     
            private int _BufLen = 0;
     
            #endregion
     
            #region Private properties
     
            private MemoryStream CacheStream
            {
                get
                {
                    return _CacheStream;
                }
            }
     
            #endregion
     
     
            #region Public properties
     
            /// <summary>
            /// get or set the Network Stream
            /// </summary>
            public NetworkStream NetworkStream
            {
                get
                {
                    return _NetworkStream;
                }
            }
     
            #endregion
     
     
            public TcpCacheStream(NetworkStream networkStream)
            {
                _NetworkStream = networkStream;
            }
     
            #region Implement stream class
     
            public override bool CanRead
            {
                get
                {
                    return true;
                }
            }
     
            public override bool CanSeek
            {
                get
                {
                    return false;
                }
            }
     
            public override bool CanWrite
            {
                get
                {
                    return true;
                }
            }
     
            public override void Flush()
            {
                NetworkStream.Write(_Buf, 0, _BufLen);
                NetworkStream.Flush();
            }
     
            public override long Length
            {
                get
                {
                    throw new Exception("This stream can not seek!");
                }
            }
     
            public override long Position
            {
                get
                {
                    throw new Exception("This stream can not seek!");
                }
     
                set
                {
                    throw new Exception("This stream can not seek!");
                }
            }
     
            public override int Read(byte[] buffer, int offset, int count)
            {
                int len = 0;
     
                //If cache is not empty, read from cache
                if (CacheStream.Length > CacheStream.Position)
                {
                    len = CacheStream.Read(buffer, offset, count);
                    return len;
                }
     
                //Fill cache
                len = NetworkStream.Read(_Buf, 0, BUF_SIZE);
     
                if (len == 0)
                {
                    return 0;
                }
     
                CacheStream.Position = 0;
                CacheStream.Write(_Buf, 0, len);
                CacheStream.SetLength(len);
                CacheStream.Position = 0;
     
                len = CacheStream.Read(buffer, offset, count);
     
                return len;
            }
     
            public override long Seek(long offset, SeekOrigin origin)
            {
                throw new Exception("This stream can not seek!");
            }
     
            public override void SetLength(long value)
            {
                throw new Exception("This stream can not seek!");
            }
     
            public override void Write(byte[] buffer, int offset, int count)
            {
                if (offset + count > buffer.Length)
                {
                    throw new ArgumentException("Count + offset large then buffer.Length");
                }
     
                int remain = count - (BUF_SIZE - _BufLen);
     
                if (remain < 0)
                {
                    Array.Copy(buffer, offset, _Buf, _BufLen, count);
                    _BufLen = BUF_SIZE + remain;
                }
                else
                {
                    Array.Copy(buffer, offset, _Buf, _BufLen, BUF_SIZE - _BufLen);
                    NetworkStream.Write(_Buf, 0, _Buf.Length);
     
                    Array.Copy(buffer, offset + BUF_SIZE - _BufLen, _Buf, 0, remain);
     
                    _BufLen = remain;
                }
            }
     
            #endregion
        }
  • 相关阅读:
    【校招面试 之 C/C++】第23题 C++ STL(五)之Set
    Cannot create an instance of OLE DB provider “OraOLEDB.Oracle” for linked server "xxxxxxx".
    Redhat Linux安装JDK 1.7
    ORA-10635: Invalid segment or tablespace type
    Symantec Backup Exec 2012 Agent for Linux 卸载
    Symantec Backup Exec 2012 Agent For Linux安装
    You must use the Role Management Tool to install or configure Microsoft .NET Framework 3.5 SP1
    YourSQLDba介绍
    PL/SQL重新编译包无反应
    MS SQL 监控数据/日志文件增长
  • 原文地址:https://www.cnblogs.com/eaglet/p/1595887.html
Copyright © 2011-2022 走看看