zoukankan      html  css  js  c++  java
  • 【网络编程】之十二、wsaeventselect+线程池 服务器实现

    #include<WinSock2.h>
    #include<iostream>
    using namespace std;
    
    
    #pragma comment(lib, "WS2_32.lib")
    
    typedef struct _SOCKET_OBJ{
    	SOCKET s;
    	HANDLE event;
    	sockaddr_in addrRemote;
    	_SOCKET_OBJ *pNext;
    }SOCKET_OBJ, *PSOCKET_OBJ;
    
    typedef struct _THREAD_OBJ{
    	HANDLE events[WSA_MAXIMUM_WAIT_EVENTS];
    	int nSocketCount;
    	PSOCKET_OBJ pSocketHeader;
    	PSOCKET_OBJ pSocketTail;
    	CRITICAL_SECTION cs;
    	_THREAD_OBJ *pNext;
    }THREAD_OBJ, *PTHREAD_OBJ;
    
    PTHREAD_OBJ g_pThreadList;
    CRITICAL_SECTION g_cs;
    
    LONG g_nTatolConnections;
    LONG g_nCurrentConnections;
    
    DWORD WINAPI ServerThread(LPVOID lpParam);
    //******************************************************************************//
    PSOCKET_OBJ GetSocketObj(SOCKET s)
    {
    	PSOCKET_OBJ pSocket = (PSOCKET_OBJ)::GlobalAlloc(GPTR, sizeof(SOCKET_OBJ));
    	if(pSocket != NULL)
    	{
    		pSocket->s = s;
    		pSocket->event = ::WSACreateEvent();
    	}
    
    	return pSocket;
    }
    
    void FreeSocketObj(PSOCKET_OBJ pSocket)
    {
    	::CloseHandle(pSocket->event);
    	if(pSocket->s != INVALID_SOCKET)
    	{
    		closesocket(pSocket->s);
    	}
    	::GlobalFree(pSocket);
    }
    
    //*************************************************************************//
    
    PTHREAD_OBJ GetThreadObj()
    {
    	PTHREAD_OBJ pThread = (PTHREAD_OBJ)::GlobalAlloc(GPTR, sizeof(THREAD_OBJ));
    	if(pThread != NULL)
    	{
    		::InitializeCriticalSection(&pThread->cs);
    
    		pThread->events[0] = ::WSACreateEvent();
    
    		::EnterCriticalSection(&g_cs);
    		pThread->pNext = g_pThreadList;
    		g_pThreadList = pThread;
    		::LeaveCriticalSection(&g_cs);
    	}
    
    	return pThread;
    }
    
    void FreeThreadObj(PTHREAD_OBJ pThread)
    {
    	::EnterCriticalSection(&g_cs);
    	PTHREAD_OBJ p = g_pThreadList;
    	if(p == pThread)
    	{
    		g_pThreadList = p->pNext;
    	}
    	else
    	{
    		while(p != NULL && p->pNext != pThread)
    		{
    			p = p->pNext;
    		}
    		if(p != NULL)
    		{
    			p->pNext = pThread->pNext;
    		}
    	}
    	::LeaveCriticalSection(&g_cs);
    
    	::CloseHandle(pThread->events[0]);
    	::DeleteCriticalSection(&pThread->cs);
    	::GlobalFree(pThread);
    }
    
    
    void RebulidArray(PTHREAD_OBJ pThread)
    {
    	::EnterCriticalSection(&pThread->cs);
    	PSOCKET_OBJ pSocket = pThread->pSocketHeader;
    	int n = 1;
    	while(pSocket != NULL)
    	{
    		pThread->events[n++] = pSocket->event;
    		pSocket = pSocket->pNext;
    	}
    
    	::LeaveCriticalSection(&pThread->cs);
    }
    
    //********************************************************************//
    
    BOOL insertSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
    {
    	BOOL bRet = FALSE;
    	::EnterCriticalSection(&pThread->cs);
    	if(pThread->nSocketCount < WSA_MAXIMUM_WAIT_EVENTS - 1)
    	{
    		if(pThread->pSocketHeader == NULL)
    		{
    			pThread->pSocketHeader = pThread->pSocketTail = pSocket;
    		}
    		else
    		{
    			pThread->pSocketTail->pNext = pSocket;
    			pThread->pSocketTail = pSocket;
    		}
    		pThread->nSocketCount++;
    		bRet = TRUE;
    	}
    
    	::LeaveCriticalSection(&pThread->cs);
    
    	if(bRet)
    	{
    		::InterlockedIncrement(&g_nTatolConnections);
    		::InterlockedIncrement(&g_nCurrentConnections);
    	}
    	return bRet;
    }
    
    void AssignToFreeThread(PSOCKET_OBJ pSocket)
    {
    	pSocket->pNext = NULL;
    	::EnterCriticalSection(&g_cs);
    	PTHREAD_OBJ pThread = g_pThreadList;
    
    	while(pThread != NULL)
    	{
    		if(insertSocketObj(pThread, pSocket))
    			break;
    		pThread = pThread->pNext;
    	}
    
    
    	if(pThread == NULL)
    	{
    		pThread = GetThreadObj();
    		insertSocketObj(pThread, pSocket);
    		::CreateThread(NULL, 0, ServerThread, pThread, 0, NULL);
    
    	}
    
    	::LeaveCriticalSection(&g_cs);
    
    	::WSASetEvent(pThread->events[0]);
    }
    
    
    void RemoveSocketObj(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
    {
    	::EnterCriticalSection(&pThread->cs);
    
    	PSOCKET_OBJ pTest = pThread->pSocketHeader;
    	if(pTest == pSocket)
    	{
    		if(pThread->pSocketHeader == pThread->pSocketTail)
    			pThread->pSocketTail = pThread->pSocketHeader = pTest->pNext;
    		else
    			pThread->pSocketHeader = pTest->pNext;
    	}
    	else
    	{
    		while(pTest != NULL && pTest->pNext != pSocket)
    			pTest = pTest->pNext;
    		if(pTest != NULL)
    		{
    			if(pThread->pSocketTail == pSocket)
    				pThread->pSocketTail = pTest;
    			pTest->pNext = pSocket->pNext;
    		}
    	}
    
    	pThread->nSocketCount--;
    
    	::LeaveCriticalSection(&pThread->cs);
    
    
    	::WSASetEvent(pThread->events[0]);
    	::InterlockedDecrement(&g_nCurrentConnections);
    }
    
    //********************************************************************//
    PSOCKET_OBJ FindSocketObj(PTHREAD_OBJ pThread, int nIndex)
    {
    	PSOCKET_OBJ pSocket = pThread->pSocketHeader;
    	while(--nIndex)
    	{
    		if(pSocket == NULL)
    			return NULL;
    		pSocket = pSocket->pNext;
    	}
    	return pSocket;
    }
    
    BOOL HandleIO(PTHREAD_OBJ pThread, PSOCKET_OBJ pSocket)
    {
    	WSANETWORKEVENTS event;
    	::WSAEnumNetworkEvents(pSocket->s, pSocket->event, &event);
    	do{
    		if(event.lNetworkEvents & FD_READ)
    		{
    			if(event.iErrorCode[FD_READ_BIT] == 0)
    			{
    				char szText[256];
    				int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0);
    				if(nRecv > 0)
    				{
    					szText[nRecv] = '';
    					cout << "接收到数据:" << szText << endl;
    				}
    			}
    			else
    				break;
    		}
    		else
    		{
    			if(event.lNetworkEvents &FD_CLOSE)
    			{
    				break;
    			}
    			else
    			{
    				if(event.lNetworkEvents & FD_WRITE)
    				{
    					if(event.iErrorCode[FD_READ_BIT] == 0)
    					{
    						char szText[256];
    						int nRecv = ::recv(pSocket->s, szText, strlen(szText), 0);
    						if(nRecv > 0)
    						{
    							szText[nRecv] = '';
    							cout << "接收到数据:" << szText << endl;
    						}
    					}
    					else
    						break;
    				}
    			}
    		}
    		return TRUE;
    
    	}while(FALSE);
    
    
    	RemoveSocketObj(pThread, pSocket);
    	FreeSocketObj(pSocket);
    	return FALSE;
    }
    
    
    DWORD WINAPI ServerThread(LPVOID lpParam)
    {
    	PTHREAD_OBJ pThread = (PTHREAD_OBJ)lpParam;
    	while(TRUE)
    	{
    		int nIndex = ::WSAWaitForMultipleEvents(pThread->nSocketCount + 1, pThread->events, FALSE, WSA_INFINITE, FALSE);
    		nIndex = nIndex - WSA_WAIT_EVENT_0;
    
    		for(int i = nIndex; i < pThread->nSocketCount + 1; ++i)
    		{
    			nIndex = ::WSAWaitForMultipleEvents(1, &pThread->events[i], TRUE, 1000, FALSE);
    			if(nIndex == WSA_WAIT_FAILED || nIndex == WSA_WAIT_TIMEOUT)
    			{
    				continue;
    			}
    			else
    			{
    				if(i == 0)
    				{
    					RebulidArray(pThread);
    
    					if(pThread->nSocketCount == 0)
    					{
    						FreeThreadObj(pThread);
    						return 0;
    					}
    					::WSAResetEvent(pThread->events[0]);
    				}
    				else
    				{
    					PSOCKET_OBJ pSocket = (PSOCKET_OBJ)FindSocketObj(pThread, i);
    					if(pSocket != NULL)
    					{
    						if(!HandleIO(pThread, pSocket))
    							RebulidArray(pThread);
    					}
    					else
    						cout << "Unable to find socket object" << endl;
    				}
    			}
    		}
    	}
    	return 0;
    }
    
    
    //******************************************************************************//
    
    int main(void)
    {
    
    	WSADATA wsaData;  
    	WORD sockVersion = MAKEWORD(2,0);//指定版本号  
    	::WSAStartup(sockVersion, &wsaData);//载入winsock的dll  
    	//创建套接字基于TCP  
    	SOCKET sListen = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);  
    	if(sListen == INVALID_SOCKET)  
    	{  
    		printf("error");  
    		::WSACleanup();//清理,释放资源  
    		return 0;  
    	}  
    
    	sockaddr_in sin;  
    	sin.sin_family = AF_INET;  
    	sin.sin_port = htons(8888);//端口号8888  
    	sin.sin_addr.S_un.S_addr = INADDR_ANY;//地址全是0,也就是所有的地址  
    	//绑定socket  
    	if(::bind(sListen, (LPSOCKADDR)&sin, sizeof(sin)) == SOCKET_ERROR)  
    	{  
    		printf("error");  
    		::WSACleanup();//清理释放资源  
    		return 0;  
    	}  
    	//监听socket  
    	if(::listen(sListen, 2) == SOCKET_ERROR)  
    	{  
    		printf("error");  
    		::WSACleanup();//释放资源  
    		return 0;  
    	}  
    
    	WSAEVENT event = ::WSACreateEvent();
    	::WSAEventSelect(sListen, event, FD_ACCEPT | FD_CLOSE);
    	::InitializeCriticalSection(&g_cs);
    
    	//处理请求
    	while(TRUE)
    	{
    		int nRet = ::WaitForSingleObject(event, 5 * 1000);
    		if(nRet == WAIT_FAILED)
    		{
    			cout << "failed waitforsingleobject" << endl;
    			break;
    		}
    		else if(nRet == WSA_WAIT_TIMEOUT)
    		{
    			cout << endl;
    			cout << " tatolconnections:" << g_nTatolConnections << endl;
    			cout << " currentconnections: " << g_nCurrentConnections << endl;
    			continue;
    		}
    		else
    		{
    			::ResetEvent(event);   //新连接
    
    			while(TRUE)
    			{
    				sockaddr_in si;
    				int nLen = sizeof(si);
    				SOCKET sNew = ::accept(sListen, (sockaddr*)&si, &nLen);
    				if(sNew == SOCKET_ERROR)
    					break;
    				PSOCKET_OBJ pSocket = GetSocketObj(sNew);
    				pSocket->addrRemote = si;
    				::WSAEventSelect(pSocket->s, pSocket->event, FD_READ | FD_CLOSE | FD_WRITE);
    				AssignToFreeThread(pSocket);
    			}
    		}
    	}
    
    	::DeleteCriticalSection(&g_cs);
    	::WSACleanup();
    	return 0;
    }

  • 相关阅读:
    计算机硬件基础
    元类
    内置函数
    单例模式的三种实现方式
    字符编码
    odoo权限
    odoo api介绍
    odoo 二次开发小记不定时更新
    js与jQuery区别
    Cookie, LocalStorage 与 SessionStorage说明
  • 原文地址:https://www.cnblogs.com/java20130723/p/3211385.html
Copyright © 2011-2022 走看看