#pragma once #include <WinSock2.h> #include <MSWSock.h> #include <Windows.h> #pragma comment(lib,"ws2_32.lib") #define BUFFER_SIZE (1024*8) // 8KB #define BUFFER_SIZE_DATA (3*BUFFER_SIZE ) #define NOTIFY_MSG_ACCEPT 0xa1 #define NOTIFY_MSG_CONNECT 0xa2 #define NOTIFY_MSG_DISCONNECT 0xa3 #define NOTIFY_MSG_READ 0xa4 #define NOTIFY_MSG_WRITE 0xa5 struct PER_IO_BUFFER { WSAOVERLAPPED ol; SOCKET sClient ; //the socket of client use by AcceptEx LPBYTE lpBuffer; // the pointer of buffer DWORD dwBufferSize; // the size of buffer DWORD dwTrans ; // the size of io-trans BYTE opType; // opteion type #define OP_ACCEPT 6 #define OP_CONNECT 7 #define OP_WRITE 8 #define OP_READ 9 PER_IO_BUFFER* pNext; // next buffer }; struct PER_HANDLE_DATA { SOCKET s; SOCKADDR_IN saddr; BOOL bConnect; BYTE readBytes[BUFFER_SIZE_DATA]; DWORD dwBufferOffSet; HANDLE m_hWriteComplete ; //DWORD dwBufferSize; PER_HANDLE_DATA* pNext; }; typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD ); class Ciocp { private: HANDLE m_hIocp; // iocp handle SOCKET m_sListen; // the socket of listen SOCKET m_sConnect; // the socket of connect DWORD m_dwProt; // the port of listen DWORD m_dwMaxConns; // the max count of connectios DWORD m_dwMaxFreeBuffers; // the max count of freebuffers DWORD m_dwMaxFreeContexts;// the max count of freecontexts DWORD m_dwInitOp; // the count of init-op ULONGLONG m_ulWriteBytes; // the total of write-bytes ULONGLONG m_ulReadBytes; // the total of read-bytes DWORD m_dwWorkThreadCount ; // the count of worker thread DWORD m_dwCurWorkThreadCount ; CRITICAL_SECTION m_csWorkLock ; CRITICAL_SECTION m_csIoLock ; CRITICAL_SECTION m_csBuffersListLock; // the cs-lock of free-buffers-list CRITICAL_SECTION m_csContextsListLock; // the cs-lock of free-count DWORD m_dwBuffersCount; // the count of cur buffers DWORD m_dwContextsCount; // the count of cur contexts PER_IO_BUFFER* m_pFreeBuffersList; // the list of free-buffers PER_HANDLE_DATA* m_pFreeContextsList; // the list of free-contexts SOCKADDR_IN m_siRemoteAddr ; bool m_bStarted ; // the status of socket server LPFN_ACCEPTEX m_lpfnAcceptEx ; LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs; LPFN_CONNECTEX m_lpfnConnectEx ; PNOTIFYPROC m_pNotifyProc; BOOL PostAccept(); BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize); BOOL PostRead(PER_HANDLE_DATA* pContext); BOOL PostConnect(); static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam); PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE); PER_HANDLE_DATA* AllocContext(SOCKET s); void ReleaseContext(PER_HANDLE_DATA* pContext); void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer); void CreateWokerThreads(); void CreateIocp(); void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0); void DecCurWorkCount(); void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans); void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg); void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans); BOOL SetKeepAlive(SOCKET s); void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans); void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans); public: Ciocp(void); ~Ciocp(void); BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 ); void Shutdown(); BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 ); void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite); void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize); void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans); };
#include "StdAfx.h" #include "iocp.h" #include <process.h> #include <mstcpip.h> Ciocp::Ciocp(void) { m_hIocp = INVALID_HANDLE_VALUE ; m_sListen = INVALID_SOCKET ; m_sConnect = INVALID_SOCKET ; m_dwInitOp =5; m_dwMaxConns = 2000; m_dwMaxFreeContexts = 100; m_dwMaxFreeBuffers = 100 ; m_ulWriteBytes =0; m_ulReadBytes = 0; // get max worker count SYSTEM_INFO sys_info ; GetSystemInfo(&sys_info); m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ; m_dwCurWorkThreadCount = 0; InitializeCriticalSection(& m_csBuffersListLock); InitializeCriticalSection(& m_csContextsListLock); InitializeCriticalSection(& m_csWorkLock); InitializeCriticalSection((& m_csIoLock)); m_dwBuffersCount = 0; m_dwContextsCount = 0; m_pFreeBuffersList = NULL ; m_pFreeContextsList = NULL ; m_lpfnAcceptEx = NULL ; m_lpfnConnectEx = NULL ; m_lpfnGetAcceptExSockAddrs = NULL ; m_bStarted = false ; m_dwProt = 0; WSADATA wsaData ; WORD sockVerSion = MAKEWORD(2,2); WSAStartup(sockVerSion,&wsaData); } Ciocp::~Ciocp(void) { DeleteCriticalSection(& m_csBuffersListLock); DeleteCriticalSection(& m_csContextsListLock); DeleteCriticalSection(& m_csWorkLock); DeleteCriticalSection(& m_csIoLock); if (m_sListen != INVALID_SOCKET) { closesocket(m_sListen); } if (m_sConnect != INVALID_SOCKET) { closesocket(m_sConnect); } } BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */) { m_pNotifyProc = pNotifyProc ; m_dwProt = dwPort; m_dwMaxConns = dwMaxConns; m_dwMaxFreeBuffers = dwMaxFreeBuffers; m_dwMaxFreeContexts = dwMaxFreeContexts; m_dwInitOp = dwInitOp ; m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); SOCKADDR_IN saddr ; saddr.sin_family = AF_INET; saddr.sin_port = ntohs(m_dwProt); saddr.sin_addr.S_un.S_addr = INADDR_ANY ; m_bStarted = true ; if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR) { m_bStarted = false ; return FALSE ; } listen(m_sListen,m_dwMaxConns); CreateIocp(); GUID guidAcceptEx = WSAID_ACCEPTEX ; DWORD dwBytes ; WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL); GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ; WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL); AddSocketToIocp(m_sListen); //create worker CreateWokerThreads(); // post accept for(int i=0;i < m_dwInitOp;i++) { PostAccept(); } } BOOL Ciocp::PostAccept() { // set io type PER_IO_BUFFER* pBuffer = NULL ; pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE); pBuffer->opType = OP_ACCEPT ; // post io DWORD dwBytes ; DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ; pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); // BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient, // pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2 // ,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol); BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient, pBuffer->lpBuffer,0 ,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol); if (!b && WSAGetLastError() != WSA_IO_PENDING) { return FALSE ; } return TRUE; } BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize) { PER_IO_BUFFER* pBuffer = AllocBuffer(); pBuffer->opType = OP_WRITE; // post i/o DWORD dwBytes; DWORD dwFlags = 0; WSABUF buf; buf.buf = (char*)lpBuffer; buf.len = dwSize ; if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR) { if(::WSAGetLastError() != WSA_IO_PENDING) { return FALSE; } } return TRUE ; } BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext) { PER_IO_BUFFER* pBuffer = AllocBuffer(); pBuffer->opType = OP_READ; // post i/o DWORD dwBytes; DWORD dwFlags = 0; WSABUF buf; buf.buf = (char*)pBuffer->lpBuffer; buf.len = pBuffer->dwBufferSize ; if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR) { if(::WSAGetLastError() != WSA_IO_PENDING) { return FALSE; } } return TRUE ; } BOOL Ciocp::PostConnect() { PER_IO_BUFFER* pBuffer = NULL ; pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE); SOCKADDR_IN saddr ; saddr.sin_family = AF_INET; saddr.sin_port = htons(0); saddr.sin_addr.s_addr = htonl(ADDR_ANY); if (m_sConnect != INVALID_SOCKET) { closesocket(m_sConnect); m_sConnect = INVALID_SOCKET ; } m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR) { m_bStarted = false ; return FALSE ; } PER_HANDLE_DATA *pContext = AllocContext(m_sConnect); pContext->bConnect = TRUE ; AddSocketToIocp(m_sConnect,pContext); //create worker if (m_lpfnConnectEx == NULL ) { GUID guidConnectex = WSAID_CONNECTEX ; DWORD dwBytes ; WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL); } DWORD dwSend = 0; pBuffer->opType = OP_CONNECT ; strcpy((LPSTR)pBuffer->lpBuffer,"hello kid"); pBuffer->dwBufferSize = 5; bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr), pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol); if(!b && ::WSAGetLastError() != WSA_IO_PENDING) { return FALSE; } return TRUE; } PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize) { OutputDebugString(L"Buffer ++ "); PER_IO_BUFFER* pBuffer = NULL; if (dwSize > BUFFER_SIZE) { return NULL ; } EnterCriticalSection(&m_csBuffersListLock); if (m_pFreeBuffersList == NULL ) { // 2) HeapAlloc buffer pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE ); }else { // 1) check pFreeBuffersList pBuffer = m_pFreeBuffersList ; m_pFreeBuffersList = m_pFreeBuffersList->pNext ; pBuffer->pNext = 0; m_dwBuffersCount -- ; } LeaveCriticalSection(&m_csBuffersListLock); // if (pBuffer!= NULL ) { pBuffer->dwBufferSize = dwSize ; pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 ); } return pBuffer ; } unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam) { Sleep(1000); Ciocp* pThis = (Ciocp*)lpParam ; DWORD dwTrans = 0; DWORD dwKey = 0; LPOVERLAPPED lpol; PER_IO_BUFFER* pBuffer = NULL ; while (pThis->m_bStarted ) { BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE); if (dwTrans == -1 ) { pThis->DecCurWorkCount(); _endthreadex(0); return 0; } pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol); if (!bOk) { if (pBuffer->opType == OP_CONNECT) { DWORD dwError = WSAGetLastError(); if (dwError != ERROR_IO_PENDING) { pThis->PostConnect(); pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey); }else { OutputDebugString(L"Connect Pending .... "); } }// (pBuffer->opType == OP_CONNECT) pThis->ReleaseIoBuffer(pBuffer); }else //(!bOk) { pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans); } } pThis->DecCurWorkCount(); return WSAGetLastError(); } void Ciocp::CreateWokerThreads() { for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++) { unsigned threadid; _beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid); m_dwCurWorkThreadCount ++ ; } } void Ciocp::CreateIocp() { if (m_hIocp == INVALID_HANDLE_VALUE) { m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0); } } void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/) { CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0); } BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */) { m_pNotifyProc = pNotifyProc ; m_siRemoteAddr.sin_family = AF_INET; m_siRemoteAddr.sin_port = htons(dwPort); m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp); //listen(m_sListen,m_dwMaxConns); CreateIocp(); m_bStarted = true ; CreateWokerThreads(); // post accept return PostConnect(); } PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s) { PER_HANDLE_DATA *pContext = NULL ; OutputDebugString(L"Context ++ "); EnterCriticalSection(&m_csContextsListLock); if (m_pFreeContextsList== NULL ) { pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA)); }else { pContext = m_pFreeContextsList ; m_pFreeContextsList = pContext->pNext ; pContext->pNext = 0; m_dwContextsCount -- ; } LeaveCriticalSection(& m_csContextsListLock); if (pContext != NULL) { pContext->s = s ; pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL); } return pContext ; } void Ciocp::DecCurWorkCount() { EnterCriticalSection(&m_csWorkLock); m_dwCurWorkThreadCount -- ; LeaveCriticalSection(&m_csWorkLock); } void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans) { EnterCriticalSection(&m_csIoLock); switch(pBuffer->opType) { case OP_ACCEPT: //OutputDebugString(L"Accept "); ProcessIoAccept(pContext,pBuffer,dwTrans); break; case OP_CONNECT: ProcessIoConnect(pContext,pBuffer,dwTrans); break; case OP_WRITE: //OutputDebugString(L"Write "); ProcessIoWrite(pContext,pBuffer,dwTrans); break; case OP_READ: //OutputDebugString(L"Read "); ProcessIoRead(pContext,pBuffer,dwTrans); break; default: OutputDebugString(L"HandleIoOp Default... "); break; } // release pBuffer //ReleaseIoBuffer(pBuffer); LeaveCriticalSection(&m_csIoLock); } void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans) { if (dwTrans ==0 ) // read error { NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT); if (pContext->bConnect) // is connect { PostConnect(); }else { } ReleaseContext(pContext); }else { // read ok notify main thread pBuffer->dwTrans = dwTrans ; if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA ) { closesocket(pContext->s); if (pContext->bConnect) { PostConnect(); } return ; } memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans); pContext->dwBufferOffSet+= pBuffer->dwTrans ; NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ); PostRead(pContext); } ReleaseIoBuffer(pBuffer); } void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg) { if (m_pNotifyProc == NULL ) { OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll "); } if(!IsBadCodePtr((FARPROC)m_pNotifyProc)) { m_pNotifyProc(pContext,pBuffer,dwMsg); }else { OutputDebugString(L" m_pNotifyProc is badcodeptr "); } } void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext) { OutputDebugString(L"Context -- "); EnterCriticalSection(& m_csContextsListLock); CloseHandle(pContext->m_hWriteComplete); ZeroMemory(pContext,sizeof(PER_HANDLE_DATA)); if (m_dwContextsCount > m_dwMaxFreeContexts) { HeapFree(GetProcessHeap(),0,pContext); LeaveCriticalSection(& m_csContextsListLock); return ; }else { pContext->pNext = m_pFreeContextsList; m_pFreeContextsList = pContext ; m_dwContextsCount ++ ; } LeaveCriticalSection(& m_csContextsListLock); } void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer) { OutputDebugString(L"Buffer -- "); EnterCriticalSection(& m_csBuffersListLock); ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize); if (m_dwBuffersCount > m_dwMaxFreeBuffers) { HeapFree(GetProcessHeap(),0,pBuffer); LeaveCriticalSection(& m_csBuffersListLock); return ; }else { pBuffer->pNext = m_pFreeBuffersList ; m_pFreeBuffersList =pBuffer ; m_dwBuffersCount ++ ; } LeaveCriticalSection(& m_csBuffersListLock); } void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans) { if (dwTrans == 0) { NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT); ReleaseContext(pContext); }else { int nAddrLen = sizeof(SOCKADDR_IN); getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen); SetKeepAlive(pContext->s); NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT); //OutputDebugString(L"Connect "); PostRead(pContext); } ReleaseIoBuffer(pBuffer); } void Ciocp::Shutdown() { m_bStarted = false ; if (m_sListen != INVALID_SOCKET ) { closesocket(m_sListen); m_sListen = INVALID_SOCKET ; } if (m_sConnect != INVALID_SOCKET) { closesocket(m_sConnect); m_sConnect = INVALID_SOCKET ; } while (m_dwCurWorkThreadCount > 0) { ::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL); Sleep(100); } PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ; while(pBuffer) { m_pFreeBuffersList = pBuffer->pNext ; HeapFree(GetProcessHeap(),0,pBuffer); pBuffer = m_pFreeBuffersList ; } m_dwBuffersCount = 0; PER_HANDLE_DATA* pContext = m_pFreeContextsList ; while(pContext) { m_pFreeContextsList = pContext->pNext ; HeapFree(GetProcessHeap(),0,pContext); pContext = m_pFreeContextsList ; } m_dwContextsCount = 0 ; } void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans) { LPSOCKADDR lpLocalAddr,lpRemoteAddr; int nLocalAddr,nRemoteAddr; DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ; PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient); m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr); memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr); SetKeepAlive(pBuffer->sClient); AddSocketToIocp(pBuffer->sClient,pContext1); NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT); PostRead(pContext1); PostAccept(); ReleaseIoBuffer(pBuffer); } BOOL Ciocp::SetKeepAlive(SOCKET s) { BOOL bKeepAlive = TRUE; int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive)); if (nRet == SOCKET_ERROR) { return false ; }else { tcp_keepalive alive_in = {0}; tcp_keepalive alive_out = {0}; alive_in.keepalivetime = 5000; alive_in.keepaliveinterval = 1000; alive_in.onoff = TRUE ; unsigned long ulBytesReturn = 0 ; nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL); if (nRet == SOCKET_ERROR) { return FALSE ; } } return TRUE ; } void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite) { *pulWrite = m_ulWriteBytes ; *pulRead = m_ulReadBytes ; } void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize) { WaitForSingleObject(pContext->m_hWriteComplete,INFINITE); PostWrite(pContext,lpBuffer,dwSize); } void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans) { if (dwTrans == 0) { NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT); if (pContext->bConnect) // is connect { PostConnect(); }else { } ReleaseContext(pContext); } else { SetEvent(pContext->m_hWriteComplete); } ReleaseIoBuffer(pBuffer); }
IOCP - UDP
//NOTE // //This code taken from Mr. Bob Quinn's article //titled 'Internet Multicasting' //published in Dr. Dobb's Journal dated Oct 1997 //I have modified the original code to illustrate //the use I/O completion ports with UDP. //If you have any comments email me : shapall@hotmail.com #include "StdAfx.h" #include <winsock2.h> #include <ws2tcpip.h> #include "Stdio.h" #define BUFSIZE 1024 //max size of incoming data buffer #define MAXADDRSTR 16 #define DEFAULT_GROUP_ADDRESS "239.254.1.2" #define DEFAULT_PORT 7125 LONG nCount = 0; HANDLE g_hCompletionPort; DWORD WINAPI WorkerThread( LPVOID WorkContext ); BOOL HandleIncomingData( UCHAR* pBuf); BOOL CreateNetConnections( VOID ); BOOL CreateWorkers( UINT ); void InitWinsock2(); void UnInitWinsock2(); HANDLE g_hReadEvent; SOCKET g_hSocket; UCHAR achInBuf [BUFSIZE]; char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS; u_short nPort = DEFAULT_PORT; OVERLAPPED Overlapped; //----------------------------------------------------------------- void InitWinsock2() { WSADATA data; WORD version; int ret = 0; version = (MAKEWORD(2, 2)); ret = WSAStartup(version, &data); if (ret != 0) { ret = WSAGetLastError(); if (ret == WSANOTINITIALISED) { printf("not initialised"); } } } //----------------------------------------------------------------- void UnInitWinsock2() { WSACleanup(); } //----------------------------------------------------------------- BOOL CreateNetConnections (void) { DWORD nbytes; BOOL b; BOOL fFlag = TRUE; int nRet=0; SOCKADDR_IN stLclAddr; struct ip_mreq stMreq; // Multicast interface structure // Get a datagram socket g_hSocket = socket(AF_INET, SOCK_DGRAM,0); if (g_hSocket == INVALID_SOCKET) { printf ("socket() failed, Err: %d ", WSAGetLastError()); return FALSE; } nRet = setsockopt(g_hSocket,SOL_SOCKET, SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag)); if (nRet == SOCKET_ERROR) { printf ("setsockopt() SO_REUSEADDR failed, Err: %d ",WSAGetLastError()); } // Name the socket (assign the local port number to receive on) stLclAddr.sin_family = AF_INET; stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); stLclAddr.sin_port = htons(nPort); nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr)); if (nRet == SOCKET_ERROR) { printf ("bind() port: %d failed, Err: %d ", nPort,WSAGetLastError()); } // Join the multicast group so we can receive from it stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr); stMreq.imr_interface.s_addr = INADDR_ANY; nRet = setsockopt(g_hSocket,IPPROTO_IP, IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq)); if (nRet == SOCKET_ERROR) { printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed, Err: %d ",achMCAddr, WSAGetLastError()); } // //note the 10 says how many concurrent cpu bound threads to allow thru //this should be tunable based on the requests. CPU bound requests will // really really honor this. // g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE, NULL,0,3); if (!g_hCompletionPort) { fprintf (stdout, "g_hCompletionPort Create Failed "); return FALSE; } //Associate this socket to this I/O completion port CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort, (DWORD)g_hSocket,3); // // Start off an asynchronous read on the socket. // Overlapped.hEvent = g_hReadEvent; Overlapped.Internal = 0; Overlapped.InternalHigh = 0; Overlapped.Offset = 0; Overlapped.OffsetHigh = 0; b = ReadFile ((HANDLE)g_hSocket,&achInBuf, sizeof(achInBuf),&nbytes,&Overlapped); if (!b && GetLastError () != ERROR_IO_PENDING) { fprintf (stdout, "ReadFile Failed "); return FALSE; } return TRUE; } //----------------------------------------------------------------- BOOL CreateWorkers (UINT dwNumberOfWorkers) { DWORD ThreadId; HANDLE ThreadHandle; DWORD i; for (i = 0; i < dwNumberOfWorkers; i++) { ThreadHandle = CreateThread (NULL,0, WorkerThread,NULL,0,&ThreadId); if (!ThreadHandle) { fprintf (stdout, "Create Worker Thread Failed "); return FALSE; } CloseHandle (ThreadHandle); } return TRUE; } //----------------------------------------------------------------- DWORD WINAPI WorkerThread (LPVOID WorkContext) { DWORD nSocket; BOOL b; OVERLAPPED ovl; LPOVERLAPPED lpo=&ovl; DWORD nBytesRead=0; DWORD nBytesToBeRead; UCHAR ReadBuffer[BUFSIZE]; LPVOID lpMsgBuf; memset(&ReadBuffer,0,BUFSIZE); for (;;) { b = GetQueuedCompletionStatus(g_hCompletionPort, &nBytesToBeRead,&nSocket,&lpo,INFINITE); if (b || lpo) { if (b) { // // Determine how long a response was desired by the client. // OVERLAPPED ol; ol.hEvent = g_hReadEvent; ol.Offset = 0; ol.OffsetHigh = 0; b = ReadFile ((HANDLE)nSocket,&ReadBuffer, nBytesToBeRead,&nBytesRead,&ol); if (!b ) { DWORD dwErrCode = GetLastError(); if( dwErrCode != ERROR_IO_PENDING ) { // something has gone wrong here... printf("Something has gone wrong:Error code - %d ",dwErrCode ); FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, dwErrCode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),// Default language (LPTSTR) &lpMsgBuf, 0, NULL); OutputDebugString((LPCTSTR)lpMsgBuf); //Free the buffer. LocalFree(lpMsgBuf ); } else if( dwErrCode == ERROR_IO_PENDING ) { // I had to do this for my UDP sample //Never did for my TCP servers WaitForSingleObject(ol.hEvent,INFINITE); HandleIncomingData(ReadBuffer); } } else { HandleIncomingData(ReadBuffer); } continue; } else { fprintf (stdout, "WorkThread Wait Failed "); //exit (1); } } return 1; } } //----------------------------------------------------------------- BOOL HandleIncomingData( UCHAR* pBuf) { InterlockedIncrement(&nCount); SYSTEMTIME *lpstSysTime; lpstSysTime = (SYSTEMTIME *)(pBuf); printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d ",nCount, lpstSysTime->wHour, lpstSysTime->wMinute, lpstSysTime->wSecond, lpstSysTime->wMilliseconds, lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear); memset(&pBuf,0,BUFSIZE); //just making sure that i am not showing stale data return TRUE; } //----------------------------------------------------------------- main () { //You can modify your program to take some arguments for port number //and multicast group address here printf(" *************************************** "); printf("Group IP address: %s ",achMCAddr); printf("Port number : %d ",nPort); printf(" *************************************** "); //Initialize winsock 2 InitWinsock2(); //We want to keep the main thread running HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT"); ResetEvent(hWait2Exit ); //This OVERLAPPED event g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL); // // try to get timing more accurate... Avoid context // switch that could occur when threads are released // SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL); if (!CreateNetConnections ()) { printf("Error condition @ CreateNetConnections , exiting "); return 1; } if (!CreateWorkers (5)) { printf("Error condition @CreateWorkers, exiting "); return 1; } WaitForSingleObject(hWait2Exit,INFINITE); UnInitWinsock2(); return 1; }
转 实现UDP IOCP心得-zt
http://www.cnblogs.com/BeginGame/archive/2011/09/18/2180241.html