zoukankan      html  css  js  c++  java
  • soket通讯完成端口IOCP和select对比

    自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

    https://www.cnblogs.com/bclshuai/p/11380657.html

     

    Socket网络通讯

    一.            socket网络通讯之select

    socket服务器端连接多个客户端,通过select函数去遍历每个连接,获取客户端发过来的数据信息。

    1.select的作用

    将多个套接字放入数组中,检查数组中的套接字是否有信号,连接请求,读写请求,一旦有立即返回有信号套接字的数量,超时返回0,错误返回负值。

    详细介绍本函数用于确定一个或多个套接口的状态。对每一个套接口,调用者可查询它的可读性、可写性及错误状态信息。用fd_set结构来表示一组等待检查的套接口。在调用返回时,这个结构存有套接口数组集合,并且select()返回有信号的套接口的数目。

    2.Select函数解析

    2.1select函数参数

    int select ( int nfds,fd_set *readfds, *writefds, *exceptfds,timeval *timeout )

    ndfs:select监视的文件句柄数,视进程中打开的文件数而定,一般设为呢要监视各文件 中的最大文件号加一。Window下这个参数无用,liniux下这个参数有用。

    readfds:select监视的可读文件句柄集合。

    writefds: select监视的可写文件句柄集合。

    exceptfds:select监视的异常文件句柄集合。

    readfds参数标识等待可读性检查的套接口。如果该套接口正处于监听listen()状态,则若有连接请求到达,该套接口便被标识为可读,这样一个accept()调用保证可以无阻塞完成。对其他套接口而言,可读性意味着有排队数据供读取。或者对于SOCK_STREAM类型套接口来说,相对于该套接口的虚套接口已关闭,于是recv()或recvfrom()操作均能无阻塞完成,返回值为负值,可以通过这个来判断sokect已经断开连接。writefds参数标识等待可写性检查的套接口。

    typedef struct fd_set {

            u_int fd_count;               /* how many are SET? */

            SOCKET  fd_array[FD_SETSIZE];   /* an array of SOCKETs */

    } fd_set;

    这三个参数传入的是一个socket的结构体指针,结构体是一个存socket的数组加上一个存socket数量的变量;SETSIZE等于64,是个宏定义,可修改。

    timeout:本次select()的超时结束时间。(见/usr/sys/select.h,可精确至百万分之一秒!)传入NULL,阻塞模式,传入时间结构体对象,则超时select返回0,timeout两个值都为0时,则非阻塞模式,立即返回。

    2.2 select函数返回值

    负值:select错误

    正值:某些文件可读写或出错

    0:等待超时,没有可读写或错误的文件

    2.3宏定义操作

    FD_ZERO(fd_set *fdset):FD_ZERO 是将fd_set结构体中的count置为0

    FD_SET(int fd, fd_set *fdset):是将套接字插入数组,并将count 加1

    FD_CLR(int fd, fd_set *fdset):将套接字从数组中删除,将后续套接字前移,将count减1.

    FD_ISSET(int fd, fdset *fdset):检查fdset联系的文件句柄fd是否在set数组中,可读写,>0表示可读写。

    2.4调用流程

    (1)定义fd_set变量,调用FD_ZERO宏将fd_set清空,实际上只是将count置0.

    (2)调用FD_SET将关注的套接字插入到结构体数组中

    (3)使用 select函数查询结构体的数组中的套接字是否有信号,

    (4)调用FD_ISSET判断套接字是否在有信号的列表中,底层维护的列表。

    (5)在列表中则调用读写函数进行操作。

    3.例程

    3.1select服务器

    // MulitSocket.cpp : 定义控制台应用程序的入口点。

    //

     

    #include "stdafx.h"

    #include "winsock2.h"

    #pragma  comment(lib,"ws2_32.lib")

    #include<iostream>

    using namespace std;

    #define PORT 5050

    #define BUFFSIZE  1024

    SOCKET g_socketclientArray[FD_SETSIZE];

    int g_num=0;

    bool g_bRun=true;

    char sendbuff[1024]="what's up ? dute !";

    SOCKET g_socketserver;

    DWORD WINAPI AcceptWork(LPVOID pPama)

    {

           if (g_socketserver==INVALID_SOCKET)

           {

                  cout<<"socket server is invalid, thread is stoped"<<endl;

                  return-1;

           }

           fd_set fdset;

           timeval outtime;

           outtime.tv_sec=5;

           outtime.tv_usec=0;

           while (g_bRun)

           {

                  FD_ZERO(&fdset);//清空socket数组

                  FD_SET(g_socketserver,&fdset);//将g_socketserver保存到socket数组中

                  int iRet=-1;

                  //(最大socket值linux有效,socket数组读,socket数组写,socket数组错误,超时时间)

                  iRet=::select(g_socketserver+1,&fdset,NULL,NULL,&outtime);//查询数组中套接字是否有信号,返回有信号的socket数目

                  if (iRet==0)//超时

                  {

                         cout<<"accept work wait outtime "<<endl;

                         continue;

                  }

                  else if (iRet<0)//错误

                  {

                         int iRetErr = WSAGetLastError();

                         cout<<"select error code"<<iRetErr<<endl;

                         continue;

                  }

                  if (FD_ISSET(g_socketserver,&fdset)<=0)///判断是否准备好

                  {

                         cout<<"FD_ISSET return value is <=0"<<endl;

                         continue;

                  }

                  sockaddr addr;

                  int addresslen=sizeof(sockaddr);

                  SOCKET sok=accept(g_socketserver,&addr,&addresslen);//接受连接

                  if (sok>0)

                  {

                        

                         if (g_num>=FD_SETSIZE)

                         {

                                closesocket(sok);

                                cout<<"client number exceed the range,can't accept client connect anymore ! "<<endl;

                                continue;

                         }

                         g_socketclientArray[g_num]=sok;

                         g_num++;

                         cout<<"one client connected !"<<endl;

                  }

                  else

                  {

                         cout<<"accept socket return value <=0"<<endl;

                  }

                 

           }

           return 0;

    }

     

           DWORD WINAPI RecvWork(LPVOID pPama)

           {

                  timeval timeout;

                  timeout.tv_sec=0;

                  timeout.tv_usec=5;

                  char recvbuff[BUFFSIZE];

                  while (g_bRun)

                  {

                        

                         if (g_num==0)

                         {

                                Sleep(1000);

                                continue;

                         }

                         SOCKET maxsock=0;            

                         for (int i =0;i<g_num;i++)

                         {

                                fd_set  fdset;

                                FD_ZERO(&fdset);

                                FD_SET(g_socketclientArray[i],&fdset);

                                int iRet=-1;

                                if ((iRet=select(maxsock,&fdset,NULL,NULL,&timeout))==0)

                                {

                                       //cout<<"select wait time out"<<endl;

                                       continue;

                                }

                                else if (iRet<0)

                                {

                                       cout<<"select return value is below zero"<<endl;

                                       Sleep(2000);

                                       //continue;

                                }

                                if (FD_ISSET(g_socketclientArray[i],&fdset)<=0)

                                {

                                       continue;

                                }

                                memset(recvbuff,0,BUFFSIZE);

                                if (recv(g_socketclientArray[i],recvbuff,BUFFSIZE,0)>0)

                                {

                                       cout<<"recvfrom client"<<i+1<<":"<<recvbuff<<endl;

                                       send(g_socketclientArray[i],sendbuff,1024,0);

                                       cout<<"send to client"<<i+1<<":"<<sendbuff<<endl;

                                }

                                else

                                {

                                       closesocket(g_socketclientArray[i]);

                                       int j=i;

                                       for (;(j+1)<g_num;j++)

                                       {

                                              g_socketclientArray[j]=g_socketclientArray[j+1];

                                       }

                                       g_socketclientArray[j+1]=INVALID_SOCKET;

                                       g_num--;

                                       cout<<"one client has lost connect ,left client num is "<<g_num<<endl;

                                }

                                             

                         }

                  }

                  return 0;

           }

     

     

    int _tmain(int argc, _TCHAR* argv[])

    {

          

           WSADATA wsadata;

           if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)

           {

                  cout<<"WSAStartup failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           g_socketserver=::socket(AF_INET,SOCK_STREAM,0);

           if (g_socketserver==INVALID_SOCKET)

           {

                  cout<<"create socket failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           SOCKADDR_IN  serverAddressIn;

           serverAddressIn.sin_addr.S_un.S_addr=htonl(INADDR_ANY);

           serverAddressIn.sin_family=AF_INET;

           serverAddressIn.sin_port=htons(PORT);

           if (::bind(g_socketserver,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN))!=0)

           {

                  cout<<"bind address to m_socketserver failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           if (::listen(g_socketserver,5)!=0)

           {

                  cout<<"listen socket failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           HANDLE m_socketServerAccept=NULL;

           m_socketServerAccept=CreateThread(NULL,0,AcceptWork,NULL,0,NULL);

           if (m_socketServerAccept==NULL)

           {

                  cout<<"create thread failed "<<endl;

                  Sleep(2000);

                  return -1;

           }

           HANDLE m_threadRev=NULL;

           m_threadRev=CreateThread(NULL,0,RecvWork,NULL,0,NULL);

           if(m_threadRev==NULL)

           {

                  cout<<"create thread is failed "<<endl;

                  Sleep(3000);

                  return -1;

           }

           system("PAUSE");

           cout<<"stop program now"<<endl;

           Sleep(3000);

           g_bRun=false;

           return 0;

    }

     

     

    3.2客户端

    // SocketClient.cpp : 定义控制台应用程序的入口点。

    //

     

    #include "stdafx.h"

    #include "winsock2.h"

    #pragma  comment(lib,"ws2_32.lib")

    #include<iostream>

    using namespace std;

    #define PORT 5050

    bool g_bRun=true;

    SOCKET g_socketClient;

    char sendbuff[1024]="hello beautiful girl !";

    char recvBuff[1024];

    DWORD WINAPI SendWork(LPVOID pPama)

    {

           timeval timeout;

           timeout.tv_sec=5;

           timeout.tv_usec=0;

           while(g_bRun)

           {

                  if (send(g_socketClient,sendbuff,1024,0)<=0)

                  {

                         cout<<"send data error"<<endl;

                  }

                  cout<<"send: "<<sendbuff<<endl;

                  fd_set fdset_r;

                  FD_ZERO(&fdset_r);

                  FD_SET(g_socketClient,&fdset_r);

                  int iRval=0;

                  if ((iRval=select(g_socketClient+1,&fdset_r,NULL,NULL,&timeout))==0)//查看g_socketClient是否有可读信号

                  {

                         continue;

                  }

                  else if (iRval<0)//返回错误

                  {

                         cout<<"socket connect is broken,thread is stoped "<<endl;

                         g_bRun=false;

                         break;

                  }

                  else

                  {

                         if (FD_ISSET(g_socketClient,&fdset_r)<=0)//判断是否准备好接受数据

                         {

                                continue;

                         }

                         else

                         {

                                memset(recvBuff,0,1024);

                                recv(g_socketClient,recvBuff,1024,0);

                                cout<<"recv: "<<recvBuff<<endl;

                         }           

                  }

                  Sleep(5000);

           }

           return 0;

    }

    int _tmain(int argc, _TCHAR* argv[])

    {

           WSADATA wsadata;

           if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)//初始化函数库

           {

                  cout<<"WSAStartup failed"<<endl;

                  Sleep(3000);

                  return -1;

           }

           g_socketClient=::socket(AF_INET,SOCK_STREAM,0);//创建套接字

           if (g_socketClient==INVALID_SOCKET)

           {

                  cout<<"create socket failed"<<endl;

                  Sleep(3000);

                  return -1;

           }

           SOCKADDR_IN  serverAddressIn;

           serverAddressIn.sin_addr.S_un.S_addr=inet_addr("10.20.48.12");//输入服务器地址和端口

           serverAddressIn.sin_family=AF_INET;

           serverAddressIn.sin_port=htons(PORT);//和端口

           int reval=0;

           reval=connect(g_socketClient,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN));//连接服务器

           if (reval==SOCKET_ERROR)

           {

                  cout<<"connect socket failed"<<endl;

                  Sleep(3000);

                  return -1;

           }

           HANDLE m_recvThread=NULL;

           m_recvThread=CreateThread(NULL,0,SendWork,NULL,0,NULL);

           system("PAUSE");

           cout<<"stop program now"<<endl;

           Sleep(3000);

           g_bRun=false;

           return 0;

    }

     

    socket网络通讯之完成端口

    1. 1.      完成端口的作用

    使用单线程处理服务器的收发消息,会发生阻塞,使界面卡顿。使用一个线程处理一个客户端的通讯,客户端数量多时,线程间的切换,会使cpu不堪重负。完成端口的作用就是解决一个线程对应一个客户端的情况,采用几个线程处理所有的客户端通讯。

    2.完成端口的原理

    事先开好几个线程,你有几个CPU我就开几个,首先是避免了线程的上下文切换,因为线程想要执行的时候,总有CPU资源可用,然后让这几个线程等着,等到有用户请求来到的时候,就把这些请求都加入到一个公共消息队列中去,然后这几个开好的线程就排队逐一去从消息队列中取出消息并加以处理,这种方式就很优雅的实现了异步通信和负载均衡的问题,因为它提供了一种机制来使用几个线程“公平的”处理来自于多个客户端的输入/输出,并且线程如果没事干的时候也会被系统挂起,不会占用CPU周期,这个关键的作为交换的消息队列,就是完成端口。

    3.完成端口的使用步骤

    使用完成端口只用遵循如下几个步骤:

     (1) 调用 CreateIoCompletionPort() 函数创建一个完成端口,把它的句柄保存好。

     (2) 根据系统中有多少个处理器,就建立多少个工作者(为了醒目起见,下面直接说Worker)线程,这几个线程是专门用来和客户端进行通信的。

     (3) 下面就是接收连入的Socket连接了,这里有两种实现方式:一是和别的编程模型一样,还需要启动一个独立的线程,专门用来accept客户端的连接请求;二是用性能更高更好的异步AcceptEx()请求。

    (4) 每当有客户端连入的时候,我们还是得调用CreateIoCompletionPort()函数,这里却不是新建立完成端口了,而是把新连入的Socket(也就是前面所谓的设备句柄),与目前的完成端口绑定在一起。

    (5) 例如,客户端连入之后,我们可以在这个Socket上提交一个网络请求,例如WSARecv(),然后系统就会帮咱们乖乖的去执行接收数据的操作,我们大可以放心的去干别的事情了;

     (6) 而此时,我们预先准备的那几个Worker线程就不能闲着了, 我们在前面建立的几个Worker就要忙活起来了,都需要分别调用GetQueuedCompletionStatus() 函数在扫描完成端口的队列里是否有网络通信的请求存在(例如读取数据,发送数据等),一旦有的话,就将这个请求从完成端口的队列中取回来,继续执行本线程中后面的处理代码,处理完毕之后,我们再继续投递下一个网络通信的请求就OK了,如此循环。

     

     

    4. acceptEx相对accept的优点

    (1) 这个好处是最关键的,是因为AcceptEx是在客户端连入之前,就把客户端的Socket建立好了,也就是说,AcceptEx是先建立的Socket,然后才发出的AcceptEx调用,也就是说,在进行客户端的通信之前,无论是否有客户端连入,Socket都是提前建立好了;而不需要像accept是在客户端连入了之后,再现场去花费时间建立Socket。

     (2) 相比accept只能阻塞方式建立一个连入的入口,对于大量的并发客户端来讲,入口实在是有点挤;而AcceptEx可以同时在完成端口上投递多个请求。

     (3) AcceptEx还有一个非常体贴的优点,就是在投递AcceptEx的时候,我们还可以顺便在AcceptEx的同时,收取客户端发来的第一组数据,这个是同时进行的,如果客户端只是连入但是不发送数据的话,我们就不会收到这个AcceptEx完成的通知。

    5.使用完成端口的步骤

    (1)创建完成端口

    HANDLE m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 ); 

    返回值不为空则认为创建成功

    2)根据系统CPU个数创建工作线程

    我们最好是建立CPU核心数量*2那么多的线程,这样更可以充分利用CPU资源,

    SYSTEM_INFO si; 

    GetSystemInfo(&si); 

    int m_nProcessors = si.dwNumberOfProcessors;//CPU个数

    m_nThreads = 2 * m_nProcessors; 

     HANDLE* m_phWorkerThreads = new HANDLE[m_nThreads]; 

     for (int i = 0; i < m_nThreads; i++) 

     { 

         m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, …); 

     } 

    3)创建一个监听的socket绑定到完成端口,监听

    // 初始化Socket库 

    WSADATA wsaData; 

    WSAStartup(MAKEWORD(2,2), &wsaData); 

    //初始化Socket 

    struct sockaddr_in ServerAddress; 

    // 这里需要特别注意,如果要使用重叠I/O的话,这里必须要使用WSASocket来初始化Socket 

    // 注意里面有个WSA_FLAG_OVERLAPPED参数 

    SOCKET m_sockListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //表示异步

    // 填充地址结构信息 

    ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress)); 

    ServerAddress.sin_family = AF_INET; 

    // 这里可以选择绑定任何一个可用的地址,或者是自己指定的一个IP地址  

    //ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);                       

    ServerAddress.sin_addr.s_addr = inet_addr(“你的IP”);          

    ServerAddress.sin_port = htons(11111);                           

    // 绑定端口 

    if (SOCKET_ERROR == bind(m_sockListen, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))  

    // 开始监听 

    listen(m_sockListen,SOMAXCONN)) //在winsock32.h中的宏定义Maximum queue length specifiable by listen.,

    //调用创建完成端口的函数将监听套接字绑定到完成端口

    HANDLE WINAPI CreateIoCompletionPort( 

        __in      HANDLE  FileHandle,             // 这里当然是连入的这个套接字句柄了 

         __in_opt  HANDLE  ExistingCompletionPort, // 这个就是前面创建的那个完成端口 

         __in      ULONG_PTR CompletionKey,        // 这个参数就是类似于线程参数一样,在 

                                                   // 绑定的时候把自己定义的结构体指针传递 

                                                   // 这样到了Worker线程中,也可以使用这个 

                                                   // 结构体的数据了,相当于参数的传递 

         __in      DWORD NumberOfConcurrentThreads // 这里同样置0 

    ); 

    4)启动监听线程,开始接收客户端连接

    方法1:开通线程接收客户端连接

    DWORD WINAPI AcceptThread(LPVOID  pVoid)

    {

           while(g_bRun)

           {

                  SOCKET client=INVALID_SOCKET;

                  sockaddr sockadd;

                  int len =sizeof(sockaddr);

                  client=accept(g_socketserver,&sockadd,&len);//阻塞连接客户端

                  if (client==INVALID_SOCKET)

                  {

                         cout<<"accept socket failed !"<<endl;

                         continue;

                  }

                  else

                  {

                         cout<<"one client connected ip is "<<sockadd.sa_data;

                  }

                  IOPORTCINENT* pIoClient=new IOPORTCINENT;//创建客户端变量保存信息

                  if (pIoClient==NULL)

                  {

                         cout<<"IOPORTCINENT  malloc failed "<<endl;

                         continue;

                  }

                  pIoClient->m_sock=client;

                  IODATA* pSendData=NULL;

                  IODATA* pRecvData=NULL;

                  pRecvData=pIoClient->GetNewIOData();

                  pSendData=pIoClient->GetNewIOData();

                  if (pRecvData==NULL||pRecvData==NULL)

                  {

                         cout<<"GetNewIOData failed !"<<endl;

                         delete(pIoClient);

                         continue;

                  }

                  pSendData->OperationType=SOCK_SEND;

                  pSendData->m_sock=pIoClient->m_sock;

                  pRecvData->OperationType=SOCK_RECV;

                  pRecvData->m_sock=pIoClient->m_sock;

                  CreateIoCompletionPort((HANDLE)client, g_IOPort,(ULONG_PTR) pIoClient, 0);//绑定socket到完成端口用pIoClient作为键值,

                  if (WSARecv(client,&(pRecvData->buff),1,&(pRecvData->NumberofByteRevc),&pRecvData->Flags,&pRecvData->overlap,NULL)==SOCKET_ERROR)

                  {//投递接收请求

                         cout<<"WSARecv post recv request failed "<<client<<endl;

                         delete(pIoClient);

                         continue;

                  }

                 

                  strcpy_s(pSendData->szMessage,"server to client data");

                  pSendData->NumberofByteRevc=21;

                  if (WSASend(client,&(pSendData->buff),1,&(pSendData->NumberofByteRevc),pSendData->Flags,&pSendData->overlap,NULL)==SOCKET_ERROR)

                  {//投递发送请求

                         cout<<"WSASend post send request failed "<<client<<endl;

                         delete(pIoClient);

                         continue;

                  }

                 

     

                  AddToClinetMap(pIoClient);

                  Sleep(1000);

           }

    return 0;

    }

    方法2:投递Accept请求

    这是为什么呢?因为我们在未取得函数指针的情况下就调用AcceptEx的开销是很大的,因为AcceptEx 实际上是存在于Winsock2结构体系之外的(因为是微软另外提供的),所以如果我们直接调用AcceptEx的话,首先我们的代码就只能在微软的平台上用了,没有办法在其他平台上调用到该平台提供的AcceptEx的版本(如果有的话), 而且更糟糕的是,我们每次调用AcceptEx时,Service Provider都得要通过WSAIoctl()获取一次该函数指针,效率太低了,所以还不如我们自己直接在代码中直接去这么获取一下指针好了。微软的实现是通过mswsock.dll中提供的,所以我们可以通过静态链接mswsock.lib来使用AcceptEx。但是这是一个不推荐的方式,我们应该用WSAIoctl 配合SIO_GET_EXTENSION_FUNCTION_POINTER参数来获取函数的指针,然后再调用AcceptEx。

    // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数

           // 所以需要额外获取一下函数的指针,

           // 获取AcceptEx函数指针

    BOOL AcceptEx (      

                   SOCKET sListenSocket,  

                   SOCKET sAcceptSocket,  

                   PVOID lpOutputBuffer,  

                   DWORD dwReceiveDataLength,  

                   DWORD dwLocalAddressLength,  

                   DWORD dwRemoteAddressLength,  

                   LPDWORD lpdwBytesReceived,  

                   LPOVERLAPPED lpOverlapped  

    );

    参数1--sListenSocket, 这个就是那个唯一的用来监听的Socket了,没什么说的;

     

    参数2--sAcceptSocket, 用于接受连接的socket,这个就是那个需要我们事先建好的,等有客户端连接进来直接把这个Socket拿给它用的那个,是AcceptEx高性能的关键所在。

     

    参数3--lpOutputBuffer,接收缓冲区,这也是AcceptEx比较有特色的地方,既然AcceptEx不是普通的accpet函数,那么这个缓冲区也不是普通的缓冲区,这个缓冲区包含了三个信息:一是客户端发来的第一组数据,二是server的地址,三是client地址,都是精华啊…但是读取起来就会很麻烦,不过后面有一个更好的解决方案。

     

    参数4--dwReceiveDataLength,前面那个参数lpOutputBuffer中用于存放数据的空间大小。如果此参数=0,则Accept时将不会待数据到来,而直接返回,如果此参数不为0,那么一定得等接收到数据了才会返回…… 所以通常当需要Accept接收数据时,就需要将该参数设成为:sizeof(lpOutputBuffer) - 2*(sizeof sockaddr_in +16),也就是说总长度减去两个地址空间的长度就是了,看起来复杂,其实想明白了也没啥……

    参数5--dwLocalAddressLength,存放本地址地址信息的空间大小;

    参数6--dwRemoteAddressLength,存放本远端地址信息的空间大小;

    参数7--lpdwBytesReceived,out参数,对我们来说没用,不用管;

    参数8--lpOverlapped,本次重叠I/O所要用到的重叠结构。

    包含头文件

    #include <MSWSock.h>

     

    定义函数指针

    LPFN_ACCEPTEX                m_lpfnAcceptEx;                // AcceptEx 和GetAcceptExSockaddrs 的函数指针,用于调用这两个扩展函数

    LPFN_GETACCEPTEXSOCKADDRS    m_lpfnGetAcceptExSockAddrs;

     

    定义GUID变量

    GUID GuidAcceptEx = WSAID_ACCEPTEX; 

           GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;

     

    根据GUID获取函数指针

           DWORD dwBytes = 0; 

           if(SOCKET_ERROR == WSAIoctl(

                  m_pListenContext->m_Socket,

                  SIO_GET_EXTENSION_FUNCTION_POINTER,

                  &GuidAcceptEx,

                  sizeof(GuidAcceptEx),

                  &m_lpfnAcceptEx,

                  sizeof(m_lpfnAcceptEx),

                  &dwBytes,

                  NULL,

                  NULL)) 

           { 

                  this->_ShowMessage("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d ", WSAGetLastError());

                  this->_DeInitialize();

                  return false; 

           } 

     

           // 获取GetAcceptExSockAddrs函数指针,也是同理

           if(SOCKET_ERROR == WSAIoctl(

                  m_pListenContext->m_Socket,

                  SIO_GET_EXTENSION_FUNCTION_POINTER,

                  &GuidGetAcceptExSockAddrs,

                  sizeof(GuidGetAcceptExSockAddrs),

                  &m_lpfnGetAcceptExSockAddrs,

                  sizeof(m_lpfnGetAcceptExSockAddrs),  

                  &dwBytes,

                  NULL,

                  NULL)) 

           { 

                  this->_ShowMessage("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d ", WSAGetLastError()); 

                  this->_DeInitialize();

                  return false;

           } 

     

     

           // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求

           for( int i=0;i<MAX_POST_ACCEPT;i++ )

           {

                  // 新建一个IO_CONTEXT

                  PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();

     

                  if( false==this->_PostAccept( pAcceptIoContext ) )

                  {

                         m_pListenContext->RemoveContext(pAcceptIoContext);

                         return false;

                  }

           }

           this->_ShowMessage( _T("投递%d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );

           return true;

    6. GetQueuedCompletionStatus函数介绍

    BOOL WINAPI GetQueuedCompletionStatus( 

        __in   HANDLE          CompletionPort,    // 这个就是我们建立的那个唯一的完成端口 

        __out  LPDWORD         lpNumberOfBytes,   //这个是操作完成后返回的字节数 

        __out  PULONG_PTR      lpCompletionKey,   // 这个是我们建立完成端口的时候绑定的那个自定义结构体参数 

        __out  LPOVERLAPPED    *lpOverlapped,     // 这个是我们在连入Socket的时候一起建立的那个重叠结构 

        __in   DWORD           dwMilliseconds     // 等待完成端口的超时时间,如果线程不需要做其他的事情,那就INFINITE就行了 

    );

     

    7.完成端口服务例程

    // IOPortSocketServer.cpp : 定义控制台应用程序的入口点。

    //

     

    #include "stdafx.h"

    #include "winsock2.h"

    #pragma  comment(lib,"ws2_32.lib")

    #include<iostream>

    #include <map>

    using namespace std;

    #define PORT 5050

    #define  MSGLEN 1024

    bool g_bRun=true;

     

    HANDLE g_IOPort=NULL;

    SOCKET g_socketserver=INVALID_SOCKET;

    CRITICAL_SECTION             m_csContextList; 

     

    enum SocketType

    {

           SOCK_RECV=1,

           SOCK_ACCEPT=2,

           SOCK_SEND=3

    };

    char sendbuff[1024]="what's up ? dute !";

     

    struct  IODATA//用来接收完成端口数据的结构体

    {

           WSAOVERLAPPED overlap;//重叠结构体,完成端口内部用于标志客户端请求的标志

           WSABUF buff;//一个指针和一个长度变量

           char szMessage[MSGLEN];//实际存储数据的缓冲区

           DWORD NumberofByteRevc;//缓冲区的长度

           DWORD Flags;//标志位

           int OperationType;//发送,接收,连接标识符

           SOCKET m_sock;//连接客户端的套接字

           IODATA()

           {

                  memset(&overlap, 0,sizeof(WSAOVERLAPPED));

                  NumberofByteRevc=MSGLEN;

                  buff.buf=szMessage;

                  buff.len=NumberofByteRevc;

                  memset(szMessage,0,MSGLEN);

                  Flags=0;

                  OperationType=0;

                  m_sock=INVALID_SOCKET;

           }

    };

     

    struct  IOPORTCINENT//每个客户端连接,都会建立一个变量用于保存客户端信息

    {

           SOCKET m_sock;//连接客户端的套接字

           map<IODATA*,IODATA*> m_mapIOData;//保存发送请求的结构体

           IOPORTCINENT()

           {

                  m_sock=INVALID_SOCKET;

           }

           ~IOPORTCINENT()//析构函数

           {

                  if (m_sock!=INVALID_SOCKET)

                  {

                         closesocket(m_sock);

                         m_sock=INVALID_SOCKET;

                  }

                  if (m_mapIOData.size()>0)

                  {

                         map<IODATA*,IODATA*>::iterator it=m_mapIOData.begin();

                         for(;it!=m_mapIOData.end();it++)

                         {

                                delete it->second;

                         }

                         m_mapIOData.clear();

                  }

           }

           IODATA*GetNewIOData()//客户端连接时,申请数据结构保存完成端口返回的数据

           {

                  IODATA*p=new IODATA;

                  if (p!=NULL)

                  {

                         m_mapIOData.insert(pair<IODATA*,IODATA*>(p,p));

                         return p;

                  }

                  else

                  {

                         return NULL;

                  }

           }

           bool DeleteIOData(IODATA* p)//释放内存

           {

                  map<IODATA*,IODATA*>::iterator it=m_mapIOData.find(p);

                  if (it!=m_mapIOData.end())

                  {

                         delete p;

                         m_mapIOData.erase(it);

                         return true;

                  }

                  else

                  {

                         return false;

                  }

           }

    };

    map<SOCKET,IOPORTCINENT*> g_mapIoPortClient;//用于保存客户端连接map

     

    bool AddToClinetMap(IOPORTCINENT*p)//添加到map,方便后续释放内存

    {

           EnterCriticalSection(&m_csContextList);

           map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.find(p->m_sock);

           if (it==g_mapIoPortClient.end())

           {

                  g_mapIoPortClient.insert(pair<SOCKET,IOPORTCINENT*>(p->m_sock,p));

                  LeaveCriticalSection(&m_csContextList);

                  return true;

           }

           else

           {

                  LeaveCriticalSection(&m_csContextList);

                  return false;

           }

          

    }

    bool DeleteFromClientMap(SOCKET sock)//客户端断开连接后将map中对应的变量删除

    {

           EnterCriticalSection(&m_csContextList);

           map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.find(sock);

           if (it!=g_mapIoPortClient.end())

           {

                  closesocket(it->first);

                  delete it->second;

                  g_mapIoPortClient.erase(it);

                  LeaveCriticalSection(&m_csContextList);

                  return true;

           }

           else

           {

                  LeaveCriticalSection(&m_csContextList);

                  return false;

           }

          

    }

    void ClearClientMap()//服务器退出时,释放所有客户端的连接数据,并清空map

    {

           EnterCriticalSection(&m_csContextList);

           map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.begin();

           if (g_mapIoPortClient.size()>0)

           {

                  for (;it!=g_mapIoPortClient.end();it++)

                  {

                         closesocket(it->first);

                         delete it->second;

                  }

                  g_mapIoPortClient.clear();

           }

           LeaveCriticalSection(&m_csContextList);

    }

     

    //接收完成端口发送和接收数据的工作线程

    DWORD WINAPI WorkThread(LPVOID  pVoid)

    {

          

          

           while(g_bRun)

           {

                  IODATA* pRecvData=NULL;

                  OVERLAPPED           *pOverlapped = NULL;

                  DWORD   dwBytesTransferred=0;

                  IOPORTCINENT* pIoClient=NULL;

                  /*从队列中取出完成端口完成的操作(发送、接收、连接操作)

                  pOverlapped参数返回的是WSARecv中传入的那个&pRecvData->overlap,因为overlapp是结构体IODATA的第一个

                  变量,所以overlapp的地址就是pRecvData的地址

                  */

                  if (GetQueuedCompletionStatus(g_IOPort,&dwBytesTransferred,(PULONG_PTR)&pIoClient,&pOverlapped,INFINITE)==FALSE)

                  {

                         DWORD dwErr = GetLastError();

                         if (dwErr==WAIT_TIMEOUT)//这里是INFINITE是一直等待,所以不会出现超时情况

                         {

                                continue;

                         }

                         else//连接出现问题要删除

                         {

                                if (DeleteFromClientMap(pIoClient->m_sock)==false)

                                {

                                       cout<<"DeleteFromClientMap failed "<<pIoClient->m_sock<<endl;

                                }    

                         }

                         cout<<"GetQueuedCompletionStatus return false "<<dwErr<<endl;

                         Sleep(3000);

                         continue;

                  }

                  else

                  {

                         if (dwBytesTransferred==0)//客户端关闭

                         {

                                if (DeleteFromClientMap(pIoClient->m_sock)==false)

                                {

                                       cout<<"DeleteFromClientMap failed "<<pIoClient->m_sock<<endl;

                                }    

                         }

                         else

                         {

                                pRecvData=(IODATA*)pOverlapped;

                                //pRecvData=CONTAINING_RECORD(pOverlapped, IODATA, overlap); 

                                /***************

                                主线程向完成端口发送接收数据请求,然后继续执行其他事情,不会阻塞

                                当完成端口完成某个操作后,会向队列里放入消息,工作线程中GetQueuedCompletionStatus

                                不断的读取队列中的消息,通过pOverlapped这个指针的地址判断是哪个客户端发来的数据

                               

                                #define CONTAINING_RECORD(address, type, field) ((type *)(

                                (PCHAR)(address) -

                                (ULONG_PTR)(&((type *)0)->field)))

                                这个宏定义的作用是通过结构体中某个变量的地址,计算这个变量的首地址

                                address是结构体中某个变量的地址,(ULONG_PTR)(&((type *)0)->field))这部分计算的是成员变量在结构体中的

                                偏移量,成员变量地址减去偏移量,得到结构体变量的首地址。如果成员变量是结构体的第一个变量,则

                                成员变量的地址就是结构体变量的地址,直接强制转换即可。这个是很多人难以理解的地方

                                **************/

                                if (pRecvData->OperationType==SOCK_RECV)//接收请求完成

                                {

                                       cout<<"recv:"<<pRecvData->m_sock<<"~"<<pRecvData->szMessage<<endl;

                                       memset(pRecvData->szMessage,0,MSGLEN);//清空接收区,再次用相同的pRecvData发送接收数据请求

                                WSARecv(pRecvData->m_sock,&(pRecvData->buff),1,&pRecvData->NumberofByteRevc,&pRecvData->Flags,&pRecvData->overlap,NULL);//投递接收请求

                                       send(pRecvData->m_sock,sendbuff,18,0);//向客户端发送数据,告知接收到数据

                                       cout<<"send:"<<pRecvData->m_sock<<"~"<<sendbuff<<endl;

                                }

                                else if (pRecvData->OperationType==SOCK_SEND)//发送请求完成

                                {

                                       cout<<"WSASend success"<<endl;

                                       cout<<"WSASend:"<<pRecvData->szMessage<<endl;

                                       pRecvData->NumberofByteRevc=21;

                                       //向完成端口投递发送请求

                                WSASend(pRecvData->m_sock,&(pRecvData->buff),1,&pRecvData->NumberofByteRevc,pRecvData->Flags,&pRecvData->overlap,NULL);//投递发送请求

                                       Sleep(3000);

                                }

                               

                         }

                        

                  }

           }

           return 0;

    }

    DWORD WINAPI AcceptThread(LPVOID  pVoid)

    {

           while(g_bRun)

           {

                  SOCKET client=INVALID_SOCKET;

                  sockaddr sockadd;

                  int len =sizeof(sockaddr);

                  client=accept(g_socketserver,&sockadd,&len);//阻塞连接客户端

                  if (client==INVALID_SOCKET)

                  {

                         cout<<"accept socket failed !"<<endl;

                         continue;

                  }

                  else

                  {

                         cout<<"one client connected ip is "<<sockadd.sa_data;

                  }

                  IOPORTCINENT* pIoClient=new IOPORTCINENT;//创建客户端变量保存信息

                  if (pIoClient==NULL)

                  {

                         cout<<"IOPORTCINENT  malloc failed "<<endl;

                         continue;

                  }

                  pIoClient->m_sock=client;

                  IODATA* pSendData=NULL;

                  IODATA* pRecvData=NULL;

                  pRecvData=pIoClient->GetNewIOData();

                  pSendData=pIoClient->GetNewIOData();

                  if (pRecvData==NULL||pRecvData==NULL)

                  {

                         cout<<"GetNewIOData failed !"<<endl;

                         delete(pIoClient);

                         continue;

                  }

                  pSendData->OperationType=SOCK_SEND;

                  pSendData->m_sock=pIoClient->m_sock;

                  pRecvData->OperationType=SOCK_RECV;

                  pRecvData->m_sock=pIoClient->m_sock;

                  CreateIoCompletionPort((HANDLE)client, g_IOPort,(ULONG_PTR) pIoClient, 0);//绑定socket到完成端口用pIoClient作为键值,

                  if (WSARecv(client,&(pRecvData->buff),1,&(pRecvData->NumberofByteRevc),&pRecvData->Flags,&pRecvData->overlap,NULL)==SOCKET_ERROR)

                  {//投递接收请求

                         cout<<"WSARecv post recv request failed "<<client<<endl;

                         delete(pIoClient);

                         continue;

                  }

                 

                  strcpy_s(pSendData->szMessage,"server to client data");

                  pSendData->NumberofByteRevc=21;

                  if (WSASend(client,&(pSendData->buff),1,&(pSendData->NumberofByteRevc),pSendData->Flags,&pSendData->overlap,NULL)==SOCKET_ERROR)

                  {//投递发送请求

                         cout<<"WSASend post send request failed "<<client<<endl;

                         delete(pIoClient);

                         continue;

                  }

                 

     

                  AddToClinetMap(pIoClient);

                  Sleep(1000);

           }

    return 0;

    }

    void finit()

    {

           g_bRun=false;

           WSACleanup();

           closesocket(g_socketserver);

           ClearClientMap();

    }

    int _tmain(int argc, _TCHAR* argv[])

    {

           /*IODATA data;

           cout<<"data's address is :"<<&data<<endl;

           cout<<"data's member WSABUF's address: "<<&data.buff<<endl;

           unsigned long pianyi=(unsigned long)(&(((IODATA*)0)->buff));

           cout<<"offset is :"<<pianyi<<endl;

           IODATA* p=(IODATA*)((char*)(&data.buff)-pianyi);

           cout<<"data address is :"<<p<<endl;

           system("PAUSE");*/

           //初始化套接字库

           WSADATA wsadata;

           if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)

           {

                  cout<<"WSAStartup failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           InitializeCriticalSection(&m_csContextList);

           //创建完成端口

           g_IOPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);

           if (g_IOPort==NULL)

           {

                  cout<<"CreateIoCompletionPort failed"<<endl;

                  Sleep(2000);

                  finit();

                  return -1;

           }

           //创建监听套接字

           g_socketserver=::WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);

           if (g_socketserver==INVALID_SOCKET)

           {

                  cout<<"create socket failed"<<endl;

                  Sleep(2000);

                  finit();

                  return -1;

           }

           //绑定地址到套接字

           SOCKADDR_IN  serverAddressIn;

           serverAddressIn.sin_addr.S_un.S_addr=htonl(INADDR_ANY);

           serverAddressIn.sin_family=AF_INET;

           serverAddressIn.sin_port=htons(PORT);

           if (::bind(g_socketserver,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN))!=0)

           {

                  cout<<"bind address to m_socketserver failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           //监听套接字

           if (::listen(g_socketserver,SOMAXCONN)!=0)

           {

                  cout<<"listen socket failed"<<endl;

                  Sleep(2000);

                  return -1;

           }

           //获取CPU个数

           SYSTEM_INFO systeminfo;

           GetSystemInfo(&systeminfo);

           //创建工作线程

           for (int i = 0; i < systeminfo.dwNumberOfProcessors*2; i++)

           {

                  HANDLE hthread=CreateThread(NULL, 0, WorkThread, g_IOPort, 0, NULL);

                  CloseHandle(hthread);//关闭句柄,线程仍然运行

           }

           //创建接收客户端连接的线程

           HANDLE hthread=CreateThread(NULL, 0, AcceptThread, g_IOPort, 0, NULL);

           CloseHandle(hthread);

           cout<<"server is started, waiting for client to connect!"<<endl;

           system("PAUSE");

           g_bRun=false;//让线程自动退出

           ClearClientMap();//清除客户端信息map

           DeleteCriticalSection(&m_csContextList);

           cout<<"stop program now"<<endl;

           Sleep(3000);

           return 0;

    }

     

     参考文献

    http://www.cnblogs.com/c1230v/archive/2012/11/25/2788280.html

  • 相关阅读:
    spark[源码]-TaskSchedulerlmpl类源码
    spark[源码]-SparkEnv执行环境创建
    spark[源码]-sparkContext概述
    spark学习(基础篇)--(第三节)Spark几种运行模式
    spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)
    Spark --【宽依赖和窄依赖】
    CURL常用命令
    Foundation框架基本数据类型
    Object-C Categories和Protocols
    Object c 基础知识
  • 原文地址:https://www.cnblogs.com/bclshuai/p/13407972.html
Copyright © 2011-2022 走看看