I/O完成端口原理见上一篇(可点击这里)
10.5.4.4 利用I/O完成端口实现Socket通信
(1)Accept和AcceptEx流程的比较
①采用accept方式的流程示意图如下(普通的阻塞函数)
②采用AcceptEx方式的流程示意图如下(可工作在阻塞或非阻塞方式)、
【注意】:AcceptEx与Accept的不同
①AcceptEx是在客户端连入之前,就把客户端的Socket建立好了。也就是说,AcceptEx是先建立的Socket,然后才发出的AcceptEx调用。即在进行客户端的通信之前,无论是否有客户端连入,Socket都是提前建立好了;而不需要像accept是在客户端连入了之后,再临时去花费时间建立Socket。
②相比accept只能阻塞方式建立一个连接的入口,对于大量的并发客户端来讲,入口实在是有点挤;而AcceptEx可以同时在完成端口上投递多个请求,这样有客户端连入的时候,就非常优雅而且从容不迫的处理连入请求了。
③AcceptEx还有一个非常体贴的优点,就是在投递AcceptEx的时候,还可以顺便在AcceptEx的同时,收取客户端发来的第一组数据。这个是同时进行的,即在收到AcceptEx完成的通知的时候,就已经把这第一组数据接完毕了;但是这也意味着,如果客户端只是连入但是不发送数据的话,我们就不会收到这个AcceptEx完成的通知。
④Accept函数需要启动新的线程来连接客户端,而AcceptEx不需要!
(2)AcceptEx函数
参数 |
描述 |
sListenSocket |
用来监听的Socket |
sAcceptSocket |
用于接受连接的socket,这个就是那个事先建好的,等客户端连接进来直接把这个Socket拿给它用的那个,是AcceptEx高性能的关键所在。 |
lpOutputBuffer |
接收缓冲区。这也是AcceptEx比较有特色的地方,既然AcceptEx不是普通的accept函数,该缓冲区包含了三个信息:一是客户端发来的第一组数据,二是server的地址,三是client地址 |
dwReceiveDataLength |
参数lpOutputBuffer中用于存放数据的空间大小。 如果此参数=0,则Accept时将不会待数据到来,而直接返回,如果此参数不为0,那么一定得等接收到数据了才会返回…… 所以通常当需要Accept接收数据时,就需要将该参数设成为:sizeof(lpOutputBuffer) - 2*(sizeof sockaddr_in +16),也就是说总长度减去两个地址空间的长度就是 |
dwLocalAddressLength |
存放本地址地址信息的空间大小 |
dwRemoteAddressLength |
存放本远端地址信息的空间大小 |
lpdwBytesReceived |
一般没用,只用在同步时。 |
lpOverlapped |
重叠I/O所要用到的重叠结构 |
备注: ①这是个扩展版本的函数,需要用WSAloctl()函数获取该函数的函数指针。 ②此外,当客户端连接以后,可以通过GetAcceptExSockaddrs函数来获取地址信息,但该函数也要用WSAloctl函数来获取其指针。 |
(3)关于完成端口通知的次序问题
调用GetQueuedCompletionStatus 获取I/O完成端口请求的时候,肯定是用先入先出的方式来进行的。但是,唤醒那些调用了GetQueuedCompletionStatus的线程是以后进先出的方式来进行的。如果出现了一个已经完成的I/O项,那么是最后一个调用GetQueuedCompletionStatus的线程会被唤醒。平常这个次序倒是不重要,但是在对数据包顺序有要求的时候。同时并发的线程可能不止一个,但他们处理I/O完成项的速度可能不一样。这样会导致读取到的数据的顺序可能跟收送过来的数据的顺序是不一样的。比如传送大块数据的时候,是需要注意下这个先后次序的。微软之所以这么做,那当然是有道理的,这样如果反复只有一个I/O操作而不是多个操作完成的话,内核就只需要唤醒同一个线程就可以了,而不需要轮着唤醒多个线程,节约了资源,而且可以把其他长时间睡眠的线程换出内存,提到资源利用率。
(4)文件传输问题
如果各位需要使用完成端口来传送文件的话,这里有个非常需要注意的地方。因为发送文件的做法,一般都会是先打开一个文件,然后不断的循环调用ReadFile读取一块之后,然后再调用WSASend ()去发发送。但是ReadFile的时候,是需要操作系统通过磁盘的驱动程序,到实际的物理硬盘上去读取文件的,这就会使得操作系统从用户态转换到内核态去调用驱动程序,然后再把读取的结果返回至用户态;同样的道理,WSARecv也会涉及到从用户态到内核态切换的问题 --- 这样就使得我们不得不频繁的在用户态到内核态之间转换,效率低下……
而一个非常好的解决方案是使用微软提供的扩展函数TransmitFile来传输文件,因为只需要传递给TransmitFile一个文件的句柄和需要传输的字节数,程序就会整个切换至内核态,无论是读取数据还是发送文件,都是直接在内核态中执行的,直到文件传输完毕才会返回至用户态给主进程发送通知,效率明显提高了许多。
(5)重叠结构数据释放的问题
既然使用的是异步通讯的方式,就得要习惯一点,就是我们投递出去的完成请求,不知道什么时候我们才能收到操作完成的通知,而在这段等待通知的时间,我们就得要千万注意得保证我们投递请求的时候所使用的变量在此期间都得是有效的。
例如发送WSARecv请求时候所使用的Overlapped变量,因为在操作完成的时候,这个结构里面会保存很多很重要的数据,对于设备驱动程序来讲,指示保存着我们这个Overlapped变量的指针,而在操作完成之后,驱动程序会将Buffer的指针、已经传输的字节数、错误码等等信息都写入到我们传递给它的那个Overlapped指针中去。如果我们已经不小心把Overlapped释放了,或者是又交给别的操作使用了的话,谁知道驱动程序会把这些东西写到哪里去呢?岂不是很崩溃?
【利用IOCP实现Socket通信程序】
效果图——可以打开任务管理器,查看CPU占用率!(比较低)
//服务器端
//main.c
/************************************************************************* Moudule: main.cpp Notices: Copyright(c) 2015 浅墨浓香 *************************************************************************/ #include "../../CommonFiles/CmnHdr.h" #include "resource.h" #include "IOCPModel.h" #include <tchar.h> ////////////////////////////////////////////////////////////////////////// TCHAR szApp[] = TEXT("服务端器"); CIOCPModel g_IocpModel; ////////////////////////////////////////////////////////////////////////// //向列表示中加入信息 inline void AddInformation(HWND hwndLv, PCTSTR pszInfo) { LVITEM lv; memset(&lv, 0, sizeof(lv)); lv.mask = LVIF_TEXT | LVIF_PARAM; #ifdef UNICODE lv.pszText = (PWSTR)pszInfo; #else USES_CONVERSION; lv.pszText = A2W((PSTR)pszInfo); #endif lv.lParam =(LPARAM)ListView_GetItemCount(hwndLv); ListView_InsertItem(hwndLv, &lv); } ////////////////////////////////////////////////////////////////////////// //初始化ListView void InitListCtrl(HWND hwndLv){ ListView_SetExtendedListViewStyle(hwndLv, LVS_EX_FULLROWSELECT | LVS_EX_GRIDLINES); LVCOLUMN lvCol; //设置第1列为Information lvCol.mask = LVCF_FMT | LVCF_ORDER | LVCF_TEXT | LVCF_WIDTH; lvCol.fmt = LVCFMT_CENTER; lvCol.cx = 490; lvCol.pszText = TEXT("信息"); lvCol.cchTextMax = 4; lvCol.iOrder = 0; ListView_InsertColumn(hwndLv, 0, &lvCol); } ////////////////////////////////////////////////////////////////////////// BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){ g_IocpModel.LoadSocketLib(); //设置本机IP Static_SetText(GetDlgItem(hwnd, IDC_SERVERIP), g_IocpModel.GetLocalIP()); //设置默认端口 SetDlgItemInt(hwnd,IDC_PORT, DEFAULT_PORT,FALSE); //初始化列表 InitListCtrl(GetDlgItem(hwnd, IDC_LIST_INFO)); //绑定主界面(为了方便在界面中显示信息 ) g_IocpModel.SetMainDlg(hwnd); return TRUE; } ////////////////////////////////////////////////////////////////////////// void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){ switch (id) { case IDCANCEL: g_IocpModel.Stop(); EndDialog(hwnd, id); break; case IDOK: if (FALSE == g_IocpModel.Start()){ MessageBox(hwnd, TEXT("服务器启动失败!"), szApp, MB_OK | MB_ICONERROR); return; } EnableWindow(GetDlgItem(hwnd, IDOK), FALSE); EnableWindow(GetDlgItem(hwnd, IDC_STOP), TRUE); break; case IDC_STOP: g_IocpModel.Stop(); EnableWindow(GetDlgItem(hwnd, IDC_STOP), FALSE); EnableWindow(GetDlgItem(hwnd, IDOK), TRUE); break; } } ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){ switch (uMsg) { chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand); } return FALSE; } ////////////////////////////////////////////////////////////////////////// int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR pszCmdLine, int){ DialogBox(hInstExe, MAKEINTRESOURCE(IDD_IOCPSERVER), NULL, Dlg_Proc); return 0; }
//IOCPModel.h
/************************************************************************ ************************************************************************/ #pragma once #include "../../CommonFiles/CmnHdr.h" #include <vector> #include <atlconv.h> //ATL库提供了更简便的字符集转换函数 #include <assert.h> using namespace std; #pragma comment(lib,"ws2_32.lib") ////////////////////////////////////////////////////////////////////////// #define DEFAULT_IP TEXT("127.0.0.1") #define DEFAULT_PORT 12345 #define MAX_BUFFER_LEN 8192 //缓冲区大小8K ////////////////////////////////////////////////////////////////////////// //向列表示中加入信息 extern inline void AddInformation(HWND hwndLv, PCTSTR pszInfo); ////////////////////////////////////////////////////////////////////////// //在完成端口中上投递的I/O操作类型(完成键) typedef enum _tag_OPERATION_TYPE { ACCEPT_POSTED, //Accept操作的标志 SEND_POSTED, //发送操作的标志 RECV_POSTED, //接收操作的标志 NULL_POSTED //用于初始化,无意义 }OPERATION_TYPE; ////////////////////////////////////////////////////////////////////////// //PER_IO_DATA(单I/O数据)封装了OVERLAPPED结构体 //单IO数据,封装了OVERLAPPED结构体,代表一次I/O重叠操作,一般与每次的操作WSARecv、 //WSASend等相对应,即进行一次读或写操作; typedef struct _PER_IO_CONTEXT { OVERLAPPED m_Overlapped; //每个异步调用Socket的函数都有一个这结构体,表示 //对该Socket的某种操作 SOCKET m_sockAccpet; // WSABUF m_wsaBuf; //WSA类型的缓冲区,用于给重叠操作传参数 char m_szBuffer[MAX_BUFFER_LEN]; OPERATION_TYPE m_OpType; //标识网络操作的类型(对应上面的枚举) //初始化 _PER_IO_CONTEXT(){ ZeroMemory(&m_Overlapped, sizeof(m_Overlapped)); ZeroMemory(m_szBuffer, MAX_BUFFER_LEN); m_wsaBuf.buf = m_szBuffer; m_wsaBuf.len = MAX_BUFFER_LEN; m_OpType = NULL_POSTED; } //释放Socket ~_PER_IO_CONTEXT(){ if (m_sockAccpet!=INVALID_SOCKET){ closesocket(m_sockAccpet); m_sockAccpet = INVALID_SOCKET; } } //重置缓冲区内容 void ResetBuffer(){ ZeroMemory(m_szBuffer, MAX_BUFFER_LEN); } }PER_IO_CONTEXT,*PPER_IO_CONTEXT; ////////////////////////////////////////////////////////////////////////// //完成键,传递的数据被称为单句柄结构PER_HANDLE_DATA,一般是与每个socket句柄对应 typedef struct _PER_SOCKET_CONTEXT { SOCKET m_Socket; SOCKADDR_IN m_ClientAddr; //客户端地址 vector<PPER_IO_CONTEXT> m_arrayIoContext; //初始化 _PER_SOCKET_CONTEXT() { m_Socket = INVALID_SOCKET; memset(&m_ClientAddr, 0, sizeof(m_ClientAddr)); } //释放资源 ~_PER_SOCKET_CONTEXT(){ if (m_Socket!=INVALID_SOCKET){ closesocket(m_Socket); m_Socket = INVALID_SOCKET; } //释放掉所有的IO上下文数据 vector<PPER_IO_CONTEXT>::iterator it; for (it = m_arrayIoContext.begin();it!=m_arrayIoContext.end();it++){ delete *it; } m_arrayIoContext.clear(); } //获取一个新的IoContext _PER_IO_CONTEXT* GetNewIoContext(){ _PER_IO_CONTEXT* p = new _PER_IO_CONTEXT; m_arrayIoContext.push_back(p); return p; } //从数组中移除一个指定的IoContext void RemoveContext(_PER_IO_CONTEXT* pContext){ assert(pContext != NULL); vector<PPER_IO_CONTEXT>::iterator it; for (it = m_arrayIoContext.begin(); it != m_arrayIoContext.end(); it++){ if (*it == pContext){ m_arrayIoContext.erase(it); delete pContext; break; } } } }PER_SOCKET_CONTEXT,*PPER_SOCKET_CONTEXT; ////////////////////////////////////////////////////////////////////////// class CIOCPModel; typedef struct _tagThreadParams_WORKER { CIOCPModel* pIOCPModel;//类指类,用于调用类中的函数 int nThreadNo;//线程编号 }THREADPARAMS_WORKER, *PTHREADPARAMS_WORKER; ////////////////////////////////////////////////////////////////////////// //CIOCPModel类 class CIOCPModel { public: CIOCPModel(); ~CIOCPModel(); public: //启动服务器 BOOL Start(); //停止服务器 void Stop(); //加载Socket库 BOOL LoadSocketLib(); //卸载Socket库 void UnloadSocketLib(){ WSACleanup(); } //获得本机IP TCHAR* GetLocalIP(); //设置监听端口 void SetPort(const int& nPort){m_nPort = nPort;} //保存主对话框句柄 void SetMainDlg(HWND hwnd){ m_hWnd = hwnd; } protected: //线程函数,用来处理IOCP请求服务的 static DWORD WINAPI _WorkerThread(PVOID pParam); //初始化IOCP BOOL _InitializeIOCP(); //初始化Socket BOOL _InitializeListenSocket(); //释放资源 BOOL _DeInitialize(); //投递Accept请求 BOOL _PostAccept(PER_IO_CONTEXT* pAcceptIoContext); //投递接收数据请求 BOOL _PostRecv(PER_IO_CONTEXT* pIoContext); //当客户端连入的时候,进行处理 BOOL _DoAccept(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext); //当有接收的数据到达的时候进行处理 BOOL _DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext); //将设备绑定到完成端口中 BOOL _AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext); //获取本机的处理器数量 int _GetNumberOfProcessors(); //在主界面上显示信息 void _ShowMessage(PCTSTR pszFormat, ...) const; //将客户端的相关信息存储到数组中 void _AddToContextList(PER_SOCKET_CONTEXT* pSocketContext); //将客户端的信息从数组中移除 void _RemoveContext(PER_SOCKET_CONTEXT* pSocketContext); //清空客户端信息 void _ClearContextList(); //获得AcceptEx函数的地址 BOOL _GetAcceptExPointer(SOCKET sock); //获取GetAcceptExSockAddrs函数的指针 BOOL _GetAcceptExSockAddrsPointer(SOCKET sock); //处理完成端口上的错误 BOOL _HandleError(PER_SOCKET_CONTEXT* pContext, const DWORD& dwErr); //检测客户端Socket是否己断开 BOOL _IsSocketAlive(SOCKET s); private: TCHAR* m_szIP; //服务器IP int m_nPort; //服务器监听端口 HWND m_hWnd; int m_nThreads; HANDLE* m_phWorkerThreads; //所有工作线程(Worker)句柄数组 HANDLE m_hShutdownEvent; HANDLE m_hIOCP; CRITICAL_SECTION m_csContextList; // 用于Worker线程同步的互斥量 vector<PER_SOCKET_CONTEXT*> m_arrayClientContext;//客户端列表(Socket的Context信息) PER_SOCKET_CONTEXT* m_pListenContext; //用于监听Socket的Context信息 LPFN_ACCEPTEX m_lpfnAcceptEx; //AcceptEx函数指针 LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;//GetAcceptExSockaddrs函数的指针 };
//IOCPModel.cpp
#include "IOCPModel.h" #include "resource.h" #include <tchar.h> #include <ws2tcpip.h> //For getaddrinfo函数 ////////////////////////////////////////////////////////////////////////// #define WORKER_THREADS_PER_PROCESSOR 2 //每个处理器最大处理的线程数 #define EXIT_CODE NULL //传递给工作线程退出的信号 #define MAX_POST_ACCEPT 10 //同时投递Accept请求的数量(可根据实际调整) ////////////////////////////////////////////////////////////////////////// //释放指针宏 #define RELEASE(x) {if(x!=NULL){delete x;x=NULL;}} //释放句柄宏 #define RELEASE_HANDLE(x) {if(x!=NULL && x!=INVALID_HANDLE_VALUE) {CloseHandle(x);x = NULL;}} //释放Socket宏 #define RELEASE_SOCKET(x) {if(x!=INVALID_SOCKET){closesocket(x);x=INVALID_SOCKET;}} ////////////////////////////////////////////////////////////////////////// //工作者线程:为IOCP请求服务的线程函数,也就是当完成端口出现完成数据包里, //就将之取走进行处理的线程 DWORD WINAPI CIOCPModel::_WorkerThread(PVOID pParam){ PTHREADPARAMS_WORKER pParams = (PTHREADPARAMS_WORKER)pParam; CIOCPModel* pIOCPModel = (CIOCPModel*)pParams->pIOCPModel; int nThreadNo = (int)pParams->nThreadNo; pIOCPModel->_ShowMessage(_T("工作者线程启动,ID:%d"), nThreadNo); OVERLAPPED* pOverlapped = NULL; PER_SOCKET_CONTEXT *pSocketContext = NULL; DWORD dwBytesTransfered = 0; //循环处理请求,直接收到Shutdown信息为止 while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0)){ BOOL bRet = GetQueuedCompletionStatus(pIOCPModel->m_hIOCP, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE); //退出时,程序要手动PostQueuedCompletionStatus(m_hIOCP, 0, (DWORD)EXIT_CODE, NULL); if (EXIT_CODE ==(DWORD)pSocketContext){ break; } //判断是否出现了错误 if (!bRet){ DWORD dwErr = WSAGetLastError(); //显示提示信息 if (!pIOCPModel->_HandleError(pSocketContext, dwErr)){ break; } continue;; } else{ //读取传入的参数 //CONTAINING_RECORD(pInt,A,a)展开来后:(A*)((char*)pInt - (unsigned long)(&((A*)0)->a)) /* CONTAINING_RECORD 宏返回一个结构实例的基地址,该结构的类型和结构所包含的一个域(成员)地址已知。 PCHAR CONTAINING_RECORD(IN PCHAR Address,IN TYPE Type,IN PCHAR Field); 参数 Address 指向Type类型结构实例中某域(成员)的指针。 Type 需要得到基地址的结构实例的结构类型名。 Field Type类型结构包含的域(成员)的名称。 */ PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped); //(PER_IO_CONTEXT*)pOverlapped; //判断是否客户端断开 if ((0==dwBytesTransfered)&& (RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType)){ TCHAR szIP[32]; InetNtop(AF_INET, &pSocketContext->m_ClientAddr.sin_addr, szIP, sizeof(szIP)); pIOCPModel->_ShowMessage(TEXT("客户端%s:%d断开连接."), szIP, ntohs(pSocketContext->m_ClientAddr.sin_port)); //释放掉对应的资源 pIOCPModel->_RemoveContext(pSocketContext); continue; } else{ switch (pIoContext->m_OpType) { case ACCEPT_POSTED: //Accept pIOCPModel->_DoAccept(pSocketContext, pIoContext); break; case RECV_POSTED: //Recv pIOCPModel->_DoRecv(pSocketContext, pIoContext); break; case SEND_POSTED: //Send //这里略过 break; default: //不应该执行到这里 break; } } } } //end while //TRACE(_T("工作者线程%d退出."), nThreadNo); //释放线程参数 RELEASE(pParam); return TRUE; } ////////////////////////////////////////////////////////////////////////// CIOCPModel::CIOCPModel() : m_szIP(DEFAULT_IP), m_nPort(DEFAULT_PORT), m_hWnd(NULL), m_nThreads(0), m_hIOCP(NULL), m_hShutdownEvent(NULL), m_phWorkerThreads(NULL), m_pListenContext(NULL),m_lpfnGetAcceptExSockAddrs(NULL), m_lpfnAcceptEx(NULL) { //初始化线程互斥量 InitializeCriticalSection(&m_csContextList); } CIOCPModel::~CIOCPModel(){ this->Stop(); DeleteCriticalSection(&m_csContextList); } //初始化WinSock2.2 BOOL CIOCPModel::LoadSocketLib() { WSADATA wsaData; int nRet; nRet = WSAStartup(MAKEWORD(2, 2), &wsaData); return (nRet); } // BOOL CIOCPModel::Start() { //建立系统退出的事件通知 m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL); //初始化IOCP if (FALSE == _InitializeIOCP()){ _ShowMessage(_T("初始化IOCP失败! ")); return FALSE; } else{ _ShowMessage(_T("初始化IOCP成功! ")); } //初始化Socket if (FALSE == _InitializeListenSocket()){ _ShowMessage(_T("Listen Socket初始化失败! ")); _DeInitialize(); return FALSE; } else{ _ShowMessage(_T("Listen Socket初始化! ")); } // _ShowMessage(_T("系统准备就绪,等候连接... ")); return TRUE; } //发送系统退出消息,退出完成端口和线程资源 void CIOCPModel::Stop(){ //激活关闭消息 SetEvent(m_hShutdownEvent); for (int i = 0; i < m_nThreads; i++) { //通知所有的完成端口操作退出 PostQueuedCompletionStatus(m_hIOCP, 0, (DWORD)EXIT_CODE, NULL); } //等待所有的客户端资源退出 if (NULL != m_phWorkerThreads) WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE); //清除客户端列表信息 _ClearContextList(); //清除其他资源 _DeInitialize(); _ShowMessage(_T("停止监听! ")); } ////////////////////////////////////////////////////////////////////////// //初始化IOCP BOOL CIOCPModel::_InitializeIOCP(){ //建立一个完成端口 m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (NULL == m_hIOCP) return FALSE; //根据本机的处理器数量,建立对应的线程数 m_nThreads = WORKER_THREADS_PER_PROCESSOR*_GetNumberOfProcessors(); //为工作者线程分配句柄数组 m_phWorkerThreads = new HANDLE[m_nThreads]; //根据计算出出的数量建立工作者(Worker)线程 DWORD nThreadID; for (int i = 0; i < m_nThreads;i++){ THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER; pThreadParams->pIOCPModel = this; pThreadParams->nThreadNo = i + 1; m_phWorkerThreads[i] = chBEGINTHREADEX(NULL, 0, _WorkerThread, (PVOID)pThreadParams, 0, &nThreadID); if (m_phWorkerThreads[i] == NULL) return FALSE; } return TRUE; } //初始化Socket BOOL CIOCPModel::_InitializeListenSocket(){ m_pListenContext = new PER_SOCKET_CONTEXT; //用WSASocket创建监听Socket(支持重叠IO操作) m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET==m_pListenContext->m_Socket){ _ShowMessage(TEXT("初始化Socket失败,错误代码:%d. "), WSAGetLastError()); return FALSE; }; //将监听Socket绑定到完成端口 if (NULL == CreateIoCompletionPort((HANDLE)m_pListenContext->m_Socket, m_hIOCP,(ULONG_PTR)m_pListenContext,0)){ _ShowMessage(TEXT("绑定Listen Socket到完成端口失败!错误代码:%d "),WSAGetLastError()); RELEASE_SOCKET(m_pListenContext->m_Socket); return FALSE; } //填充地址信息 struct sockaddr_in ServerAddress = { 0 }; ServerAddress.sin_family = AF_INET; //这里可以绑定到任何IP,但也可以指定一个IP //ServerAddress.sin_addr.S_un.S_addr = htonl(INADDR_ANY); InetPton(AF_INET, m_szIP,(PVOID)&ServerAddress.sin_addr.S_un.S_addr); ServerAddress.sin_port = htons(m_nPort); //绑定地址和端口到监听Socket if (SOCKET_ERROR == bind(m_pListenContext->m_Socket,(sockaddr*)&ServerAddress, sizeof(ServerAddress))){ _ShowMessage(TEXT("bind()函数执行错误.错误代码:%d "), WSAGetLastError()); return FALSE; } //开始监听 if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN)){ _ShowMessage(TEXT("Listen()函数执行出错,错误代码:%d "), WSAGetLastError()); return FALSE; } //获取AcceptEx函数地址 if (!_GetAcceptExPointer(m_pListenContext->m_Socket)) { _ShowMessage(TEXT("WSAIoCtl未能获取AcceptEx函数指针,错误代码:%d "), WSAGetLastError()); _DeInitialize(); return FALSE; } //获取GetAcceptExSockAddrs函数指针 if (!_GetAcceptExSockAddrsPointer(m_pListenContext->m_Socket)) { _ShowMessage(TEXT("WSAIoCtl未能获取GetAcceptExSockAddrs函数指针,错误代码:%d "), WSAGetLastError()); _DeInitialize(); return FALSE; } //为AcceptEx准备参数,然后投递I/O请求 for (int i = 0; i < MAX_POST_ACCEPT;i++){ //新建一个IO_CONTEXT PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext(); if (FALSE == _PostAccept(pAcceptIoContext)){ m_pListenContext->RemoveContext(pAcceptIoContext); return FALSE; } } _ShowMessage(TEXT("投递%d个AcceptEx请求完毕"), MAX_POST_ACCEPT); return TRUE; } //释放资源 BOOL CIOCPModel::_DeInitialize(){ //关闭系统退出事件句柄 RELEASE_HANDLE(m_hShutdownEvent); //释放工作者线程句柄 if (m_phWorkerThreads!=NULL) for (int i = 0; i < m_nThreads; i++) { RELEASE_HANDLE(m_phWorkerThreads[i]); } RELEASE(m_phWorkerThreads); //关闭IOCP句柄 RELEASE_HANDLE(m_hIOCP); //关闭监听Socket句柄 RELEASE(m_pListenContext); _ShowMessage(TEXT("释放资源完毕! ")); return TRUE; } //将Socket设备绑定到完成端口中 BOOL CIOCPModel::_AssociateWithIOCP(PER_SOCKET_CONTEXT *pContext){ //将用于和客户端通信的SOCKET绑定到完成端口中 HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCP,(DWORD)pContext,0); if (NULL ==hTemp){ _ShowMessage(TEXT("执行Socket绑定到完成端口失败!错误代码:%u"), GetLastError()); return FALSE; } return TRUE; } //投递Accept请求 BOOL CIOCPModel::_PostAccept(PER_IO_CONTEXT* pAcceptIoContext){ assert(INVALID_SOCKET != m_pListenContext->m_Socket); //准备参数 DWORD dwBytes = 0; pAcceptIoContext->m_OpType = ACCEPT_POSTED; WSABUF* p_wbuf = &pAcceptIoContext->m_wsaBuf; OVERLAPPED* pol = &pAcceptIoContext->m_Overlapped; //为以后新连入的客户端先准备好Socket(这与传统的Accept最大的差别) pAcceptIoContext->m_sockAccpet = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == pAcceptIoContext->m_sockAccpet){ _ShowMessage(TEXT("创建用于Accept的Socket失败!错误代码:%d"), WSAGetLastError()); return FALSE; } //投递AcceptEx请求 if (FALSE == m_lpfnAcceptEx(m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccpet, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, pol)) { if (WSA_IO_PENDING != WSAGetLastError()){ _ShowMessage(TEXT("投递AcceptEx请求失败,错误代码:%d"),WSAGetLastError()); return FALSE; } } return TRUE; } //投递接收数据请求 BOOL CIOCPModel::_PostRecv(PER_IO_CONTEXT* pIoContext){ //初始化变量 DWORD dwFlags = 0; DWORD dwBytes = 0; WSABUF* p_wbuf = &pIoContext->m_wsaBuf; OVERLAPPED* pol = &pIoContext->m_Overlapped; pIoContext->ResetBuffer(); pIoContext->m_OpType = RECV_POSTED; //投递WSARecv请求 int nBytesRecv = WSARecv(pIoContext->m_sockAccpet, p_wbuf, 1, &dwBytes, &dwFlags, pol, NULL); //如果返回错误,并且错误代码不是Pending,说明这个请求失败了 if (SOCKET_ERROR == nBytesRecv && (WSA_IO_PENDING != WSAGetLastError())){ _ShowMessage(TEXT("投递第一个WSARecv失败!")); return FALSE; } return TRUE; } //当客户端连入的时候,进行处理 //传入监听端口的Context, BOOL CIOCPModel::_DoAccept(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext){ LPSOCKADDR_IN pLocalAddr, pRemoteAddr; int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN); //1.取得连入客户端的地址信息(客户端发来的第1组数据,本地地址、客户端地址信息) m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len -((sizeof(SOCKADDR_IN)+16)*2), sizeof(SOCKADDR_IN)+16,sizeof(SOCKADDR_IN)+16, (LPSOCKADDR *)&pLocalAddr, &localLen, (LPSOCKADDR *)&pRemoteAddr, &remoteLen); TCHAR szIP[32]; InetNtop(AF_INET, (char*)&(pRemoteAddr->sin_addr.S_un.S_addr), szIP, sizeof(szIP)); _ShowMessage(TEXT("客户端:%s:%d连入."), szIP, ntohs(pRemoteAddr->sin_port)); _ShowMessage(TEXT("收到%s:%d信息:%s"), szIP, ntohs(pRemoteAddr->sin_port), pIoContext->m_wsaBuf.buf); //2.将AcceptSocket绑定到完成端口(但是要为这个Soket创建一个上下文,即完成键) PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT; pNewSocketContext->m_Socket = pIoContext->m_sockAccpet; memcpy(&(pNewSocketContext->m_ClientAddr), pRemoteAddr, sizeof(SOCKADDR_IN)); if (FALSE == _AssociateWithIOCP(pNewSocketContext)){ RELEASE(pNewSocketContext); return FALSE; } //3.继续建立与这个Socket中Io,用于在这个Socket上投递第1个Recv数据 PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext(); pNewIoContext->m_OpType = RECV_POSTED; pNewIoContext->m_sockAccpet = pNewSocketContext->m_Socket; //如果Buffer需要保留,则Copy一份出来 //memcpy(pNewIoContext->m_szBuffer, pIoContext->m_szBuffer, MAX_BUFFER_LEN); //开始这个Socket上投递请求 if (FALSE == _PostRecv(pNewIoContext)){ pNewSocketContext->RemoveContext(pNewIoContext); return FALSE; } //4.如果投递成功,将这个有效的客户端信息加入ContextList(有利用统一管理) _AddToContextList(pNewSocketContext); //5.使用完毕后,将监听Socket的那个IO重置,然后准备投递新的AcceptEx pIoContext->ResetBuffer(); return _PostAccept(pIoContext); } //当有接收的数据到达的时候进行处理 BOOL CIOCPModel::_DoRecv(PER_SOCKET_CONTEXT* pSocketContext, PPER_IO_CONTEXT pIoContext){ //先把收到的数据显示出来,然后重置状态,发送一个Recv请求 SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr; TCHAR szIP[32]; InetNtop(AF_INET, &ClientAddr->sin_addr, szIP, sizeof(szIP)); _ShowMessage(TEXT("收到%s:%d信息:%s"),szIP,ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf); //投递一下个WSARecv请求 return _PostRecv(pIoContext); } //========================================================================= // ContextList相关操作 //========================================================================= //将客户端的相关信息存储到数组中 void CIOCPModel::_AddToContextList(PER_SOCKET_CONTEXT* pSocketContext){ EnterCriticalSection(&m_csContextList); m_arrayClientContext.push_back(pSocketContext); LeaveCriticalSection(&m_csContextList); } //移除某个特定的Context void CIOCPModel::_RemoveContext(PER_SOCKET_CONTEXT* pSocketContext){ EnterCriticalSection(&m_csContextList); vector<PER_SOCKET_CONTEXT*>::iterator it; for (it = m_arrayClientContext.begin(); it != m_arrayClientContext.end();it++){ if (*it==pSocketContext){ m_arrayClientContext.erase(it); delete pSocketContext; break; } } LeaveCriticalSection(&m_csContextList); } //清空客户端列表 void CIOCPModel::_ClearContextList(){ EnterCriticalSection(&m_csContextList); vector<PER_SOCKET_CONTEXT*>::iterator it; for (it = m_arrayClientContext.begin();it!=m_arrayClientContext.end();it++){ delete *it; } m_arrayClientContext.clear(); LeaveCriticalSection(&m_csContextList); } //========================================================================= // 其他辅助函数 //========================================================================= //获取本机的处理器数量 inline int CIOCPModel::_GetNumberOfProcessors(){ SYSTEM_INFO si; GetSystemInfo(&si); return si.dwNumberOfProcessors; } // 在主界面中显示提示信息 void CIOCPModel::_ShowMessage(PCTSTR pszFormat, ...) const{ if (NULL == m_hWnd) return; //根据传入的参数格式化字符串 TCHAR szMsg[1024]; va_list arglist; va_start(arglist, pszFormat); _vstprintf_s(szMsg, _countof(szMsg), pszFormat, arglist); va_end(arglist); AddInformation(GetDlgItem(m_hWnd, IDC_LIST_INFO), szMsg); } //获取本机IP TCHAR* CIOCPModel::GetLocalIP(){ ////获得本机主机名 char hostname[MAX_PATH] = { 0 }; gethostname(hostname, MAX_PATH); struct addrinfo *pRet = NULL;//通过result指针指向addrinfo结构链表的指针 struct addrinfo hints;//需要<ws2tcpip.h> ZeroMemory(&hints, sizeof(hints)); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; DWORD dwRetval = getaddrinfo(hostname, NULL, &hints, &pRet);//返回值0代表成功 if (dwRetval != 0) return m_szIP; SOCKADDR_IN *sockaddr_ipv4 = (SOCKADDR_IN*)pRet->ai_addr; //将IP地址转化成字符串形式 char inAddr[16]; inet_ntop(AF_INET, (void*)&sockaddr_ipv4->sin_addr, inAddr, sizeof(inAddr)); #ifdef UNICODE USES_CONVERSION; m_szIP = A2W(inAddr); #else m_szIP =(TCHAR*)inAddr; #endif return m_szIP; } //获得AcceptEx函数的地址 BOOL CIOCPModel::_GetAcceptExPointer(SOCKET sock){ //使用AcceptEx函数,因为属于WinSock2规范之外的扩展函数,所以 //需要通过SWAIoCtl函数来获取其函数指针 DWORD dwBytes = 0; GUID GuidAcceptEx = WSAID_ACCEPTEX; return (SOCKET_ERROR != WSAIoctl( sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL )); } //获取GetAcceptExSockAddrs函数的指针 BOOL CIOCPModel::_GetAcceptExSockAddrsPointer(SOCKET sock){ //使用AcceptEx函数,因为属于WinSock2规范之外的扩展函数,所以 //需要通过WSAIoCtl函数来获取其函数指针 DWORD dwBytes = 0; GUID GuidAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; return (SOCKET_ERROR != WSAIoctl( sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptExSockAddrs, sizeof(GuidAcceptExSockAddrs), &m_lpfnGetAcceptExSockAddrs, sizeof(m_lpfnGetAcceptExSockAddrs), &dwBytes, NULL, NULL )); } //检测客户端Socket是否己断开 BOOL CIOCPModel::_IsSocketAlive(SOCKET s){ int nBytesSend = send(s, "", 0, 0); if (-1 == nBytesSend) return FALSE; return TRUE; } //处理完成端口上的错误 BOOL CIOCPModel::_HandleError(PER_SOCKET_CONTEXT* pContext, const DWORD& dwErr){ //如果超时,就再继续等待 if (WAIT_TIMEOUT ==dwErr){ //确认客户端是否还活着 if (!_IsSocketAlive(pContext->m_Socket)){ _ShowMessage(TEXT("检测到客户端异常退出!")); _RemoveContext(pContext); return TRUE; }else{ _ShowMessage(TEXT("网络操作超时!重试中...")); return TRUE; } //可能客户端异常退出了 } else if (ERROR_NETNAME_DELETED == dwErr){ _ShowMessage(TEXT("检测到客户端异常退出!")); _RemoveContext(pContext); return TRUE; } else{ _ShowMessage(TEXT("完成端口操作出现错误,线程退出。错误代码:%d"), dwErr); return FALSE; } return TRUE; }
//Resouce.h
//{{NO_DEPENDENCIES}} // Microsoft Visual C++ 生成的包含文件。 // 供 10_IOCPServer.rc 使用 // #define IDC_STOP 3 #define IDD_IOCPSERVER 101 #define IDC_SERVERIP 1003 #define IDC_PORT 1004 #define IDC_LIST_INFO 1006 // Next default values for new objects // #ifdef APSTUDIO_INVOKED #ifndef APSTUDIO_READONLY_SYMBOLS #define _APS_NEXT_RESOURCE_VALUE 102 #define _APS_NEXT_COMMAND_VALUE 40001 #define _APS_NEXT_CONTROL_VALUE 1009 #define _APS_NEXT_SYMED_VALUE 101 #endif #endif
//Serve.rc
// Microsoft Visual C++ generated resource script. // #include "resource.h" #define APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 2 resource. // #include "winres.h" ///////////////////////////////////////////////////////////////////////////// #undef APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // 中文(简体,中国) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS) LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED #ifdef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // TEXTINCLUDE // 1 TEXTINCLUDE BEGIN "resource.h " END 2 TEXTINCLUDE BEGIN "#include ""winres.h"" " "