zoukankan      html  css  js  c++  java
  • Server Develop (八) IOCP模型

    IOCP模型

      IOCP全称I/O Completion Port,中文译为I/O完成端口。IOCP是一个异步I/O的Windows API,它可以高效地将I/O事件通知给应用程序,类似于Linux中的Epoll。

    简介

      IOCP模型属于一种通讯模型,适用于Windows平台下高负载服务器的一个技术。在处理大量用户并发请求时,如果采用一个用户一个线程的方式那将造成CPU在这成千上万的线程间进行切换,后果是不可想象的。而IOCP完成端口模型则完全不会如此处理,它的理论是并行的线程数量必须有一个上限-也就是说同时发出500个客户请求,不应该允许出现500个可运行的线程。目前来说,IOCP完成端口是Windows下性能最好的I/O模型,同时它也是最复杂的内核对象。它避免了大量用户并发时原有模型采用的方式,极大的提高了程序的并行处理能力。

    原理图

      从图中可以看到,一共包括三部分:完成端口(存放重叠的I/O请求),客户端请求的处理,等待者线程队列(一定数量的工作者线程,一般采用CPU*2个)。

      完成端口中所谓的[端口]并不是我们在TCP/IP中所提到的端口,可以说是完全没有关系。它其实就是一个通知队列,由操作系统把已经完成的重叠I/O请求的通知放入其中。当某项I/O操作一旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。

      通常情况下,我们会在创建一定数量的工作者线程来处理这些通知,也就是线程池的方法。线程数量取决于应用程序的特定需要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的CPU时间,在此期间该线程可以运行,然后另一个线程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它线程来充分利用这些时间片。

    IOCP的优点

      基于IOCP的开发是异步IO的,决定了IOCP所实现的服务器的高吞吐量。

      完成端口的线程并发量可以在创建该完成端口时指定,从而限制了与该完成端口相关联的可运行线程的数目。

         通过引入IOCP,会大大减少Thread切换带来的额外开销,最小化的线程上下文切换,减少线程切换带来的巨大开销,让CPU把大量的事件用于线程的运行。当与该完成端口相关联的可运行线程的总数目达到了该并发量,系统就会阻塞任何与该完成端口相关联的后续线程的执行,直到与该完成端口相关联的可运行线程数目下降到小于该并发量为止。

      Select是先查询再发起IO请求,IOCP是先发起IO请求再接收通知。但是Select方式在处理大量非活动连接时是比较低效的,因为每次Select需要对所有的Socket状态进行查询,而对非活动的Socket查询是没有意义的浪费,另外由于Socket句柄不能设置用户私有数据,当查询返回Socket句柄时还需要一个额外的查询来找到关联的用户对象,这两点是Select低效的关键。

    IOCP的具体实现步骤

      IOCP中用到单个函数,分为用于创建关联完成端口、获取完成状态和投递完成状态,函数原型:

    //功能:创建完成端口和关联完成端口
     HANDLE WINAPI CreateIoCompletionPort(
         *    __in   HANDLE FileHandle,              // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
         *    __in   HANDLE ExistingCompletionPort,  // 已经存在的IOCP句柄
         *    __in   ULONG_PTR CompletionKey,        // 完成键,包含了指定I/O完成包的指定文件
         *    __in   DWORD NumberOfConcurrentThreads // 真正并发同时执行最大线程数,一般推介是CPU核心数*2
         * );
    
    //例子
    //创建完成端口句柄
    HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    
    
    typedef struct{
        SOCKET socket;//客户端socket
        SOCKADDR_STORAGE ClientAddr;//客户端地址
    }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
    
    //与socket进行关联
    CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
    //功能:获取队列完成状态
    BOOL   GetQueuedCompletionStatus(
        HANDLE   CompletionPort,          //完成端口句柄
        LPDWORD   lpNumberOfBytes,    //一次I/O操作所传送的字节数
        PULONG_PTR   lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK
        LPOVERLAPPED   *lpOverlapped, //IOCP特定的结构体
        DWORD   dwMilliseconds);           //调用者的等待时间
    /*
    返回值:
    调用成功,则返回非零数值,相关数据存于lpNumberOfBytes、lpCompletionKey、lpoverlapped变量中。失败则返回零值。
    */
    
    //用于IOCP的特定函数
    typedef struct _OVERLAPPEDPLUS{
        OVERLAPPED ol;      //一个固定的用于处理网络消息事件返回值的结构体变量
        SOCKET s, sclient;  int OpCode;      //用来区分本次消息的操作类型(在完成端口的操作里面,是以消息通知系统,读数据/写数据,都是要发这样的消息结构体过去的)
        WSABUF wbuf;     //读写缓冲区结构体变量 
        DWORD dwBytes, dwFlags; //一些在读写时用到的标志性变量 
    }OVERLAPPEDPLUS;
    //功能:投递一个队列完成状态
    BOOL PostQueuedCompletionStatus(   HANDLE CompletlonPort,
    //指定想向其发送一个完成数据包的完成端口对象   DW0RD dwNumberOfBytesTrlansferred, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数   DWORD dwCompletlonKey, //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数   LPOVERLAPPED lpoverlapped, ); //指定—个值,直接传递给GetQueuedCompletionStatus函数中对应的参数

      TCP IOCP实现具体步骤:

    1. 创建好 IOCP
    2. 创建 Socket ( socket 可以是由 Accept 得到)
    3.  将 Socket 关联到 IOCP
    4. socket 向 IOCP 提交各种所需请求
    5. IOCP 操作完成之后将结果返回给 socket
    6. 重复步骤 3 和 4 ,直到 socket 关闭

        例子:

      1 #include <winsock2.h>
      2 #include <windows.h>
      3 #include <string>
      4 #include <iostream>
      5 using namespace std;
      6 
      7 #pragma comment(lib,"ws2_32.lib")
      8 #pragma comment(lib,"kernel32.lib")
      9 
     10 HANDLE g_hIOCP;
     11 
     12 enum IO_OPERATION{IO_READ,IO_WRITE};
     13 
     14 struct IO_DATA{
     15     OVERLAPPED                  Overlapped;
     16     WSABUF                      wsabuf;
     17     int                         nBytes;
     18     IO_OPERATION                opCode;
     19     SOCKET                      client;
     20 };
     21 
     22 char buffer[1024];
     23 
     24 DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {
     25     IO_DATA *lpIOContext = NULL; 
     26     DWORD nBytes = 0;
     27     DWORD dwFlags = 0; 
     28     int nRet = 0;
     29 
     30     DWORD dwIoSize = 0; 
     31     void * lpCompletionKey = NULL;
     32     LPOVERLAPPED lpOverlapped = NULL;
     33 
     34     while(1){
     35         GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
     36         
     37         lpIOContext = (IO_DATA *)lpOverlapped;
     38         if(dwIoSize == 0)
     39         {
     40             cout << "Client disconnect" << endl;
     41             closesocket(lpIOContext->client);
     42             delete lpIOContext;
     43             continue;
     44         }
     45 
     46         if(lpIOContext->opCode == IO_READ) // a read operation complete
     47         {
     48             ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
     49             lpIOContext->wsabuf.buf = buffer;
     50             lpIOContext->wsabuf.len = strlen(buffer)+1;
     51             lpIOContext->opCode = IO_WRITE;
     52             lpIOContext->nBytes = strlen(buffer)+1;
     53             dwFlags = 0;
     54             nBytes = strlen(buffer)+1;
     55             nRet = WSASend(
     56                 lpIOContext->client,
     57                 &lpIOContext->wsabuf, 1, &nBytes,
     58                 dwFlags,
     59                 &(lpIOContext->Overlapped), NULL);
     60             if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
     61                 cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
     62                 closesocket(lpIOContext->client);
     63                 delete lpIOContext;
     64                 continue;
     65             }
     66             memset(buffer, NULL, sizeof(buffer));
     67         }
     68         else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
     69         {
     70             // Write operation completed, so post Read operation.
     71             lpIOContext->opCode = IO_READ; 
     72             nBytes = 1024;
     73             dwFlags = 0;
     74             lpIOContext->wsabuf.buf = buffer;
     75             lpIOContext->wsabuf.len = nBytes;
     76             lpIOContext->nBytes = nBytes;
     77             ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
     78 
     79             nRet = WSARecv(
     80                 lpIOContext->client,
     81                 &lpIOContext->wsabuf, 1, &nBytes,
     82                 &dwFlags,
     83                 &lpIOContext->Overlapped, NULL);
     84             if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
     85                 cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl;
     86                 closesocket(lpIOContext->client);
     87                 delete lpIOContext;
     88                 continue;
     89             } 
     90             cout<<lpIOContext->wsabuf.buf<<endl;
     91         }
     92     }
     93     return 0;
     94 }
     95 void main ()
     96 {
     97     WSADATA wsaData;
     98     WSAStartup(MAKEWORD(2,2), &wsaData);
     99 
    100     SOCKET    m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
    101     
    102     sockaddr_in server;
    103     server.sin_family = AF_INET;
    104     server.sin_port = htons(6000);
    105     server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
    106     
    107     bind(m_socket ,(sockaddr*)&server,sizeof(server));
    108 
    109     listen(m_socket, 8);
    110 
    111     SYSTEM_INFO sysInfo;
    112     GetSystemInfo(&sysInfo);
    113     int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
    114 
    115     g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount);
    116     
    117     //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);
    118 
    119     for( int i=0;i < g_ThreadCount; ++i){
    120         HANDLE  hThread;
    121         DWORD   dwThreadId;
    122         hThread = CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId);
    123         CloseHandle(hThread);
    124     }
    125 
    126     while(1)
    127     {
    128         SOCKET client = accept( m_socket, NULL, NULL );
    129         cout << "Client connected." << endl;
    130         
    131 
    132         if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){
    133             cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
    134             closesocket(client);
    135         }
    136         else { //post a recv request
    137             IO_DATA * data = new IO_DATA;
    138             memset(buffer, NULL ,1024);
    139             memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
    140             data->opCode = IO_READ;
    141             data->nBytes = 0;
    142             data->wsabuf.buf  = buffer;
    143             data->wsabuf.len  = sizeof(buffer);
    144             data->client = client;
    145             DWORD nBytes= 1024 ,dwFlags=0;
    146             int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,
    147                 &dwFlags,
    148                 &data->Overlapped, NULL);
    149             if(nRet == SOCKET_ERROR  && (ERROR_IO_PENDING != WSAGetLastError())){
    150                 cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
    151                 closesocket(client);
    152                 delete data;
    153             }
    154             cout<<data->wsabuf.buf<<endl;
    155         }
    156     }
    157     closesocket(m_socket);
    158     WSACleanup();
    159 }
    View Code
     1 #include <iostream>
     2 #include <WinSock2.h>
     3 using namespace std;
     4 
     5 #pragma comment(lib,"ws2_32.lib")
     6 
     7 void main()
     8 {
     9     WSADATA wsaData;  
    10     WSAStartup(MAKEWORD(2,2), &wsaData);
    11 
    12     sockaddr_in server;
    13     server.sin_family = AF_INET;
    14     server.sin_port   = htons(6000);
    15     server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
    16 
    17     SOCKET client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    18 
    19     int flag;
    20     flag = connect(client, (sockaddr*)&server, sizeof(server));
    21     if(flag < 0){
    22         cout<<"error!"<<endl;
    23         return;
    24     }
    25     while(1){
    26         cout<<"sent hello!!!!"<<endl;
    27         char buffer[1024];
    28         strcpy(buffer,"hello");
    29         send(client, buffer, 1024, 0);
    30 
    31         memset(buffer, NULL, sizeof(buffer));
    32         
    33         cout<<"recv: "<<endl;
    34         int rev = recv(client, buffer, 1024, 0);
    35         if(rev == 0)
    36             cout<<"recv nothing!"<<endl;
    37         cout<<buffer<<endl;
    38         Sleep(10000);
    39     }
    40 
    41     closesocket(client);
    42     WSACleanup();
    43 }
    View Code

    参考

    http://www.cnblogs.com/lidabo/archive/2012/12/10/2812230.html

    http://www.codeproject.com/KB/IP/iocp-multicast-udp.aspx

    http://blog.csdn.net/zhongguoren666/article/details/7386592

    http://www.baike.com/wiki/%E5%AE%8C%E6%88%90%E7%AB%AF%E5%8F%A3%E6%A8%A1%E5%9E%8B

    http://blog.csdn.net/neicole/article/details/7549497

    http://ycool.com/post/zgu6hbp

    知识共享许可协议
    IOCP模型cococo点点 创作,采用 知识共享 署名-相同方式共享 3.0 未本地化版本 许可协议进行许可。欢迎转载,请注明出处:
    转载自:cococo点点 http://www.cnblogs.com/coder2012
  • 相关阅读:
    设计模式开篇——7大设计原则
    MySQL MVCC专题
    Spring常考的面试题
    HashMap常考面试题
    Equals和==的比较
    高并发编程
    一文读懂JVM
    scala实现定时任务的方法
    PLAY2.6-SCALA(十二) 表单的处理
    PLAY2.6-SCALA(十一) 模板常用场景
  • 原文地址:https://www.cnblogs.com/coder2012/p/3185715.html
Copyright © 2011-2022 走看看