zoukankan      html  css  js  c++  java
  • WinSock 完成端口模型

    之前写了关于Winsock的重叠IO模型,按理来说重叠IO模型与之前的模型相比,它的socket即是非阻塞的,也是异步的,它基本上性能非常高,但是它主要的缺点在于,即使我们使用历程来处理完成通知,但是我们知道历程它本身是在对应线程暂停,它借用当前线程的线程环境来执行完成通知,也就是说要执行完成通知就必须暂停当前线程的工作。这对工作线程来说也是一个不必要的性能浪费,这样我们自然就会想到,另外开辟一个线程来执行完成通知,而本来的线程就不需要暂停,而是一直执行它自身的任务。处于这个思想,WinSock提供了一个新的模型——完成端口模型。

    完成端口简介

    完成端口本质上是一个线程池的模型,它需要我们创建对应的线程放在那,当完成通知到来时,他会直接执行线程。在这5中模型中它的性能是最高的。
    在文件中我们也提到过完成端口,其实我们利用Linux上一切皆文件的思想来考虑这个问题就可以很方便的理解,既然我们需要异步的方式来读写网卡的信息,这与读写文件的方式类似,既然文件中存在完成端口模型,网络上存在也就不足为奇了。
    对于完成端口Windows没有引入新的API函数,而是仍然采用文件中一堆相关的函数。可以使用CreateIoCompletionPort来创建完成端口的句柄,该函数原型如下:

    HANDLE WINAPI CreateIoCompletionPort(
      __in      HANDLE FileHandle,
      __in_opt  HANDLE ExistingCompletionPort,
      __in      ULONG_PTR CompletionKey,
      __in      DWORD NumberOfConcurrentThreads
    );
    

    第一个参数是与完成端口绑定的文件句柄,如果我们要创建完成端口句柄,这个值必须传入INVALID_HANDLE_VALUE。如果是要将文件句柄与完成端口绑定,这个参数必须穿入一个支持完成端口的文件句柄。在Winsock中如果要绑定SOCKET到完成端口只需要将SOCKET强转为HANDLE。
    第二个参数是一个已知的完成端口句柄,如果是创建完成端口,这个参数填入NULL。
    第三个参数是一个LONG型的指针,它作为一个标志,由完成通知传入完成线程中,用来标识不同的完成通知。一般我们会定义一个扩展来OVERLAPPED结构来标识不同的完成通知,所以这个参数一般不用传入NULL。
    第四个参数是同时执行的线程数,如果是绑定文件句柄到完成端口,则这个参数填入0

    我们可以在对应的完成线程中调用GetQueuedCompletionStatus函数来获取完成通知,这个函数只有当有IO操作完成时才会返回,函数原型如下:

    BOOL WINAPI GetQueuedCompletionStatus(
      __in   HANDLE CompletionPort,
      __out  LPDWORD lpNumberOfBytes,
      __out  PULONG_PTR lpCompletionKey,
      __out  LPOVERLAPPED* lpOverlapped,
      __in   DWORD dwMilliseconds
    );
    
    

    它的第一个参数是一个完成端口的句柄。
    第二个参数表示当前有多少字节的数据完成IO操作。
    第三个参数是一个标记值,用来标识不同文件句柄对应的完成通知,它是通过 CreateIoCompletionPort 函数设置的那个标识。
    第四个参数是OVERLAPPED结构。
    第五个参数表示等待的时间,如果填入INFINITE则会一直等到有IO操作完成。

    完成端口的示例:

    下面是一个完成端口的示例

    typedef struct _tag_MY_OVERLAPPED
    {
    	OVERLAPPED m_overlapped;
    	SOCKET m_sClient;
    	long m_lEvent;
    	DWORD m_dwNumberOfBytesRecv;
    	DWORD m_dwFlags;
    	char *m_pszBuf;
    	LONG m_dwBufSize;
    }MY_OVERLAPPED, *LPMY_OVERLAPPED;
    
    unsigned int __stdcall IOCPThread(LPVOID lpParameter);
    
    #define BUFFER_SIZE 1024
    #define SERVER_PORT 6000
    
    int _tmain(int argc, TCHAR *argv)
    {
    	WSADATA wd = {0};
    	WSAStartup(MAKEWORD(2, 2), &wd);
    
    	SYSTEM_INFO si = {0};
    	GetSystemInfo(&si);
    
    	//创建完成端口对象
    	HANDLE hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, si.dwNumberOfProcessors);
    
    	//创建完成端口对应的线程对象
    	HANDLE *pThreadArray = (HANDLE *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, 2 * si.dwNumberOfProcessors);
    	for (int i = 0; i < 2 * si.dwNumberOfProcessors; i++)
    	{
    		pThreadArray[i] = (HANDLE)_beginthreadex(NULL, 0, IOCPThread, &hIocp, 0, NULL);
    	}
    
    	SOCKET SrvSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
    	SOCKADDR_IN SockAddr = {0};
    	SockAddr.sin_family = AF_INET;
    	SockAddr.sin_port = htons(SERVER_PORT);
    	SockAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    
    	bind(SrvSocket, (SOCKADDR*)&SockAddr, sizeof(SOCKADDR));
    	listen(SrvSocket, 5);
    
    	SOCKET sClient = accept(SrvSocket, NULL, NULL);
    	CreateIoCompletionPort((HANDLE)sClient, hIocp, NULL, 0);
    
    	WSABUF buf = {0};
    	buf.buf = (char*)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFFER_SIZE);
    	buf.len = BUFFER_SIZE;
    
    	MY_OVERLAPPED AcceptOverlapped = {0};
    	AcceptOverlapped.m_dwBufSize = BUFFER_SIZE;
    	AcceptOverlapped.m_lEvent = FD_READ;
    	AcceptOverlapped.m_pszBuf = buf.buf;
    	AcceptOverlapped.m_sClient = sClient;
    
    	WSARecv(sClient, &buf, 1, &AcceptOverlapped.m_dwNumberOfBytesRecv, &AcceptOverlapped.m_dwFlags, &AcceptOverlapped.m_overlapped, NULL);
    
    	while (TRUE)
    	{
    		int nVirtKey = GetAsyncKeyState(VK_ESCAPE); //用户按下退出键(ESC)
    		{
    			break;
    		}
    	}
    
    	for (int i = 0; i < si.dwNumberOfProcessors * 2; i++)
    	{
    		//向IOCP发送FD_CLOSE消息,以便对应线程退出
    		AcceptOverlapped.m_lEvent = FD_CLOSE;
    		PostQueuedCompletionStatus(hIocp, si.dwNumberOfProcessors * 2, 0, &AcceptOverlapped.m_overlapped);
    	}
    
    	WaitForMultipleObjects(2 * si.dwNumberOfProcessors, pThreadArray, TRUE, INFINITE);
    
    	for (int i = 0; i < si.dwNumberOfProcessors * 2; i++)
    	{
    		CloseHandle(pThreadArray[i]);
    	}
    
    	HeapFree(GetProcessHeap(), 0, buf.buf);
    
    	shutdown(sClient, SD_BOTH);
    	closesocket(sClient);
    
    	CloseHandle(hIocp);
    	WSACleanup();
    	return 0;
    }
    
    unsigned int __stdcall IOCPThread(LPVOID lpParameter)
    {
    	HANDLE hIocp = *(HANDLE*)lpParameter;
    	DWORD dwNumberOfBytes = 0;
    	MY_OVERLAPPED *lpOverlapped = NULL;
    	ULONG key = 0;
    	BOOL bLoop = TRUE;
    	while (bLoop)
    	{
    		BOOL bRet = GetQueuedCompletionStatus(hIocp, &dwNumberOfBytes, &key, (LPOVERLAPPED*)&lpOverlapped, INFINITE);
    		if (!bRet)
    		{
    			continue;
    		}
    
    		switch (lpOverlapped->m_lEvent)
    		{
    		case FD_CLOSE: //退出
    			{
    				bLoop = FALSE;
    				printf("线程[%08x]准备退出......
    ", GetCurrentThreadId());
    			}
    			break;
    		case FD_WRITE:
    			{
    				printf("数据发送完成......
    ");
    				shutdown(lpOverlapped->m_sClient, SD_BOTH);
    				closesocket(lpOverlapped->m_sClient);
    			}
    			break;
    
    		case FD_READ:
    			{
    				printf("client>%s", lpOverlapped->m_pszBuf);
    				lpOverlapped->m_lEvent = FD_WRITE;
    				WSABUF buf = {0};
    				buf.buf = lpOverlapped->m_pszBuf;
    				buf.len = dwNumberOfBytes;
    				lpOverlapped->m_dwFlags = 0;
    				WSASend(lpOverlapped->m_sClient, &buf, 1, &lpOverlapped->m_dwNumberOfBytesRecv, lpOverlapped->m_dwFlags, &lpOverlapped->m_overlapped, NULL);
    			}
    		}
    	}
    	return 0;
    }
    

    在上述代码中,首先定义了一个结构体用来保存额外的数据。在main函数中首先查询CPU的核数,然后创建这个数目2倍的线程。接着创建一个完成端口对象。然后进行SOCKET的创建、绑定、监听、接收连接的操作。当有连接进来的时候。创建对应的扩展结构并调用WSARecv投递一个接收操作。由于后面的收发操作都在对应的线程中操作,因此在主线程中只需要等待即可。当用户确定退出时。先调用PostQueuedCompletionStatus函数向完成线程中发送完成通知,并将网络事件设置为FD_CLOSE,表示让线程退出。在这里没有使用TerminateThread这种暴力的方式,而选择了一种让线程自动退出的温和的方式。接着进行资源的回收,最后退出。
    在线程中,我们首先在循环中调用 GetQueuedCompletionStatus函数来获取完成通知,当发生完成事件时,我们在switch中根据不同的额网络事件来处理,针对FD_CLOSE事件,直接退出线程。针对FD_READ事件,先打印客户端发送的信息,然后调用WSASend将信息原样返回,接着设置网络事件为FD_WRITE,以便断开与客户端的链接。

    几种模型的比较

    最后针对5种模型和两种socket工作模式来做一个归纳说明。

    1. 最先学习的是SOCKET的阻塞模式,它的效率最低,它会一直等待有客户端连接或者有数据发送过来才会返回。这就好像我们在等某个人的信,但是不知道这封信什么时候能送到,于是我们在自家门口的收信箱前一直等待,直到有信到来。
    2. 为了解决这个问题,提出了SOCKET的非阻塞模式,它不会等待连接或者收发数据的操作完成,当我们调用对应的accept或者send、recv时会立即返回,但是我们不知道它什么时候有数据要处理,如果针对每个socket都等待直到有数据到来,那么跟之前的阻塞模式相比没有任何改进,于是就有了socket模式,它会等待多个socket,只要其中有一个有数据就返回,并处理。用收信的模型类比的话,现在我们不用在邮箱前等待了。但是我们会每隔一段时间就去邮箱那看看,有没有信,有信就将它收回否则空手而归。
    3. 我们说select模型的最大问题在于不知道什么时候有待决的SOCKET,因此我们需要在循环中不停的等待。为了解决这个时机问题,又提出了WSAAsyncSelect模型和WSAEvent模型,它们主要用来解决调用对应函数的时机。用收信的例子类比就是现在我在邮箱上装了一个报警的按钮,只有有信,警报就会响,这个时候我们就去收信。而不用向之前那样每隔一段时间就去邮箱看看
    4. 我们说解决了时机的问题,但是调用send和recv对网卡进行读写操作仍然是同步的操作,CPU需要傻傻的等着数据从网卡读到内存或者从内存写到网卡上。因此又有了重叠IO的模型和一些列的新的API,向WSARecv和WSASend等等函数。这样就相当于当有信来的警报响起时,我们不需要自己去取信了,另外派了一个人帮我们拿信,这样我们的工作效率又提高了一些。节约了我们的时间
    5. 重叠IO也有它的问题,如果使用重叠IO的事件模型时,也需要在合适的时候等待,就好像我们虽然派了一个人来帮忙拿信,但是我们自己却需要停下手头上的工作,询问拿信的人回来了。而使用完成历程也存在自己的问题,因为它需要使用主线程的资源来执行历程,它需要主线程暂停下来,这样就可能出现两种情况:1)有通知事件到来,但是并没有进入可警告状态;2)进入可警告状态却没有客户端发送请求。这就相当于可能我们不停的等待但是拿信的那个人却没有回来,或者拿信的人回来了,我们却没有时间处理信件。
    6. 针对重叠IO的上述问题,提出了完成端口的解决方案,完成事件由对应的线程处理,而主线程只需要专注于它自己的工作就好了,这就相当于警报响了,我们知道信来了,直接派一个人去拿信,后面的我就不管了,而拿信的人把信拿回来的时候将信放好。当我们忙完之后去处理这封信。没忙完的话信就一直放在那,甚至让拿信的人处理这封信,这样就能更高效的集中注意力来处理眼前的工作。

  • 相关阅读:
    Unix 时间戳;时间戳获取和生成
    linux 显示文件全局路径
    云原生是什么?
    linux 终端输出有颜色字体
    git 建立本地分支和远程分支关系
    Linux 终端修改分辨率;Linux 系统添加新的分辨率方法;
    windows 下,如何查看文件夹被哪个程序占用(转)
    剑指Offer(第二版)面试题目分析与实现-解决面试题的思路
    WRF Model Wrf 模式 软件测试环境配置
    MPI 本地局域网运行多机配置,同时运行多个程序;
  • 原文地址:https://www.cnblogs.com/lanuage/p/9275710.html
Copyright © 2011-2022 走看看