IOCP事实上是一个Tread pool,但是它运行于内核,所以相对于用户态的select效率应该高了一点点。
它的工作过程一般是这样的:
0.CreateIoCompletionPort创建一个完成端口
1.事先开好N个线程,这线程的个数一般为处理器个数的二倍+2,线程的参数传完成端口的句柄
2.服务器有一个listen线程,当它从已完成三次握手队列中取出一个临时accept套接字时
3.accept绑定到0步创建的完成端口
4.accept使用WSARecv读取客户端发送的数据,并放入缓冲区,使用OVERLAPPED开头的自定义数据结构,WSARecv接收客户端的数据,并把数据放入缓冲区,让工作线程(事先开好的N个线程)来取。注意dwFlags通常设为0,否则会出错
5.工作线程通过GetQueuedCompletionStatus查询,从缓冲区取出数据并加以处理
基本代码:
BOOL CIMServerThread::InitServer()
{
WSADATA wsadata;
if (0 != WSAStartup(WINSOCK_VERSION, &wsadata))
{
return FALSE;
}
SYSTEM_INFO syminfo;
GetSystemInfo(&syminfo);
// 确定创建多少个IOCP处理线程
m_dwThreads = syminfo.dwNumberOfProcessors*2;
m_phThread = new HANDLE[m_dwThreads];
return TRUE;
}
BOOL CIMServerThread::Run()
{
InitServer();
// 创建监听线程
if (NULL == m_hListenThread)
{
m_hListenThread = CreateThread(NULL, 0, ListenThread,
NULL, 0, NULL);
}
return TRUE;
}
DWORD WINAPI CIMServerThread::ListenThread(LPVOID lp)
{
HANDLE hCompletionPort
= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (NULL == hCompletionPort)
{
return TRUE;
}
// 为每个CPU创建两个工作线程
for (DWORD i=0; i<m_dwThreads; i++)
{
m_phThread[i] = CreateThread(NULL, 0, WorkThread, hCompletionPort,
0, NULL);
}
// WSASocket可以使用WinSock特有功能,比如重叠IO,用dwflags指定。
SOCKET listenSock = WSASocket(AF_INET, SOCK_STREAM, 0,
NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == listenSock)
{
return TRUE;
}
SOCKADDR_IN sin;
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
if (SOCKET_ERROR == bind(listenSock, (sockaddr*)&sin, sizeof(sin)))
{
return TRUE;
}
if (SOCKET_ERROR == listen(listenSock, 5))
{
return TRUE;
}
// 接受连接,并分配给完成端口
SOCKET acceptSock;
LPPER_HANDLE_DATA PerHandleDate = NULL;
DWORD RecvBytes = 0;
DWORD Flags = 0;
while (TRUE)
{
acceptSock = WSAAccept(listenSock, NULL, NULL, NULL, 0);
if (INVALID_SOCKET == acceptSock)
{
return TRUE;
}
PerHandleDate = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HEANDLE_DATA));
if (NULL == PerHandleDate)
{
return TRUE;
}
// 对临时socket启动突发断线检测
BOOL bKeepAlive = TRUE;
int nRet = setsockopt(acceptSock,
SOL_SOCKET,
SO_KEEPALIVE,
(char*)&bKeepAlive,
sizeof(bKeepAlive));
if (SOCKET_ERROR == nRet)
{
return TRUE;
}
tcp_keepalive alive_in;
tcp_keepalive alive_out;
alive_in.keepalivetime = 5000;
alive_in.keepaliveinterval = 5000;
alive_in.onoff = TRUE;
unsigned long ulBytesReturn = 0;
nRet = WSAIoctl(acceptSock, SIO_KEEPALIVE_VALS,
&alive_in, sizeof(alive_in),
&alive_out, sizeof(alive_out),
&ulBytesReturn, NULL, NULL);
if (SOCKET_ERROR == nRet)
{
return TRUE;
}
//
PerHandleDate->Sock = acceptSock;
if (NULL == CreateIoCompletionPort((HANDLE)acceptSock, hCompletionPort,
(ULONG_PTR)PerHandleDate, 0))
{
return TRUE;
}
LPPER_IO_OPERATION_DATA PerIoData =
(LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,
sizeof(PER_IO_OPERATION_DATA));
if (NULL == PerIoData)
{
return TRUE;
}
memset(&(PerIoData->Overlapped), 0, sizeof(OVERLAPPED));
PerIoData->DataBuf.len = DATA_BUFSIZE;
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->hWnd = AfxGetApp()->m_pMainWnd->GetSafeHwnd();
PerIoData->send = 1;
Flags = 0;
if (SOCKET_ERROR == WSARecv(acceptSock, &(PerIoData->DataBuf), 1,
&RecvBytes, &Flags, &(PerIoData->Overlapped), NULL))
{
if (ERROR_IO_PENDING != WSAGetLastError())
{
return TRUE;
}
}
}
return TRUE;
}
DWORD WINAPI CIMServerThread::WorkThread(LPVOID lp)
{
while (TRUE)
{
HANDLE hCompletionPort = (HANDLE)lp;
DWORD BytesTransferred = 0;
LPPER_HANDLE_DATA PerHanleData;
LPPER_IO_OPERATION_DATA PerIoData;
HWND hWnd = NULL;
SOCKET socket = INVALID_SOCKET;
if (0 == GetQueuedCompletionStatus(hCompletionPort,
&BytesTransferred,
(LPDWORD)&PerHanleData,
(LPOVERLAPPED*)&PerIoData,
INFINITE))//为0表示失败
{
hWnd = PerIoData->hWnd;
socket = PerHanleData->Sock;
FreeAndNull(PerIoData);
FreeAndNull(PerHanleData);
// 获取错误代码
int nLastErr = WSAGetLastError();
// 错误处理
switch (nLastErr)
{
case ERROR_NETNAME_DELETED:
case ERROR_OPERATION_ABORTED:
{
::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
closesocket(socket);
}
break;
default:// 发生严重错误
{
return 0;
}
break;
}
// 继续工作
continue;
}
// 下面表示成功的从一个完成端口出列一个完成包
hWnd = PerIoData->hWnd;
socket = PerHanleData->Sock;
// 客户正常退出程序,会接收到0字节
if (0 == BytesTransferred)
{
FreeAndNull(PerHanleData);
FreeAndNull(PerIoData);
::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
closesocket(socket);
// 继续工作
continue;
}
// 数据包过大
if (1024*2<BytesTransferred)
{
FreeAndNull(PerHanleData);
FreeAndNull(PerIoData);
::SendMessage(hWnd, IM_S_LOGOFF, NULL, socket);
closesocket(socket);
// 继续工作
continue;
}
// 开始正确处理数据包 ---------------------------------
switch (PerIoData->send)
{
case 1:
{
}
break;
case 2:
{
}
break;
default:
break;
}
}// while(TRUE)
return TRUE;
}