zoukankan      html  css  js  c++  java
  • 重叠IO与IOCP

    (一) IO模型

    I/O设备处理方式一般有两种 同步和异步

    同步阻塞:这种方式就一直读写IO直道操作完成或者失败。

    异步IO:使用overlapped I/O。overlapped I/O是WIN32的一项技术,你可以要求操作系统为你传送数据,并且在传送完毕时通知你。

    (二)使用overlapped I/O: 

    先来看看OVERLAPPED 结构体有两种定义: 

    typedef struct _OVERLAPPED 
    { 
      DWORD Internal; 
      DWORD InternalHigh; 
      DWORD Offset; 
      DWORD OffsetHigh; 
      HANDLE hEvent; 
    }OVERLAPPED
    
    typedef struct _OVERLAPPED 
    {
        ULONG_PTR Internal; //操作系统保留,指出一个和系统相关的状态
        ULONG_PTR InternalHigh; //指出发送或接收的数据长度
        union
        {
            struct
            {
                 DWORD Offset; //文件传送的字节偏移量的低位字
                 DWORD OffsetHigh; //文件传送的字节偏移量的高位字
            };
            PVOID Pointer; //指针,指向文件传送位置
        };
        HANDLE hEvent; //指定一个I/O操作完成后触发的事件
    } OVERLAPPED, *LPOVERLAPPED;

      1> 进行I/O操作时,指定overlapped方式使用CreateFile (),将其第6个参数指定为FILE_FLAG_OVERLAPPED,就是准备使用overlapped的方式构造或打开文件;

      2> 如果采用 overlapped,那么ReadFile()、WriteFile()的第5个参数必须提供一个指针,指向一个OVERLAPPED结构。 OVERLAPPED用于记录了当前正在操作的文件一些相关信息。 详细的看下面代码例子:  

    int main()
    {
        BOOL rc;
        HANDLE hFile;
        DWORD numread;
        OVERLAPPED overlap;
        char buf[512];
        char szPath=”c:\xxxxxxxx”;
        hFile = CreateFile( szPath,
                        GENERIC_READ,
                        FILE_SHARE_READ|FILE_SHARE_WRITE,
                        NULL,
                        OPEN_EXISTING,
                        FILE_FLAG_OVERLAPPED, // 以overlapped打开文件
                        NULL
                    );
    
        // OVERLAPPED结构实始化为0
        memset(&overlap, 0, sizeof(overlap));
        
        //指定文件位置是1500;
        overlap.Offset = 1500;    
        rc = ReadFile(hFile,buf,300,&numread,&overlap);
        //因为是overlapped操作,ReadFile会将读文件请求放入读队列之后立即返回(false),而不会等到文件读完才返回(true)
        if (rc)
        {
    
            //…………此处即得到数据了。
           //文件真是被读完了,rc为true
           // 或当数据被放入cache中,或操作系统认为它可以很快速地取得数据,rc为true
        }
        else
        {
            if (GetLastError() == ERROR_IO_PENDING)
            {//当错误是ERROR_IO_PENDING,那意味着读文件的操作还在进行中
             //等候,直到文件读完
                WaitForSingleObject(hFile, INFINITE);
                rc = GetOverlappedResult(hFile,&overlap,&numread,FALSE);
                //上面二条语句完成的功能与下面一条语句的功能等价:一只阻塞等到得到数据才继续下面。
                // GetOverlappedResult(hFile,&overlap,&numread,TRUE);
             }
             else
             {
                //出错了
            }
        }
        CloseHandle(hFile);
        return EXIT_SUCCESS;
    }

    注意:

      如果多个操作同时访问同一 操作i/o,这时我们可以使用OVERLAPPED结构体中的event字段。这个envent事件必须是手动方式。在转移完成时处理一个事件设置为有信号状态,调用进程集这个成员在调用ReadFile、 WriteFile、TransactNamedPipe、 ConnectNamedPipe函数之后事件会设置成武信号状态,当完成I/O操作之后信号有设置为由信号状态。

    (三) socket IOCP模型

      socket通信也是一种I/O操作,既然文件I0有异步模型,那么socket也有异步模型,sokcet的异步操作模型由很多种,我们这里重点说明IOCP也叫完成端口。

    使用普通socket通信和iocp通信流程图如下, 此图来自于博客http://blog.csdn.net/neicole/article/details/7549497

      

    //filename Iocp.h
    #pragma once
    #include <WinSock2.h>
    #include <stdio.h>
    #include <process.h>
    
    #define  IP_SIZE  32        //ip地址长度
    #define  BUFFER_SIZE 1024
    
    
    
    enum SOCKET_STATE
    {
        ACCEPT = 1,
        SEND,
        RECV
    };
    /*传送给处理函数的参数*/
    typedef struct tagPleData
    {
        SOCKET sSocket;
        CHAR szClientIP[IP_SIZE];
        UINT  uiClientPort;
        /*
        其他信息
        */
    }PLEDATA, * LPPLEDATA;
    
    typedef struct tagIOData
    {
        OVERLAPPED oOverlapped;
        WSABUF wsBuffer;
        CHAR szBuffer[BUFFER_SIZE];
        DWORD dSend;
        DWORD dRecv;
        SOCKET_STATE sState;
    }IOData, *LPIOData;
    
    typedef void (*ReadProc)(LPPLEDATA lpData,  CHAR * RecvData);
    
    class Iocp
    {
    public:
        Iocp(const CHAR * host, UINT port);
        ~Iocp(void);
        
        static VOID ServerWorkThread( VOID * _this );   //监听完成端口线程
        VOID SetReadProc(VOID * lprFun);                //设置读取回掉函数
        bool ListenEx(UINT backlog);
        static VOID AcceptEx(VOID  * _this);
    
    
    public:
        
        ReadProc lpFun;             //读取回调函数
        HANDLE h_ComPlePort;        //完成端口句柄
    
        bool bIsListen;                 //是否是服务端socket      
        SOCKADDR_IN m_SockAddr;         //socket地址
        SOCKET  m_ListenSocketID;       //socket 
        CHAR m_Host[IP_SIZE];           //连接socketIp 
        UINT m_Port;                    //连接IP 
    };
    //filenameIocp.cpp
    
    #include "Iocp.h"
    
    Iocp::Iocp(const CHAR * host, UINT port)
    {
        /*协商套接字版本*/
        WSADATA wsaData;
        DWORD dwRet = WSAStartup( 0x0202, &wsaData );
        if (0 != dwRet )
        {
            WSACleanup();
            throw 1;
        }
    
        m_ListenSocketID = INVALID_SOCKET ;
        memset( &m_SockAddr, 0, sizeof(SOCKADDR_IN) ) ;
        memset( m_Host, 0, IP_SIZE ) ;
        m_Port = 0 ;
        SYSTEM_INFO mySysInfo;
        GetSystemInfo( &mySysInfo );
        iThreadNums = mySysInfo.dwNumberOfProcessors * 2 + 1;
    
        BOOL ret = FALSE ;
        bIsListen = TRUE;
        strncpy_s(m_Host,  host, IP_SIZE - 1);
        m_SockAddr.sin_family = AF_INET;
        m_SockAddr.sin_addr.s_addr =inet_addr(host);
        m_SockAddr.sin_port = htons(port);
    
        /*创建监听套接字*/
    
        m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED );
    
        if( m_ListenSocketID== INVALID_SOCKET )
        {
            throw 1;
        }
    
        /*设置套接字选项*/
        CHAR opt = 1;
        ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) );
        if( ret != 0 )
        {
            throw 1 ;
        }
    
        /*绑定套接字*/
        if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&m_SockAddr, sizeof(struct sockaddr)))
        {
            throw 1 ;
        }
    
        /*创建完成端口*/
        h_ComPlePort  = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
        if ( h_ComPlePort == NULL )
        {
            throw 1 ;
        }
        for ( DWORD i = 0; i < ( mySysInfo.dwNumberOfProcessors * 2 + 1 ); ++i )
        {
            _beginthread(Iocp::ServerWorkThread,  0,  (VOID *)this);
        }
    
    }
    
    
    Iocp::~Iocp(void)
    {
        WSACleanup();
    }
    
    
    /*************************************************
    Function:AcceptEx
    Description:接受套接字的线程函数
    Input:
    Output:
    Others: 
    *************************************************/
    
    
    VOID Iocp::AcceptEx(VOID  * _this)
    {
        SOCKET acSocket;
        DWORD dwRecvBytes;
        Iocp * pTemp = (Iocp *)_this;
        SOCKADDR_IN sAddr;
        INT uiClientSize = sizeof(sAddr);
        //struct socketaddrin
        while (TRUE)
        {
            int x = 6;
            acSocket = WSAAccept( pTemp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 );
            if ( acSocket == SOCKET_ERROR )
            {
                return;
            }
    
            LPPLEDATA lpSocketData = (LPPLEDATA)malloc(sizeof(PLEDATA));
            if ( NULL == lpSocketData )
            {
                return;
            }
    
            lpSocketData->sSocket = acSocket;
           sprintf(lpSocketData->szClientIP, inet_ntoa(sAddr.sin_addr));
           lpSocketData->uiClientPort = sAddr.sin_port;
           
           //将连接socket 加入完成端口中
            if ( CreateIoCompletionPort( (HANDLE)acSocket, pTemp->h_ComPlePort, (ULONG_PTR)lpSocketData, 0 ) == NULL )
            {
                return;
            }
    
            /*这里停止监听会有问题*/
    
            if (pTemp->bIsListen = FALSE)
            {
                break;
            }
            LPIOData lpIoData = (LPIOData )malloc(sizeof(IOData));
            if ( lpIoData == NULL )
            {
                return;
            }
    
    #pragma region 投递线程事件
    
            ZeroMemory( &( lpIoData->oOverlapped ), sizeof( lpIoData->oOverlapped) );
            lpIoData->dSend = 0;
            lpIoData->dRecv = 0;
            lpIoData->wsBuffer.len = BUFFER_SIZE;
            lpIoData->wsBuffer.buf = lpIoData->szBuffer;
            lpIoData->sState = SEND;
    
            DWORD flags = 0;
            if ( WSARecv(acSocket, &(lpIoData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
            {
                if ( WSAGetLastError() != ERROR_IO_PENDING )
                {
                    return;
                }
                else
                {
                    //return;
                    printf("ERROR_IO_PENDING:ok
    ");
                }
            }
    #pragma endregion 投递线程事件
        }
    }
    
    /*************************************************
    Function:ListenEx
    Description:监听函数
    Input:
    Output:
    Others: 
    *************************************************/
    
    BOOL Iocp::ListenEx(UINT backlog)
    {
        if (SOCKET_ERROR == listen(m_ListenSocketID, backlog))
        {
            return FALSE;
        }
        /*创建监听线程*/
        if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this))
        {
            return FALSE;
        }
        return TRUE;
    }
    
    /*************************************************
    Function:ServerWorkThread
    Description:端口上的工作线程
    Input:
    Output:
    Others: 
    *************************************************/
    
    VOID Iocp:: ServerWorkThread( VOID * _this )
    {
        Iocp * lpTemp = (Iocp *)_this;
        HANDLE hPlePort  = (HANDLE)lpTemp->h_ComPlePort;
        DWORD dwBytes;
        LPPLEDATA lpPleData = NULL;
        LPIOData lpIoData = NULL;
        DWORD sendBytes = 0;
        DWORD recvBytes = 0;
        DWORD dwFlag = 0;
        while (TRUE)
        {
            int x = 89;
            if ( GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpPleData, (LPOVERLAPPED *)&lpIoData, INFINITE ) == 0 )
            {
                return ;
            }
            if ( dwBytes == 0 || NULL == lpIoData)
            {
                printf("there is a socket away
    ");
                free( lpPleData );
                free( lpIoData );
                continue;
            }
            else
            {
    
    #pragma region 接受到数据
    
                lpIoData->dRecv = dwBytes;
                lpIoData->szBuffer[lpIoData->dRecv] = 0;
                //printf("ServerWorkThread:R[%s]
    ", lpIoData->szBuffer);
                lpTemp->lpFun(lpPleData, lpIoData->szBuffer);
    
    #pragma endregion 接受到数据
    
    #pragma region 再次投递
                lpIoData->dRecv = 0;
                ZeroMemory( &(lpIoData->oOverlapped), sizeof( OVERLAPPED ) );
                lpIoData->wsBuffer.len = BUFFER_SIZE;
                lpIoData->wsBuffer.buf = lpIoData->szBuffer;
    
                if ( WSARecv( lpPleData->sSocket, &(lpIoData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR )
                {
                    if ( WSAGetLastError() != ERROR_IO_PENDING )
                    {
                        return ;
                    }
                }
    #pragma endregion 再次投递
            }    
        }
    }
    
    VOID Iocp::SetReadProc(VOID * lprFun)
    {
        lpFun  = (ReadProc)lprFun;
    }
    //finename main.cpp
    #include <iostream>
    #include "Iocp.h"
    
    using namespace std;
    
    #pragma comment( lib, "Ws2_32.lib" )
    //客户端的发送的数据会在这个函数通知
    void OnRead(LPPLEDATA lpData, CHAR * lpRecvData)
    {
        SOCKET sSock = lpData->sSocket;
        printf("socket:IP[%s:%d] send data[%s]
    ",lpData->szClientIP, lpData->uiClientPort, lpRecvData);
    }
    
    void main()
    {
    
        Iocp server("127.0.0.1",  20000);
        server.SetReadProc((VOID *)OnRead);
        server.ListenEx(10);
    }

      

        

  • 相关阅读:
    HTTP 错误 404.13
    C# 文件操作(全部) 追加、拷贝、删除、移动文件、创建目录 修改文件名、文件夹名
    设计模式---装饰模式(Decorator)
    设计模式---订阅发布模式(Subscribe/Publish)
    Merge into 详细介绍
    优化案例--多语句表值函数的影响
    常用脚本--Kill所有连接到指定数据库上的回话
    常用脚本--查看当前锁信息
    常用脚本--查看死锁和阻塞usp_who_lock
    常用脚本--在线重建或重整实例下所有索引
  • 原文地址:https://www.cnblogs.com/wolfrickwang/p/3301205.html
Copyright © 2011-2022 走看看