zoukankan      html  css  js  c++  java
  • 【转】C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装

    http://blog.csdn.net/sqldebug_fan/article/details/17557341

    1、SocketAsyncEventArgs介绍

    SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步。使用此类执行异步套接字操作的模式包含以下步骤:
    1.分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
    2.将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。
    3.调用适当的套接字方法 (xxxAsync) 以启动异步操作。
    4.如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
    5.如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
    6.将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。

    2、SocketAsyncEventArgs封装

    使用SocketAsyncEventArgs之前需要先建立一个Socket监听对象,使用如下代码:

    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public void Start(IPEndPoint localEndPoint)  
    2. {  
    3.     listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  
    4.     listenSocket.Bind(localEndPoint);  
    5.     listenSocket.Listen(m_numConnections);  
    6.     Program.Logger.InfoFormat("Start listen socket {0} success", localEndPoint.ToString());  
    7.     //for (int i = 0; i < 64; i++) //不能循环投递多次AcceptAsync,会造成只接收8000连接后不接收连接了  
    8.     StartAccept(null);  
    9.     m_daemonThread = new DaemonThread(this);  
    10. }  
    然后开始接受连接,SocketAsyncEventArgs有连接时会通过Completed事件通知外面,所以接受连接的代码如下:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public void StartAccept(SocketAsyncEventArgs acceptEventArgs)  
    2. {  
    3.     if (acceptEventArgs == null)  
    4.     {  
    5.         acceptEventArgs = new SocketAsyncEventArgs();  
    6.         acceptEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);  
    7.     }  
    8.     else  
    9.     {  
    10.         acceptEventArgs.AcceptSocket = null; //释放上次绑定的Socket,等待下一个Socket连接  
    11.     }  
    12.   
    13.     m_maxNumberAcceptedClients.WaitOne(); //获取信号量  
    14.     bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);  
    15.     if (!willRaiseEvent)  
    16.     {  
    17.         ProcessAccept(acceptEventArgs);  
    18.     }  
    19. }  
    接受连接响应事件代码:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)  
    2. {  
    3.     try  
    4.     {  
    5.         ProcessAccept(acceptEventArgs);  
    6.     }  
    7.     catch (Exception E)  
    8.     {  
    9.         Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", acceptEventArgs.AcceptSocket, E.Message);  
    10.         Program.Logger.Error(E.StackTrace);    
    11.     }              
    12. }  
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)  
    2. {  
    3.     Program.Logger.InfoFormat("Client connection accepted. Local Address: {0}, Remote Address: {1}",  
    4.         acceptEventArgs.AcceptSocket.LocalEndPoint, acceptEventArgs.AcceptSocket.RemoteEndPoint);  
    5.   
    6.     AsyncSocketUserToken userToken = m_asyncSocketUserTokenPool.Pop();  
    7.     m_asyncSocketUserTokenList.Add(userToken); //添加到正在连接列表  
    8.     userToken.ConnectSocket = acceptEventArgs.AcceptSocket;  
    9.     userToken.ConnectDateTime = DateTime.Now;  
    10.   
    11.     try  
    12.     {  
    13.         bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求  
    14.         if (!willRaiseEvent)  
    15.         {  
    16.             lock (userToken)  
    17.             {  
    18.                 ProcessReceive(userToken.ReceiveEventArgs);  
    19.             }  
    20.         }                      
    21.     }  
    22.     catch (Exception E)  
    23.     {  
    24.         Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", userToken.ConnectSocket, E.Message);  
    25.         Program.Logger.Error(E.StackTrace);                  
    26.     }              
    27.   
    28.     StartAccept(acceptEventArgs); //把当前异步事件释放,等待下次连接  
    29. }  
    接受连接后,从当前Socket缓冲池AsyncSocketUserTokenPool中获取一个用户对象AsyncSocketUserToken,AsyncSocketUserToken包含一个接收异步事件m_receiveEventArgs,一个发送异步事件m_sendEventArgs,接收数据缓冲区m_receiveBuffer,发送数据缓冲区m_sendBuffer,协议逻辑调用对象m_asyncSocketInvokeElement,建立服务对象后,需要实现接收和发送的事件响应函数:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs)  
    2. {  
    3.     AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken;  
    4.     userToken.ActiveDateTime = DateTime.Now;  
    5.     try  
    6.     {                  
    7.         lock (userToken)  
    8.         {  
    9.             if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive)  
    10.                 ProcessReceive(asyncEventArgs);  
    11.             else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send)  
    12.                 ProcessSend(asyncEventArgs);  
    13.             else  
    14.                 throw new ArgumentException("The last operation completed on the socket was not a receive or send");  
    15.         }     
    16.     }  
    17.     catch (Exception E)  
    18.     {  
    19.         Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message);  
    20.         Program.Logger.Error(E.StackTrace);  
    21.     }                       
    22. }  
    在Completed事件中需要处理发送和接收的具体逻辑代码,其中接收的逻辑实现如下:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs)  
    2. {  
    3.     AsyncSocketUserToken userToken = receiveEventArgs.UserToken as AsyncSocketUserToken;  
    4.     if (userToken.ConnectSocket == null)  
    5.         return;  
    6.     userToken.ActiveDateTime = DateTime.Now;  
    7.     if (userToken.ReceiveEventArgs.BytesTransferred > 0 && userToken.ReceiveEventArgs.SocketError == SocketError.Success)  
    8.     {  
    9.         int offset = userToken.ReceiveEventArgs.Offset;  
    10.         int count = userToken.ReceiveEventArgs.BytesTransferred;  
    11.         if ((userToken.AsyncSocketInvokeElement == null) & (userToken.ConnectSocket != null)) //存在Socket对象,并且没有绑定协议对象,则进行协议对象绑定  
    12.         {  
    13.             BuildingSocketInvokeElement(userToken);  
    14.             offset = offset + 1;  
    15.             count = count - 1;  
    16.         }  
    17.         if (userToken.AsyncSocketInvokeElement == null) //如果没有解析对象,提示非法连接并关闭连接  
    18.         {  
    19.             Program.Logger.WarnFormat("Illegal client connection. Local Address: {0}, Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,   
    20.                 userToken.ConnectSocket.RemoteEndPoint);  
    21.             CloseClientSocket(userToken);  
    22.         }  
    23.         else  
    24.         {  
    25.             if (count > 0) //处理接收数据  
    26.             {  
    27.                 if (!userToken.AsyncSocketInvokeElement.ProcessReceive(userToken.ReceiveEventArgs.Buffer, offset, count))  
    28.                 { //如果处理数据返回失败,则断开连接  
    29.                     CloseClientSocket(userToken);  
    30.                 }  
    31.                 else //否则投递下次介绍数据请求  
    32.                 {  
    33.                     bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求  
    34.                     if (!willRaiseEvent)  
    35.                         ProcessReceive(userToken.ReceiveEventArgs);  
    36.                 }  
    37.             }  
    38.             else  
    39.             {  
    40.                 bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求  
    41.                 if (!willRaiseEvent)  
    42.                     ProcessReceive(userToken.ReceiveEventArgs);  
    43.             }  
    44.         }  
    45.     }  
    46.     else  
    47.     {  
    48.         CloseClientSocket(userToken);  
    49.     }  
    50. }  
    由于我们制定的协议第一个字节是协议标识,因此在接收到第一个字节的时候需要绑定协议解析对象,具体代码实现如下:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private void BuildingSocketInvokeElement(AsyncSocketUserToken userToken)  
    2. {  
    3.     byte flag = userToken.ReceiveEventArgs.Buffer[userToken.ReceiveEventArgs.Offset];  
    4.     if (flag == (byte)SocketFlag.Upload)  
    5.         userToken.AsyncSocketInvokeElement = new UploadSocketProtocol(this, userToken);  
    6.     else if (flag == (byte)SocketFlag.Download)  
    7.         userToken.AsyncSocketInvokeElement = new DownloadSocketProtocol(this, userToken);  
    8.     else if (flag == (byte)SocketFlag.RemoteStream)  
    9.         userToken.AsyncSocketInvokeElement = new RemoteStreamSocketProtocol(this, userToken);  
    10.     else if (flag == (byte)SocketFlag.Throughput)  
    11.         userToken.AsyncSocketInvokeElement = new ThroughputSocketProtocol(this, userToken);  
    12.     else if (flag == (byte)SocketFlag.Control)  
    13.         userToken.AsyncSocketInvokeElement = new ControlSocketProtocol(this, userToken);  
    14.     else if (flag == (byte)SocketFlag.LogOutput)  
    15.         userToken.AsyncSocketInvokeElement = new LogOutputSocketProtocol(this, userToken);  
    16.     if (userToken.AsyncSocketInvokeElement != null)  
    17.     {  
    18.         Program.Logger.InfoFormat("Building socket invoke element {0}.Local Address: {1}, Remote Address: {2}",  
    19.             userToken.AsyncSocketInvokeElement, userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint);  
    20.     }   
    21. }  
    发送响应函数实现需要注意,我们是把发送数据放到一个列表中,当上一个发送事件完成响应Completed事件,这时我们需要检测发送队列中是否存在未发送的数据,如果存在则继续发送。
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)  
    2. {  
    3.     AsyncSocketUserToken userToken = sendEventArgs.UserToken as AsyncSocketUserToken;  
    4.     if (userToken.AsyncSocketInvokeElement == null)  
    5.         return false;  
    6.     userToken.ActiveDateTime = DateTime.Now;  
    7.     if (sendEventArgs.SocketError == SocketError.Success)  
    8.         return userToken.AsyncSocketInvokeElement.SendCompleted(); //调用子类回调函数  
    9.     else  
    10.     {  
    11.         CloseClientSocket(userToken);  
    12.         return false;  
    13.     }  
    14. }  
    SendCompleted用于回调下次需要发送的数据,具体实现过程如下:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public virtual bool SendCompleted()  
    2. {  
    3.     m_activeDT = DateTime.UtcNow;  
    4.     m_sendAsync = false;  
    5.     AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;  
    6.     asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包  
    7.     int offset = 0;  
    8.     int count = 0;  
    9.     if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count))  
    10.     {  
    11.         m_sendAsync = true;  
    12.         return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,  
    13.             asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count);  
    14.     }  
    15.     else  
    16.         return SendCallback();  
    17. }  
    18.   
    19. //发送回调函数,用于连续下发数据  
    20. public virtual bool SendCallback()  
    21. {  
    22.     return true;  
    23. }  
    当一个SocketAsyncEventArgs断开后,我们需要断开对应的Socket连接,并释放对应资源,具体实现函数如下:
    [csharp] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public void CloseClientSocket(AsyncSocketUserToken userToken)  
    2. {  
    3.     if (userToken.ConnectSocket == null)  
    4.         return;  
    5.     string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,  
    6.         userToken.ConnectSocket.RemoteEndPoint);  
    7.     Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo);  
    8.     try  
    9.     {  
    10.         userToken.ConnectSocket.Shutdown(SocketShutdown.Both);  
    11.     }  
    12.     catch (Exception E)   
    13.     {  
    14.         Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message);  
    15.     }  
    16.     userToken.ConnectSocket.Close();  
    17.     userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源  
    18.   
    19.     m_maxNumberAcceptedClients.Release();  
    20.     m_asyncSocketUserTokenPool.Push(userToken);  
    21.     m_asyncSocketUserTokenList.Remove(userToken);  
    22. }  

    3、SocketAsyncEventArgs封装和MSDN的不同点

    MSDN在http://msdn.microsoft.com/zh-cn/library/system.NET.sockets.socketasynceventargs(v=vs.110).aspx实现了示例代码,并实现了初步的池化处理,我们是在它的基础上扩展实现了接收数据缓冲,发送数据队列,并把发送SocketAsyncEventArgs和接收SocketAsyncEventArgs分开,并实现了协议解析单元,这样做的好处是方便后续逻辑实现文件的上传,下载和日志输出。

    DEMO下载地址:http://download.csdn.Net/detail/sqldebug_fan/7467745
    免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

  • 相关阅读:
    A+B for Input-Output Practice (VIII)
    A+B for Input-Output Practice (VI)
    A+B for Input-Output Practice (VII)
    A+B for Input-Output Practice (IV)
    1.1.4 A+B for Input-Output Practice (V)
    1.1.3 A+B for Input-Output Practice (III)
    基础练习 龟兔赛跑预测
    基础练习 回形取数
    Python实用黑科技——以某个字段进行分组
    Python黑科技神奇去除马赛克
  • 原文地址:https://www.cnblogs.com/mimime/p/6139687.html
Copyright © 2011-2022 走看看