zoukankan      html  css  js  c++  java
  • 重叠IO与IOCP

    (一) IO模型

    I/O设备处理方式一般有两种 同步和异步

    同步阻塞:这种方式就一直读写IO直道操作完成或者失败。

    异步IO:使用overlapped I/O。overlapped I/O是WIN32的一项技术,你可以要求操作系统为你传送数据,并且在传送完毕时通知你。

    (二)使用overlapped I/O: 

    先来看看OVERLAPPED 结构体有两种定义: 

    typedef struct _OVERLAPPED 
    { 
      DWORD Internal; 
      DWORD InternalHigh; 
      DWORD Offset; 
      DWORD OffsetHigh; 
      HANDLE hEvent; 
    }OVERLAPPED
    
    typedef struct _OVERLAPPED 
    {
        ULONG_PTR Internal; //操作系统保留,指出一个和系统相关的状态
        ULONG_PTR InternalHigh; //指出发送或接收的数据长度
        union
        {
            struct
            {
                 DWORD Offset; //文件传送的字节偏移量的低位字
                 DWORD OffsetHigh; //文件传送的字节偏移量的高位字
            };
            PVOID Pointer; //指针,指向文件传送位置
        };
        HANDLE hEvent; //指定一个I/O操作完成后触发的事件
    } OVERLAPPED, *LPOVERLAPPED;

      1> 进行I/O操作时,指定overlapped方式使用CreateFile (),将其第6个参数指定为FILE_FLAG_OVERLAPPED,就是准备使用overlapped的方式构造或打开文件;

      2> 如果采用 overlapped,那么ReadFile()、WriteFile()的第5个参数必须提供一个指针,指向一个OVERLAPPED结构。 OVERLAPPED用于记录了当前正在操作的文件一些相关信息。 详细的看下面代码例子:  

    int main()
    {
        BOOL rc;
        HANDLE hFile;
        DWORD numread;
        OVERLAPPED overlap;
        char buf[512];
        char szPath=”c:\xxxxxxxx”;
        hFile = CreateFile( szPath,
                        GENERIC_READ,
                        FILE_SHARE_READ|FILE_SHARE_WRITE,
                        NULL,
                        OPEN_EXISTING,
                        FILE_FLAG_OVERLAPPED, // 以overlapped打开文件
                        NULL
                    );
    
        // OVERLAPPED结构实始化为0
        memset(&overlap, 0, sizeof(overlap));
        
        //指定文件位置是1500;
        overlap.Offset = 1500;    
        rc = ReadFile(hFile,buf,300,&numread,&overlap);
        //因为是overlapped操作,ReadFile会将读文件请求放入读队列之后立即返回(false),而不会等到文件读完才返回(true)
        if (rc)
        {
    
            //…………此处即得到数据了。
           //文件真是被读完了,rc为true
           // 或当数据被放入cache中,或操作系统认为它可以很快速地取得数据,rc为true
        }
        else
        {
            if (GetLastError() == ERROR_IO_PENDING)
            {//当错误是ERROR_IO_PENDING,那意味着读文件的操作还在进行中
             //等候,直到文件读完
                WaitForSingleObject(hFile, INFINITE);
                rc = GetOverlappedResult(hFile,&overlap,&numread,FALSE);
                //上面二条语句完成的功能与下面一条语句的功能等价:一只阻塞等到得到数据才继续下面。
                // GetOverlappedResult(hFile,&overlap,&numread,TRUE);
             }
             else
             {
                //出错了
            }
        }
        CloseHandle(hFile);
        return EXIT_SUCCESS;
    }

    注意:

      如果多个操作同时访问同一 操作i/o,这时我们可以使用OVERLAPPED结构体中的event字段。这个envent事件必须是手动方式。在转移完成时处理一个事件设置为有信号状态,调用进程集这个成员在调用ReadFile、 WriteFile、TransactNamedPipe、 ConnectNamedPipe函数之后事件会设置成武信号状态,当完成I/O操作之后信号有设置为由信号状态。

    (三) socket IOCP模型

      socket通信也是一种I/O操作,既然文件I0有异步模型,那么socket也有异步模型,sokcet的异步操作模型由很多种,我们这里重点说明IOCP也叫完成端口。

    使用普通socket通信和iocp通信流程图如下, 此图来自于博客http://blog.csdn.net/neicole/article/details/7549497

      

    //filename Iocp.h
    #pragma once
    #include <WinSock2.h>
    #include <stdio.h>
    #include <process.h>
    
    #define  IP_SIZE  32        //ip地址长度
    #define  BUFFER_SIZE 1024
    
    
    
    enum SOCKET_STATE
    {
        ACCEPT = 1,
        SEND,
        RECV
    };
    /*传送给处理函数的参数*/
    typedef struct tagPleData
    {
        SOCKET sSocket;
        CHAR szClientIP[IP_SIZE];
        UINT  uiClientPort;
        /*
        其他信息
        */
    }PLEDATA, * LPPLEDATA;
    
    typedef struct tagIOData
    {
        OVERLAPPED oOverlapped;
        WSABUF wsBuffer;
        CHAR szBuffer[BUFFER_SIZE];
        DWORD dSend;
        DWORD dRecv;
        SOCKET_STATE sState;
    }IOData, *LPIOData;
    
    typedef void (*ReadProc)(LPPLEDATA lpData,  CHAR * RecvData);
    
    class Iocp
    {
    public:
        Iocp(const CHAR * host, UINT port);
        ~Iocp(void);
        
        static VOID ServerWorkThread( VOID * _this );   //监听完成端口线程
        VOID SetReadProc(VOID * lprFun);                //设置读取回掉函数
        bool ListenEx(UINT backlog);
        static VOID AcceptEx(VOID  * _this);
    
    
    public:
        
        ReadProc lpFun;             //读取回调函数
        HANDLE h_ComPlePort;        //完成端口句柄
    
        bool bIsListen;                 //是否是服务端socket      
        SOCKADDR_IN m_SockAddr;         //socket地址
        SOCKET  m_ListenSocketID;       //socket 
        CHAR m_Host[IP_SIZE];           //连接socketIp 
        UINT m_Port;                    //连接IP 
    };
    //filenameIocp.cpp
    
    #include "Iocp.h"
    
    Iocp::Iocp(const CHAR * host, UINT port)
    {
        /*协商套接字版本*/
        WSADATA wsaData;
        DWORD dwRet = WSAStartup( 0x0202, &wsaData );
        if (0 != dwRet )
        {
            WSACleanup();
            throw 1;
        }
    
        m_ListenSocketID = INVALID_SOCKET ;
        memset( &m_SockAddr, 0, sizeof(SOCKADDR_IN) ) ;
        memset( m_Host, 0, IP_SIZE ) ;
        m_Port = 0 ;
        SYSTEM_INFO mySysInfo;
        GetSystemInfo( &mySysInfo );
        iThreadNums = mySysInfo.dwNumberOfProcessors * 2 + 1;
    
        BOOL ret = FALSE ;
        bIsListen = TRUE;
        strncpy_s(m_Host,  host, IP_SIZE - 1);
        m_SockAddr.sin_family = AF_INET;
        m_SockAddr.sin_addr.s_addr =inet_addr(host);
        m_SockAddr.sin_port = htons(port);
    
        /*创建监听套接字*/
    
        m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
    
        if( m_ListenSocketID== INVALID_SOCKET )
        {
            throw 1;
        }
    
        /*设置套接字选项*/
        CHAR opt = 1;
        ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) );
        if( ret != 0 )
        {
            throw 1 ;
        }
    
        /*绑定套接字*/
        if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&m_SockAddr, sizeof(struct sockaddr)))
        {
            throw 1 ;
        }
    
        /*创建完成端口*/
        h_ComPlePort  = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
        if ( h_ComPlePort == NULL )
        {
            throw 1 ;
        }
        for ( DWORD i = 0; i < ( mySysInfo.dwNumberOfProcessors * 2 + 1 ); ++i )
        {
            _beginthread(Iocp::ServerWorkThread,  0,  (VOID *)this);
        }
    
    }
    
    
    Iocp::~Iocp(void)
    {
        WSACleanup();
    }
    
    
    /*************************************************
    Function:AcceptEx
    Description:接受套接字的线程函数
    Input:
    Output:
    Others: 
    *************************************************/
    
    
    VOID Iocp::AcceptEx(VOID  * _this)
    {
        SOCKET acSocket;
        DWORD dwRecvBytes;
        Iocp * pTemp = (Iocp *)_this;
        SOCKADDR_IN sAddr;
        INT uiClientSize = sizeof(sAddr);
        //struct socketaddrin
        while (TRUE)
        {
            int x = 6;
            acSocket = WSAAccept( pTemp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 );
            if ( acSocket == SOCKET_ERROR )
            {
                return;
            }
    
            LPPLEDATA lpSocketData = (LPPLEDATA)malloc(sizeof(PLEDATA));
            if ( NULL == lpSocketData )
            {
                return;
            }
    
            lpSocketData->sSocket = acSocket;
           sprintf(lpSocketData->szClientIP, inet_ntoa(sAddr.sin_addr));
           lpSocketData->uiClientPort = sAddr.sin_port;
           
           //将连接socket 加入完成端口中
            if ( CreateIoCompletionPort( (HANDLE)acSocket, pTemp->h_ComPlePort, (ULONG_PTR)lpSocketData, 0 ) == NULL )
            {
                return;
            }
    
            /*这里停止监听会有问题*/
    
            if (pTemp->bIsListen = FALSE)
            {
                break;
            }
            LPIOData lpIoData = (LPIOData )malloc(sizeof(IOData));
            if ( lpIoData == NULL )
            {
                return;
            }
    
    #pragma region 投递线程事件
    
            ZeroMemory( &( lpIoData->oOverlapped ), sizeof( lpIoData->oOverlapped) );
            lpIoData->dSend = 0;
            lpIoData->dRecv = 0;
            lpIoData->wsBuffer.len = BUFFER_SIZE;
            lpIoData->wsBuffer.buf = lpIoData->szBuffer;
            lpIoData->sState = SEND;
    
            DWORD flags = 0;
            if ( WSARecv(acSocket, &(lpIoData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
            {
                if ( WSAGetLastError() != ERROR_IO_PENDING )
                {
                    return;
                }
                else
                {
                    //return;
                    printf("ERROR_IO_PENDING:ok
    ");
                }
            }
    #pragma endregion 投递线程事件
        }
    }
    
    /*************************************************
    Function:ListenEx
    Description:监听函数
    Input:
    Output:
    Others: 
    *************************************************/
    
    BOOL Iocp::ListenEx(UINT backlog)
    {
        if (SOCKET_ERROR == listen(m_ListenSocketID, backlog))
        {
            return FALSE;
        }
        /*创建监听线程*/
        if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this))
        {
            return FALSE;
        }
        return TRUE;
    }
    
    /*************************************************
    Function:ServerWorkThread
    Description:端口上的工作线程
    Input:
    Output:
    Others: 
    *************************************************/
    
    VOID Iocp:: ServerWorkThread( VOID * _this )
    {
        Iocp * lpTemp = (Iocp *)_this;
        HANDLE hPlePort  = (HANDLE)lpTemp->h_ComPlePort;
        DWORD dwBytes;
        LPPLEDATA lpPleData = NULL;
        LPIOData lpIoData = NULL;
        DWORD sendBytes = 0;
        DWORD recvBytes = 0;
        DWORD dwFlag = 0;
        while (TRUE)
        {
            int x = 89;
            if ( GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpPleData, (LPOVERLAPPED *)&lpIoData, INFINITE ) == 0 )
            {
                return ;
            }
            if ( dwBytes == 0 || NULL == lpIoData)
            {
                printf("there is a socket away
    ");
                free( lpPleData );
                free( lpIoData );
                continue;
            }
            else
            {
    
    #pragma region 接受到数据
    
                lpIoData->dRecv = dwBytes;
                lpIoData->szBuffer[lpIoData->dRecv] = 0;
                //printf("ServerWorkThread:R[%s]
    ", lpIoData->szBuffer);
                lpTemp->lpFun(lpPleData, lpIoData->szBuffer);
    
    #pragma endregion 接受到数据
    
    #pragma region 再次投递
                lpIoData->dRecv = 0;
                ZeroMemory( &(lpIoData->oOverlapped), sizeof( OVERLAPPED ) );
                lpIoData->wsBuffer.len = BUFFER_SIZE;
                lpIoData->wsBuffer.buf = lpIoData->szBuffer;
    
                if ( WSARecv( lpPleData->sSocket, &(lpIoData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
                {
                    if ( WSAGetLastError() != ERROR_IO_PENDING )
                    {
                        return ;
                    }
                }
    #pragma endregion 再次投递
            }    
        }
    }
    
    VOID Iocp::SetReadProc(VOID * lprFun)
    {
        lpFun  = (ReadProc)lprFun;
    }
    //finename main.cpp
    #include <iostream>
    #include "Iocp.h"
    
    using namespace std;
    
    #pragma comment( lib, "Ws2_32.lib" )
    //客户端的发送的数据会在这个函数通知
    void OnRead(LPPLEDATA lpData, CHAR * lpRecvData)
    {
        SOCKET sSock = lpData->sSocket;
        printf("socket:IP[%s:%d] send data[%s]
    ",lpData->szClientIP, lpData->uiClientPort, lpRecvData);
    }
    
    void main()
    {
    
        Iocp server("127.0.0.1",  20000);
        server.SetReadProc((VOID *)OnRead);
        server.ListenEx(10);
    }

      

        

  • 相关阅读:
    【PAT甲级】1043 Is It a Binary Search Tree (25 分)(判断是否为BST的先序遍历并输出后序遍历)
    Educational Codeforces Round 73 (Rated for Div. 2)F(线段树,扫描线)
    【PAT甲级】1042 Shuffling Machine (20 分)
    【PAT甲级】1041 Be Unique (20 分)(多重集)
    【PAT甲级】1040 Longest Symmetric String (25 分)(cin.getline(s,1007))
    【PAT甲级】1039 Course List for Student (25 分)(vector嵌套于map,段错误原因未知)
    Codeforces Round #588 (Div. 2)E(DFS,思维,__gcd,树)
    2017-3-9 SQL server 数据库
    2017-3-8 学生信息展示习题
    2017-3-5 C#基础 函数--递归
  • 原文地址:https://www.cnblogs.com/wolfrickwang/p/3301205.html
Copyright © 2011-2022 走看看