zoukankan      html  css  js  c++  java
  • IOCP模型总结(总结回想)

    IOCP旧代码重提。近期一直在玩其它方面的东东。时不时回想一下,收益多多。

    IOCP(I/O Completion Port,I/O完毕port)是性能最好的一种I/O模型。它是应用程序使用线程池处理异步I/O请求的一种机制。在处理多个并发的异步I/O请求时,以往的模型都是在接收请求是创建一个线程来应答请求。这样就有非常多的线程并行地执行在系统中。

    而这些线程都是可执行的,Windows内核花费大量的时间在进行线程的上下文切换。并没有多少时间花在线程执行上。再加上创建新线程的开销比較大。所以造成了效率的低下。

    调用的过程例如以下:
    抽象出一个完毕port大概的处理流程:
    1:创建一个完毕port。
    2:创建一个线程A。


    3:A线程循环调用GetQueuedCompletionStatus()函数来得到IO操作结果,这个函数是个堵塞函数。


    4:主线程循环里调用accept等待client连接上来。
    5:主线程里accept返回新连接建立以后,把这个新的套接字句柄用CreateIoCompletionPort关联到完毕port,然后发出一个异步的WSASend或者WSARecv调用,由于是异步函数,WSASend/WSARecv会立即返回,实际的发送或者接收数据的操作由WINDOWS系统去做。
    6:主线程继续下一次循环。堵塞在accept这里等待client连接。
    7:WINDOWS系统完毕WSASend或者WSArecv的操作。把结果发到完毕port。
    8:A线程里的GetQueuedCompletionStatus()立即返回,并从完毕port取得刚完毕的WSASend/WSARecv的结果。
    9:在A线程里对这些数据进行处理(假设处理过程非常耗时,须要新开线程处理),然后接着发出WSASend/WSARecv,并继续下一次循环堵塞在GetQueuedCompletionStatus()这里。


    归根究竟概括完毕port模型一句话:
    我们不停地发出异步的WSASend/WSARecv IO操作。详细的IO处理过程由WINDOWS系统完毕,WINDOWS系统完毕实际的IO处理后,把结果送到完毕port上(假设有多个IO都完毕了,那么就在完毕port那里排成一个队列)。

    我们在另外一个线程里从完毕port不断地取出IO操作结果,然后依据须要再发出WSASend/WSARecv IO操作。

    而IOCP模型是事先开好了N个线程。存储在线程池中,让他们hold。

    然后将全部用户的请求都投递到一个完毕port上,然后N个工作线程逐一地从完毕port中取得用户消息并加以处理。

    这样就避免了为每一个用户开一个线程。

    既降低了线程资源。又提高了线程的利用率。

    完毕port模型是如何实现的呢?我们先创建一个完毕port(::CreateIoCompletioPort())。然后再创建一个或多个工作线程,并指定他们到这个完毕port上去读取数据。

    我们再将远程连接的套接字句柄关联到这个完毕port(还是用::CreateIoCompletionPort())。一切就OK了。

    工作线程都干些什么呢?首先是调用::GetQueuedCompletionStatus()函数在关联到这个完毕port上的全部套接字上等待I/O的完毕。再推断完毕了什么类型的I/O。一般来说。有三种类型的I/O。OP_ACCEPT,OP_READ和OP_WIRTE。我们到数据缓冲区内读取数据后,再投递一个或是多个同类型的I/O就可以(::AcceptEx()、::WSARecv()、::WSASend())。

    对读取到的数据,我们能够依照自己的须要来进行对应的处理。

    为此,我们须要一个以OVERLAPPED(重叠I/O)结构为第一个字段的per-I/O数据自己定义结构。

    typedef struct _PER_IO_DATA
    {
             OVERLAPPED ol;       // 重叠I/O结构
             char buf[BUFFER_SIZE];   // 数据缓冲区
             int nOperationType;         //I/O操作类型
    #define OP_READ 1
    #define OP_WRITE 2
    #define OP_ACCEPT 3
    } PER_IO_DATA, *PPER_IO_DATA;

    将一个PER_IO_DATA结构强制转化成一个OVERLAPPED结构传给::GetQueuedCompletionStatus()函数,返回的这个PER_IO_DATA结构的的nOperationType就是I/O操作的类型。当然。这些类型都是在投递I/O请求时自己设置的。

    这样一个IOCPserver的框架就出来了。

    当然,要做一个好的IOCPserver。还有考虑非常多问题,如内存资源管理、接受连接的方法、恶意的客户连接、包的重排序等等。

    以上是个人对于IOCP模型的一些理解与看法。还有待完好。另外各Winsock API的使用方法參见MSDN。


    补充IOCP模型的实现:

    //创建一个完毕port
    HANDLE FCompletPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0,0,0 );

    //接受远程连接,并把这个连接的socket句柄绑定到刚才创建的IOCP上
    AConnect = accept( FListenSock, addr, len);
    CreateIoCompletionPort( AConnect, FCompletPort, NULL, 0 );

    //创建CPU数*2 + 2个线程
    SYSTEM_INFO si;
    GetSystemInfo(&si);
    for (int i=1;si.dwNumberOfProcessors*2+2;i++)
    {
       AThread = TRecvSendThread.Create( false );
       AThread.CompletPort = FCompletPort;//告诉这个线程,你要去这个IOCP去訪问数据
    }

    OK,就这么简单,我们要做的就是建立一个IOCP,把远程连接的socket句柄绑定到刚才创建的IOCP上。最后创建n个线程。并告诉这n个线程到这个IOCP上去訪问数据就能够了。

    再看一下TRecvSendThread线程都干些什么:

    void TRecvSendThread.Execute(...)
    {
       while (!self.Terminated)
       {
         //查询IOCP状态(数据读写操作是否完毕)
         GetQueuedCompletionStatus( CompletPort, BytesTransd, CompletKey, POVERLAPPED(pPerIoDat), TIME_OUT );

         if (BytesTransd !=0) .......
           ....;//数据读写操作完毕

         //再投递一个读数据请求
         WSARecv( CompletKey, &(pPerIoDat->BufData), 1, BytesRecv, Flags, &(pPerIoDat->Overlap), NULL );
       }
    }

    读写线程仅仅是简单地检查IOCP是否完毕了我们投递的读写操作,假设完毕了则再投递一个新的读写请求。


    应该注意到。我们创建的全部TRecvSendThread都在訪问同一个IOCP(由于我们仅仅创建了一个IOCP),而且我们没有使用临界区!

    难道不会产生冲突吗?不用考虑同步问题吗?
    呵呵,这正是IOCP的奥妙所在。IOCP不是一个普通的对象,不须要考虑线程安全问题。它会自己主动调配訪问它的线程:假设某个socket上有一个线程A正在訪问,那么线程B的訪问请求会被分配到另外一个socket。这一切都是由系统自己主动调配的,我们无需过问。

    实例:

    简单实现。适合IOCP入门
    參考:《WINDOWS网络与通信程序设计》

    /******************************************************************
    *
    *
    * 文件名:IOCPHeader.h
    * 摘   要: IOCP定义文件
    *
    * 作    者:Jeson Yang
    * 完毕日期:2009-3-21
    *
    * 代替版本号:
    * 原 作者:
    * 完毕日期:
    *
    ******************************************************************/

    #ifndef _IOCPHEADER_H_20080916_
    #define _IOCPHEADER_H_20080916_

    #include <WINSOCK2.H>
    #include <windows.h>

    #define BUFFER_SIZE 1024

    /******************************************************************
    * per_handle 数据
    *******************************************************************/
    typedef struct _PER_HANDLE_DATA
    {
        SOCKET      s;      // 相应的套接字句柄
        sockaddr_in addr;   // 对方的地址

    }PER_HANDLE_DATA, *PPER_HANDLE_DATA;

    /******************************************************************
    * per_io 数据
    *******************************************************************/
    typedef struct _PER_IO_DATA
    {
        OVERLAPPED ol;                 // 重叠结构
        char        buf[BUFFER_SIZE];   // 数据缓冲区
        int         nOperationType;     // 操作类型

    #define OP_READ   1
    #define OP_WRITE 2
    #define OP_ACCEPT 3

    }PER_IO_DATA, *PPER_IO_DATA;

    #endif

    /******************************************************************
    *
    *
    * 文件名:main.cpp
    * 摘   要: iocp demo
    *
    * 当前版本号:1.0
    * 作    者:Jeson Yang
    * 完毕日期:2009-3-21
    *
    * 代替版本号:
    * 原 作者:
    * 完毕日期:
    *
    ******************************************************************/

    #include <iostream>
    #include <string>
    #include "IOCPHeader.h"
    using namespace std;

    DWORD WINAPI ServerThread( LPVOID lpParam );

    int main( int argc, char *argv[] )
    {
        //////////////////////////////////////////////////////////////////////////  
        WSADATA wsaData;

        if( 0 != WSAStartup( MAKEWORD( 2, 2 ), &wsaData ) )
        {
            printf( "Using %s (Status:%s) ", wsaData.szDescription, wsaData.szSystemStatus );
            printf( "with API versions: %d.%d to %d.%d",
                     LOBYTE( wsaData.wVersion), HIBYTE( wsaData.wVersion ),
                     LOBYTE( wsaData.wHighVersion), HIBYTE( wsaData.wHighVersion) );

            return -1;
        }
        else
        {
            printf("Windows sockets 2.2 startup ");
        }
        //////////////////////////////////////////////////////////////////////////

        int nPort = 20055;

        // 创建完毕port对象
        // 创建工作线程处理完毕port对象的事件
        HANDLE hIocp = ::CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0, 0, 0 );
        ::CreateThread( NULL, 0, ServerThread, (LPVOID)hIocp, 0, 0 );

        // 创建监听套接字。绑定本地port,開始监听
        SOCKET sListen = ::socket( AF_INET,-SOCK_STREAM, 0 );

        SOCKADDR_IN addr;
        addr.sin_family = AF_INET;
        addr.sin_port = ::htons( nPort );
        addr.sin_addr.S_un.S_addr = INADDR_ANY;
        ::bind( sListen, (sockaddr *)&addr, sizeof( addr ) );
        ::listen( sListen, 5 );

        printf( "iocp demo start...... " );

        // 循环处理到来的请求
        while ( TRUE )
        {
            // 等待接受未决的连接请求
            SOCKADDR_IN saRemote;
            int nRemoteLen = sizeof( saRemote );
            SOCKET sRemote = ::accept( sListen, (sockaddr *)&saRemote, &nRemoteLen );

            // 接受到新连接之后,为它创建一个per_handle数据,并将他们关联到完毕port对象
            PPER_HANDLE_DATA pPerHandle = ( PPER_HANDLE_DATA )::GlobalAlloc( GPTR, sizeof( PPER_HANDLE_DATA ) );
            if( pPerHandle == NULL )
            {
                break;
            }

            pPerHandle->s = sRemote;
            memcpy( &pPerHandle->addr, &saRemote, nRemoteLen );

            ::CreateIoCompletionPort( ( HANDLE)pPerHandle->s, hIocp, (DWORD)pPerHandle, 0 );

            // 投递一个接受请求
            PPER_IO_DATA pIoData = ( PPER_IO_DATA )::GlobalAlloc( GPTR, sizeof( PPER_IO_DATA ) );
            if( pIoData == NULL )
            {
                break;
            }

            pIoData->nOperationType = OP_READ;
            WSABUF buf;
            buf.buf = pIoData->buf;
            buf.len = BUFFER_SIZE;
            
            DWORD dwRecv = 0;
            DWORD dwFlags = 0;

            ::WSARecv( pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pIoData->ol, NULL );

        }

        //////////////////////////////////////////////////////////////////////////
    ERROR_PROC:
        WSACleanup();
        //////////////////////////////////////////////////////////////////////////

        return 0;
    }

    /******************************************************************
    * 函数介绍:处理完毕port对象事件的线程
    * 输入參数:
    * 输出參数:
    * 返回值 :
    *******************************************************************/
    DWORD WINAPI ServerThread( LPVOID lpParam )
    {
        HANDLE hIocp = ( HANDLE )lpParam;
        if( hIocp == NULL )
        {
            return -1;
        }

        DWORD dwTrans = 0;
        PPER_HANDLE_DATA pPerHandle;
        PPER_IO_DATA     pPerIo;
        
        while( TRUE )
        {
            // 在关联到此完毕port的全部套接字上等待I/O完毕
            BOOL bRet = ::GetQueuedCompletionStatus( hIocp, &dwTrans, (LPDWORD)&pPerHandle, (LPOVERLAPPED*)&pPerIo, WSA_INFINITE );
            if( !bRet )     // 错误发生
            {
                ::closesocket( pPerHandle->s );
                ::GlobalFree( pPerHandle );
                ::GlobalFree( pPerIo );

                cout << "error" << endl;
                continue;
            }

            // 套接字被对方关闭
            if( dwTrans == 0 && ( pPerIo->nOperationType == OP_READ || pPerIo->nOperationType&nb-sp;== OP_WRITE ) )
            {
                ::closesocket( pPerHandle->s );
                ::GlobalFree( pPerHandle );
                ::GlobalFree( pPerIo );

                cout << "client closed" << endl;
                continue;
            }

            switch ( pPerIo->nOperationType )
            {
            case OP_READ:       // 完毕一个接收请求
                {
                    pPerIo->buf[dwTrans] = '';
                    printf( "%s ", pPerIo->buf );

                    // 继续投递接受操作
                    WSABUF buf;
                    buf.buf = pPerIo->buf;
                    buf.len = BUFFER_SIZE;
                    pPerIo->nOperationType = OP_READ;
                    
                    DWORD dwRecv = 0;
                    DWORD dwFlags = 0;
                    
                    ::WSARecv( pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIo->ol, NULL );

                }
                break;
            case OP_WRITE:
            case OP_ACCEPT:
                break;

            }

        }

        return 0;
    }

  • 相关阅读:
    SSL JudgeOnline 1194——最佳乘车
    SSL JudgeOnline 1457——翻币问题
    SSL JudgeOnlie 2324——细胞问题
    SSL JudgeOnline 1456——骑士旅行
    SSL JudgeOnline 1455——电子老鼠闯迷宫
    SSL JudgeOnline 2253——新型计算器
    SSL JudgeOnline 1198——求逆序对数
    SSL JudgeOnline 1099——USACO 1.4 母亲的牛奶
    SSL JudgeOnline 1668——小车载人问题
    SSL JudgeOnline 1089——USACO 1.2 方块转换
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7137164.html
Copyright © 2011-2022 走看看