zoukankan      html  css  js  c++  java
  • IOCP用法

    IOCP:即是IO完成端口。I/O完成端口(s)是一种机制,通过这个机制,应用程序在启动时会首先创建一个线程池,然后该应用程序使用线程池处理异步I/O请求。这些线程被创建的唯一目的就是用于处理I/O请求。对于处理大量并发异步I/O请求的应用程序来说,相比于在I/O请求发生时创建线程来说,使用完成端口(s)它就可以做的更快且更有效率。IOCP是windows平台最高效的通信模型,可以在一个应用程序同时管理为数众多的套接字,以达到最佳的系统性能!从本质上说,完成端口模型要求我们创建一个Win32完成端口对象,通过指定数量的线程,对重叠I/O请求进行管理,以便为已经完成的重叠I/O请求提供服务。


    一、完成端口的相关函数或结构:

    1、创建完成端口或绑定句柄

    HANDLE CreateIoCompletionPort(
    HANDLE FileHandle,
    HANDLE ExistingCompletionPort,
    ULONG_PTR CompletionKey,
    DWORD NumberOfConcurrentThreads
    );

    2、获取排队完成状态

    BOOL GetQueuedCompletionStatus(
        HANDLE CompletionPort,
        LPDWORD lpNumberOfBytes,
        PULONG_PTR lpCompletionKey,
        LPOVERLAPPED* lpOverlapped,
        DWORD dwMilliseconds
    );

    3、用OVERLAPPED Overlapped;开头封装一个自定义OVERLAPPED结构

    typedef struct _PER_IO_OPERATION_DATA
    {
     //重叠结构
     OVERLAPPED OverLapped;
     //数据缓冲区
     WSABUF RecvDataBuf;
     WSABUF SendDataBuf;
     char RecvBuf[BUFFER_SIZE];
     char SendBuf[BUFFER_SIZE];
     //操作类型表示
     bool OperType;
    }PER_IO_OPERATION_DATA,*PPER_IO_OPERATION_DATA;

    此结构用来投递IO工作者
    PER_IO_OPERATION_DATA PerIoWorker;
    WSARecv(socket, ..., (OVERLAPPED *)&PerIoWorker;);

    4、向工作者线程都发送一个完成数据包

    BOOL PostQueuedCompletionStatus(
        HANDLE CompletionPort,
        DWORD dwNumberOfBytesTransferred,
        ULONG_PTR dwCompletionKey,
        LPOVERLAPPED lpOverlapped
    );

    二、IOCP实例
    包括两个线程工作函数:
    1、工作者函数封装GetQueuedCompletionStatus,实现各种状态处理
    2、监听工作线程函数,接收链接请求,创建完成端口CreateIoCompletionPort、投递工作者WSARecv

    一个发送消息函数,创建PPER_IO_OPERATION_DATA结构工作对象,直接发送WSASend

    一个初始化过程:
    1、用WSASocket启动SOCKET,绑定并监听
    2、启动工作者线程,线程数一般为CPU处理器数量*2+2
    3、启动侦听线程

    退出IOCP:
    1、退出工作线程
    2、退出侦听线程
    3、关闭网络的侦听
    4、切断当前所有连接

    具体:
    1、工作者线程函数

    DWORD WINAPI WorkerProc(LPVOID lParam)
    {
     CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
     HANDLE CompletionPort=pSvr->CompletionPort;
     DWORD ByteTransferred;
     LPPER_HANDLE_DATA PerHandleData;
     PPER_IO_OPERATION_DATA PerIoData;
     DWORD RecvByte;
     while(true)
     {
      bool bSuccess=GetQueuedCompletionStatus(CompletionPort,
                &ByteTransferred,
                (LPDWORD)&PerHandleData,
                (LPOVERLAPPED* )&PerIoData,
                INFINITE);
      //1、退出信号到达,退出线程
      if(ByteTransferred==-1 && PerIoData==NULL)
      {
       return 1L;
      }
      //2、客户机已经断开连接或者连接出现错误
      if(ByteTransferred==0 &&
         (PerIoData->OperType==RECV_POSTED || PerIoData->OperType==SEND_POSTED))
      {
       //1、将该客户端数据删除
       //2、通知上层该客户端已经断开
       //3、关闭套接口   
       continue;
      }
      //3、接收数据
      if(PerIoData->OperType==RECV_POSTED)
      {
       //调用回调函数,处理数据
       pSvr->m_pProcessRecvData(PerHandleData->IpAddr,
                             PerHandleData->sClient,
              PerIoData->RecvBuf,
              ByteTransferred);
       //将源数据置空
       memset(PerIoData->RecvBuf,0,BUFFER_SIZE);
       ByteTransferred=0;
       //重置IO操作数据
       unsigned long Flag=0;
       ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
       
       PerIoData->RecvDataBuf.buf=PerIoData->RecvBuf;
       PerIoData->RecvDataBuf.len=BUFFER_SIZE;
       PerIoData->OperType=RECV_POSTED;
       //提交另一个Recv请求
       WSARecv(PerHandleData->sClient,
        &(PerIoData->RecvDataBuf),
        1,
        &RecvByte,
        &Flag,
        &(PerIoData->OverLapped),
        NULL);
      }//4、发送数据完成,置空缓冲区,释放缓冲区  
      else if(PerIoData->OperType==SEND_POSTED)
      {
       memset(PerIoData,0,sizeof(PER_IO_OPERATION_DATA));
       GlobalFree(PerIoData);
       ByteTransferred=0;
      }
     }
     return 0L;
    }

    2、监听线程并启动工作者

    DWORD WINAPI ListenWorkerProc(LPVOID lParam)
    {
     CIocpModeSvr* pSvr=(CIocpModeSvr*)lParam;
     SOCKET Accept;
     while(true)
     {
      //1、接收客户的请求
      Accept = WSAAccept(pSvr->ListenSocket,NULL,&nLen,ConnectAcceptCondition,(DWORD)lParam);  

      //2、取得客户端信息
      sockaddr soad;
      sockaddr_in in;
      int len=sizeof(soad);
      if(getpeername(Accept,&soad,&len)==SOCKET_ERROR)
      {
       CString LogStr;
       LogStr.Format("getpeername() faild : %d",GetLastError());
       pSvr->WriteLogString(LogStr);
      }
      else
      {
       memcpy(&in,&soad,sizeof(sockaddr));
      }
      //3、给Socket创建完成端口

      //申请新的句柄操作数据
      LPPER_HANDLE_DATA PerHandleData=(LPPER_HANDLE_DATA) GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));
      //句柄数据
      PerHandleData->sClient=Accept;
      PerHandleData->IpAddr=in.sin_addr.S_un.S_addr;
      //存储客户信息
      ::EnterCriticalSection(&pSvr->cInfoSection);
      pSvr->ClientInfo.Add(*PerHandleData);
      ::LeaveCriticalSection(&pSvr->cInfoSection);
      //转储信息
      CString LogStr;
      LogStr.Format("UserIP: %s ,Socket : %d Connected!",inet_ntoa(in.sin_addr),Accept);
      pSvr->WriteLogString(LogStr);
      TRACE("\nUserIP: %s ,Socket : %d Connected!",inet_ntoa(in.sin_addr),Accept);
      //关联客户端口到完成端口,句柄数据在此时被绑定到完成端口
      CreateIoCompletionPort((HANDLE)Accept, pSvr->CompletionPort,(DWORD)PerHandleData,0);
      
      4、创建并投递工作者
      for(int i=0;i<pSvr->IOWorkerNum;i++)
      {
       //Io操作数据标志   
       PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA)GlobalAlloc(GPTR,sizeof(PER_IO_OPERATION_DATA));
       unsigned long  Flag=0;
       DWORD RecvByte;
       ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
       
       PerIoData->RecvDataBuf.buf=PerIoData->RecvBuf;
       PerIoData->RecvDataBuf.len=BUFFER_SIZE;
       PerIoData->OperType=RECV_POSTED;
       //提交首个接收数据请求
       //这时
       //如果客户端断开连接
       //则也可以以接收数据时得到通知 
       WSARecv(PerHandleData->sClient,
        &(PerIoData->RecvDataBuf),
        1,
        &RecvByte,
        &Flag,
        &(PerIoData->OverLapped),
        NULL);
      }
     }
    }

    3、发消息

    //提交发送消息请求,
    //如果提交发送消息失败,
    //则将导致在工作线程里将目标客户端的连接切断
    bool CIocpModeSvr::SendMsg(SOCKET sClient,char * pData,unsigned long Length)
    {
     if(sClient==INVALID_SOCKET || pData==NULL || Length==0 || !IsStart)return false;

     //申请操作键
     PPER_IO_OPERATION_DATA PerIoData=(PPER_IO_OPERATION_DATA) \
               GlobalAlloc(GPTR,
                                    sizeof(PER_IO_OPERATION_DATA));

     //准备缓冲
     unsigned long  Flag=0;
     DWORD SendByte;
     ZeroMemory(&(PerIoData->OverLapped),sizeof(OVERLAPPED));
     memcpy(PerIoData->SendBuf,pData,Length);
     PerIoData->SendDataBuf.buf=PerIoData->SendBuf;
     PerIoData->SendDataBuf.len=Length;
     PerIoData->OperType=SEND_POSTED;
     int bRet=WSASend(sClient,
                   &(PerIoData->SendDataBuf),
                   1,
                   &SendByte,
                   Flag,
                   &(PerIoData->OverLapped),
                   NULL);
     if(bRet==SOCKET_ERROR && GetLastError()!=WSA_IO_PENDING)
     {
      CString LogStr;
      LogStr.Format("WSASend With Error : %d",GetLastError());
      WriteLogString(LogStr);
      return false;
     }
     else return true;
     
     return false;
    }

    4、启动IOCP

    int CIocpModeSvr::InitNetWork(unsigned int SvrPort,std::string *pHostIpAddress)
    {
     //启动网络
     CString LogStr;
     int Error=0;
     WSADATA wsaData;
     char Name[100];
     hostent *pHostEntry;
     in_addr rAddr;
     //Net Start Up
     Error=WSAStartup(MAKEWORD(0x02,0x02),&wsaData);
     if(Error!=0)
     {
      Error = WSAGetLastError();
      pHostIpAddress->assign( "" );
      
      LogStr.Format("WSAStartUp Faild With Error: %d",Error);
      WriteLogString(LogStr);
      
      return Error;
     }
     //Make Version
     if ( LOBYTE( wsaData.wVersion ) != 2 ||
      HIBYTE( wsaData.wVersion ) != 2 )
     {
      WSACleanup( );
      
      WriteLogString("The Local Net Version Is not 2");
      
      return -1;
     }
     //Get Host Ip
     Error = gethostname ( Name, sizeof(Name) );
     if( 0 == Error )
     {
      pHostEntry = gethostbyname( Name );
      if( pHostEntry != NULL )
      {
       memcpy( &rAddr, pHostEntry->h_addr_list[0], sizeof(struct in_addr) );
       pHostIpAddress->assign( inet_ntoa( rAddr ) );
      }
      else
      {
       Error = WSAGetLastError();
       LogStr.Format("GetHostIp faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;
       
      }
     }
     else
     {
      Error = WSAGetLastError();
      LogStr.Format("gethostname faild with Error: %d",Error);
      WriteLogString(LogStr);
      return Error;
     }
     if(0==Error)
     {
      //创建侦听端口
      ListenSocket=WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
      if(ListenSocket==INVALID_SOCKET)
      {
       Error = WSAGetLastError();
       LogStr.Format("CreateSocket faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;
      }
     }
     //绑定到目标地址
     if(0==Error)
     {
      sockaddr_in InternetAddr;
      InternetAddr.sin_family=AF_INET;
      InternetAddr.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
      InternetAddr.sin_port=htons(SvrPort);
      if(bind(ListenSocket,
           (PSOCKADDR )&InternetAddr,
        sizeof(InternetAddr))==SOCKET_ERROR)
      {
       Error=GetLastError();
       LogStr.Format("bind Socket faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;
      }
     }
     //侦听端口上的连接请求
     if(0==Error)
     {
      if( listen(ListenSocket,5)==SOCKET_ERROR)
      {
       Error=GetLastError();
       LogStr.Format("listen Socket faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;
      }
     }
     //创建完成端口句柄
     if(0==Error)
     {
      CompletionPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
      if(CompletionPort==INVALID_HANDLE_VALUE)
      {
       Error=GetLastError();
       LogStr.Format("CreateIoCompletionPort faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;
      }
     }
     //启动工作线程,线程数为CPU处理器数量*2+2
     if(0==Error)
     { 
      SYSTEM_INFO sys_Info;
      GetSystemInfo(&sys_Info);
      for(int i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
      {
       HANDLE ThreadHandle;
       DWORD ThreadID;
       
       ThreadHandle=CreateThread(NULL,
        0,
        ServerWorkerProc,
        this,
        0,
        &ThreadID);
       if(ThreadHandle==NULL)
       {
        Error = WSAGetLastError();
        LogStr.Format("Create Server Work Thread faild with Error: %d",Error);
        WriteLogString(LogStr);
        return Error;
       } 
       CloseHandle(ThreadHandle);
      }
     }
     //启动侦听线程
     if(0==Error)
     {
      DWORD thID;
      ListenThreadHandle=CreateThread(NULL,
                                   0,
              ListenProc,
              this,
              0,
              &thID);
      if(ListenThreadHandle==NULL)
      {
       Error = WSAGetLastError();
       LogStr.Format("Create Listen Thread faild with Error: %d",Error);
       WriteLogString(LogStr);
       return Error;  
      }
     }
     return Error;
    }

    4、退出IOCP

    void CIocpModeSvr::UnInit()
    {
     if(!IsStart)return;
     //退出工作线程
     SYSTEM_INFO sys_Info;
     GetSystemInfo(&sys_Info);
     for(int i=0;i<sys_Info.dwNumberOfProcessors*2+2;i++)
     {
      //寄出退出消息
      PostQueuedCompletionStatus(CompletionPort, -1, -1,NULL);
     }
     //退出侦听线程
     ::TerminateThread(ListenThreadHandle,1L);
     ::WaitForSingleObject(ListenThreadHandle,10);
     CloseHandle(ListenThreadHandle);
     //关闭网络的侦听
     shutdown(ListenSocket,0);
     closesocket(ListenSocket);
     //切断当前所有连接
     DisConnectAll(); 
     ::DeleteCriticalSection(&cInfoSection);
     m_pProcessRecvData=NULL;
     IsStart=false;
    }
  • 相关阅读:
    实例属性 类属性 实例域 类域
    研究数据集
    static 静态域 类域 静态方法 工厂方法 he use of the static keyword to create fields and methods that belong to the class, rather than to an instance of the class 非访问修饰符
    accessor mothod mutator mothod 更改器方法 访问器方法 类的方法可以访问类的任何一个对象的私有域!
    上钻 下钻 切片 转轴 降采样
    识别会话
    Performance Tuning Using Linux Process Management Commands
    Secure Hash Algorithm 3
    grouped differently across partitions
    spark 划分stage Wide vs Narrow Dependencies 窄依赖 宽依赖 解析 作业 job stage 阶段 RDD有向无环图拆分 任务 Task 网络传输和计算开销 任务集 taskset
  • 原文地址:https://www.cnblogs.com/virtualNatural/p/1918999.html
Copyright © 2011-2022 走看看