zoukankan      html  css  js  c++  java
  • IOCP小记

    IOCP事实上是一个Tread pool,但是它运行于内核,所以相对于用户态的select效率应该高了一点点。

    它的工作过程一般是这样的:

    0.CreateIoCompletionPort创建一个完成端口

    1.事先开好N个线程,这线程的个数一般为处理器个数的二倍+2,线程的参数传完成端口的句柄

    2.服务器有一个listen线程,当它从已完成三次握手队列中取出一个临时accept套接字时

    3.accept绑定到0步创建的完成端口

    4.accept使用WSARecv读取客户端发送的数据,并放入缓冲区,使用OVERLAPPED开头的自定义数据结构,WSARecv接收客户端的数据,并把数据放入缓冲区,让工作线程(事先开好的N个线程)来取。注意dwFlags通常设为0,否则会出错

    5.工作线程通过GetQueuedCompletionStatus查询,从缓冲区取出数据并加以处理

    基本代码:

    BOOL CIMServerThread::InitServer()
    {
    	WSADATA wsadata;
    	if (0 != WSAStartup(WINSOCK_VERSION, &wsadata))
    	{
    		return FALSE;
    	}
    
    	SYSTEM_INFO syminfo;
    	GetSystemInfo(&syminfo);
    
    
    	// 确定创建多少个IOCP处理线程
    	m_dwThreads = syminfo.dwNumberOfProcessors*2;
    	m_phThread = new HANDLE[m_dwThreads];
    
    	return TRUE;
    }
    
    BOOL CIMServerThread::Run()
    {
    	InitServer();
    
       // 创建监听线程
       if (NULL == m_hListenThread)
       {
    	   m_hListenThread = CreateThread(NULL, 0, ListenThread, 
    		   NULL, 0, NULL);
       }
    
    	return TRUE;
    }
    
    DWORD WINAPI CIMServerThread::ListenThread(LPVOID lp)
    {
    	HANDLE hCompletionPort
        = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    	if (NULL == hCompletionPort)
    	{
    		return TRUE;
    	}
    
    	// 为每个CPU创建两个工作线程
    	for (DWORD i=0; i<m_dwThreads; i++)
    	{
    		m_phThread[i] = CreateThread(NULL, 0, WorkThread, hCompletionPort, 
    			0, NULL);
    	}
    
    	// WSASocket可以使用WinSock特有功能,比如重叠IO,用dwflags指定。
    	SOCKET listenSock = WSASocket(AF_INET, SOCK_STREAM, 0, 
    		NULL, 0, WSA_FLAG_OVERLAPPED);
    	if (INVALID_SOCKET == listenSock)
    	{
    		return TRUE;
    	}
    
    	SOCKADDR_IN sin;
    	sin.sin_family = AF_INET;
    	sin.sin_port = htons(PORT);
    	sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    	if (SOCKET_ERROR == bind(listenSock, (sockaddr*)&sin, sizeof(sin)))
    	{
    		return TRUE;
    	}
    
    	if (SOCKET_ERROR == listen(listenSock, 5))
    	{
    		return TRUE;
    	}
    
    	// 接受连接,并分配给完成端口
    	SOCKET acceptSock;
    	LPPER_HANDLE_DATA PerHandleDate = NULL;
    	DWORD RecvBytes = 0;
    	DWORD Flags = 0;
    	while (TRUE)
    	{
    		acceptSock = WSAAccept(listenSock, NULL, NULL, NULL, 0);
    		if (INVALID_SOCKET == acceptSock)
    		{
    			return TRUE;
    		}
    
    		PerHandleDate = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HEANDLE_DATA));
            if (NULL == PerHandleDate)
            {
    			return TRUE;
            }
    
    		// 对临时socket启动突发断线检测
    		BOOL bKeepAlive = TRUE;
    		int nRet = setsockopt(acceptSock, 
    			SOL_SOCKET,
    			SO_KEEPALIVE,
    			(char*)&bKeepAlive,
    			sizeof(bKeepAlive));
    		if (SOCKET_ERROR == nRet)
    		{
    			return TRUE;
    		}
    
    		tcp_keepalive alive_in;
    		tcp_keepalive alive_out;
    		alive_in.keepalivetime = 5000;
    		alive_in.keepaliveinterval = 5000;
    		alive_in.onoff = TRUE;
    
    		unsigned long ulBytesReturn = 0;
    		nRet = WSAIoctl(acceptSock, SIO_KEEPALIVE_VALS,
    			&alive_in, sizeof(alive_in),
    			&alive_out, sizeof(alive_out),
    			&ulBytesReturn, NULL, NULL);
    		if (SOCKET_ERROR == nRet)
    		{
    			return TRUE;
    		}
    
    		//
    		PerHandleDate->Sock = acceptSock;
    		if (NULL == CreateIoCompletionPort((HANDLE)acceptSock, hCompletionPort,
    			(ULONG_PTR)PerHandleDate, 0))
    		{
    			return TRUE;
    		}
    
    		LPPER_IO_OPERATION_DATA PerIoData = 
    			(LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, 
    			sizeof(PER_IO_OPERATION_DATA));
    		if (NULL == PerIoData)
    		{
    			return TRUE;
    		}
    
    		memset(&(PerIoData->Overlapped), 0, sizeof(OVERLAPPED));
    		PerIoData->DataBuf.len = DATA_BUFSIZE;
    		PerIoData->DataBuf.buf = PerIoData->Buffer;
    		PerIoData->hWnd = AfxGetApp()->m_pMainWnd->GetSafeHwnd();
    		PerIoData->send = 1;
    
    		Flags = 0;
    		if (SOCKET_ERROR == WSARecv(acceptSock, &(PerIoData->DataBuf), 1, 
    			&RecvBytes, &Flags, &(PerIoData->Overlapped), NULL))
    		{
    
    			if (ERROR_IO_PENDING != WSAGetLastError())
    			{
    				return TRUE;
    			}
    		}
    	}
    
    	return TRUE;
    }
    
    DWORD WINAPI CIMServerThread::WorkThread(LPVOID lp)
    {
    	while (TRUE)
    	{
    		HANDLE hCompletionPort = (HANDLE)lp;
    		DWORD BytesTransferred = 0;
    		LPPER_HANDLE_DATA PerHanleData;
    		LPPER_IO_OPERATION_DATA PerIoData;
    		HWND hWnd = NULL;
    		SOCKET socket = INVALID_SOCKET;
    		if (0 == GetQueuedCompletionStatus(hCompletionPort,
    			&BytesTransferred, 
    			(LPDWORD)&PerHanleData,
    			(LPOVERLAPPED*)&PerIoData,
    			INFINITE))//为0表示失败
    		{
    			hWnd = PerIoData->hWnd;
    		    socket = PerHanleData->Sock;
    
    			FreeAndNull(PerIoData);
    			FreeAndNull(PerHanleData);
    
    			// 获取错误代码
    			int nLastErr = WSAGetLastError();
    
    			// 错误处理
    			switch (nLastErr)
    			{
    			case ERROR_NETNAME_DELETED:
    			case ERROR_OPERATION_ABORTED:
    				{
    					::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
    					closesocket(socket);
    				}
    				break;
    
    			default:// 发生严重错误
    				{
    					return 0;
    				}
    				break;
    			}
    
    			// 继续工作
    			continue;
    		}
    
    		// 下面表示成功的从一个完成端口出列一个完成包
    		hWnd = PerIoData->hWnd;
    		socket = PerHanleData->Sock;
    
    		// 客户正常退出程序,会接收到0字节
    		if (0 == BytesTransferred)
    		{
    			FreeAndNull(PerHanleData);
    			FreeAndNull(PerIoData);
    
    			::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
    			closesocket(socket);
    
    			// 继续工作
    			continue;
    		}
    
    		// 数据包过大
    		if (1024*2<BytesTransferred)
    		{
    			FreeAndNull(PerHanleData);
    			FreeAndNull(PerIoData);
    
    			::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
    			closesocket(socket);
    
    			// 继续工作
    			continue;
    		}
    
            // 开始正确处理数据包 ---------------------------------
    		switch (PerIoData->send)
    		{
    
    		case 1:
    			{
    
    			}
    			break;
    
    		case 2:
    			{
    
    			}
    			break;
    
    		default:
    			break;
    		}
    	}// while(TRUE)
    
    
    	return TRUE;
    }


     

  • 相关阅读:
    leetCode算法题(一)
    node学习笔记1(http模块)
    mySql常用命令总结
    django学习遇到的问题解决方式
    WebSSH2 界面ssh(转载)
    垃圾回收策略学习总结
    5种数组扁平化方法
    冒泡、插入、选择、归并、快速排序
    手写ES6数组方法
    HTTP options预请求
  • 原文地址:https://www.cnblogs.com/hgy413/p/3693500.html
Copyright © 2011-2022 走看看