zoukankan      html  css  js  c++  java
  • 套接字I/O模型-完成端口IOCP

    “完成端口”模型是迄今为止最为复杂的一种I/O模型。然而,假若一个应用程序同时需要管理为数众多的套接字,那么采用这种模型,往往可以达到最佳的系统性能!但不幸的是,该模型只适用于Windows NT和Windows 2000操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千个套接字的时候,而且希望随着系统内安装的CPU数量的增多,应用程序的性能也可以线性提升,才应考虑采用“完成端口”模型。要记住的一个基本准则是,假如要为Windows NT或Windows 2000开发高性能的服务器应用,同时希望为大量套接字I/O请求提供服务(Web服务器便是这方面的典型例子),那么I/O完成端口模型便是最佳选择!
    从本质上说,完成端口模型要求创建一个windows完成端口对象,该对象通过指定数量的线程,对重叠I/O进行管理,以便为已完成的重叠I/O请求提供服务。要注意的是,所谓完成端口,实际上是windows采用的一种I/O构造机制,除套接字句柄之外,还可以接受其他东西。
    使用这种模型之前,首先要创建一个I/O完成端口对象,用它面向任意数量的套接字句柄,管理多个I/O请求,要做到这一点,首先调用函数:
    HANDLE CreateIoCompletionPort( 
      HANDLE FileHandle, 
      HANDLE ExistingCompletionPort, 
      DWORD CompletionKey, 
      DWORD NumberOfConcurrentThreads 
    ); 
    首先注意该函数实际用于两个截然不同的两个目的:
    1.用于创建一个完成端口对象
    2.将一个句柄同完成端口关联在一起
    最开始创建完成端口时,我们唯一感兴趣的是NumberOfConccurrentThreads,前三个参数不太重要。 NumberOfConccurrentThreads定义了在一个完成端口上,同时允许执行的线程数量。若将该参数设为0,则告诉系统安装了多少个处理器,则允许同时运行多少个线程,可用如下代码创建一个I/O完成端口:
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
    该语句的作用是返回一个句柄,在为完成端口分配了一个套接字句柄后,用来对那个端口进行标识。
     
    工作器线程与完成端口
    成功创建一个完成端口后,便可开始将套接字句柄与对象关联到一起。但在关联套接字之前,首先必须创建一个或多个工作器线程,以便在套接字的I/O请求投递给完成端口后,为完成端口提供服务。
    假如事先预计到线程有可能暂时处于阻塞状态,那么最好能够创建比CreateIoCompletionPort的NumberOfConccurrentThreads值更多的线程。以便到时候充分发挥系统的潜力。
    一旦在完成端口上拥有足够多的工作器线程来为I/O请求提供服务,便可着手将套接字句柄同完成端口关联在一起。需要在一个完成端口上调用CreateIoCompletionPort函数,同时为前三个参数FileHandle,ExistingCompletionPort和CompletionKey提供套接字信息。其中,FileHandle参数指定一个要同完成端口关联在一起的套接字句柄,ExistingCompletionPort参数标识的是一个现有的完成端口套接字句柄已经与他关联在一起。CompletionKey参数标识的是要与某个特定套接字句柄关联在一起的单句柄数据;在这个参数中,应用程序可保持与一个套接字对应的任意类型信息。之所以叫它单句柄数据,是由于它代表了与套接字句柄关联在一起的数据。可将它作为指向一个数据结构的指针;在这个结构中,同时包含了套接字的句柄,以及与该套接字有关的其他信息。为完成端口提供服务的线程的例程可通过这个参数,取得与套接字句柄有关的信息。
    下面示例阐述了如何使用完成端口模型,来开发一个回应服务器应用程序,这个程序基本按照如下步骤进行:
    1.创建一个完成端口,第四个参数为0,它指定完成端口上每个处理器一次只允许执行一个工作器线程
    2.判断系统内有多少个处理器
    3.创建工作器线程,根据步骤2得到的处理器信息,在完成端口上为已完成的I/O请求提供服务。在这个简单的例子中,我们为每个处理器只创建一个工作器线程。调用CreateThread函数时,必须同时提供一个工作器例程,由线程在创建好后执行
    4.准备好一个监听套接字,在端口上监听传入的连接
    5.使用accept接收入站的连接请求
    6.创建一个数据结构,用于容纳单句柄数据,同时在结构中存入接收的套接字句柄
    7.调用CreateIoCompletionPort,将自accept返回的新套接字句柄同完成端口关联在一起。通过 CompletionKey 参数,将单句柄数据结构传递给CreateIoCompletionPort
    8.开始在已结束的连接上进行I/O操作,在此,我们希望通过重叠I/O机制,在新建套接字投递一个或多个WSARecv或WSASend请求。这些I/O请求完成后,工作器线程会为I/O请求提供服务,同时继续处理以后的I/O请求
    9.重复步骤5~8,直到服务器终止
    HANDLE CompletionPort; 
    WSADATA wsd; 
    SYSTEM_INFO SystemInfo; 
    SOCKADDR_IN addr; 
    SOCKET Listen; 
    int i; 
    typedef struct _PER_HANDLE_DATA 
    { 
        SOCKET Socket; 
        SOCKADDR_STORAGE ClientAddr; 
        //将和这个句柄关联的其他信息  
    }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; 
     
    //加载Winsock 
    StartWinsock(MAKEWORD(2,2), &wsd); 
     
    //第一步 
    //创建一个I/O完成端口 
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
     
    //第二步 
    //确定系统有多少个处理器 
    GetSystemInfo(&SystemInfo); 
     
    //第三步 
    //基于系统中可用的处理器数量创建工作器线程 
    //对这个例子,为每个处理器创建一个工作器线程 
    for(i=0; i<SystemInfo.dwNumberOfProcessors; i++) 
    { 
        HANDLE ThreadHandle; 
         
        //创建一个服务器的工作线程,并将完成端口传递到该线程 
        ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread,  
                                    CompletionPort, 0, NULL); 
        //关闭线程句柄 
        CloseHandle(ThreadHandle);  
    }   
     
    //第四步 
    //创建一个监听套接字 
    Listen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); 
    addr.sin_family = AF_INET; 
    addr.sin_port = htons(5050); 
    addr.sin_addr.s_addr = htonl(INADDR_ANY); 
    bind(Listen, (PSOCKADDR)&addr, sizeof(SOCKADDR_IN)); 
    listen(Listen, 5); 
     
    while(TRUE) 
    { 
        PER_HANDLE_DATA *PerHandleData = NULL; 
        SOCKADDR_IN saRemote; 
        SOCKET Accept; 
        int RemoteLen; 
         
        //第五步 
        //接收连接,并分配到完成端口 
        RemoteLen = sizeof(SOCKADDR_IN); 
        Accept = WSAAccept(Listen, (SOCKADDR*)&saRemote, &RemoteLen); 
         
        //第六步 
        //创建用来和套接字关联的单句柄数据信息结构 
        PerHandleData  = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA)); 
        printf("Socket Number %d connected
    ",Accept); 
        PerHandleData->Socket = Accept; 
        memcpy(&PerHandleData->ClientAddr,&saRemote,RemoteLen); 
         
        //第七步 
        //将接收套接字和完成端口关联起来 
        CreateIoCompletionPort((HANDLE)Accept, 
                               CompletionPort, 
                               (DWORD)PerHandleData, 
                               0); 
         
        //第八步 
        //开始在接受套接字上处理I/O 
        //使用重叠I/O,在套接字上投递一个或多个WSASend或WSARecv调用 
        WSARecv(...);              
    }  
     
    DWORD WINAPI ServerWorkerThread(LPVOID lpParam) 
    { 
        //工作器线程 
        return 0;  
    } 
     
    完成端口和重叠I/O
    将套接字句柄与一个完成端口关联在一起后,便能以套接字句柄为基础,投递重叠发送与接收请求,开始对I/O请求进行处理,之后可开始依赖完成端口,接收有关I/O操作完成情况通知。从本质上说,完成端口模型利用了Windows重叠I/O机制。在这种机制中,类似WSASend和WSARecv这样的WindowsAPI调用会立即返回。此时,需要由应用程序负责在以后的某个时间,通过OVERLAPPED结构来检索调用的结果。在完成端口模型中,想要做到这一点需要使用GetQueuedCompletionStatus函数,让一个或多个工作器线程在完成端口上等待:
    BOOL GetQueuedCompletionStatus( 
      HANDLE CompletionPort, 
      LPWORD lpNumberOfBytesTransferred, 
      PULONG_PTR lpCompletionkey, 
      LPOVERLAPPED * lpOverlapped, 
      DWORD dwMilliseconds 
    ); 
    其中,CompletionPort对应与线程所在的完成端口。lpNumberOfBytesTransferred参数负责在完成一次I/O操作后,接收实际传输的字节数。lpCompletionkey参数为原先传递到CreateIoCompletionPort函数的套接字返回单句柄数据。如前所述,大家最好将套接字句柄保持在这个键中。lpOverlapped参数用于接收已完成的I/O操作的WSAOVERLAPPED结构。因为可用它获取每个I/O操作的数据,所有这实际上也是一个相当重要的参数。dwMilliseconds用于指明调用者等待一个完成数据包在完成端口上出现时,希望等候的毫秒数。假如将其设为INFINITE,调用会无休止的等待下去。
     
    单句柄数据和单I/O操作数据
    当一个工作器线程从GetQueuedCompletionStatus这个API调用中接收到I/O完成通知后,在lpCompletionKey和lpOverlapped参数中,会包含一些必要的套接字信息。利用这些信息,可通过完成端口,继续在一个套接字上进行I/O处理。通过这些参数,可获得两种重要的套接字数据类型:单句柄数据和单I/O操作数据。
    因为在一个套接字首次与完成端口关联到一起的时候,单句柄数据便与一个特定的套接字句柄对应起来了,所有lpCompletionKey参数也包含了单句柄数据。这些数据真是在进行CreateIoCompletionPort调用的时候,通过CompletionKey参数传递的。通常情况下,应用程序会将与I/O请求有关的套接字句柄保存在这里。
    lpOverlappde则包含了一个OVERLAPPED结构,在它后面跟随单I/O操作数据。工作器线程处理一个完成数据包时(回应数据,接受连接以及投递另一个线程等),这些信息是它必须知道的。单I/O操作数据是包含在一个结构内的,任意数量的字节,这个结果本身也包含了一个OVERLAPPED结构,假如一个函数要求用到一个OVERLAPPED结构,我们便必须将这样的一个结构传递进去,以满足它的要求。要想做到这一点,一个简单的方法是定义一个结构,然后将OVERLAPPED结构作为新结构的第一个元素使用,举个例子:
    typedef struct 
    { 
      OVERLAPPED Overlapped; 
      char Buffer[DATA_BUFSIZE]; 
      int BufferLen; 
      int OperationType; 
    }PER_IO_DATA 
    要想调用windowsAPI函数,同时为其分配一个OVERLAPPED结构,只要简单的撤销对结构中OVERLAPPED机构的引用即可,如下所示:
    PER_IO_OPERATION_DATA PerIoData; 
    WSABUF wbuf; 
    DWORD Bytes,Flags; 
     
    //初始化wbuf 
     
    WSARecv(socket,&wbuf,1,&Bytes,&Flags,&(PerIoData.Overlapped),NULL); 
    在工作器线程的后面部分,GetQueuedCompletionStatus函数返回了一个重叠结构和完成键,获取单I/O数据应使用宏CONTAINING_RECORD,例如:
    PER_IO_DATA *PerIoData = NULL; 
    OVERLAPPED *lpOverlapped = NULL; 
     
    ret = GetQueuedCompletionStatus( 
          ComPortHandle, 
          &Transferred, 
          (PULONG_PTR)&CompletionKey, 
           &lpOverlapped, 
           INFINITE); 
    //检查成功的返回 
    PerIoData = CONTAINING_RECORD(lpOverlapped,PER_IO_DATA,Overlapped); 
    应该使用这个宏;否则,结构PER_IO_DATA的成员OVERLAPPED就始终不得不首先出现,这会成为一个危险的假设(多个开发者开发同一段代码时尤为严重)。
    可以使用单I/O结构的一个字段来表示被投递的操作类型,从而可以确定到底是哪个操作投递到了句柄上。在我们的例子中,OpdrationType字段应设为可以指示读写等操作的值。对单I/O操作数据来说,它最大的优点便是允许我们在同一个句柄上,同时管理多个I/O操作(读写,多个读写操作等等)。
    Windows完成端口的一个重要方面是,所有重叠操作可确保按照应用程序安排好的顺序执行。然而,不能确保从完成端口返回的完成通知也按上述顺序执行。
    设计一个工作器线程,令其使用单句柄数据和单I/O操作数据为I/O请求提供服务:
    DWORD WINAPI ServerWorkerThread(LPVOID lpParam) 
    { 
        HANDLE CompletionPort = (HANDLE)lpParam; 
        DWORD BytesTransferred; 
        LPOVERLAPPED Overlapped; 
        LPPER_HANDLE_DATA PerHandleData; 
        LPPER_IO_DATA PerIoData; 
        DWORD SendBytes, RecvBytes; 
        DWORD Flags; 
        while(TRUE) 
        { 
            //等待和完成端口关联的任意套接字上的I/O完成 
            ret = GetQueuedCompletionStauts(CompletionPort, 
                                            &BytesTransferred, 
                                            (LPWORD)&PerHandleData, 
                                            (LPOVERLAPPED*)&PerIoData, 
                                            INFINITE); 
            //先检查一下,看是否在套接字上发生错误; 
            //如果发生了,关闭套接字,并清除和这个套接字关联的单句柄数据和单I/O操作数据 
            if(BytesTransferred==0 && 
               (PerIoData->OperationType == RECV_POSTED || PerIoData->OperationType == SEND_POSTED)) 
            { 
                //BytesTransferred为0时,表明套接字已被通信对方关闭,因此我们也要关闭套接字 
                //注意:单句柄数据用来引用和I/O关联的套接字 
                closesocket(PerHandleData->Socket); 
                GlobalFree(PerHandleData); 
                GlobalFree(PerIoData); 
                continue;  
            }  
            //为完成的I/O请求提供服务。可以通过查看单I/O操作数据中包含的 OperationType字段, 
            //来确定刚完成的是哪个I/O请求 
            if(PerIoData->OperationType == RECV_POSTED)  
            { 
                //对PerIoData->Buffer中接收到的数据施加某种操作  
            } 
             
            //投递另外一个WSASend或WSARecv操作 
            //这里只投递一个WSARecv操作 
            Flags = 0; 
             
            //为下一个重叠调用建立单I/O操作数据 
            ZeroMemory(&(PerIoData->Overlapped),sizeof(OVERLAPPED)); 
            PerIoData->DataBuf.len = DATA_BUFSIZE; 
            PerIoData->DataBuf.buf = PerIoData->Buffer; 
            PerIoData->OperationType = RECV_POSTED; 
            WSARecv(PerHandleData->Socket,  
                    &(PerIoData->DataBuf), 
                    1, 
                    &RecvBytes, 
                    &Flags, 
                    &(PerIoData->Overlapped), 
                    NULL); 
        } 
    } 
    对于一个给定的重叠操作,如果发生错误,则GetQueuedCompletionStatus将返回FALSE,因为完成端口是Windows采用的一种I/O构造机制,所有,如果调用GetLastError或WSAGetLastError,则错误代码及可能是一个Windows错误代码,而非Winsock错误。要想得到winsock错误代码,可以在指定了套接字句柄和结构WSAOVERLAPPED的情况下,对已完成的操作调用WSAGetOverlappedResult,之后WSAGetLastError将返回转换后的Winsock错误代码。
    最后要注意一处细节,是如何正确关闭I/O完成端口--特别是同时运行一个或多个线程,在几个不同的套接字上执行I/O操作时。要注意的一个主要问题是,在进行重叠I/O操作时,应避免强行释放OVERLAPPED结构。要想不出现这种情况,最好的办法是针对每个套接字句柄,调用closesocket函数,则任何尚未进行的重叠I/O操作都会完成。一旦所有套接字句柄都已关闭,便须在完成端口上终止所有工作器线程的运行。要想做到这一点可以使用PostQueuedCompletionStatus函数,向每个工作器线程都发送一个特殊的完成数据包。该函数会提示每个线程立即结束并推出:
    BOOL PostQueuedCompletionStatus( 
      HANDLE CompletionPort, 
      DWORD dwNumberOfBytesTransferred, 
      ULONG_PTR dwCompletionKey, 
      LPOVERLAPPED lpOverlapped 
    ); 
    CompletionPort参数指明程序想向其发送一个完成数据包的完成端口对象。而就dwNumberOfBytesTransferred,dwCompletionKey,lpOverlapped这3个参数来说,每一个都允许指定一个值,直接传递给GetQueuedCompletionStatus函数中对应的参数,这样,根据参数,决定何时退出。 
     
    =========================================================================
    #include<stdio.h> 
    #include<winsow2.h> 
    #pragma comment(lib, "ws2_32.lib") 
     
    #define PORT 5050 
    #define MSGSIE 1024 
     
    typedef enum 
    { 
        RECV_POSTED 
    }OPERATION_TYPE; 
     
    typedef struct 
    { 
        OVERLAPPED overlap; 
        WSABUF     Buffer; 
        char       szMessage[MSGSIZE]; 
        DWORD      NumberOfBytesRecvd; 
        DWORD      Flags; 
        OPERATION_TYPE OpetationType; 
    }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA; 
     
    DWORD WINAPI WorkerThread(LPVOID lpParam); 
     
    int main() 
    { 
        WSADATA wsaData; 
        SOCKET sListen, sClient; 
        SOCKADDR_IN local, client; 
        DWORD i, dwThreadId; 
        int iAddrSize = sizeof(SOCKADDR_IN); 
        HANDLE CompletionPort = INVALID_HANDLE_VALUE; 
        SYSTEM_INFO sysinfo; 
        LPPER_IO_OPERATION_DATA lpPerIoData = NULL; 
         
        WSAStartup(MAKEWORD(2,2), &wsaData); 
        CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); 
        GetSystemInfo(&sysinfo); 
        for(i = 0; i<sysinfo.dwNumberOfProcessors; i++) 
        { 
            CreateThread(NULL,0,WorkerThread,CompletionPort,0,&dwThreadId); 
        } 
         
        sListen = socket(AF_INET,SOCK_STREAM,0); 
        memset(&local,0,sizeof(SOCKADDR_IN)); 
        local.sin_family = AF_INET; 
        local.sin_port = htons(PORT); 
        local.sin_addr.s_addr = htonl(INADDR_ANY); 
        bind(sListen,(SOCKADDR*)&local,sizeof(SOCKADDR_IN)); 
        listen(sListen, 5); 
        while(TRUE) 
        { 
            sCient = accept(sListen,(SOCKADDR*)&client,&iAddrSize); 
            printf("Accept Client:%s:%d
    ",inet_ntoa(client.sin_addr),ntohs(client.sin_port)); 
            CreateIoCompletionPort((HANDLE)sClient,CompletionPort,(DWORD)sClient,0); 
            lpPetIoData=(LPPER_IO_OPERATION_DATA)HeapAlloc( 
                                  GetProcessHeap(), 
                                  HEAP_ZERO_MEMORY, 
                                  sizeof(PER_IO_OPERATION_DATA)); 
            lpPerIoData->Buffer.len = MSGSIZE; 
            lpPerIoData->Buffer.buf = lpPerIoData->szMessage; 
            lpPerIoData->OpetationType = RECV_POSTED; 
            WSARecv(sClient, 
                    &lpPerIoData->Buffer, 
                    1, 
                    &lpPerIoData->NumberOfBytesRecvd, 
                    &lpPerIoData->Flags, 
                    &lpPerIoData->overlap, 
                    NULL); 
        } 
         
        PostQueuedCompletionStauts(CompletionPort,0xFFFFFFFF,0,NULL); 
        CloseHandle(CompletionPort); 
        closesocket(sListen); 
        WSACleanup(); 
        return 0;     
    } 
     
    DWORD WINAPI WorkerThread(LPVOID lpParam) 
    { 
        HANDLE CompletionPort = (HANDLE)lpParam; 
        DWORD dwBytesTransferred; 
        SOCKET sClient; 
        LPPER_IO_OPERATION_DATA lpPerIoData = NULL; 
        while(TRUE) 
        { 
            GetQueuedCompletionStatus(CompletionPort, 
                                      &dwBytesTransferred, 
                                      (DWORD*)sClient, 
                                      (LPOVERLAPPED*)&lpPerIoData, 
                                      INFINITE); 
            if(dwBytesTransferred==0xFFFFFFFF) 
            { 
                return 0; 
            } 
            if(lpPerIoData->OpetationType==RECV_POSTED) 
            { 
                if(dwBytesTransferred==0) 
                { 
                    closesocket(sClient); 
                    HeapFree(GetProcessHeap(),0,lpPerIoData); 
                } 
                else 
                { 
                    lpPerIoData->szMessage[dwBytesTransferred]=''; 
                    send(sClient,lpPerIoData->szMessage,dwBytesTransferred,0); 
                     
                    memset(lpPerIoData,0,sizeof(PER_IO_OPERATION_DATA)); 
                    lpPerIoData->Buffer.len = MSGSIZE; 
                    lpPerIoData->Buffer.buf = lpPerIoData->szMessage; 
                    lpPerIoData->OpetationType = RECV_POSTED; 
                    WSARecv(sClient, 
                            &lpPerIoData->Buffer, 
                            1, 
                            &lpPerIoData->NumberOfBytesRecvd, 
                            &lpPerIoData->Flags, 
                            &lpPerIoData->overlap, 
                            NULL); 
                } 
            } 
        } 
        return 0; 
    } 
    服务器端得主要流程: 
    1.创建完成端口对象 
    2.创建工作者线程(这里工作者线程的数量是按照CPU的个数来决定的,这样可以达到最佳性能) 
    3.创建监听套接字,绑定,监听,然后程序进入循环 
    4.在循环中,我做了以下几件事情: 
    (1).接受一个客户端连接 
    (2).将该客户端套接字与完成端口绑定到一起(还是调用CreateIoCompletionPort,但这次的作用不同),注意,按道理来讲,此时传递给CreateIoCompletionPort的第三个参数应该是一个完成键,一般来讲,程序都是传递一个单句柄数据结构的地址,该单句柄数据包含了和该客户端连接有关的信息,由于我们只关心套接字句柄,所以直接将套接字句柄作为完成键传递; 
    (3).触发一个WSARecv异步调用,这次又用到了“尾随数据”,使接收数据所用的缓冲区紧跟在WSAOVERLAPPED对象之后,此外,还有操作类型等重要信息。 
     
    在工作者线程的循环中,我们 
    1.调用GetQueuedCompletionStatus取得本次I/O的相关信息(例如套接字句柄、传送的字节数、单I/O数据结构的地址等等) 
    2.通过单I/O数据结构找到接收数据缓冲区,然后将数据原封不动的发送到客户端 
    3.再次触发一个WSARecv异步操作 
  • 相关阅读:
    开源数据汇集工具
    scrapy定时执行抓取任务
    xpath的常见操作
    ubuntu 安装python mysqldb
    sudo: /etc/sudoers is owned by uid 755, should be 0
    ubuntu 14.04安装mysql数据库
    win7 远程桌面连接centos 6.5
    本地启动spark-shell
    ubuntu 安装 2.10.x版本的scala
    unfolding maps支持中文
  • 原文地址:https://www.cnblogs.com/profession/p/4464416.html
Copyright © 2011-2022 走看看