zoukankan      html  css  js  c++  java
  • IOCP一:AcceptEx

    IOCP底层机理还没有透彻的理解,现将部分内容记录如下 2014.7.22 16:50 


    把完成端口理解为完成队列。

    投递的异步IO请求完成后会携带三参数返回。

    异步IO请求分为:连接、接收、发送,分别对应AcceptEx、WSARecv、WSASend。

    三参数:单句柄数据结构、单IO数据结构、传输字节数。

    用两种自定义结构:单句柄数据结构和单IO数据结构

    单句柄数据结构与特定socket关联,在该socket上完成的所有类型的所有异步请求完成后都会返回该结构。单句柄数据结构的故事是这样的:把socket关联到完成端口时允许带一个整数,过后在该socket上完成的所有异步请求完成后都会返回该整数。

    单IO数据机构与具体异步请求关联,每次投递请求时都要带一个单IO数据结构,请求A完成后会携带与A关联的单IO数据结构返回。

    还原故事发生的大体情节:用

    	HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    	if(hcp == NULL)
    	{
    		std::cout<<"Create Completion Port Failed : "<<GetLastError()<<std::endl;
    		return -1;
    	}
    创建完成端口,现在假设socket s上需要进行异步接收,用

    		CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0);

    将s关联到完成端口hcp,同时带一整数pphd。pphd为单句柄数据结构的指针,该结构用new在堆上分配。故传入的是单句柄数据结构指针。用

    			LPPER_IO_DATA ppiod = new PER_IO_DATA;
    			ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
    			ppiod->operationType = OP_RECV;
    			WSABUF databuf;
    			databuf.buf = ppiod->buf;
    			databuf.len = BUF_LEN;
    
    			DWORD dwRecv = 0;
    			DWORD dwFlags = 0;
    			WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
    在s上投递请求。所有的工作线程阻塞在完成端口等待请求完成的到来

    	DWORD dwNum = 0;
    	LPPER_HANDLE_DATA pphd;
    	LPPER_IO_DATA ppiod;
    	bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
    请求到来,唤醒一个工作线程,该线程从GetQueuedCompletionStatus函数返回,dwNum、pphd、ppiod三参数出参:

    dwNum被赋值本次IO传输字节数,

    pphd为指针,GetQueuedCompletionStatus中再取地址,是为指针地址,目的为指针赋值,所赋值为CreateIoCompletionPort((HANDLE)s, hcp, (DWORD)pphd, 0)传入的pphd,

    ppiod同pphd,被赋值WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL)中的ppiod的地址。


    自定义结构可以任意设置,唯一的要求就是单IO数据结构的第一个成员必须是OVERLAPPED。

    #include <WinSock2.h>
    #include <Windows.h>
    #include <iostream>
    #include <process.h>
    #include <string>
    #include <MSWSock.h>
    #include <set>
    
    #pragma comment(lib, "Ws2_32.lib")
    #pragma comment(lib, "Kernel32.lib")
    #pragma comment(lib, "Mswsock.lib")
    
    #define BUF_LEN 1024
    
    enum OperateType
    {
    	OP_RECV,
    	OP_SEND,
    	OP_ACCEPT,
    };
    
    typedef struct PER_HANDLE_DATA
    {
    	SOCKET s;			//记录是哪个socket上的请求
    	SOCKADDR_IN addr;	//记录该socket对应的客户端地址和端口
    }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
    
    typedef struct PER_IO_DATA
    {
    	OVERLAPPED overlapped;	//第一项必须为OVERLAPPED
    	SOCKET cs;				//记录客户端socket
    	char buf[BUF_LEN];		//发送:此buf存储待发送数据,接收:此buf存储到来的数据
    	int operationType;		//记录完成的请求类型:是接收?是发送? 还是连接?
    }PER_IO_DATA, *LPPER_IO_DATA;

    异步连接

    为什么监听连接用异步?

    网上很多IOCP模型:主线程循环accept等待连接的到来,然后将Client socket加入IOCP,同时在Client socket上投递WSARecv等待数据到来。其他worker线程GetQueuedCompletionStatus阻塞在IOCP上等待请求的完成。

    缺点:阻塞式accept效率不高,且接收连接单独占用一个线程,让原本的一个worker线程专门来等待连接,压榨了一个worker的潜能。

    异步连接让所有运行的线程均为worker线程,且MSDN说AcceptEx比accpet连接进行得更快,可以用少量的线程处理大量的Client连接

    整体过程:

            1.创建listenSocket,与本地地址绑定,开始监听

            2.将listenSocket添加到IOCP

            3.用AcceptEx在listenSocket上投递连接请求


    如何投递异步连接请求?

    bool PostAccept(SOCKET listenSocket)
    {
    	/************************************************
    	为即将到来的Client连接事先创建好Socket:
    	阻塞式连接中accept的返回值即为新进连接创建的Socket,
    	异步连接需要事先将此Socket备下,再行连接
    	**************************************************/
    	SOCKET cs = socket(AF_INET, SOCK_STREAM, 0);
    	if(INVALID_SOCKET == cs)
    	{
    		std::cout<<"Create Socket Failed : "<<GetLastError()<<std::endl;
    		return false;
    	}
    
    	/*每一个异步请求必须一个PER_IO_DATA结构*/
    	LPPER_IO_DATA ppiod = new PER_IO_DATA;
    	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));	//PER_IO_DATA在使用前必须清空OVERLAPPED成员
    	ppiod->operationType = OP_ACCEPT;	//待会从GetQueuedCompletionStatus返回,通过查看operationType就知道是接收完成?发送完成?还是连接完成
    	ppiod->cs = cs;	//连接完成后要在新进连接上投递异步发送或接收,所以要事先把Client Socket记录下来以备后续使用
    
    	/***********************
    	投递异步连接请求:
    	函数:AcceptEx
    	参数:
    	一参本地监听Socket
    	二参为即将到来的客人准备好的Socket
    	三参接收缓冲区:
    		一存客人发来的第一份数据、二存Server本地地址、三存Client远端地址
    		地址包括IP和端口,
    	四参定三参数据区长度,0表只连不接收、连接到来->请求完成,否则连接到来+任意长数据到来->请求完成
    	五参定三参本地地址区长度,至少sizeof(sockaddr_in) + 16
    	六参定三参远端地址区长度,至少sizeof(sockaddr_in) + 16
    	七八两参不用管
    	***************************************/
    	DWORD dwRecv;
    	int len = sizeof(sockaddr_in) + 16;
    	bool ret = AcceptEx(listenSocket, ppiod->cs, ppiod->buf, 0, len, len, &dwRecv, &ppiod->overlapped);
    	if(false == ret && ERROR_IO_PENDING != GetLastError())
    	{
    		std::cout<<"AcceptEx Failed : "<<GetLastError()<<std::endl;
    		return false;
    	}
    
    	return true;
    }


    异步接收

    bool PostRecv(SOCKET s)
    {
    	/*每一个异步请求必须一个PER_IO_DATA结构*/
    	LPPER_IO_DATA ppiod = new PER_IO_DATA;
    	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
    	ppiod->operationType = OP_RECV;	//请求类型是接收
    	memset(ppiod->buf, 0, BUF_LEN);	//清空接收缓存
    
    	WSABUF databuf;
    	databuf.buf = ppiod->buf;	//接收缓冲区首地址
    	databuf.len = BUF_LEN;		//接收缓冲区长度
    
    	/*********************** 
        投递异步接收请求: 
        函数:WSARecv 
        参数: 
        一参接收Socket 
        二参WSABUF指针,接收缓冲数组
        三参说二参数组元素个数、接收缓冲个数
        四五六七不用管
        ***************************************/ 
    	DWORD dwRecv = 0;
    	DWORD dwFlags = 0;
    	int ret = WSARecv(s, &databuf, 1, &dwRecv, &dwFlags, &ppiod->overlapped, NULL);
    	if(SOCKET_ERROR == ret && WSA_IO_PENDING != GetLastError())
    		return false;
    
    	return true;
    }

    异步发送

    bool PostSend(SOCKET s, const char *buf, int len)
    {
    	/*每一个异步请求必须一个PER_IO_DATA结构*/
    	LPPER_IO_DATA ppiod = new PER_IO_DATA;
    	ZeroMemory(&(ppiod->overlapped), sizeof(OVERLAPPED));
    	ppiod->operationType = OP_SEND;
    	memset(ppiod->buf, 0, BUF_LEN);
    	memcpy(ppiod->buf, buf, len);
    
    	WSABUF databuf;
    	databuf.buf = ppiod->buf;	//发送缓冲首地址
    	databuf.len = len;			//发送数据长度,定多少就发多少:多定多发,少定少发,不定不发
    
    	/*同WSARecv*/
    	DWORD dwRecv = 0;
    	DWORD dwFlags = 0;
    	WSASend(s, &databuf, 1, &dwRecv, dwFlags, &ppiod->overlapped, NULL);
    
    	return true;
    }


    处理完成的请求
    bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);

    全体worker线程循环调用GetQueuedCompletionStatus,阻塞在该函数调用中,等待从IOCP传来请求完成的通知。没有任何请求完成时,IOCP让worker沉睡;当请求到来时,IOCP唤醒最后入睡的worker线程起来执行处理

    GetQueuedCompletionStatus成功返回true,失败返回false。


    实验结果如下:

            1.连接到来, ret = true && dwNum = 0 && ppiod->operationType = OP_ACCEPT

            2.连接断开:

                    A.Client调用closesocket,ret = true && dwNum = 0

                    B.Client直接退出,ret = false && dwNum = 0

                    C.Client暴力中断,如断电,2014.7.23 18:49 用两台电脑测试结果表明对于暴力断电,IOCP无任何反应,故心跳检测    必不可少

            3.投递请求的线程退出,ret = false && dwNum = 0 && GetLastError() = 995(995:由于线程退出或应用程序请求,已放弃   I/O 操作)

    附加:Client关闭连接或直接退出,在对应socket上投递的所有请求均返回。假设在socket 1上投递了三个WSARecv, 当Client关闭连接时,会有三个连接断开返回,不要重复释放空间。


    用户控制退出

    PostQueuedCompletionStatus(hcp, -1, NULL, NULL);

    向IOCP投一个IO完成包,会有一个worker线程从GetQueuedCompletionStatus返回,同时它得到的三参数依次是dwNum = -1, pphd = NULL, ppiod = NULL,正是我们传入PostQueuedCompletionStatus的后三个参数。

    PostQueuedCompletionStatus(hcp, X, Y, Z),则返回的worker线程得到的三参数依次是dwNum = X, pphd = Y, ppiod = Z

    正常情况下dwNum只有两种取值:正整数和0,所以worker线程返回后通过查看dwNum == -1是否成立就可以判断是否到了结束的时间了。

    理想情况下,以上述格式每投递一个完成包就有一个worker退出,所以我们应该投递workerNum个完成包来使所有worker退出。在非理想情况下我们应该防止还有worker没有退出,完美方式

    void NetModule::Stop()
    {
    	/*std::set<HANDLE> setWorkers存储所有worker线程句柄*/
    	for(size_t i = 0; i < setWorkers.size(); i++)
    		PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
    
    	for(auto iter = setWorkers.begin(); iter != setWorkers.end();)
    	{
    		int ret = WaitForSingleObject(*iter, 100);
    		if(WAIT_OBJECT_0 == ret)
    			setWorkers.erase(iter++);
    		else
    		{
    			PostQueuedCompletionStatus(hcp, -1, NULL, NULL);
    			continue;
    		}
    	}
    }


    所以检查GetQueuedCompletionStatus的返回值的标准方式

    		bool ret = GetQueuedCompletionStatus(hcp, &dwNum, (LPDWORD)&pphd, (LPOVERLAPPED*)&ppiod, WSA_INFINITE);
    
    		//线程退出控制,没有释放申请的堆空间,还不完善
    		if(-1 == dwNum)
    		{
    			std::cout<<"Thread Exit"<<std::endl;
    			_endthreadex(0);
    			return 0;
    		}
    
    		int type = ppiod->operationType;
    		if(0 == dwNum && OP_ACCEPT != type)
    		{
    			//投递请求的线程退出
    			if(GetLastError() == 995)
    			{
    				std::cout<<"Thread Exit Cause Request Returned"<<std::endl;
    			}
    			//连接断开
    			else
    			{
    				std::cout<<"Peer Close The Connection"<<std::endl;
    
    				//在一个socket上投递多个WSARecv需要考虑连接被Client断开时所有异步WSARecv均返回不会重复delete PER_HANDLE_DATA
    				AutoLock lock(pphd->mutex);
    				if(pphd->flag == true)
    				{
    					closesocket(pphd->s);
    					delete pphd;
    					pphd->flag = false;
    				}
    			}
    
    			delete ppiod;
    			continue;
    		}
    
    		//错误发生
    		if(false == ret)
    		{
    			std::cout<<"An Error Occurs : "<<GetLastError()<<std::endl;
    			delete ppiod;
    			continue;
    		}



  • 相关阅读:
    $("").append无反应
    go 客户端、服务端
    go mysql insert变量到数据库
    .gvfs: Permission denied
    go笔记
    java socket通信笔记
    (转)linux中top命令下显示出的PRNIRESSHRS\%MEM TIME+都代表什么
    adb Android Debug Bridge 安卓调试桥
    一阶段冲刺(四)
    一阶段冲刺(三)
  • 原文地址:https://www.cnblogs.com/chaikefusibushiji/p/7475635.html
Copyright © 2011-2022 走看看