zoukankan      html  css  js  c++  java
  • 重叠I/O之事件通知

       在 Winsock 中,重叠 I/O(Overlapped I/O)模型能达到更佳的系统性能,高于select模型、异步选择和事件选择三种。重叠模型的基本设计原理便是让应用程序使

    用一个重叠的数据结构(WSAOVERLAPPED),一次投递一个或多个 Winsock I/O 请求。针对这些提交的请求,在它们完成之后,我们的应用程序会收到通知,于是

    我们就可以对数据进行处理了。

        要想在一个套接字上使用重叠 I/O 模型,首先必须使用 WSA_FLAG_OVERLAPPED 这个标志,创建一个套接字。例如:

    SOCKET s = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

        注:创建套接字的时候,假如使用的是 socket 函数,那么会默认设置 WSA_FLAG_OVERLAPPED 标志。

        成功建好一个套接字,同时将其与一个本地接口绑定到一起后,便可开始进行重叠 I/O 操作,为了要使用重叠结构,我们常用的 send、recv 等收发数据的函数也都要

    被 WSASend、WSARecv 替换掉了,方法是调用下述的 Winsock 函数,同时指定一个 WSAOVERLAPPED 结构(可选):

        WSASend()

        WSASendTo()

        WSARecv()

        WSARecvFrom()

        WSAIoctl()

        AcceptEx()

        TrnasmitFile()

        WSA_IO_PENDING : 最常见的返回值,这是说明我们的重叠函数调用成功了,但是 I/O 操作还没有完成。

        若随一个 WSAOVERLAPPED 结构一起调用这些函数,函数会立即完成并返回,无论套接字是否设为阻塞模式。那么我们如何来得知我们的 I/O 请求是否成功了呢?

    方法有两个:

        1、事件对象;(有64个socket的限制)

        2、完成例程。

        这两种方法用法差不多,区别即在于我们投递的操作:比如WSARecv操作,完成之后,操作系统以什么样的方式通知我们。这篇随笔主要讲得是事件通知,完成例程

    在另一篇随笔。

       这里只需要注意一点,重叠函数(如:WSARecv)的参数中都有一个 Overlapped 参数,我们可以假设是把我们的WSARecv这样的操作“绑定”到这个重叠结构上,

    提交一个请求,而不是将操作立即完成,其他的事情就交给重叠结构去做,而其中重叠结构又要与Windows的事件对象“绑定”在一起,这样我们调用完 WSARecv 以后

    就可以“坐享其成”,等到重叠操作完成以后,自然会有与之对应的事件来通知我们操作完成,然后我们就可以来根据重叠操作的结果取得我们想要的数据了。

        重叠 I/O 的事件通知方法要求将 Win32事件对象与 WSAOVERLAPPED 结构关联在一起,当 I/O 操作完成后,事件的状态会变成“已传信”状态,即激发态;下面来

    看一下 WSAOVERLAPPED 结构的定义:

    typedef struct _WSAOVERLAPPED {
        DWORD    Internal;
        DWORD    InternalHigh;
        DWORD    Offset;
        DWORD    OffsetHigh;
        WSAEVENT hEvent;
    } WSAOVERLAPPED, FAR * LPWSAOVERLAPPED;

        其中,Internal、InternalHigh、Offset 和 OffsetHigh 字段均由系统在内部使用,不应由应用程序直接进行处理或使用。而另一方面,hEvent 字段有点儿特殊,它

    允许应用程序将一个事件对象句柄同一个套接字关联起来。大家可能会觉得奇怪,如何将一个事件对象句柄分配给该字段呢?正如我们早先在 WSAEventSelect 模型中

    讲述的那样,可用 WSACreateEvent 函数来创建一个事件对象句柄。一旦创建好一个事件句柄,简单地将重叠结构的 hEvent 字段分配给事件句柄,再使用重叠结构,

    调用一个Winsock函数即可,比如 WSASend 或 WSARecv。

        一个重叠 I/O 请求最终完成后,我们的应用程序要负责取回重叠 I/O 操作的结果。一个重叠请求操作最终完成之后,在事件通知方法中,Winsock会更改与一个

    WSAOVERLAPPED 结构对应的一个事件对象的事件传信状态,将其从“未传信”变成“已传信”。由于一个事件对象已分配给 WSAOVERLAPPED 结构,所以只需简单地

    调用 WSAWaitForMultipleEvents 函数,从而判断出一个重叠 I/O 调用在什么时候完成。WSAWaitForMultipleEvents 函数已在事件选择模型中说过,这里不再细

    说。

        发现一次重叠请求完成之后,接着需要调用 WSAGetOverlappedResult(取得重叠结构)函数,判断那个重叠调用到底是成功,还是失败。该函数的定义如下:

    BOOL WSAAPI WSAGetOverlappedResult(
      __in          SOCKET s,
      __in          LPWSAOVERLAPPED lpOverlapped,
      __out         LPDWORD lpcbTransfer,
      __in          BOOL fWait,
      __out         LPDWORD lpdwFlags
    );

        s 参数用于指定在重叠操作开始的时候,与之对应的那个套接字。

        lpOverlapped 参数是一个指针,对应于在重叠操作开始时,指定的那个 WSAOVERLAPPED 结构。

        lpcbTransfer 参数也是一个指针,对应一个DWORD(双字)变量,负责接收一次重叠发送或接收操作实际传输的字节数。

        fWait 参数用于决定函数是否应该等待一次待决(未决)的重叠操作完成。若将 fWait设为 TRUE,那么除非操作完成,否则函数不会返回;若设为FALSE,而且操作

    仍然处于“待决”状态,那么WSAGetOverlappedResult 函数会返回 FALSE值,同时返回一个WSAIOINCOMPLETE(I/O操作未完成)错误。但就我们目前的情况来

    说,由于需要等候重叠操作的一个已传信事件完成,所以该参数无论采用什么设置,都没有任何效果。

        lpdwFlags参数对应于一个指针,指向一个DWORD(双字),负责接收结果标志(假如原先的重叠调用是用WSARecv或WSARecvFrom函数发出的)。

        返回值:若 WSAGetOverlappedResult 函数调用成功,返回值就是TRUE。这意味着我们的重叠 I/O 操作已成功完成,而且由 lpcbTransfer 参数指向的值已进行

    了更新。若返回值是FALSE,那么可能是由下述任何一种原因造成的:

        1、重叠 I/O操 作仍处在“待决”状态。

        2、重叠操作已经完成,但含有错误。

        3、重叠操作的完成状态不可判决,因为在提供给 WSAGetOverlappedResult函数的一个或多个参数中,存在着错误。

        失败后,由 lpcbTransfer 参数指向的值不会进行更新,而且我们的应用程序应调用 WSAGetLastError 函数,调查到底是何种原因造成了调用失败。

        在 Windows NT 和 Windows 2000 中,重叠 I/O 模型也允许应用程序以一种重叠方式,实现对客户端连接的接受。具体的做法是在监听套接字上调用 AcceptEx

    函数。AcceptEx 是一个特殊的 Winsock1.1 扩展函数,位于 Mswsock.h 头文件以及 Mswsock.lib 库文件内。AcceptEx 函数的定义如下:

    BOOL AcceptEx(
        __in SOCKET sListenSocket,
        __in SOCKET sAcceptSocket,
        __in PVOID lpOutputBuffer,
        __in DWORD dwReceiveDataLength,
        __in DWORD dwLocalAddressLength,
        __in DWORD dwRemoteAddressLength,
        __out LPDWORD lpdwBytesReceived,
        __in LPOVERLAPPED lpOverlapped
    );

        sListenSocket 参数指定的是一个监听套接字。

        sAcceptSocket 参数指定的是另一个套接字,负责对进入连接请求的“接受”。AcceptEx 函数和 accept 函数的区别在于,我们必须提供接受的套接字,而不是让函数

    自动为我们创建。正是由于要提供套接字,所以要求我们事先调用 socket 或 WSASocket 函数,创建一个套接字,以便通过 sAcceptSocket 参数,将其传递给

    AcceptEx。
        lpOutputBuffer 参数指定的是一个特殊的缓冲区,因为它要负责三种数据的接收:服务器的本地地址,客户机的远程地址,以及在新建连接上发送的第一个数据块。

        dwReceiveDataLength参数以字节为单位,指定了在 lpOutputBuffer 缓冲区中,保留多大的空间,用于数据的接收。

    如这个参数设为0,那么在连接的接受过程中,不会再一道接收任何数据。

        dwLocalAddressLength 和 dwRemoteAddressLength 参数也是以字节为单位,指定在 lpOutputBuffer 缓冲区中,保留多大的空间,在一个套接字被接受的时

    候,用于本地和远程地址信息的保存。要注意的是,和当前采用的传送协议允许的最大地址长度比较起来,这里指定的缓冲区大小至少应多出16字节。举个例子来说:假

    定正在使用的是 TCP/IP 协议,那么这里的大小应设为“SOCKADDRIN 结构的长度+16字节”。

        lpdwBytesReceived 参数用于返回接收到的实际数据量,以字节为单位。只有在操作以同步方式完成的前提下,才会设置这个参数。假如 AcceptEx 函数返回 ERROR_IO_PENDING,那么这个参数永远都不会设置,我们必须利用完成事件通知机制,获知实际读取的字节量。

        lpOverlapped 参数对应的是一个 OVERLAPPED 结构,允许 AcceptEx 以一种异步方式工作。如我们早先所述,只有在一个重叠 I/O 应用中,该函数才需要使用事

    件对象通知机制,这是由于此时没有一个完成例程参数可供使用。也就是说 AcceptEx 函数只能由本节课给大家讲的“事件通知”方式获取异步 I/O 请求的结果,而“完成例

    程”方法无法被使用。

        重叠 I/O 模型的编程步骤总结如下:

        1、创建一个套接字,开始在指定的端口上监听连接请求;

        2、 接受一个客户端进入的连接请求;

        3、为接受的套接字新建一个 WSAOVERLAPPED 结构,并为该结构分配一个事件对象句柄,同时将该事件对象句柄分配给一个事件数组,以便稍后由

    WSAWaitForMultipleEvents 函数使用。

        4、在套接字上投递一个异步 WSARecv 请求,指定参数为 WSAOVERLAPPED 结构。注意函数通常会以失败告终,返回 SOCKET_ERROR 错误状态

    WSA_IO_PENDING(I/O操作尚未完成);

        5、 使用步骤3)的事件数组,调用 WSAWaitForMultipleEvents 函数,并等待与重叠调用关联在一起的事件进入“已传信”状态(换言之,等待那个事件的“触发”);

        6、 WSAWaitForMultipleEvents 函数返回后,针对“已传信”状态的事件,调用 WSAResetEvent(重设事件)函数,从而重设事件对象,并对完成的重叠请求进行

    处理;

        7、 使用 WSAGetOverlappedResult 函数,判断重叠调用的返回状态是什么;

        8、 在套接字上投递另一个重叠 WSARecv 请求;

        9、 重复步骤5)~8)。

        下面我们来看看界面的演示:

        服务器端:

        

        客户端界面:

        

        客户端与服务器通信的信息:

        

        下面,我们来看看代码如何实现的,初始化socket等步骤我们就不再细说,我们直接来看看重叠IO之事件通知的核心部分。

        1、我们定义几个结构,做好准备工作

    #define BUF_SIZE 4096   //缓冲区大小4k
    
    enum IO_TYPE            //操作类型
    {
        IO_ACCEPT,
        IO_READ,
        IO_WRITE,
        IO_UNKNOWN
    };
    
    struct szOverlapped               //自定义结构,第一个成员必定是WSAOVERLAPPED
    { 
        WSAOVERLAPPED m_overlapped;   //不多说
        SOCKET m_socket;              //接受客户端连接的socket或者是与客户端通信的socket  
        SOCKET m_accSocket;           //客户端连接的socket
        IO_TYPE m_iotype;             //投递操作的类型
        char m_buf[BUF_SIZE];         //接受客户端发送的信息
        szOverlapped()
        {
            ZeroMemory(&m_overlapped, sizeof(WSAOVERLAPPED));
            m_socket = INVALID_SOCKET;
            m_accSocket = INVALID_SOCKET;
            m_overlapped.hEvent = WSACreateEvent();
            m_iotype = IO_UNKNOWN;
            ZeroMemory(m_buf, BUF_SIZE);
        }
    };

        2、启动一个工作线程,几乎所有工作都在这个线程里操作,防止主界面卡死,线程代码如下

    UINT ThreadProc(LPVOID lpParameter)
    {
        COverlapped1Dlg *pDlg = (COverlapped1Dlg*)lpParameter;
        ASSERT(pDlg != NULL);
    
        SOCKET listenSocket = INVALID_SOCKET;
        listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
        if (INVALID_SOCKET == listenSocket)
        {
            return FALSE;
        }
        char ipbuf[1024] = {0};
        wcstombs(ipbuf, pDlg->GetIpAddress(), pDlg->GetIpAddress().GetLength());
        const char *p = ipbuf;
        sockaddr_in serverAddress;
        serverAddress.sin_addr.S_un.S_addr = inet_addr(p);
        serverAddress.sin_family = AF_INET;
        serverAddress.sin_port = htons(pDlg->m_iPort);
    
        if (SOCKET_ERROR == bind(listenSocket, (sockaddr*)&serverAddress, sizeof(sockaddr_in)))
        {
            return FALSE;
        }
    
        listen(listenSocket, SOMAXCONN);
        pDlg->ShowText(_T("系统消息:服务器开始监听。。。"));
        pDlg->PostAccept(listenSocket);
        CString cstrText;
        DWORD cbTransfer;
        DWORD dwFlag;
    
        while (TRUE)
        {
            DWORD Index = WSAWaitForMultipleEvents(pDlg->m_dwTotal, pDlg->m_eventArray, FALSE, 100, FALSE);
            if (WSA_WAIT_TIMEOUT  == Index)
            {
                continue;
            }
            Index = Index - WSA_WAIT_EVENT_0;  //事件通知都是等待事件的发生,然后根据事件去获取究竟是哪个重叠结构和socket,然后根据获取的重叠结构和socket进行相应的操作
            szOverlapped *pOverlapped = pDlg->FindOverlappedByEvent(pDlg->m_eventArray[Index]);
            WSAResetEvent(pDlg->m_eventArray[Index]);
            ASSERT(pOverlapped != NULL);
            if (!WSAGetOverlappedResult(pDlg->FindSocketByEvent(pDlg->m_eventArray[Index]), &(pOverlapped->m_overlapped), &cbTransfer, TRUE, &dwFlag))
            {
                continue;
            }
            switch(pOverlapped->m_iotype)     
            {
            case IO_ACCEPT:                              //客户端接入的socket就是szOverlapped结构的成员m_accSocket,这里必定先投递
                pDlg->PostRecv(pOverlapped->m_accSocket);//这个socket接受数据,然后才在监听socket投递下一accept命令
                pDlg->PostAccept(pOverlapped);
                pDlg->ShowText(_T("系统消息:客户端接入成功。。。"));
                break;
            case IO_READ:
                if (cbTransfer > 0)
                {
                    cstrText.Format(_T("%s"), pOverlapped->m_buf);
                    pDlg->ShowText(_T("Client: >") + cstrText);
                    pDlg->PostRecv(pOverlapped);
                }
                else
                {
                    pDlg->CleanUp(pOverlapped);
                }
                break;
            default:
                break;
            }        
        }
    
        if (listenSocket != INVALID_SOCKET) {
            closesocket(listenSocket);
        }
    
        pDlg->CleanAll();   //清理操作
        WSACleanup();
        return TRUE;
    }

        同步调用和异步调用的区别:同步调用等待客户端接入时会一直卡在那里,直到有客户端接入,而异步调用等待客户端接入就好比发送一个等待客户端接入的命令给操

    作系统,意思就是说你(操作系统)帮我接入一个客户端,客户端接入成功之后你再告诉我,这就是为什么上面的代码中进入循环之前我需要调用PostAccept函数的原因。

    同样的,接受客户端的信息也一样,当收到客户端的信息之后,也要在这个客户端投递下一个recv命令。

        PostAccept()函数与PostRecv()函数,这里我都进行了重载,不过代码都大致一样

    BOOL COverlapped1Dlg::PostAccept(SOCKET sock)
    {
        if (INVALID_SOCKET == sock)
        {
            return FALSE;
        }
        DWORD dwBytesReceived = 0;
        szOverlapped *pItem = new szOverlapped();
        pItem->m_socket = sock;
        pItem->m_iotype = IO_ACCEPT;
        pItem->m_accSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
        if (!AddSzOverlapped(pItem))
        {
            return FALSE;
        }
        m_eventArray[m_dwTotal] = pItem->m_overlapped.hEvent;
        m_dwTotal++;
        if (!AcceptEx(pItem->m_socket, pItem->m_accSocket, pItem->m_buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesReceived, &(pItem->m_overlapped)))
        {
            if (WSAGetLastError() != WSA_IO_PENDING)
            {
                return FALSE;
            }
        }
        return TRUE;
    }
    
    BOOL COverlapped1Dlg::PostAccept(szOverlapped * pOverlapped)
    {
        ASSERT(pOverlapped != NULL);
        DWORD dwBytesReceived;
        pOverlapped->m_iotype = IO_ACCEPT;
        pOverlapped->m_accSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    
        if (AcceptEx(pOverlapped->m_socket, pOverlapped->m_accSocket, pOverlapped->m_buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesReceived, &(pOverlapped->m_overlapped)))
        {
            if (WSAGetLastError() != WSA_IO_PENDING)
            {
                return FALSE;
            }
        }
        return TRUE;
    }
    
    BOOL COverlapped1Dlg::PostRecv(SOCKET sock)
    {
        WSABUF wsabuf = {0};
        szOverlapped *pOverlapped = new szOverlapped();
        pOverlapped->m_socket = sock;
        pOverlapped->m_iotype = IO_READ;
        wsabuf.buf = pOverlapped->m_buf;
        wsabuf.len = BUF_SIZE;
        DWORD numberOfBytesRecvd = 0, flags = 0;
    
        if (!AddSzOverlapped(pOverlapped))
        {
            return FALSE;
        }
        m_eventArray[m_dwTotal] = pOverlapped->m_overlapped.hEvent;
        m_dwTotal++;
        int iRet = WSARecv(pOverlapped->m_socket, &wsabuf, 1, &numberOfBytesRecvd, &flags, &(pOverlapped->m_overlapped), NULL);
        if (NO_ERROR != iRet)
        {
            if (WSA_IO_PENDING != WSAGetLastError())
            {
                return FALSE;
            }
        }
        return TRUE;
    }
    
    BOOL COverlapped1Dlg::PostRecv(szOverlapped * pOverlapped)
    {
        WSABUF wsabuf = {0};
        pOverlapped->m_iotype = IO_READ;
        ZeroMemory(pOverlapped->m_buf, BUF_SIZE);
        wsabuf.buf = pOverlapped->m_buf;
        wsabuf.len = BUF_SIZE;
        DWORD numberOfBytesRecvd = 0, flags = 0;
    
        int iRet = WSARecv(pOverlapped->m_socket, &wsabuf, 1, &numberOfBytesRecvd, &flags, &(pOverlapped->m_overlapped), NULL);
        
        if (NO_ERROR != iRet)
        {
            if (WSA_IO_PENDING != WSAGetLastError())
            {
                return FALSE;
            }
        }
        return TRUE;
    }

        另外一些辅助函数都很简单,一看就明白了,代码如下:

    BOOL COverlapped1Dlg::AddSzOverlapped(szOverlapped * pOverlapped)  //添加szOverlapped结构函数,保存每一个重叠结构和socket
    {
        if (m_AllOverlapped.GetCount() > WSA_MAXIMUM_WAIT_EVENTS)
        {
            closesocket(pOverlapped->m_accSocket);
            delete pOverlapped;
            return FALSE;
        }
        m_AllOverlapped.Add(pOverlapped);
        return TRUE;
    }
    
    szOverlapped* COverlapped1Dlg::FindOverlappedByEvent(WSAEVENT wsaevent)  //通过事件查找重叠结构
    {
        szOverlapped* pItem = NULL;
        for (int i = 0; i < m_AllOverlapped.GetCount(); i++)
        {
            pItem = m_AllOverlapped.GetAt(i);
            if (pItem->m_overlapped.hEvent == wsaevent)
            {
                break;
            }
        }
        return pItem;
    }
    
    SOCKET COverlapped1Dlg::FindSocketByEvent(WSAEVENT wsaevent)     //通过事件查找socket
    {
        for (int i = 0; i < m_AllOverlapped.GetCount(); i++)
        {
            if (m_AllOverlapped.GetAt(i)->m_overlapped.hEvent == wsaevent)
            {
                return m_AllOverlapped.GetAt(i)->m_socket;
            }
        }
        return INVALID_SOCKET;
    }
    
    BOOL COverlapped1Dlg::RemoveEvent(WSAEVENT wsaevent)   //移除事件
    {
        DWORD idx;
        for (int i = 0; m_eventArray[i] != 0; i++)
        {
            if (wsaevent == m_eventArray[i])
            {
                idx = i;
            }
        }
        for (int i = idx; m_eventArray[i] != 0; i++)
        {
            m_eventArray[idx] = m_eventArray[idx + 1];
        }
        m_dwTotal--;
        return TRUE;
    }

         事件通知最多只能接受64个socket的接入,而完成例程就没有这样的限制,我将会在后面给出完成例程的演示。

  • 相关阅读:
    Java多态——代码示例
    使用zabbix监控oracle的后台日志
    使用zabbix监控linux的io
    Oracle
    Oracle
    Percona XtraDB Cluster简易入门
    Oracle
    使用zabbix监控oracle数据库
    Ogg
    Mysql
  • 原文地址:https://www.cnblogs.com/venow/p/2552094.html
Copyright © 2011-2022 走看看