为什么采用IOCP(I/O Completion Port)完成端口模型?
在网络发展飞速的今天,客户端的数量日益倍增,如何高效地处理多个客户端的并发,成为服务器需要处理的关键问题之一。在处理并发的问题上,处理器的做法是采用“一客户一线程”的方式,一个客户为其创建一个线程,如果客户端数量很多时,线程数也随之增加,这就需要服务器来协调各线程之间的工作,一旦处理不当,就会引起并发中的一系列问题,甚至可能导致系统瘫痪。IOCP机制可以较好的处理各个客户端之间的协调和并发问题,可以使系统资源可以被合理地利用,有效地解决了因多线程竞争所带来的问题。
IOCP优缺点:
② 去除删除线程创建/终结负担。
③ 利于管理,分配线程,控制并发,最小化的线程上下文切换。
④ 优化线程调度,提高CPU和内存缓冲的命中率。
阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。函数只有在得到结果之后才会返回。
非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回。
如果使用“同步”的方式来通信的话,这里说的同步的方式就是说所有的操作都在一个线程内顺序执行完成,这么做缺点是很明显的:因为同步的通信操作会阻塞住来自同一个线程的任何其他操作,只有这个操作完成了之后,后续的操作才可以完成;一个最明显的例子就是咱们在MFC的界面代码中,直接使用阻塞Socket调用的代码,整个界面都会因此而阻塞住没有响应!所以我们不得不为每一个通信的Socket都要建立一个线程,而异步则没有这些缺点,它可以同时完成两件或两件以上的事务。
单线程在程序执行时,所走的程序路径按照连续顺序排下来,前面的必须处理好,后面的才会执行。多线程是为了同步完成多项任务,不是为了提高运行效率,而是为了提高资源使用效率来提高系统的效率。线程是在同一时间需要完成多项任务的时候实现的。
重叠I/O技术:
针对每个I/O操作绑定一个内核事件对象,并将等待事件等待函数等待该事件的受信,当I/O操作完成后系统使得与该操作绑定的事件受信,从而判断那个操作完成。该技术解决了使一个设备内核对象变为有信号技术中一个设备只能对应一个操作的不足。
IOCP模型实现流程:
(1) 调用 CreateIoCompletionPort() 函数创建一个完成端口句柄,而且在一般情况下,我们需要且只需要建立这一个完成端口,该句柄直接调用API;
(2) 建立工作者(Worker)线程,专门用来和客户端进行通信的;
(3) 接收连入的Socket连接,启动一个独立的线程,专门用来accept客户端的连接请求;
(4) 每当有客户端连入的时候,我们就还是得调用CreateIoCompletionPort()函数,这里却不是新建立完成端口了,而是把新连入的Socket(也就是前面所谓的设备句柄),与目前的完成端口绑定在一起。
(5) 前面建立的Worker线程中提交网络请求,例如WSARecv(),让系统执行接收数据的操作。同时线程都需要分别调用GetQueuedCompletionStatus() 函数在扫描完成端口的队列里是否有网络通信的请求存在(例如读取数据,发送数据等),一旦有的话,就将这个请求从完成端口的队列中取回来,继续执行本线程中后面的处理代码,处理完毕之后,我们再继续投递下一个网络通信的请求。
代码:
相关数据结构:
1 typedef struct _PER_HANDLE_DATA 2 { 3 SOCKET SockClient; 4 SOCKADDR_IN addr; 5 }PER_HANDLE_DATA,*PPER_HANDLE_DATA; 6 7 typedef struct _PER_IO_DATA 8 { 9 OVERLAPPED ol; // 重叠I/O结构 10 char buf[BUFFER_SIZE]; // 数据缓冲区 11 int nOperationType; //I/O操作类型 12 #define OP_READ 1 13 #define OP_WRITE 2 14 #define OP_ACCEPT 3 15 }PER_IO_DATA,*PPER_IO_DATA;
(1)初始化Socket
#include <winsock2.h> #pragma comment(lib, "WS2_32") // 链接到WS2_32.lib class CInitSock { public: CInitSock(BYTE minorVer = 2, BYTE majorVer = 2) { // 初始化WS2_32.dll WSADATA wsaData; WORD sockVersion = MAKEWORD(minorVer, majorVer); if(::WSAStartup(sockVersion, &wsaData) != 0) { exit(0); } } ~CInitSock() { ::WSACleanup(); TRACE0("WSACleanup()"); } };
(2)用Listen类封装监听线程:CreateIoCompletionPort()创建完成端口句柄,监听端口,投递WSARecv(接收数据)
BOOL CListen::Initial() { m_sListen=socket(AF_INET,SOCK_STREAM,0); SOCKADDR_IN addrSrv; addrSrv.sin_family=AF_INET; addrSrv.sin_port=::ntohs(6666); /////////////// addrSrv.sin_addr.S_un.S_addr=INADDR_ANY; if (::bind(m_sListen,(SOCKADDR*)&addrSrv,sizeof(addrSrv))==SOCKET_ERROR) { TRACE("绑定套接字失败!"); int err; if ((err=WSAGetLastError())!=WSA_IO_PENDING) { TRACE1("err is %d",err); //TRACE("投递失败"); } closesocket(m_sListen); return FALSE; } if (listen(m_sListen,20)==SOCKET_ERROR) { closesocket(m_sListen); return FALSE; } AfxBeginThread(ListenFun,this,THREAD_PRIORITY_NORMAL,0,0,NULL); return TRUE; } int CListen::Run() { PPER_HANDLE_DATA pHandleData; PPER_IO_DATA pPerIO; while (TRUE) { SOCKADDR_IN addrRemote; int iRemoteLen=sizeof(addrRemote); SOCKET sNew=accept(m_sListen,(SOCKADDR*)&addrRemote,&iRemoteLen); //TRACE1("sNew的值为:%d",sNew); // 更新界面数据 PostMessage(m_hWnd,USER_UPDATEWINDOW,(long)sNew,(long)&addrRemote); //pHandleData=new PER_HANDLE_DATA; pHandleData=(PPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA)); pHandleData->SockClient=sNew; memcpy(&pHandleData->addr,&addrRemote,iRemoteLen); // 将此套接字关联到端口上 CreateIoCompletionPort((HANDLE)pHandleData->SockClient,m_hCompletion,(DWORD)pHandleData,0); // 投递一个请求 // pPerIO=new PER_IO_DATA; pPerIO=(PPER_IO_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_DATA)); // memset(&pPerIO->ol,0,sizeof(pPerIO->ol)); ///////////////// pPerIO->nOperationType=OP_READ; WSABUF WsaBuf; WsaBuf.buf=pPerIO->buf; WsaBuf.len=BUFFER_SIZE; DWORD dwRecv; DWORD dwFlag=0; int ret; if((ret=::WSARecv(pHandleData->SockClient,&WsaBuf,1,&dwRecv,&dwFlag,&pPerIO->ol,NULL))!=NO_ERROR) { TRACE1("ret is %d",ret); int err; if ((err=WSAGetLastError())!=WSA_IO_PENDING) { TRACE1("err is %d",err); TRACE("投递失败"); } } } return 1; }
(3)用Server类封装工作线程:GetQueuedCompletionStatus() 监控完成端口,提交请求
1 int CServer::Run() 2 { 3 DWORD dwTrans; 4 while (TRUE) 5 { 6 TRACE("进入工作线程处理函数"); 7 BOOL bOK=GetQueuedCompletionStatus(m_hCompletion,&dwTrans,(LPDWORD)&m_spPerHandle, 8 (LPOVERLAPPED*)&m_spPerIO,WSA_INFINITE); 9 if (!bOK && dwTrans!=0) // 在此套接字上发生错误 10 { 11 TRACE("取包出错"); 12 int err=WSAGetLastError(); 13 if ((err=WSAGetLastError())!=WSA_IO_PENDING) 14 { 15 TRACE1("err is %d",err); 16 } 17 closesocket(m_spPerHandle->SockClient); 18 GlobalFree(m_spPerHandle); 19 GlobalFree(m_spPerIO); 20 //delete m_spPerHandle; 21 //delete m_spPerIO; 22 continue; 23 } 24 if (bOK==ERROR_SUCCESS && dwTrans==0 && (m_spPerIO->nOperationType==OP_READ || m_spPerIO->nOperationType==OP_WRITE)) 25 { 26 TRACE("对方关闭连接"); 27 SendMessage(m_hWnd,USER_UPDATEWINDOW,(long)m_spPerHandle->SockClient,(long)1); 28 closesocket(m_spPerHandle->SockClient); 29 //delete m_spPerHandle; 30 //delete m_spPerIO; 31 GlobalFree(m_spPerHandle); 32 GlobalFree(m_spPerIO); 33 continue; 34 } 35 switch (m_spPerIO->nOperationType) 36 { 37 case OP_READ: 38 { 39 TRACE("收到读"); 40 TRACE1("dwTrans的值为:%d",dwTrans); 41 m_spPerIO->buf[dwTrans]='