zoukankan      html  css  js  c++  java
  • C#高性能Socket服务器SocketAsyncEventArgs的实现(IOCP)

    网址:http://blog.csdn.net/zhujunxxxxx/article/details/43573879

    引言

    我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面
    微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
    NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。
    但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。
     
    代码下载:http://download.csdn.net/detail/zhujunxxxxx/8431289 这里的代码优化了的

    目标

    在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,
     
    效果如下
     
     

    代码

     
    首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全
     
    [csharp] view plain copy
     
    1. using System;  
    2. using System.Collections.Generic;  
    3. using System.Linq;  
    4. using System.Text;  
    5. using System.Net.Sockets;  
    6. using System.Net;  
    7. using System.Threading;  
    8.   
    9. namespace ServerTest  
    10. {  
    11.     /// <summary>  
    12.     /// IOCP SOCKET服务器  
    13.     /// </summary>  
    14.     public class IOCPServer : IDisposable  
    15.     {  
    16.         const int opsToPreAlloc = 2;  
    17.         #region Fields  
    18.         /// <summary>  
    19.         /// 服务器程序允许的最大客户端连接数  
    20.         /// </summary>  
    21.         private int _maxClient;  
    22.   
    23.         /// <summary>  
    24.         /// 监听Socket,用于接受客户端的连接请求  
    25.         /// </summary>  
    26.         private Socket _serverSock;  
    27.   
    28.         /// <summary>  
    29.         /// 当前的连接的客户端数  
    30.         /// </summary>  
    31.         private int _clientCount;  
    32.   
    33.         /// <summary>  
    34.         /// 用于每个I/O Socket操作的缓冲区大小  
    35.         /// </summary>  
    36.         private int _bufferSize = 1024;  
    37.   
    38.         /// <summary>  
    39.         /// 信号量  
    40.         /// </summary>  
    41.         Semaphore _maxAcceptedClients;  
    42.   
    43.         /// <summary>  
    44.         /// 缓冲区管理  
    45.         /// </summary>  
    46.         BufferManager _bufferManager;  
    47.   
    48.         /// <summary>  
    49.         /// 对象池  
    50.         /// </summary>  
    51.         SocketAsyncEventArgsPool _objectPool;  
    52.   
    53.         private bool disposed = false;  
    54.  
    55.         #endregion  
    56.  
    57.         #region Properties  
    58.   
    59.         /// <summary>  
    60.         /// 服务器是否正在运行  
    61.         /// </summary>  
    62.         public bool IsRunning { get; private set; }  
    63.         /// <summary>  
    64.         /// 监听的IP地址  
    65.         /// </summary>  
    66.         public IPAddress Address { get; private set; }  
    67.         /// <summary>  
    68.         /// 监听的端口  
    69.         /// </summary>  
    70.         public int Port { get; private set; }  
    71.         /// <summary>  
    72.         /// 通信使用的编码  
    73.         /// </summary>  
    74.         public Encoding Encoding { get; set; }  
    75.  
    76.         #endregion  
    77.  
    78.         #region Ctors  
    79.   
    80.         /// <summary>  
    81.         /// 异步IOCP SOCKET服务器  
    82.         /// </summary>  
    83.         /// <param name="listenPort">监听的端口</param>  
    84.         /// <param name="maxClient">最大的客户端数量</param>  
    85.         public IOCPServer(int listenPort,int maxClient)  
    86.             : this(IPAddress.Any, listenPort, maxClient)  
    87.         {  
    88.         }  
    89.   
    90.         /// <summary>  
    91.         /// 异步Socket TCP服务器  
    92.         /// </summary>  
    93.         /// <param name="localEP">监听的终结点</param>  
    94.         /// <param name="maxClient">最大客户端数量</param>  
    95.         public IOCPServer(IPEndPoint localEP, int maxClient)  
    96.             : this(localEP.Address, localEP.Port,maxClient)  
    97.         {  
    98.         }  
    99.   
    100.         /// <summary>  
    101.         /// 异步Socket TCP服务器  
    102.         /// </summary>  
    103.         /// <param name="localIPAddress">监听的IP地址</param>  
    104.         /// <param name="listenPort">监听的端口</param>  
    105.         /// <param name="maxClient">最大客户端数量</param>  
    106.         public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)  
    107.         {  
    108.             this.Address = localIPAddress;  
    109.             this.Port = listenPort;  
    110.             this.Encoding = Encoding.Default;  
    111.   
    112.             _maxClient = maxClient;  
    113.   
    114.             _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
    115.   
    116.             _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);  
    117.   
    118.             _objectPool = new SocketAsyncEventArgsPool(_maxClient);  
    119.   
    120.             _maxAcceptedClients = new Semaphore(_maxClient, _maxClient);   
    121.         }  
    122.  
    123.         #endregion  
    124.  
    125.  
    126.         #region 初始化  
    127.   
    128.         /// <summary>  
    129.         /// 初始化函数  
    130.         /// </summary>  
    131.         public void Init()  
    132.         {  
    133.             // Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   
    134.             // against memory fragmentation  
    135.             _bufferManager.InitBuffer();  
    136.   
    137.             // preallocate pool of SocketAsyncEventArgs objects  
    138.             SocketAsyncEventArgs readWriteEventArg;  
    139.   
    140.             for (int i = 0; i < _maxClient; i++)  
    141.             {  
    142.                 //Pre-allocate a set of reusable SocketAsyncEventArgs  
    143.                 readWriteEventArg = new SocketAsyncEventArgs();  
    144.                 readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);  
    145.                 readWriteEventArg.UserToken = null;  
    146.   
    147.                 // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  
    148.                 _bufferManager.SetBuffer(readWriteEventArg);  
    149.   
    150.                 // add SocketAsyncEventArg to the pool  
    151.                 _objectPool.Push(readWriteEventArg);  
    152.             }  
    153.   
    154.         }  
    155.  
    156.         #endregion  
    157.  
    158.         #region Start  
    159.         /// <summary>  
    160.         /// 启动  
    161.         /// </summary>  
    162.         public void Start()  
    163.         {  
    164.             if (!IsRunning)  
    165.             {  
    166.                 Init();  
    167.                 IsRunning = true;  
    168.                 IPEndPoint localEndPoint = new IPEndPoint(Address, Port);  
    169.                 // 创建监听socket  
    170.                 _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
    171.                 //_serverSock.ReceiveBufferSize = _bufferSize;  
    172.                 //_serverSock.SendBufferSize = _bufferSize;  
    173.                 if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)  
    174.                 {  
    175.                     // 配置监听socket为 dual-mode (IPv4 & IPv6)   
    176.                     // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,  
    177.                     _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);  
    178.                     _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));  
    179.                 }  
    180.                 else  
    181.                 {  
    182.                     _serverSock.Bind(localEndPoint);  
    183.                 }  
    184.                 // 开始监听  
    185.                 _serverSock.Listen(this._maxClient);  
    186.                 // 在监听Socket上投递一个接受请求。  
    187.                 StartAccept(null);  
    188.             }  
    189.         }  
    190.         #endregion  
    191.  
    192.         #region Stop  
    193.   
    194.         /// <summary>  
    195.         /// 停止服务  
    196.         /// </summary>  
    197.         public void Stop()  
    198.         {  
    199.             if (IsRunning)  
    200.             {  
    201.                 IsRunning = false;  
    202.                 _serverSock.Close();  
    203.                 //TODO 关闭对所有客户端的连接  
    204.   
    205.             }  
    206.         }  
    207.  
    208.         #endregion  
    209.  
    210.  
    211.         #region Accept  
    212.   
    213.         /// <summary>  
    214.         /// 从客户端开始接受一个连接操作  
    215.         /// </summary>  
    216.         private void StartAccept(SocketAsyncEventArgs asyniar)  
    217.         {  
    218.             if (asyniar == null)  
    219.             {  
    220.                 asyniar = new SocketAsyncEventArgs();  
    221.                 asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);  
    222.             }  
    223.             else  
    224.             {  
    225.                 //socket must be cleared since the context object is being reused  
    226.                 asyniar.AcceptSocket = null;  
    227.             }  
    228.             _maxAcceptedClients.WaitOne();  
    229.             if (!_serverSock.AcceptAsync(asyniar))  
    230.             {  
    231.                 ProcessAccept(asyniar);  
    232.                 //如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件  
    233.                 //此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法  
    234.             }  
    235.         }  
    236.   
    237.         /// <summary>  
    238.         /// accept 操作完成时回调函数  
    239.         /// </summary>  
    240.         /// <param name="sender">Object who raised the event.</param>  
    241.         /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>  
    242.         private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)  
    243.         {  
    244.             ProcessAccept(e);  
    245.         }  
    246.   
    247.         /// <summary>  
    248.         /// 监听Socket接受处理  
    249.         /// </summary>  
    250.         /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>  
    251.         private void ProcessAccept(SocketAsyncEventArgs e)  
    252.         {  
    253.             if (e.SocketError == SocketError.Success)  
    254.             {  
    255.                 Socket s = e.AcceptSocket;//和客户端关联的socket  
    256.                 if (s.Connected)  
    257.                 {  
    258.                     try  
    259.                     {  
    260.                           
    261.                         Interlocked.Increment(ref _clientCount);//原子操作加1  
    262.                         SocketAsyncEventArgs asyniar = _objectPool.Pop();  
    263.                         asyniar.UserToken = s;  
    264.   
    265.                         Log4Debug(String.Format("客户 {0} 连入, 共有 {1} 个连接。", s.RemoteEndPoint.ToString(), _clientCount));  
    266.                           
    267.                         if (!s.ReceiveAsync(asyniar))//投递接收请求  
    268.                         {  
    269.                             ProcessReceive(asyniar);  
    270.                         }  
    271.                     }  
    272.                     catch (SocketException ex)  
    273.                     {  
    274.                         Log4Debug(String.Format("接收客户 {0} 数据出错, 异常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));  
    275.                         //TODO 异常处理  
    276.                     }  
    277.                     //投递下一个接受请求  
    278.                     StartAccept(e);  
    279.                 }  
    280.             }  
    281.         }  
    282.  
    283.         #endregion  
    284.  
    285.         #region 发送数据  
    286.   
    287.         /// <summary>  
    288.         /// 异步的发送数据  
    289.         /// </summary>  
    290.         /// <param name="e"></param>  
    291.         /// <param name="data"></param>  
    292.         public void Send(SocketAsyncEventArgs e, byte[] data)  
    293.         {  
    294.             if (e.SocketError == SocketError.Success)  
    295.             {  
    296.                 Socket s = e.AcceptSocket;//和客户端关联的socket  
    297.                 if (s.Connected)  
    298.                 {  
    299.                     Array.Copy(data, 0, e.Buffer, 0, data.Length);//设置发送数据  
    300.   
    301.                     //e.SetBuffer(data, 0, data.Length); //设置发送数据  
    302.                     if (!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
    303.                     {  
    304.                         // 同步发送时处理发送完成事件  
    305.                         ProcessSend(e);  
    306.                     }  
    307.                     else  
    308.                     {  
    309.                         CloseClientSocket(e);  
    310.                     }  
    311.                 }  
    312.             }  
    313.         }  
    314.   
    315.         /// <summary>  
    316.         /// 同步的使用socket发送数据  
    317.         /// </summary>  
    318.         /// <param name="socket"></param>  
    319.         /// <param name="buffer"></param>  
    320.         /// <param name="offset"></param>  
    321.         /// <param name="size"></param>  
    322.         /// <param name="timeout"></param>  
    323.         public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)  
    324.         {  
    325.             socket.SendTimeout = 0;  
    326.             int startTickCount = Environment.TickCount;  
    327.             int sent = 0; // how many bytes is already sent  
    328.             do  
    329.             {  
    330.                 if (Environment.TickCount > startTickCount + timeout)  
    331.                 {  
    332.                     //throw new Exception("Timeout.");  
    333.                 }  
    334.                 try  
    335.                 {  
    336.                     sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);  
    337.                 }  
    338.                 catch (SocketException ex)  
    339.                 {  
    340.                     if (ex.SocketErrorCode == SocketError.WouldBlock ||  
    341.                     ex.SocketErrorCode == SocketError.IOPending ||  
    342.                     ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)  
    343.                     {  
    344.                         // socket buffer is probably full, wait and try again  
    345.                         Thread.Sleep(30);  
    346.                     }  
    347.                     else  
    348.                     {  
    349.                         throw ex; // any serious error occurr  
    350.                     }  
    351.                 }  
    352.             } while (sent < size);  
    353.         }  
    354.   
    355.   
    356.         /// <summary>  
    357.         /// 发送完成时处理函数  
    358.         /// </summary>  
    359.         /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>  
    360.         private void ProcessSend(SocketAsyncEventArgs e)  
    361.         {  
    362.             if (e.SocketError == SocketError.Success)  
    363.             {  
    364.                 Socket s = (Socket)e.UserToken;  
    365.   
    366.                 //TODO  
    367.             }  
    368.             else  
    369.             {  
    370.                 CloseClientSocket(e);  
    371.             }  
    372.         }  
    373.  
    374.         #endregion  
    375.  
    376.         #region 接收数据  
    377.   
    378.   
    379.         /// <summary>  
    380.         ///接收完成时处理函数  
    381.         /// </summary>  
    382.         /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>  
    383.         private void ProcessReceive(SocketAsyncEventArgs e)  
    384.         {  
    385.             if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  
    386.             {  
    387.                 // 检查远程主机是否关闭连接  
    388.                 if (e.BytesTransferred > 0)  
    389.                 {  
    390.                     Socket s = (Socket)e.UserToken;  
    391.                     //判断所有需接收的数据是否已经完成  
    392.                     if (s.Available == 0)  
    393.                     {  
    394.                         //从侦听者获取接收到的消息。   
    395.                         //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);  
    396.                         //echo the data received back to the client  
    397.                         //e.SetBuffer(e.Offset, e.BytesTransferred);  
    398.   
    399.                         byte[] data = new byte[e.BytesTransferred];  
    400.                         Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用  
    401.   
    402.                         string info=Encoding.Default.GetString(data);  
    403.                         Log4Debug(String.Format("收到 {0} 数据为 {1}",s.RemoteEndPoint.ToString(),info));  
    404.                         //TODO 处理数据  
    405.   
    406.                         //增加服务器接收的总字节数。  
    407.                     }  
    408.   
    409.                     if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
    410.                     {  
    411.                         //同步接收时处理接收完成事件  
    412.                         ProcessReceive(e);  
    413.                     }  
    414.                 }  
    415.             }  
    416.             else  
    417.             {  
    418.                 CloseClientSocket(e);  
    419.             }  
    420.         }  
    421.  
    422.         #endregion  
    423.  
    424.         #region 回调函数  
    425.   
    426.         /// <summary>  
    427.         /// 当Socket上的发送或接收请求被完成时,调用此函数  
    428.         /// </summary>  
    429.         /// <param name="sender">激发事件的对象</param>  
    430.         /// <param name="e">与发送或接收完成操作相关联的SocketAsyncEventArg对象</param>  
    431.         private void OnIOCompleted(object sender, SocketAsyncEventArgs e)  
    432.         {  
    433.             // Determine which type of operation just completed and call the associated handler.  
    434.             switch (e.LastOperation)  
    435.             {  
    436.                 case SocketAsyncOperation.Accept:  
    437.                     ProcessAccept(e);  
    438.                     break;  
    439.                 case SocketAsyncOperation.Receive:  
    440.                     ProcessReceive(e);  
    441.                     break;  
    442.                 default:  
    443.                     throw new ArgumentException("The last operation completed on the socket was not a receive or send");  
    444.             }  
    445.         }  
    446.  
    447.         #endregion  
    448.  
    449.         #region Close  
    450.         /// <summary>  
    451.         /// 关闭socket连接  
    452.         /// </summary>  
    453.         /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param>  
    454.         private void CloseClientSocket(SocketAsyncEventArgs e)  
    455.         {  
    456.             Log4Debug(String.Format("客户 {0} 断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));  
    457.             Socket s = e.UserToken as Socket;  
    458.             CloseClientSocket(s, e);  
    459.         }  
    460.   
    461.         /// <summary>  
    462.         /// 关闭socket连接  
    463.         /// </summary>  
    464.         /// <param name="s"></param>  
    465.         /// <param name="e"></param>  
    466.         private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)  
    467.         {  
    468.             try  
    469.             {  
    470.                 s.Shutdown(SocketShutdown.Send);  
    471.             }  
    472.             catch (Exception)  
    473.             {  
    474.                 // Throw if client has closed, so it is not necessary to catch.  
    475.             }  
    476.             finally  
    477.             {  
    478.                 s.Close();  
    479.             }  
    480.             Interlocked.Decrement(ref _clientCount);  
    481.             _maxAcceptedClients.Release();  
    482.             _objectPool.Push(e);//SocketAsyncEventArg 对象被释放,压入可重用队列。  
    483.         }  
    484.         #endregion  
    485.  
    486.         #region Dispose  
    487.         /// <summary>  
    488.         /// Performs application-defined tasks associated with freeing,   
    489.         /// releasing, or resetting unmanaged resources.  
    490.         /// </summary>  
    491.         public void Dispose()  
    492.         {  
    493.             Dispose(true);  
    494.             GC.SuppressFinalize(this);  
    495.         }  
    496.   
    497.         /// <summary>  
    498.         /// Releases unmanaged and - optionally - managed resources  
    499.         /// </summary>  
    500.         /// <param name="disposing"><c>true</c> to release   
    501.         /// both managed and unmanaged resources; <c>false</c>   
    502.         /// to release only unmanaged resources.</param>  
    503.         protected virtual void Dispose(bool disposing)  
    504.         {  
    505.             if (!this.disposed)  
    506.             {  
    507.                 if (disposing)  
    508.                 {  
    509.                     try  
    510.                     {  
    511.                         Stop();  
    512.                         if (_serverSock != null)  
    513.                         {  
    514.                             _serverSock = null;  
    515.                         }  
    516.                     }  
    517.                     catch (SocketException ex)  
    518.                     {  
    519.                         //TODO 事件  
    520.                     }  
    521.                 }  
    522.                 disposed = true;  
    523.             }  
    524.         }  
    525.         #endregion  
    526.   
    527.         public void Log4Debug(string msg)  
    528.         {  
    529.             Console.WriteLine("notice:"+msg);  
    530.         }  
    531.   
    532.     }  
    533. }  

    BufferManager.cs 这个类是缓存管理类,是采用MSDN上面的例子一样的 地址: https://msdn.microsoft.com/zh-cn/library/bb517542.aspx
     
    SocketAsyncEventArgsPool.cs 这个类也是来自MSDN的 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
     
    需要的话自己到MSDN网站上去取,我就不贴出来了
     

    服务器端

    [csharp] view plain copy
     
    1. static void Main(string[] args)  
    2.         {  
    3.   
    4.             IOCPServer server = new IOCPServer(8088, 1024);  
    5.             server.Start();  
    6.             Console.WriteLine("服务器已启动....");  
    7.             System.Console.ReadLine();  
    8.         }  


    客户端

     
    客户端代码也是很简单
    [csharp] view plain copy
     
    1. static void Main(string[] args)  
    2.         {  
    3.             IPAddress remote=IPAddress.Parse("192.168.3.4");  
    4.             client c = new client(8088,remote);  
    5.   
    6.             c.connect();  
    7.             Console.WriteLine("服务器连接成功!");  
    8.             while (true)  
    9.             {  
    10.                 Console.Write("send>");  
    11.                 string msg=Console.ReadLine();  
    12.                 if (msg == "exit")  
    13.                     break;  
    14.                 c.send(msg);  
    15.             }  
    16.             c.disconnect();  
    17.             Console.ReadLine();  
    18.         }  
     
    client.cs
    [csharp] view plain copy
     
    1. public class client  
    2.     {  
    3.   
    4.         public TcpClient _client;  
    5.   
    6.         public int port;  
    7.   
    8.         public IPAddress remote;  
    9.   
    10.         public client(int port,IPAddress remote)  
    11.         {  
    12.   
    13.             this.port = port;  
    14.             this.remote = remote;  
    15.         }  
    16.   
    17.         public void connect()  
    18.         {  
    19.             this._client=new TcpClient();  
    20.             _client.Connect(remote, port);  
    21.         }  
    22.         public void disconnect()  
    23.         {  
    24.             _client.Close();  
    25.         }  
    26.         public void send(string msg)  
    27.         {  
    28.             byte[] data=Encoding.Default.GetBytes(msg);  
    29.             _client.GetStream().Write(data, 0, data.Length);  
    30.         }  
    31.     }  


    IOCPClient类,使用SocketAsyncEventArgs类建立一个Socket客户端。虽然MSDN说这个类特别设计给网络服务器应用,但也没有限制在客户端代码中使用APM。下面给出了IOCPClient类的样例代码:
    [csharp] view plain copy
     
    1. public class IOCPClient  
    2.    {  
    3.        /// <summary>  
    4.        /// 连接服务器的socket  
    5.        /// </summary>  
    6.        private Socket _clientSock;  
    7.   
    8.        /// <summary>  
    9.        /// 用于服务器执行的互斥同步对象  
    10.        /// </summary>  
    11.        private static Mutex mutex = new Mutex();  
    12.        /// <summary>  
    13.        /// Socket连接标志  
    14.        /// </summary>  
    15.        private Boolean _connected = false;  
    16.   
    17.        private const int ReceiveOperation = 1, SendOperation = 0;  
    18.   
    19.        private static AutoResetEvent[]  
    20.                 autoSendReceiveEvents = new AutoResetEvent[]  
    21.         {  
    22.             new AutoResetEvent(false),  
    23.             new AutoResetEvent(false)  
    24.         };  
    25.   
    26.        /// <summary>  
    27.        /// 服务器监听端点  
    28.        /// </summary>  
    29.        private IPEndPoint _remoteEndPoint;  
    30.   
    31.        public IOCPClient(IPEndPoint local,IPEndPoint remote)  
    32.        {  
    33.            _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp);  
    34.            _remoteEndPoint = remote;  
    35.        }  
    36.  
    37.        #region 连接服务器  
    38.   
    39.        /// <summary>  
    40.        /// 连接远程服务器  
    41.        /// </summary>  
    42.        public void Connect()  
    43.        {  
    44.            SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();  
    45.   
    46.            connectArgs.UserToken = _clientSock;  
    47.            connectArgs.RemoteEndPoint = _remoteEndPoint;  
    48.            connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected);  
    49.            mutex.WaitOne();  
    50.            if (!_clientSock.ConnectAsync(connectArgs))//异步连接  
    51.            {  
    52.                ProcessConnected(connectArgs);  
    53.            }  
    54.              
    55.        }  
    56.        /// <summary>  
    57.        /// 连接上的事件  
    58.        /// </summary>  
    59.        /// <param name="sender"></param>  
    60.        /// <param name="e"></param>  
    61.        void OnConnected(object sender, SocketAsyncEventArgs e)  
    62.        {  
    63.            mutex.ReleaseMutex();  
    64.            //设置Socket已连接标志。   
    65.            _connected = (e.SocketError == SocketError.Success);  
    66.        }  
    67.        /// <summary>  
    68.        /// 处理连接服务器  
    69.        /// </summary>  
    70.        /// <param name="e"></param>  
    71.        private void ProcessConnected(SocketAsyncEventArgs e)  
    72.        {  
    73.            //TODO  
    74.        }  
    75.  
    76.        #endregion  
    77.  
    78.        #region 发送消息  
    79.        /// <summary>  
    80.        /// 向服务器发送消息  
    81.        /// </summary>  
    82.        /// <param name="data"></param>  
    83.        public void Send(byte[] data)  
    84.        {  
    85.            SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs();  
    86.            asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete);  
    87.            asyniar.SetBuffer(data, 0, data.Length);  
    88.            asyniar.UserToken = _clientSock;  
    89.            asyniar.RemoteEndPoint = _remoteEndPoint;  
    90.            autoSendReceiveEvents[SendOperation].WaitOne();  
    91.            if (!_clientSock.SendAsync(asyniar))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
    92.            {  
    93.                // 同步发送时处理发送完成事件  
    94.                ProcessSend(asyniar);  
    95.            }  
    96.        }  
    97.   
    98.        /// <summary>  
    99.        /// 发送操作的回调方法  
    100.        /// </summary>  
    101.        /// <param name="sender"></param>  
    102.        /// <param name="e"></param>  
    103.        private void OnSendComplete(object sender, SocketAsyncEventArgs e)  
    104.        {  
    105.            //发出发送完成信号。   
    106.            autoSendReceiveEvents[SendOperation].Set();  
    107.            ProcessSend(e);  
    108.        }  
    109.   
    110.        /// <summary>  
    111.        /// 发送完成时处理函数  
    112.        /// </summary>  
    113.        /// <param name="e">与发送完成操作相关联的SocketAsyncEventArg对象</param>  
    114.        private void ProcessSend(SocketAsyncEventArgs e)  
    115.        {  
    116.            //TODO  
    117.        }  
    118.        #endregion  
    119.  
    120.        #region 接收消息  
    121.        /// <summary>  
    122.        /// 开始监听服务端数据  
    123.        /// </summary>  
    124.        /// <param name="e"></param>  
    125.        public void StartRecive(SocketAsyncEventArgs e)  
    126.        {  
    127.            //准备接收。   
    128.            Socket s = e.UserToken as Socket;  
    129.            byte[] receiveBuffer = new byte[255];  
    130.            e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);  
    131.            e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete);  
    132.            autoSendReceiveEvents[ReceiveOperation].WaitOne();  
    133.            if (!s.ReceiveAsync(e))  
    134.            {  
    135.                ProcessReceive(e);  
    136.            }  
    137.        }  
    138.   
    139.        /// <summary>  
    140.        /// 接收操作的回调方法  
    141.        /// </summary>  
    142.        /// <param name="sender"></param>  
    143.        /// <param name="e"></param>  
    144.        private void OnReceiveComplete(object sender, SocketAsyncEventArgs e)  
    145.        {  
    146.            //发出接收完成信号。   
    147.            autoSendReceiveEvents[ReceiveOperation].Set();  
    148.            ProcessReceive(e);  
    149.        }  
    150.   
    151.        /// <summary>  
    152.        ///接收完成时处理函数  
    153.        /// </summary>  
    154.        /// <param name="e">与接收完成操作相关联的SocketAsyncEventArg对象</param>  
    155.        private void ProcessReceive(SocketAsyncEventArgs e)  
    156.        {  
    157.            if (e.SocketError == SocketError.Success)  
    158.            {  
    159.                // 检查远程主机是否关闭连接  
    160.                if (e.BytesTransferred > 0)  
    161.                {  
    162.                    Socket s = (Socket)e.UserToken;  
    163.                    //判断所有需接收的数据是否已经完成  
    164.                    if (s.Available == 0)  
    165.                    {  
    166.                        byte[] data = new byte[e.BytesTransferred];  
    167.                        Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用  
    168.   
    169.                        //TODO 处理数据  
    170.                    }  
    171.   
    172.                    if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  
    173.                    {  
    174.                        //同步接收时处理接收完成事件  
    175.                        ProcessReceive(e);  
    176.                    }  
    177.                }  
    178.            }  
    179.        }  
    180.  
    181.        #endregion  
    182.   
    183.   
    184.        public void Close()  
    185.        {  
    186.            _clientSock.Disconnect(false);  
    187.        }  
    188.   
    189.        /// <summary>  
    190.        /// 失败时关闭Socket,根据SocketError抛出异常。  
    191.        /// </summary>  
    192.        /// <param name="e"></param>  
    193.   
    194.        private void ProcessError(SocketAsyncEventArgs e)  
    195.        {  
    196.            Socket s = e.UserToken as Socket;  
    197.            if (s.Connected)  
    198.            {  
    199.                //关闭与客户端关联的Socket  
    200.                try  
    201.                {  
    202.                    s.Shutdown(SocketShutdown.Both);  
    203.                }  
    204.                catch (Exception)  
    205.                {  
    206.                    //如果客户端处理已经关闭,抛出异常   
    207.                }  
    208.                finally  
    209.                {  
    210.                    if (s.Connected)  
    211.                    {  
    212.                        s.Close();  
    213.                    }  
    214.                }  
    215.            }  
    216.            //抛出SocketException   
    217.            throw new SocketException((Int32)e.SocketError);  
    218.        }  
    219.   
    220.   
    221.        /// <summary>  
    222.        /// 释放SocketClient实例  
    223.        /// </summary>  
    224.        public void Dispose()  
    225.        {  
    226.            mutex.Close();  
    227.            autoSendReceiveEvents[SendOperation].Close();  
    228.            autoSendReceiveEvents[ReceiveOperation].Close();  
    229.            if (_clientSock.Connected)  
    230.            {  
    231.                _clientSock.Close();  
    232.            }  
    233.        }  
    234.   
    235.    }  
    这个类我没有测试,但是理论上是没问题的。
  • 相关阅读:
    [leetcode-136-Single Number]
    [leetcode-2-Add Two Numbers]
    [leetcode-150-Evaluate Reverse Polish Notation]
    [leetcode-1-Two Sum]
    DataTable数据分页
    ToList和ToDataTable(其中也有反射的知识)
    c#解决Nullable类型的转换 (包含DataContract的序列化和反序列化以及 该例子应用在反射属性setvalue的时候有用)
    ADO.NET DataTable的复制(clone)
    OracleBulkCopy
    LIst和table的转换
  • 原文地址:https://www.cnblogs.com/zxtceq/p/7764980.html
Copyright © 2011-2022 走看看