zoukankan      html  css  js  c++  java
  • iocp-socket 服务(借鉴别人的,根据自己的需要改的)未完待续

    #pragma once
    #include <WinSock2.h>
    #include <MSWSock.h>
    #include <Windows.h>
    #pragma comment(lib,"ws2_32.lib")
    
    #define BUFFER_SIZE (1024*8) // 8KB
    #define BUFFER_SIZE_DATA (3*BUFFER_SIZE )
    
    #define NOTIFY_MSG_ACCEPT	0xa1
    #define NOTIFY_MSG_CONNECT	0xa2
    #define NOTIFY_MSG_DISCONNECT 0xa3
    #define NOTIFY_MSG_READ		0xa4
    #define NOTIFY_MSG_WRITE	0xa5
    
    
    struct PER_IO_BUFFER
    {
    	WSAOVERLAPPED ol;
    	SOCKET		sClient ;		//the socket of client use by AcceptEx  
    	LPBYTE		lpBuffer;		// the pointer of buffer
    	DWORD		dwBufferSize;	// the size of buffer
    	DWORD		dwTrans	;		// the size of io-trans
    	BYTE		opType;			// opteion type 
    #define OP_ACCEPT	6
    #define OP_CONNECT	7
    #define OP_WRITE	8
    #define OP_READ		9
    	PER_IO_BUFFER* 	pNext;		// next buffer
    
    };
    
    struct PER_HANDLE_DATA
    {
    	SOCKET s;
    	SOCKADDR_IN saddr;
    	BOOL bConnect;
    	BYTE readBytes[BUFFER_SIZE_DATA];
    	DWORD dwBufferOffSet;
    	HANDLE m_hWriteComplete ;
    	//DWORD dwBufferSize;
    	PER_HANDLE_DATA* pNext;
    };
    
    typedef void (__stdcall* PNOTIFYPROC)(PER_HANDLE_DATA*,PER_IO_BUFFER* ,DWORD );
    class Ciocp
    {
    private:
    	HANDLE	m_hIocp;			// iocp handle
    	SOCKET	m_sListen;			// the socket of listen 
    	SOCKET	m_sConnect;			// the socket of connect 
    	DWORD	m_dwProt;			// the port of listen 
    	DWORD	m_dwMaxConns;		// the max count of connectios
    	DWORD	m_dwMaxFreeBuffers;	// the max count of freebuffers
    	DWORD	m_dwMaxFreeContexts;// the max count of freecontexts
    	DWORD	m_dwInitOp;			// the count of init-op
    	ULONGLONG m_ulWriteBytes;	// the total of write-bytes 
    	ULONGLONG m_ulReadBytes;	// the total of read-bytes
    
    	DWORD m_dwWorkThreadCount ;	// the count of worker thread
    	DWORD m_dwCurWorkThreadCount ;
    	CRITICAL_SECTION m_csWorkLock ;
    	CRITICAL_SECTION m_csIoLock ;
    
    	CRITICAL_SECTION m_csBuffersListLock;		// the cs-lock of free-buffers-list 
    	CRITICAL_SECTION m_csContextsListLock;		// the cs-lock of free-count
    	DWORD			m_dwBuffersCount;			// the count of cur buffers
    	DWORD			m_dwContextsCount;			// the count of cur contexts
    
    	PER_IO_BUFFER* m_pFreeBuffersList;		// the list of free-buffers
    	PER_HANDLE_DATA* m_pFreeContextsList;	// the list of free-contexts
    
    	SOCKADDR_IN		m_siRemoteAddr ;
    	bool			m_bStarted ;			// the status of socket server
    
    	LPFN_ACCEPTEX				m_lpfnAcceptEx ;
    	LPFN_GETACCEPTEXSOCKADDRS	m_lpfnGetAcceptExSockAddrs;
    	LPFN_CONNECTEX				m_lpfnConnectEx ;
    	PNOTIFYPROC					m_pNotifyProc;
    
    	BOOL PostAccept();
    	BOOL PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize);
    	BOOL PostRead(PER_HANDLE_DATA* pContext);
    	BOOL PostConnect();
    	static unsigned int WINAPI _WorkerThreadProc(LPVOID lpParam);
    	PER_IO_BUFFER* AllocBuffer(DWORD dwSize = BUFFER_SIZE);
    	PER_HANDLE_DATA* AllocContext(SOCKET s);
    	void ReleaseContext(PER_HANDLE_DATA* pContext);
    	void ReleaseIoBuffer(PER_IO_BUFFER* pBuffer);
    	void CreateWokerThreads();
    	void CreateIocp();
    	void AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext=0);
    	void DecCurWorkCount();
    	void HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans);
    	void NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg);
    	void ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
    	BOOL SetKeepAlive(SOCKET s);
    	void ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
    	void ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
    public:
    	Ciocp(void);
    	~Ciocp(void);
    	BOOL Start(PNOTIFYPROC pNotifyProc, DWORD dwPort=8080,DWORD dwMaxConns= 2000,DWORD dwMaxFreeBuffers = 100,DWORD dwMaxFreeContexts =100 ,DWORD dwInitOp = 5 );
    	void Shutdown();
    	BOOL Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp = "127.0.0.1",DWORD dwPort=443 );
    	void GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite);
    	void Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize);
    	void ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans);
    
    };
    

      

    #include "StdAfx.h"
    #include "iocp.h"
    #include <process.h>
    #include <mstcpip.h>
    
    
    Ciocp::Ciocp(void)
    {
    
    	m_hIocp = INVALID_HANDLE_VALUE ;
    	m_sListen = INVALID_SOCKET ;
    	m_sConnect = INVALID_SOCKET ;
    	m_dwInitOp =5;
    	m_dwMaxConns = 2000;
    	m_dwMaxFreeContexts = 100;
    	m_dwMaxFreeBuffers = 100 ;
    	m_ulWriteBytes =0;
    	m_ulReadBytes = 0;
    	// get max worker count
    	SYSTEM_INFO sys_info ;
    	GetSystemInfo(&sys_info);
    	m_dwWorkThreadCount = sys_info.dwNumberOfProcessors *2 ;
    	m_dwCurWorkThreadCount = 0;
    	InitializeCriticalSection(& m_csBuffersListLock);
    	InitializeCriticalSection(& m_csContextsListLock);
    	InitializeCriticalSection(& m_csWorkLock);
    	InitializeCriticalSection((& m_csIoLock));
    	m_dwBuffersCount = 0;
    	m_dwContextsCount = 0;
    
    	m_pFreeBuffersList = NULL ;
    	m_pFreeContextsList = NULL ;
    
    	m_lpfnAcceptEx = NULL ;
    	m_lpfnConnectEx = NULL ;
    	m_lpfnGetAcceptExSockAddrs = NULL ;
    	m_bStarted = false ;
    
    	m_dwProt = 0;
    	WSADATA wsaData ;
    	WORD sockVerSion = MAKEWORD(2,2);
    	WSAStartup(sockVerSion,&wsaData);
    }
    
    
    Ciocp::~Ciocp(void)
    {
    	DeleteCriticalSection(& m_csBuffersListLock);
    	DeleteCriticalSection(& m_csContextsListLock);
    	DeleteCriticalSection(& m_csWorkLock);
    	DeleteCriticalSection(& m_csIoLock);
    	if (m_sListen != INVALID_SOCKET)
    	{
    		closesocket(m_sListen);
    	}
    
    	if (m_sConnect != INVALID_SOCKET)
    	{
    		closesocket(m_sConnect);
    	}
    }
    
    BOOL Ciocp::Start(PNOTIFYPROC pNotifyProc, DWORD dwPort/*=8080*/,DWORD dwMaxConns/*= 2000*/,DWORD dwMaxFreeBuffers /*= 100*/,DWORD dwMaxFreeContexts /*=100 */,DWORD dwInitOp /*= 5 */)
    {
    	m_pNotifyProc = pNotifyProc ;
    	m_dwProt = dwPort;
    	m_dwMaxConns = dwMaxConns;
    	m_dwMaxFreeBuffers = dwMaxFreeBuffers;
    	m_dwMaxFreeContexts = dwMaxFreeContexts;
    	m_dwInitOp = dwInitOp ;
    
    	m_sListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
    	SOCKADDR_IN saddr ;
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = ntohs(m_dwProt);
    	saddr.sin_addr.S_un.S_addr = INADDR_ANY ;
    	m_bStarted = true ;
    
    	if (bind(m_sListen,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
    	{
    		m_bStarted = false ;
    		return FALSE ;
    	}
    
    	listen(m_sListen,m_dwMaxConns);
    
    	CreateIocp();
    
    	GUID guidAcceptEx = WSAID_ACCEPTEX ;
    	DWORD dwBytes ;
    	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidAcceptEx,sizeof(guidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwBytes,NULL,NULL);
    
    	GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ;
    
    	WSAIoctl(m_sListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidGetAcceptExSockAddrs,sizeof(guidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwBytes,NULL,NULL);
    
    	AddSocketToIocp(m_sListen);
    	//create worker
    	CreateWokerThreads();
    	
    	// post accept 
    	
    	for(int i=0;i < m_dwInitOp;i++)
    	{
    		
    		PostAccept();
    	}
    
    
    	
    }
    
    BOOL Ciocp::PostAccept()
    {
    	// set io type
    	PER_IO_BUFFER* pBuffer = NULL ;
    	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
    	pBuffer->opType = OP_ACCEPT ;
    
    	// post io 
    	DWORD dwBytes ;
    	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
    	pBuffer->sClient = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
    // 	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
    // 		pBuffer->lpBuffer,pBuffer->dwBufferSize-dwAddrSize*2 
    // 		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
    	BOOL b = m_lpfnAcceptEx(m_sListen,pBuffer->sClient,
    		pBuffer->lpBuffer,0 
    		,dwAddrSize,dwAddrSize,&dwBytes,&pBuffer->ol);
    	if (!b && WSAGetLastError() != WSA_IO_PENDING)
    	{
    		return FALSE ;
    	}
    
    	return TRUE;
    
    
    }
    
    BOOL Ciocp::PostWrite(PER_HANDLE_DATA* pContext, LPBYTE lpBuffer, DWORD dwSize)
    {
    	PER_IO_BUFFER* pBuffer = AllocBuffer();
    	pBuffer->opType = OP_WRITE; 
    
    	// post i/o 
    	DWORD dwBytes;  
    	DWORD dwFlags = 0;  
    	WSABUF buf;  
    	buf.buf = (char*)lpBuffer;  
    	buf.len = dwSize  ;  
    	if(::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
    	{  
    		if(::WSAGetLastError() != WSA_IO_PENDING)  
    		{  
    
    			return FALSE;  
    		}  
    	}  
    
    	return TRUE ;
    	
    }
    
    BOOL Ciocp::PostRead(PER_HANDLE_DATA* pContext)
    {
    	PER_IO_BUFFER* pBuffer = AllocBuffer();
    	pBuffer->opType = OP_READ; 
    
    	// post i/o 
    	DWORD dwBytes;  
    	DWORD dwFlags = 0;  
    	WSABUF buf;  
    	buf.buf = (char*)pBuffer->lpBuffer;  
    	buf.len = pBuffer->dwBufferSize  ;  
    	if(::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)  
    	{  
    		if(::WSAGetLastError() != WSA_IO_PENDING)  
    		{  
    			
    			return FALSE;  
    		}  
    	}  
    
    	return TRUE ;
    }
    
    BOOL Ciocp::PostConnect()
    {
    	PER_IO_BUFFER* pBuffer = NULL ;
    
    	pBuffer = (PER_IO_BUFFER*)AllocBuffer(BUFFER_SIZE);
    
    	SOCKADDR_IN saddr ;
    	saddr.sin_family = AF_INET;
    	saddr.sin_port = htons(0);
    	saddr.sin_addr.s_addr = htonl(ADDR_ANY);
    
    	if (m_sConnect != INVALID_SOCKET)
    	{
    		closesocket(m_sConnect);
    		m_sConnect = INVALID_SOCKET ;
    	}
    	m_sConnect = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
    
    	if (bind(m_sConnect,(SOCKADDR*)&saddr,sizeof(saddr)) == SOCKET_ERROR)
    	{
    		m_bStarted = false ;
    		return FALSE ;
    	}
    	
    	
    	PER_HANDLE_DATA *pContext = AllocContext(m_sConnect);
    	pContext->bConnect = TRUE ;
    	AddSocketToIocp(m_sConnect,pContext);
    	//create worker
    	
    	if (m_lpfnConnectEx == NULL )
    	{
    		GUID guidConnectex = WSAID_CONNECTEX ;
    		DWORD dwBytes ;
    		WSAIoctl(m_sConnect,SIO_GET_EXTENSION_FUNCTION_POINTER,&guidConnectex,sizeof(guidConnectex),&m_lpfnConnectEx,sizeof(m_lpfnConnectEx),&dwBytes,NULL,NULL);
    	}
    
    
    	DWORD dwSend = 0;
    	pBuffer->opType = OP_CONNECT ;
    	strcpy((LPSTR)pBuffer->lpBuffer,"hello kid");
    	pBuffer->dwBufferSize = 5;
    	bool b = m_lpfnConnectEx(m_sConnect,(SOCKADDR*)&m_siRemoteAddr,sizeof(m_siRemoteAddr),
    		pBuffer->lpBuffer,pBuffer->dwBufferSize,&dwSend,& pBuffer->ol);
    
    	if(!b && ::WSAGetLastError() != WSA_IO_PENDING)  
    	{  
    		return FALSE;  
    	}  
    
    	return TRUE;  
    }
    
    PER_IO_BUFFER* Ciocp::AllocBuffer(DWORD dwSize)
    {
    	OutputDebugString(L"Buffer ++ 
     ");
    	PER_IO_BUFFER* pBuffer = NULL;
    	if (dwSize > BUFFER_SIZE)
    	{
    		return NULL ;
    	}
    	
    	EnterCriticalSection(&m_csBuffersListLock);
    	if (m_pFreeBuffersList == NULL )
    	{
    		// 2) HeapAlloc buffer 
    		pBuffer = (PER_IO_BUFFER*) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_IO_BUFFER)+ BUFFER_SIZE );
    	}else
    	{
    		// 1) check pFreeBuffersList
    		pBuffer = m_pFreeBuffersList ;
    		m_pFreeBuffersList = m_pFreeBuffersList->pNext ;
    		pBuffer->pNext = 0;
    		m_dwBuffersCount -- ;
    	}
    	LeaveCriticalSection(&m_csBuffersListLock);
    	// 
    	if (pBuffer!= NULL )
    	{
    		pBuffer->dwBufferSize = dwSize ;
    		pBuffer->lpBuffer =(LPBYTE) (pBuffer+1 );
    
    
    	}
    	return pBuffer ;
    	
    }
    
    unsigned int WINAPI Ciocp::_WorkerThreadProc(LPVOID lpParam)
    {
    	Sleep(1000);
    	Ciocp* pThis = (Ciocp*)lpParam ;
    	DWORD dwTrans = 0;
    	DWORD dwKey = 0;
    	LPOVERLAPPED lpol;
    	PER_IO_BUFFER* pBuffer = NULL ;
    	while (pThis->m_bStarted )
    	{
    		BOOL bOk = GetQueuedCompletionStatus(pThis->m_hIocp,&dwTrans,&dwKey,&lpol,WSA_INFINITE);
    		if (dwTrans == -1 )
    		{
    			pThis->DecCurWorkCount();
    			_endthreadex(0);
    			return 0;
    		}
    		pBuffer = CONTAINING_RECORD(lpol,PER_IO_BUFFER,ol);
    		if (!bOk)
    		{
    			if (pBuffer->opType == OP_CONNECT)
    			{
    				DWORD dwError = WSAGetLastError();
    				if (dwError != ERROR_IO_PENDING)
    				{
    					
    					pThis->PostConnect();
    					pThis->ReleaseContext((PER_HANDLE_DATA*)dwKey);
    				}else
    				{
    					OutputDebugString(L"Connect Pending .... ");
    				}
    			}// (pBuffer->opType == OP_CONNECT)
    
    			pThis->ReleaseIoBuffer(pBuffer);
    		}else //(!bOk)
    		{
    			pThis->HandleIoOp((PER_HANDLE_DATA*)dwKey,pBuffer,dwTrans);
    		}
    	}
    	pThis->DecCurWorkCount();
    	return  WSAGetLastError();
    }
    
    void Ciocp::CreateWokerThreads()
    {
    	
    	for (int i= 0;i< m_dwWorkThreadCount-m_dwCurWorkThreadCount ;i++)
    	{
    		unsigned threadid;
    		_beginthreadex(NULL,0,_WorkerThreadProc,this,0,&threadid);
    		m_dwCurWorkThreadCount ++ ;
    	}
    }
    
    void Ciocp::CreateIocp()
    {
    	if (m_hIocp == INVALID_HANDLE_VALUE)
    	{
    		m_hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);
    	}
    }
    
    void Ciocp::AddSocketToIocp(SOCKET s, PER_HANDLE_DATA* pContext/*=0*/)
    {
    	CreateIoCompletionPort((HANDLE)s,m_hIocp,(DWORD)pContext,0);
    }
    
    
    
    BOOL Ciocp::Connect(PNOTIFYPROC pNotifyProc, LPSTR lpstrIp /*= "127.0.0.1"*/,DWORD dwPort/*=443 */)
    {
    	m_pNotifyProc = pNotifyProc ;
    
    	m_siRemoteAddr.sin_family = AF_INET;
    	m_siRemoteAddr.sin_port = htons(dwPort);
    	m_siRemoteAddr.sin_addr.S_un.S_addr = inet_addr(lpstrIp);
    	
    	
    
    	//listen(m_sListen,m_dwMaxConns);
    
    	CreateIocp();
    
    	m_bStarted = true ;
    	CreateWokerThreads();
    
    	
    	// post accept 
    	
    
    	return PostConnect();
    
    
    }
    
    PER_HANDLE_DATA* Ciocp::AllocContext(SOCKET s)
    {
    	PER_HANDLE_DATA *pContext = NULL ;
    	OutputDebugString(L"Context ++ 
    ");
    	
    
    	EnterCriticalSection(&m_csContextsListLock);
    	if (m_pFreeContextsList== NULL )
    	{
    		pContext = (PER_HANDLE_DATA*)HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sizeof(PER_HANDLE_DATA));
    	}else
    	{
    		pContext = m_pFreeContextsList ;
    		m_pFreeContextsList = pContext->pNext ;
    		pContext->pNext = 0;
    		m_dwContextsCount -- ;
    	}
    	LeaveCriticalSection(& m_csContextsListLock);
    
    	if (pContext != NULL)
    	{
    		pContext->s = s ;
    		pContext->m_hWriteComplete = CreateEvent(NULL,true,TRUE,NULL);
    	}
    	return pContext ;
    }
    
    void Ciocp::DecCurWorkCount()
    {
    	EnterCriticalSection(&m_csWorkLock);
    	m_dwCurWorkThreadCount -- ;
    	LeaveCriticalSection(&m_csWorkLock);
    }
    
    
    
    void Ciocp::HandleIoOp(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer,DWORD dwTrans)
    {
    	EnterCriticalSection(&m_csIoLock);
    	
    	switch(pBuffer->opType)
    	{
    	case OP_ACCEPT:
    		//OutputDebugString(L"Accept 
    ");
    		ProcessIoAccept(pContext,pBuffer,dwTrans);
    		break;
    	case OP_CONNECT:
    		
    		ProcessIoConnect(pContext,pBuffer,dwTrans);
    
    		break;
    	case OP_WRITE:
    		//OutputDebugString(L"Write 
    ");
    		ProcessIoWrite(pContext,pBuffer,dwTrans);
    		break;
    	case OP_READ:
    		//OutputDebugString(L"Read 
    ");
    		ProcessIoRead(pContext,pBuffer,dwTrans);
    		break;
    	default:
    		OutputDebugString(L"HandleIoOp Default... 
    ");
    		break;
    	}
    	// release pBuffer 
    	//ReleaseIoBuffer(pBuffer);
    
    	LeaveCriticalSection(&m_csIoLock);
    }
    
    void Ciocp::ProcessIoRead(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
    {
    	if (dwTrans ==0 ) // read error 
    	{
    		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
    
    		if (pContext->bConnect) // is connect 
    		{
    			
    			PostConnect();
    			
    		}else
    		{
    
    		}
    		ReleaseContext(pContext);
    
    
    	}else
    	{
    		//  read ok notify main thread  
    		pBuffer->dwTrans = dwTrans ;
    		if (pContext->dwBufferOffSet+pBuffer->dwTrans > BUFFER_SIZE_DATA )
    		{
    			closesocket(pContext->s);
    			if (pContext->bConnect)
    			{
    				PostConnect();
    			}
    			return ;
    		}
    		memcpy(pContext->readBytes+pContext->dwBufferOffSet,pBuffer->lpBuffer,pBuffer->dwTrans);
    		pContext->dwBufferOffSet+= pBuffer->dwTrans ;
    
    		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_READ);
    		PostRead(pContext);
    		
    	}
    
    	ReleaseIoBuffer(pBuffer);
    	
    }
    
    void Ciocp::NotifyMsg(PER_HANDLE_DATA* pContext,PER_IO_BUFFER* pBuffer, DWORD dwMsg)
    {
    	if (m_pNotifyProc == NULL )
    	{
    		OutputDebugString(L"NotifyMsg m_pNotifyProc is NUll 
    ");
    	}
    	if(!IsBadCodePtr((FARPROC)m_pNotifyProc))
    	{
    		m_pNotifyProc(pContext,pBuffer,dwMsg);
    	}else
    	{
    		OutputDebugString(L" m_pNotifyProc is badcodeptr 
    ");
    	}
    }
    
    void Ciocp::ReleaseContext(PER_HANDLE_DATA* pContext)
    {
    	OutputDebugString(L"Context -- 
    ");
    	EnterCriticalSection(& m_csContextsListLock);
    	CloseHandle(pContext->m_hWriteComplete);
    
    	ZeroMemory(pContext,sizeof(PER_HANDLE_DATA));
    	if (m_dwContextsCount > m_dwMaxFreeContexts)
    	{
    		HeapFree(GetProcessHeap(),0,pContext);
    		LeaveCriticalSection(& m_csContextsListLock);
    		return ;
    	}else
    	{
    		
    		pContext->pNext = m_pFreeContextsList;
    		m_pFreeContextsList = pContext ;
    		m_dwContextsCount ++ ;
    		
    	}
    	LeaveCriticalSection(& m_csContextsListLock);
    }
    
    void Ciocp::ReleaseIoBuffer(PER_IO_BUFFER* pBuffer)
    {
    	OutputDebugString(L"Buffer -- 
    ");
    	EnterCriticalSection(& m_csBuffersListLock);
    	ZeroMemory(pBuffer,sizeof(PER_IO_BUFFER)+pBuffer->dwBufferSize);
    	if (m_dwBuffersCount > m_dwMaxFreeBuffers)
    	{
    		HeapFree(GetProcessHeap(),0,pBuffer);
    		LeaveCriticalSection(& m_csBuffersListLock);
    		return ;
    	}else
    	{
    		pBuffer->pNext = m_pFreeBuffersList ;
    		m_pFreeBuffersList =pBuffer ;
    		m_dwBuffersCount ++ ;
    	}
    	LeaveCriticalSection(& m_csBuffersListLock);
    
    }
    
    void Ciocp::ProcessIoConnect(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
    {
    	if (dwTrans == 0)
    	{
    		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
    		ReleaseContext(pContext);
    	}else
    	{
    		int nAddrLen = sizeof(SOCKADDR_IN);
    		getsockname(pContext->s,(SOCKADDR*)&pContext->saddr,&nAddrLen);
    		SetKeepAlive(pContext->s);
    		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_CONNECT);
    		//OutputDebugString(L"Connect 
    ");
    		PostRead(pContext);
    	}
    	ReleaseIoBuffer(pBuffer);
    }
    
    void Ciocp::Shutdown()
    {
    	m_bStarted = false ;
    
    	if (m_sListen != INVALID_SOCKET )
    	{
    		closesocket(m_sListen);
    		m_sListen = INVALID_SOCKET ;
    
    	}
    	if (m_sConnect != INVALID_SOCKET)
    	{
    		closesocket(m_sConnect);
    		m_sConnect = INVALID_SOCKET ;
    	}
    
    	while (m_dwCurWorkThreadCount > 0)
    	{
    		::PostQueuedCompletionStatus(m_hIocp, -1, 0, NULL);  
    		Sleep(100);
    	}
    
    	PER_IO_BUFFER* pBuffer = m_pFreeBuffersList ;
    	while(pBuffer)
    	{
    		m_pFreeBuffersList = pBuffer->pNext ;
    		HeapFree(GetProcessHeap(),0,pBuffer);
    		pBuffer = m_pFreeBuffersList ;
    	}
    	m_dwBuffersCount = 0;
    	PER_HANDLE_DATA* pContext = m_pFreeContextsList ;
    	while(pContext)
    	{
    		m_pFreeContextsList = pContext->pNext ;
    		HeapFree(GetProcessHeap(),0,pContext);
    		pContext = m_pFreeContextsList ;
    	}
    	m_dwContextsCount = 0 ;
    	
    	
    }
    
    void Ciocp::ProcessIoAccept(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
    {
    
    	LPSOCKADDR lpLocalAddr,lpRemoteAddr;
    	int	nLocalAddr,nRemoteAddr;
    	DWORD dwAddrSize = sizeof(SOCKADDR_IN)+16 ;
    
    	PER_HANDLE_DATA* pContext1 = AllocContext(pBuffer->sClient);
    
    	m_lpfnGetAcceptExSockAddrs(pBuffer->lpBuffer,pBuffer->dwBufferSize- 2*dwAddrSize,dwAddrSize,dwAddrSize,&lpLocalAddr,&nLocalAddr,&lpRemoteAddr,&nRemoteAddr);
    	memcpy(& (pContext1->saddr),lpRemoteAddr,nRemoteAddr);
    
    
    	SetKeepAlive(pBuffer->sClient);
    
    	AddSocketToIocp(pBuffer->sClient,pContext1);
    
    	NotifyMsg(pContext1,pBuffer,NOTIFY_MSG_ACCEPT);
    
    	PostRead(pContext1);
    
    	PostAccept();
    
    	ReleaseIoBuffer(pBuffer);
    }
    
    BOOL Ciocp::SetKeepAlive(SOCKET s)
    {
    	BOOL bKeepAlive = TRUE;  
    	int nRet = ::setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeepAlive, sizeof(bKeepAlive));  
    	if (nRet == SOCKET_ERROR)
    	{
    		return false ;
    	}else
    	{
    		tcp_keepalive alive_in = {0};
    		tcp_keepalive alive_out = {0};
    
    		alive_in.keepalivetime = 5000;
    		alive_in.keepaliveinterval = 1000;
    		alive_in.onoff = TRUE ;
    		unsigned long ulBytesReturn = 0 ;
    		nRet = WSAIoctl(s,SIO_KEEPALIVE_VALS,&alive_in,sizeof(alive_in),&alive_out,sizeof(alive_out),&ulBytesReturn,NULL,NULL);
    		if (nRet == SOCKET_ERROR)
    		{
    			return FALSE ;
    		}
    
    	}
    
    	return TRUE ;
    }
    
    void Ciocp::GetStatisticsData(ULONGLONG* pulRead,ULONGLONG* pulWrite)
    {
    	*pulWrite = m_ulWriteBytes ;
    	*pulRead = m_ulReadBytes ;
    }
    
    void Ciocp::Send(PER_HANDLE_DATA* pContext,LPBYTE lpBuffer,DWORD dwSize)
    {
    	WaitForSingleObject(pContext->m_hWriteComplete,INFINITE);
    	PostWrite(pContext,lpBuffer,dwSize);
    }
    
    void Ciocp::ProcessIoWrite(PER_HANDLE_DATA* pContext, PER_IO_BUFFER* pBuffer, DWORD dwTrans)
    {
    	if (dwTrans == 0)
    	{
    		NotifyMsg(pContext,pBuffer,NOTIFY_MSG_DISCONNECT);
    
    		if (pContext->bConnect) // is connect 
    		{
    
    			PostConnect();
    
    		}else
    		{
    
    		}
    		ReleaseContext(pContext);
    	}
    	else
    	{
    		SetEvent(pContext->m_hWriteComplete);
    	}
    
    	ReleaseIoBuffer(pBuffer);
    }
    

      

     IOCP - UDP

    //NOTE
    // 
    //This code taken from Mr. Bob Quinn's article 
    //titled 'Internet Multicasting'
    //published in Dr. Dobb's Journal dated Oct 1997
    
    //I have modified the original code to illustrate 
    //the use I/O completion ports with UDP.
    
    //If you have any comments email me : shapall@hotmail.com
    
    #include "StdAfx.h"
    #include <winsock2.h>
    #include <ws2tcpip.h>
    #include "Stdio.h"
    
    #define BUFSIZE 1024 //max size of incoming data buffer
    #define MAXADDRSTR 16
    
    #define DEFAULT_GROUP_ADDRESS "239.254.1.2"
    #define DEFAULT_PORT 7125 
    
    LONG nCount = 0;
    HANDLE g_hCompletionPort;
    DWORD WINAPI WorkerThread( LPVOID WorkContext );
    
    BOOL HandleIncomingData( UCHAR* pBuf);
    BOOL CreateNetConnections( VOID );
    BOOL CreateWorkers( UINT );
    void InitWinsock2();
    void UnInitWinsock2();
    
    HANDLE g_hReadEvent;
    SOCKET g_hSocket;
    UCHAR achInBuf [BUFSIZE];
    char achMCAddr[MAXADDRSTR] = DEFAULT_GROUP_ADDRESS;
    u_short nPort = DEFAULT_PORT;
    
    
    OVERLAPPED Overlapped;
    
    //-----------------------------------------------------------------
    void InitWinsock2()
    {
        WSADATA data;
        WORD version; 
        int ret = 0;  
    
        version = (MAKEWORD(2, 2)); 
        ret = WSAStartup(version, &data); 
        if (ret != 0) 
        {  
            ret = WSAGetLastError(); 
            
            if (ret == WSANOTINITIALISED) 
            {  
                printf("not initialised"); 
            }
        }
    }
    
    //-----------------------------------------------------------------
    void UnInitWinsock2()
    { 
        WSACleanup();
    }
    
    //-----------------------------------------------------------------
    BOOL CreateNetConnections (void)
    { 
        DWORD nbytes; 
        BOOL b; 
        BOOL fFlag = TRUE; 
        int nRet=0; 
        
        SOCKADDR_IN stLclAddr;  
        struct ip_mreq stMreq; // Multicast interface structure  
    
        // Get a datagram socket  
        g_hSocket = socket(AF_INET, SOCK_DGRAM,0); 
        
        if (g_hSocket == INVALID_SOCKET)  
        { 
            printf ("socket() failed, Err: %d
    ", WSAGetLastError()); 
            return FALSE;  
        }  
    
        nRet = setsockopt(g_hSocket,SOL_SOCKET,
                   SO_REUSEADDR, (char *)&fFlag, sizeof(fFlag));  
        if (nRet == SOCKET_ERROR)  
        { 
            printf ("setsockopt() SO_REUSEADDR failed, 
                              Err: %d
    ",WSAGetLastError()); 
        } 
    
        // Name the socket (assign the local port number to receive on)  
        stLclAddr.sin_family = AF_INET; 
        stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); 
        stLclAddr.sin_port = htons(nPort); 
    
        nRet = bind(g_hSocket,(struct sockaddr*) &stLclAddr,sizeof(stLclAddr)); 
        if (nRet == SOCKET_ERROR)  
        { 
            printf ("bind() port: %d failed, Err: %d
    ", 
                                    nPort,WSAGetLastError()); 
        } 
        // Join the multicast group so we can receive from it  
        stMreq.imr_multiaddr.s_addr = inet_addr(achMCAddr); 
        stMreq.imr_interface.s_addr = INADDR_ANY; 
        nRet = setsockopt(g_hSocket,IPPROTO_IP, 
                   IP_ADD_MEMBERSHIP,(char *)&stMreq,sizeof(stMreq)); 
    
        if (nRet == SOCKET_ERROR)  
        { 
            printf("setsockopt() IP_ADD_MEMBERSHIP address %s failed, 
                                 Err: %d
    ",achMCAddr,
                                 WSAGetLastError()); 
        }
    
        //
        //note the 10 says how many concurrent cpu bound threads to allow thru 
        //this should be tunable based on the requests. CPU bound requests will 
        // really really honor this. 
        // 
    
        g_hCompletionPort = CreateIoCompletionPort (INVALID_HANDLE_VALUE,
                                                                  NULL,0,3); 
        if (!g_hCompletionPort) 
        { 
            fprintf (stdout, "g_hCompletionPort Create Failed
    "); 
            return FALSE; 
        } 
        //Associate this socket to this I/O completion port 
        CreateIoCompletionPort((HANDLE)g_hSocket,g_hCompletionPort,
                                                 (DWORD)g_hSocket,3);  
    
        //
        // Start off an asynchronous read on the socket.  
        //  
        Overlapped.hEvent = g_hReadEvent;  
        Overlapped.Internal = 0;  
        Overlapped.InternalHigh = 0;  
        Overlapped.Offset = 0;  
        Overlapped.OffsetHigh = 0;  
        b = ReadFile ((HANDLE)g_hSocket,&achInBuf,
                   sizeof(achInBuf),&nbytes,&Overlapped);  
        
        if (!b && GetLastError () != ERROR_IO_PENDING)  
        {  
            fprintf (stdout, "ReadFile Failed
    ");  
            return FALSE;  
        }  
        
        return TRUE;
    }
    //-----------------------------------------------------------------
    
    BOOL CreateWorkers (UINT dwNumberOfWorkers)
    { 
        DWORD ThreadId; 
        HANDLE ThreadHandle; 
        DWORD i; 
    
        for (i = 0; i < dwNumberOfWorkers; i++) 
        { 
            ThreadHandle = CreateThread (NULL,0,
                      WorkerThread,NULL,0,&ThreadId); 
            if (!ThreadHandle) 
            { 
                fprintf (stdout, "Create Worker Thread Failed
    "); 
                return FALSE;
            } 
                   
            CloseHandle (ThreadHandle); 
        } 
        return TRUE;
    }
    
    //-----------------------------------------------------------------
    DWORD WINAPI WorkerThread (LPVOID WorkContext)
    { 
        DWORD nSocket; 
        BOOL b; 
        OVERLAPPED ovl; 
        LPOVERLAPPED lpo=&ovl; 
        DWORD nBytesRead=0; 
        DWORD nBytesToBeRead; 
        UCHAR ReadBuffer[BUFSIZE]; 
        LPVOID lpMsgBuf; 
    
        memset(&ReadBuffer,0,BUFSIZE); 
        for (;;) 
        { 
            b = GetQueuedCompletionStatus(g_hCompletionPort,
                        &nBytesToBeRead,&nSocket,&lpo,INFINITE); 
            if (b || lpo) 
            { 
                if (b) 
                { 
                    // 
                    // Determine how long a response was desired by the client. 
                    // 
                    
                    OVERLAPPED ol; 
                    ol.hEvent = g_hReadEvent; 
                    ol.Offset = 0; 
                    ol.OffsetHigh = 0; 
    
                    b = ReadFile ((HANDLE)nSocket,&ReadBuffer,
                                    nBytesToBeRead,&nBytesRead,&ol); 
                    if (!b )  
                    { 
                        DWORD dwErrCode = GetLastError(); 
                        if( dwErrCode != ERROR_IO_PENDING ) 
                        { 
                            // something has gone wrong here... 
                            printf("Something has gone 
                                   wrong:Error code - %d
    ",dwErrCode ); 
    
                            FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | 
                                   FORMAT_MESSAGE_FROM_SYSTEM | 
                                   FORMAT_MESSAGE_IGNORE_INSERTS, 
                                   NULL, dwErrCode, 
                                   MAKELANGID(LANG_NEUTRAL, 
                                        SUBLANG_DEFAULT),// Default language
                                   (LPTSTR) &lpMsgBuf, 0, NULL); 
    
                            OutputDebugString((LPCTSTR)lpMsgBuf); 
                            //Free the buffer. 
    
                            LocalFree(lpMsgBuf ); 
                        } 
                        else if( dwErrCode == ERROR_IO_PENDING ) 
                        {
                            // I had to do this for my UDP sample 
                            //Never did for my TCP servers 
                            WaitForSingleObject(ol.hEvent,INFINITE); 
    
                            HandleIncomingData(ReadBuffer); 
                        } 
                    } 
                    else 
                    { 
                        HandleIncomingData(ReadBuffer); 
                    } 
                    continue; 
                } 
                else 
                { 
                    fprintf (stdout, "WorkThread Wait Failed
    "); 
                    //exit (1); 
                } 
            } 
            return 1; 
        } 
    } 
    //-----------------------------------------------------------------
    BOOL HandleIncomingData( UCHAR* pBuf)
    { 
        InterlockedIncrement(&nCount); 
        SYSTEMTIME *lpstSysTime; 
    
        lpstSysTime = (SYSTEMTIME *)(pBuf); 
        printf("[%d]UTC Time %02d:%02d:%02d:%03d on %02d-%02d-%d 
    ",nCount, 
                    lpstSysTime->wHour, lpstSysTime->wMinute, 
                    lpstSysTime->wSecond, lpstSysTime->wMilliseconds, 
                    lpstSysTime->wMonth, lpstSysTime->wDay, lpstSysTime->wYear); 
                    memset(&pBuf,0,BUFSIZE); 
                    //just making sure that i am not showing stale data 
    
        return TRUE; 
    } 
    //-----------------------------------------------------------------
    main () 
    { 
        //You can modify your program to take some arguments for port number 
        //and multicast group address here 
        
        printf("
    ***************************************
    "); 
        printf("Group IP address: %s
    ",achMCAddr); 
        printf("Port number : %d
    ",nPort); 
        printf("
    ***************************************
    "); 
    
        //Initialize winsock 2 
        InitWinsock2(); 
    
        //We want to keep the main thread running 
        HANDLE hWait2Exit = CreateEvent(NULL,FALSE,TRUE,"MCLIENT"); 
        ResetEvent(hWait2Exit ); 
    
        //This OVERLAPPED event 
        g_hReadEvent = CreateEvent(NULL,TRUE,TRUE,NULL); 
    
        // 
        // try to get timing more accurate... Avoid context 
        // switch that could occur when threads are released 
        // 
    
        SetThreadPriority (GetCurrentThread (), THREAD_PRIORITY_TIME_CRITICAL); 
        if (!CreateNetConnections ()) 
        { 
            printf("Error condition @ CreateNetConnections , exiting
    "); 
            return 1; 
        } 
    
        if (!CreateWorkers (5)) 
        { 
            printf("Error condition @CreateWorkers, exiting
    "); 
            return 1; 
        } 
        
        WaitForSingleObject(hWait2Exit,INFINITE); 
        UnInitWinsock2(); 
        return 1; 
    }

    转 实现UDP IOCP心得-zt

     http://www.cnblogs.com/BeginGame/archive/2011/09/18/2180241.html

    签名档: 从事网络安全和编程的我,很希望能找到志同道合的朋友交流。 欢迎cn博客的好友拍砖,留言。
  • 相关阅读:
    [SCOI2012]滑雪与时间胶囊
    [SCOI2012]喵星球上的点名
    SDOI2012 Round1 day2 拯救小云公主(dis)解题报告
    SDOI2012 Round1 day2 象棋(chess)解题报告
    SDOI2012 Round1 day2 集合(set)解题报告
    [Sdoi2014]数数[数位dp+AC自动机]
    [NOI2013]快餐店
    java 日期的格式化 输入/输出
    elastic search 查询语句
    elasticsearch数据迁移——elasticsearch-dump使用
  • 原文地址:https://www.cnblogs.com/M4ster/p/my_socket_iocp.html
Copyright © 2011-2022 走看看