zoukankan      html  css  js  c++  java
  • socket编程完成端口模型

    目录:

    一、为什么选用完成端口

    二、完成端口的概念

    三、完成端口的基本流程

    四、实现详解—主线程

    五、实现详解—工作者线程

     

    一、为什么选用完成端口

    网络通信方式大致有以下几种:

    1. 同步方式:所有操作在一个线程内顺序执行,则通信操作会阻塞同一线程其他操作。
    2. 同步+多线程方式:服务器端为每一个连入的客户端建立一个线程进行通信,但当客户端过多时,建立大量线程占用过多资源,而且CPU需要很多时间进行线程的切换。
    3. 完成端口:让所有的通信请求排到队列中,利用事先建立的少量几个线程依次处理队列中的请求,因为线程数量少,无需用大量时间进行线程切换,提高通信性能。

    关于事先建立的工作者线程:

    这些线程的数量一般为有多少CPU就建立多少线程,这样可以让每个线程都有CPU可用,又避免了线程争夺CPU而引起的切换线程时间,但有些线程可能在某些情况下处于Sleep()状态,为了让CPU满负荷工作,所以实际的线程数量是CPU数量的两倍

    二、完成端口的概念

    异步通信的特点是,应用程序发出一个通信操作请求给内核后,可以继续做其他事,当内核完成操作后,通知应用程序。内核向应用程序反馈信息的时间点又有两种:

    1. 当连接建立后,通知应用程序可以发送、接受数据。
    2. 当连接建立,并且完成应用程序所请求的发送、接收数据后,通知应用程序:此数据已准备好,可以使用。此时,数据已存在于一个队列中,此队列就是完成端口(完成队列)。

    完成端口概念模型抽象:

    三、完成端口的基本流程

    完成端口执行步骤:

    1.主线程流程:

    (1)       调用 CreateIoCompletionPort() 函数创建一个完成端口,而且在一般情况下,我们需要且只需要建立这一个完成端口

    (2)       根据系统中有多少个处理器,建立多少个工作者线程(Worker线程),这几个线程是专门用来和客户端进行通信的,目前暂时没什么工作

    (3)       建立监听Socket,并置于绑定、监听状态。

    (4)       监听Socket探测到有新客户端连入,调用CreateIoCompletionPort()函数,将新连入的客户端Socket与完成端口绑定

    (5)       客户端连入后,可以进行通信

    2.工作者(worker)线程流程:

    (1)   调用GetQueuedCompletionStatus()扫描完成端口的队列里是否有网络通信的请求存在(例如读取数据,发送数据等)

    (2)   如果有通信请求的话,就将这个请求从完成端口的队列中取回来,利用CONTAINING_RECORD取得单IO操作数据结构内容,判断操作类型(读取、写入……),做出相应处理。

    3.主线程与工作者线程之间的数据联系:

    此处使用到“单句柄数据”、“单IO操作数据”和重叠结构,前两种结构包含的具体内容可自己定义。

    主线程接受新的客户端时,将客户端Socket有关信息存放到单句柄数据,并与完成端口绑定,将单IO操作数据中的部分变量(主要是重叠结构)与完成端口绑定。

    工作者线程处理新的请求时,将此请求的单句柄数据提取出来,用以判断Socket信息,将单IO操作数据中的重叠结构提取出来并据此重叠结构得到整个单IO操作数据结构,以此得到但IO操作数据结构中需要处理的数据。

    四、实现详解—主线程

    0.先定义单句柄数据、单IO数据

     1 //单句柄数据
     2 typedef struct tagPER_HANDLE_DATA
     3 {
     4 SOCKET Socket;
     5 SOCKADDR_STORAGE ClientAddr;
     6 // 其他有用信息都可写到这里
     7 } PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
     8 
     9 typedef struct tagPER_IO_DATA
    10 {
    11 OVERLAPPED Overlapped;
    12 WSABUF DataBuf;
    13 char buffer[1024];
    14 int BufferLen;
    15 int OperationType; // 可以作为操作类型
    16 // 其他有用信息都可写到这里
    17 }PER_IO_DATA, *LPPER_IO_DATA;

    单句柄数据主要存放相应的Socket信息

    单IO操作数据中,第一个变量一定要是OVERLAPPED重叠结构(在工作者线程中据此变量的地址可获得整个单IO操作数据的地址指针,所以要放在第一个变量位置)。

    1.加载WinSock2.2

    1 WSADATA  wsd;
    2 WSAStartup(MAKEWORD(2,2),&wsd);

    2.创建一个IO完成端口

    1 HANDLE  CompletionPort=CreateIoCompletionPort(
    2 INVALID_HANDLE_VALUE,
    3 NULL,
    4 0,
    5 0);

    此结构后面解释,对于参数中的最后一个零,它代表的是允许应用程序同时执行的线程数量,设置为0,也就是NumberOfConcurrentThreads,即有多少CPU,就允许多少线程同时执行。

    3.确定系统中有多少处理器

    1 SYSTEM_INFO  SystemInfo;
    2 GetSystemInfo(&SystemInfo);
    3 Int  numOfCPU= SystemInfo.dwNumberOfProcessors;

    前面讲过,有多少CPU,就建立CPU数×2个工作者线程

    4.创建工作者线程

    1 //创建服务器的工作者线程,并将完成端口传递到该线程
    2 for (int i = 0; i < numOfCPU * 2; ++i){
    3 HANDLE ThreadHandle;
    4 ThreadHandle = CreateThread(NULL,
    5 0,
    6 ServerWorkerThread,//工作者线程的函数入口
    7 CompletionPort,//完成端口(也可以是其他包含完成端口的结构体),作为工作者线程函数的参数
    8 0,
    9 NULL);

    5.创建监听套接字,并完成绑定,监听

     1 SOCKET  Listen = WSASocket(AF_INET,
     2 SOCK_STREAM,
     3 0,
     4 NULL,
     5 0,
     6 WSA_FLAG_OVERLAPPED);
     7 SOCKADDR_IN InternetAddr;
     8 InternetAddr.sin_family = AF_INET;
     9 InternetAddr.sin_port = htons(DEFAULT_PORT);
    10 InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    11 bind(Listen, (SOCKADDR*) &InternetAddr, sizeof (InternetAddr));
    12 listen(Listen, 5);

    使用重叠IO的话,初始化Socket的时候一定要使用WSASocket并带上WSA _FLAG_OVERLAPPED参数才可以(只有在服务器端需要这么做,在客户端是不需要的??);

    6.接受连接

    1 SOCKADDR_IN saRemote;
    2 int RemoteLen;
    3 SOCKET acceptSock=accept(
    4 Listen,
    5 (SOCKADDR*)&saRemote,
    6 &RemoteLen);

    7.将接受套接字与完成端口绑定

     1 //用来和套接字关联的单句柄数据
     2 PER_HANDLE_DATA * PerHandleData = NULL;
     3 PerHandleData= (LPPER_HANDLE_DATA) GlobalAlloc (
     4 GPTR,
     5 sizeof(PER_HANDLE_DATA));
     6 PerHandleData->Socket = Accept;
     7 memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);
     8 CreateIoCompletionPort(
     9 (HANDLE)Accept,
    10 CompletionPort,
    11 (DWORD)PerHandleData,
    12 0);

    函数:CreateIoCompletionPort()

    函数说明:有两个功能: 1. 用于创建一个完成端口对象。 2. 将一个句柄同完成端口关联到一起。

    函数原型:

    1 HANDLE  WINAPI  CreateIoCompletionPort(
    2 __in HANDLE FileHandle,
    3 __in_opt HANDLE ExistingCompletionPort,
    4 __in ULONG_PTR CompletionKey,
    5 __in DWORD NumberOfConcurrentThreads);

    参数说明:

    FileHandle是关联的文件句柄。

    ExistingCompletionPort是已经存在的完成端口。如果为NULL,则为新建一个IOCP。

    CompletionKey是传送给处理函数(工作者线程?-->!!)的参数(单句柄数据)。

    NumberOfConcurrentThreads是有多少个线程在访问这个消息队列。当参数ExistingCompletionPort不为0的时候,系统忽略该参数,当该参数为0表示允许同时相等数目于处理器个数的线程访问该消息队列。

    8.投递一个接受数据请求

     1 PPER_IO_DATA  pPerIO = (PPER_IO_DATA)GlobalAlloc(
     2 GPTR,
     3 sizeof(PER_IO_DATA));
     4 WSABUF buf;
     5 pPerIO->nOperationType = OP_READ;//操作类型:读
     6 buf.buf = pPerIO->buf;
     7 buf.len = BUFFER_SIZE;
     8 DWORD dwRecv;
     9 DWORD dwFlags = 0;
    10 WSARecv(pPerHandle->s, &buf, 1, &dwRecv, &dwFlags, &pPerIO-> Overlapped, NULL);
    11 //传递了部分单IO操作数据(buf,pPerIO-> Overlapped)

    函数原型:

    1 int WSARecv(
    2 SOCKET s,
    3 LPWSABUF lpBuffers,
    4 DWORD dwBufferCount,
    5 LPDWORD lpNumberOfBytesRecvd, 
    6 LPDWORD lpFlags, 
    7 LPWSAOVERLAPPED lpOverlapped,
    8 LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );

    参数说明:

    s:当然是投递这个操作的套接字

    lpBuffers:接收缓冲区,与Recv函数不同, 这里需要一个由WSABUF结构构成的数组

    dwBufferCount: 数组中WSABUF结构的数量

    lpNumberOfBytesRecvd: 如果接收操作立即完成,这里会返回函数调用所接收到的字节数

    lpFlags: 一个指向标志位的指针

    lpOverlapped: “绑定”的重叠结构

    lpCompletionRoutine: 完成例程中将会用到的参数,我们这里设置为 NULL

    五、实现详解—工作者线程

    1.得到线程被创建时传入的参数

    1 //工作者线程的函数声明:
    2 //DWORD WINAPI ServerThread(LPVOID lpParam);
    3 //本例中主线程传入的是完成端口CompletionPort
    4 HANDLE  hCompletion=(HANDLE)lpParam;

    2.查看完成端口上是否有IO完成

     1 DWORD dwTrans;//接受到的数据长度
     2 LPOVERLAPPED lpOverlapped;//重叠结构
     3 PPER_HANDLE_DATA  pPerHandle;//单句柄结构
     4 PPER_IO_DATA  pPerIO;//单IO操作数据
     5 GetQueuedCompletionStatus(
     6 hCompletion,
     7 &dwTrans,
     8 (LPDWORD) &pPerHandle ,
     9 (LPOVERLAPPED*)& lpOverlapped,
    10 WSA_INFINITE);

    函数声明:  

    1 BOOL GetQueuedCompletionStatus(
    2 __in  HANDLE   CompletionPort,
    3 __out  LPDWORD   lpNumberOfBytes,
    4 __out  PULONG_PTR   lpCompletionKey,
    5 __out  LPOVERLAPPED   *lpOverlapped,
    6 __in  DWORD   dwMilliseconds);

    调用参数:  

    CompletionPort:指定的IOCP,该值由CreateIoCompletionPort函数创建。  

    lpnumberofbytes:一次完成后的I/O操作所传送数据的字节数。  

    lpcompletionkey:这个是我们建立完成端口的时候绑定的那个自定义结构体参数

    lpoverlapped:// 这个是我们在连入Socket的时候一起建立的那个重叠结构     

    dwmilliseconds: // 等待完成端口的超时时间,如果线程不需要做其他的事情,那就INFINITE就行了

    3.取得完整的单IO操作数据

    1 pPerIO = (LPPER_IO_DATA)CONTAINING_RECORD(lpOverlapped,
    2 PER_IO_DATA,
    3 Overlapped);

    4.根据请求的操作的类型,选择相应的处理方案

    1 switch(pPerIO->OperationType) // 通过per-I/O数据中的OperationType域查看什么I/O请求完成了
    2  {
    3 case OP_READ: // 完成一个接收请求
    4 {。。。。。。} break;
    5 case OP_WRITE:
    6 case OP_ACCEPT: break;
    7 }

    参考资料:http://blog.csdn.net/piggyxp/article/details/6922277 

  • 相关阅读:
    ComponentOne Studio Enterprise ———C1控件barchat 的统计图格式 设置
    OpenStack开发学习笔记04————
    短信推送API接口实现---------阿里大于
    OpenStack开发学习笔记03————创建一个openstack
    OpenStack开发学习笔记02————环境的安装和部署
    OpenStack开发学习笔记01
    MVC模式在Java Web应用程序中的实例分析
    javascript的setTimeout以及setInterval休眠问题。
    BFC 神奇背后的原理
    jquery checkBox的问题
  • 原文地址:https://www.cnblogs.com/mutou3221/p/3048806.html
Copyright © 2011-2022 走看看