zoukankan      html  css  js  c++  java
  • IO处理线程

    客户IO处理,是在工作线程,_WorkerThreadProc中完成的

    函数,在完成端口上调用GetQueuedCompletionStatus函数等待IO完成,并调用自定义函数HandleIO来处理IO,具体代码如下:

    DOWRD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
    {
        #ifdef _DEBUG
            ::OutputDebugString("Worker Thread startup...\n");
        #endif //_DEBUG
    
        CIOCPServer *pThis = (CIOCPServer*)lpParam;
        CIOCPBuffer *pBuffer;
        DWORD dwKey;
        DWORD dwTrans;
        LPOVERLAPPED lpol;
    
        while(TRUE)
        {
            //在关联到此完成端口的所有套接字上等待IO完成
            BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion,&dwTrans,(LPDWORD)&dwKey,(LPOVERLAPPED)&lpol,WSA_INFINITE);
    
            if(dwTrans == -1)
            {
                #ifdef _DEBUG
                    ::OutputDebugString("Worker Thread startup...\n");
                #endif //_DEBUG
                    
                ::ExitThread(0);
            }
            
            pBuffer=CONTAINING_RECORD(lpol,CIOCPBuffer,ol);
            int nError = NO_ERROR;
    
            if(!bOK)
            {
                SOCKET s;
    
                if(pBuffer->nOperation == OP_ACCEPT)
                {
                    s=pThis->m_sListen;
                }
                else
                {
                    if(dwKey == 0)
                        break;
                    s=((CIOCPContext*)dwKey)->s;
                }
    
                DWORD dwFlags = 0;
    
                if(!::WSAGetOverlappedResult(s,&pBuffer->ol,&dwTrans,FALSE,&dwFlags))
                {
                    nError = ::WSAGetLastError();
                }
            }
    
            pThis->HandleIO(dwKey,pBuffer,dwTrans,nError);
    
        }
        #ifdef _DEBUG
            ::OutputDebugString("Worker Thread out...\n");
        #endif //_DEBUG
    
        return 0;
    }

    SendText成员函数用于在连接上发送数据,执行时先申请一个缓冲区对象,把用户将要发送的数据复制到里面,然后调用postSend成员函数投递这个缓冲区对象

    BOOL CIOCPServer::SendText(CIOCPContext *pContext,char *pszText,int nLen)
    {
        CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
        if(pBuffer != NULL)
        {
            memcpy(pBuffer->buff,pszText,nLen);
            return PostSend(pContext,pBuffer);
        }
        return FALSE;
    }

    下面的HandleIO函数是关键,

    处理完成的IO,投递新的IO请求,释放完成的缓冲区对象,关闭客户上下文对象

    下面是主要的实现代码:

    void CIOCPServer::HandleIO(DWORD dwKey,CIOCPBuffer *pBuffer,DOWRD dwTrans,int nError)
    {
        CIOCPContext *pContext = (CIOCPContext*)dwKey;
    
        #ifdef _DEBUG
            ::OutputDebugString("HandleIO startup..\n");
        #endif //_DEBUG
    
        //减少套接字未决IO计数
        if(pContext!=NULL)
        {
            ::EnterCriticalSection(&pContext->Lock);
    
            if(pBuffer->nOperation == OP_READ)
                pContext->nOutstandingRecv--;
            else if(pBuffer->nOperation == OP_WRITE)
                pContext->nOutstandingSend--;
    
            ::LeaveCriticalSection(&pContext->Lock);
    
            //检查套接字是否已经打开
            if(pContext->bClosing)
            {
    
                #ifdef _DEBUG
                    ::OutputDebugString("HandleIO startup..\n");
                #endif //_DEBUG
                
                if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
                {
                    ReleaseContext(pContext);
                }
                //释放已关闭套接字的未决IO
                ReleaseBuffer(pBuffer);
                return;
            }
        }
        else
        {
            RemovePendingAccept(pBuffer);
        }
    
        //检查套接字上发生的错误,然后直接关闭套接字
    
        if(nError!=NO_ERROR)
        {
            if(pBuffer->nOperation != OP_ACCEPT)
            {
                OnConnectionError(pContext,pBuffer,nError);
                CloseAConnection(pContext);
                if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
                {
                    ReleaseContext(pContext);
                }
    
                #ifdef _DEBUG
                    ::OutputDebugString("HandleIO startup..\n");
                #endif //_DEBUG
            }
            else//在监听套接字上发生错误
            {
                if(pBuffer->sClient != INVALID_SOCKET)
                {
                    ::closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
                }
    
                #ifdef _DEBUG
                    ::OutputDebugString("HandleIO startup..\n");
                #endif //_DEBUG
            }
            ReleaseBuffer(pBuffer);
            return;
        }
    
        //开始处理
        if(pBuffer->nOperation == OP_ACCEPT)
        {
            if(dwTrans == 0)
            {
                #ifdef _DEBUG
                    ::OutputDebugString("HandleIO startup..\n");
                #endif //_DEBUG
    
                if(pBuffer->sClient != INVALID_SOCKET)
                {
                    ::closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
                }
            }
            else
            {
                //为接收新连接的申请客户上下文对象
                CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
                if(pClient != NULL)
                {
                    if(AddAConnection(pCliebt))
                    {
                        //取得用户地址
                        int nLocalLen,nRmoteLen;
                        m_lpfnGetAcceptExSockaddrs(
                                pBuffer->buff,
                                pBuffer->nLen-((sizeof(sockaddr_in)+16)*2),
                                sizeof(sockaddr_in)+16,
                                sizeof(sockaddr_in)+16,
                                (SOCKADDR **)&pLocalAddr,
                                &nLocalLen,
                                (SOCKADDR **)&pRemoteAddr,
                                &nRmoteLen);
                        memcpy(&pClient->addrLocal,pLocalAddr,nLocalLen);
                        memcpy(&pClient->addrRemote,pRemoteAddr,nRmoteLen);
    
                        //关联新连接到完成端口对象
                        ::CreateIoCompletionPort((HANDLE)pClient->s,m_hCompletion,(DWORD)pClient,0);
    
                        //通知用户
                        pBuffer->nLen = dwTrans;
                        OnConnectionEstablished(pClient,pBuffer);
    
                        //向新连接投递Read请求
                        for(int i=0;i<5;i++)
                        {
                            CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
                            if(p != NULL)
                            {
                                if(!PostRecv(pClient,p))
                                {
                                    CloseAConnection(pClient);
                                    break;
                                }
                            }
                        }
                    }
                    else
                    {
                        CloseAConnection(pClient);
                        ReleaseContext(pClient);
                    }
                }
                else
                {
                    //资源不足,关闭与客户的连接即可
                    ::closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
                }
            }
            //Accept请求完成,释放IO缓冲区
            ReleaseBuffer(pBuffer);
            //通知监听线程继续再投递一个Accept请求
            ::InterlockedDecrement(&m_nRepostCount);
            ::SetEvent(m_hRepostEvent);
        }
        else if(pBuffer->nOperation == OP_READ)
        {
            if(dwTrans == 0)
            {
                //先通知用户
                pBuffer->nLen = 0;
                OnConnectionClosing(pContext,pBuffer);
                //再关闭连接
                CloseAConnection(pContext);
                //释放客户上下文和缓冲区对象
                if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
                {
                    ReleaseContext(pContext);
                }
                ReleaseBuffer(pBuffer);
            }
            else
            {
                pBuffer->nLen = dwTrans;
                //按照IO投递的顺序读取接收到的数据
                CIOCPBuffer *p = GetNextReadBuffer(pContext,pBuffer);
                while(p!=NULL)
                {
                    OnReadCompleted(pContext,p);
                    //增加要读的序列号的值
                    ::InterlockedDecrement((LONG*)pContext->nCurrentReadSequence);
                    //释放IO
                    ReleaseBuffer(p);
                    p = GetNextReadBuffer(pContext,NULL);
                }
                //继续投递一个新的请求
                pBuffer = AllocateBuffer(BUFFER_SIZE);
                if(pBuffer==NULL || !PostRecv(pContext,pBuffer))
                {
                    CloseAConnection(pContext);
                }
            }
        }
        else if(pBuffer->nOperation == OP_WRITE)
        {
            if(dwTrans == 0)
            {
                //先通知用户
                pBuffer->nLen = 0;
                OnConnectionClosing(pContext,pBuffer);
                //再关闭连接
                CloseAConnection(pContext);
                //释放客户上下文和缓冲区对象
                if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
                {
                    ReleaseContext(pContext);
                }
                ReleaseBuffer(pBuffer);
            }
            else
            {
                //写操作完成,通知用户
                pBuffer->nLen = dwTrans;
                OnWriteCompleted(pContext,pBuffer);
    
                //释放SendText函数申请缓冲区
                ReleaseBuffer(pBuffer);
            }
        }
    }
  • 相关阅读:
    Kafka Replication: The case for MirrorMaker 2.0
    CentOs7.3 搭建 SolrCloud 集群服务
    Redis 集群:CLUSTERDOWN The cluster is down
    Redis Cluster: (error) MOVED
    Kafka重启出错:Corrupt index found
    redis cluster slots数量 为何是16384(2的14次方)
    redis监控工具汇总
    Redis运维利器 -- RedisManager
    redis三种集群策略
    Deploy custom service on non hadoop node with Apache Ambari
  • 原文地址:https://www.cnblogs.com/xing901022/p/2730119.html
Copyright © 2011-2022 走看看