zoukankan      html  css  js  c++  java
  • [转]IOCPSocket IO模型终结篇

    完成端口程序的执行步骤:

     1) 创建一个完成端口。第四个参数保持为0,指定在完成端口上,每个处理器一次只允许执行一个工作者线程。

     2) 判断系统内到底安装了多少个处理器。

     3) 创建工作者线程,根据步骤2 )得到的处理器信息,在完成端口上,为已完成的I / O请求提供服务。在这个简单的例子中,我们为每个处理器都只创建一个工作者线程。这是由于事先已预计到,到时不会有任何线程进入“挂起”状态,造成由于线程数量的不足,而使处理器空闲的局面(没有足够的线程可供执行)。调用C r e a t e T h r e a d函数时,必须同时提供一个工作者例程,由线程在创建好执行。本节稍后还会详细讨论线程的职责。

    4) 准备好一个监听套接字,在端口5 1 5 0上监听进入的连接请求。

    5) 使用a c c e p t函数,接受进入的连接请求。

    6) 创建一个数据结构,用于容纳“单句柄数据”,同时在结构中存入接受的套接字句柄。 

    7) 调用C r e a t e I o C o m p l e t i o n P o r t,将自a c c e p t返回的新套接字句柄同完成端口关联到一起。通过完成键(C o m p l e t i o n K e y)参数,将单句柄数据结构传递给C r e a t e I o C o m p l e t i o n P o r t。

     8) 开始在已接受的连接上进行I / O操作。在此,我们希望通过重叠I / O机制,在新建的套接字上投递一个或多个异步W S A R e c v或W S A S e n d请求。这些I / O请求完成后,一个工作者线程会为I / O请求提供服务,同时继续处理未来的I / O请求,稍后便会在步骤3 )指定的工作者例程中,体验到这一点。

    9) 重复步骤5 ) ~ 8 ),直至服务器中止。 

      

    如果一个应用程序同时要管理众多的socket,那么采用IOCP是比较好的办法。从本质上说,完成端口模型要求我们创建一个Wi n 3 2完成端口对象,通过指定数量的线程,对重叠I / O请求进行管理,以便为已经完成的重叠I / O请求提供服务。

    1) 创建完成端口

    HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

    2) 把一个IO句柄和完成端口关联起来,这里的句柄是一个socket 句柄

    CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);

    其中第一个参数是句柄,可以是文件句柄、SOCKET句柄。
    第二个就是我们上面创建出来的完成端口,这里就把两个东西关联在一起了。
    第三个参数很关键,叫做PerHandleData,就是对应于每个句柄的数据块。我们可以使用这个参数在后面取到与这个SOCKET对应的数据。
    最后一个参数给0,意思就是根据CPU的个数,允许尽可能多的线程并发执行。

    3) 从完成端口中取得结果
    GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE)

    第一个参数是完成端口
    第二个参数是表明这次的操作传递了多少个字节的数据
    第三个参数是OUT类型的参数,就是前面CreateIoCompletionPort传进去的单句柄数据,这里就是前面的SOCKET句柄以及与之相对应的数据,这里操作系统给我们返回,让我们不用自己去做列表查询等操作了
    第四个参数就是进行IO操作的结果,是我们在投递 WSARecv / WSASend 等操作时传递进去的,这里操作系统做好准备后,给我们返回了。非常省事!

    个人感觉完成端口就是操作系统为我们包装了很多重叠IO的不爽的地方,让我们可以更方便的去使用,下篇我将会尝试去讲述完成端口的原理。

    复制代码
      1 #include "winerror.h"
      2 #include "Winsock2.h"
      3 #pragma comment(lib, "ws2_32")
      4 
      5 #include "windows.h"
      6 
      7 #include <iostream>
      8 using namespace std;
      9 
     10 /// 宏定义
     11 #define PORT 5050
     12 #define DATA_BUFSIZE 8192
     13 
     14 #define OutErr(a) cout << (a) << endl
     15 << "出错代码:" << WSAGetLastError() << endl
     16 << "出错文件:" << __FILE__ << endl  
     17 << "出错行数:" << __LINE__ << endl
     18 
     19 #define OutMsg(a) cout << (a) << endl;
     20 
     21 
     22 /// 全局函数定义
     23 
     24 ///////////////////////////////////////////////////////////////////////
     25 //
     26 // 函数名       : InitWinsock
     27 // 功能描述     : 初始化WINSOCK
     28 // 返回值       : void
     29 //
     30 ///////////////////////////////////////////////////////////////////////
     31 void InitWinsock()
     32 {
     33     // 初始化WINSOCK
     34     WSADATA wsd;
     35     if( WSAStartup(MAKEWORD(22), &wsd) != 0)
     36 
     37 }
     38 
     39 ///////////////////////////////////////////////////////////////////////
     40 //
     41 // 函数名       : BindServerOverlapped
     42 // 功能描述     : 绑定端口,并返回一个 Overlapped 的Listen Socket
     43 // 参数         : int nPort
     44 // 返回值       : SOCKET
     45 //
     46 ///////////////////////////////////////////////////////////////////////
     47 SOCKET BindServerOverlapped(int nPort)
     48 {
     49     // 创建socket
     50     SOCKET sServer = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
     51 
     52     // 绑定端口
     53     struct sockaddr_in servAddr;
     54     servAddr.sin_family = AF_INET;
     55     servAddr.sin_port = htons(nPort);
     56     servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
     57 
     58     if(bind(sServer, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
     59     {
     60         OutErr("bind Failed!");
     61         return NULL;
     62     }
     63 
     64     // 设置监听队列为200
     65     if(listen(sServer, 200!= 0)
     66     {
     67         OutErr("listen Failed!");
     68         return NULL;
     69     }
     70     return sServer;
     71 }
     72 
     73 
     74 /// 结构体定义
     75 typedef struct
     76 {
     77     OVERLAPPED Overlapped;
     78     WSABUF DataBuf;
     79     CHAR Buffer[DATA_BUFSIZE];
     80 } PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
     81 
     82 
     83 typedef struct
     84 {
     85     SOCKET Socket;
     86 } PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
     87 
     88 
     89 DWORD WINAPI ProcessIO(LPVOID lpParam)
     90 {
     91     HANDLE CompletionPort = (HANDLE)lpParam;
     92     DWORD BytesTransferred;
     93     LPPER_HANDLE_DATA PerHandleData;
     94     LPPER_IO_OPERATION_DATA PerIoData;
     95 
     96     while(true)
     97     {
     98 
     99         if(0 == GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED*)&PerIoData, INFINITE))
    100         {
    101             if( (GetLastError() == WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
    102             {
    103                 cout << "closing socket" << PerHandleData->Socket << endl;
    104 
    105                 closesocket(PerHandleData->Socket);
    106 
    107                 delete PerIoData;
    108                 delete PerHandleData;
    109                 continue;
    110             }
    111             else
    112             {
    113                 OutErr("GetQueuedCompletionStatus failed!");
    114             }
    115             return 0;
    116         }
    117 
    118         // 说明客户端已经退出
    119         if(BytesTransferred == 0)
    120         {
    121             cout << "closing socket" << PerHandleData->Socket << endl;
    122             closesocket(PerHandleData->Socket);
    123             delete PerIoData;
    124             delete PerHandleData;
    125             continue;
    126         }
    127 
    128         // 取得数据并处理
    129         cout << PerHandleData->Socket << "发送过来的消息:" << PerIoData->Buffer << endl;
    130 
    131         // 继续向 socket 投递WSARecv操作
    132         DWORD Flags = 0;
    133         DWORD dwRecv = 0;
    134         ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
    135         PerIoData->DataBuf.buf = PerIoData->Buffer;
    136         PerIoData->DataBuf.len = DATA_BUFSIZE;
    137         WSARecv(PerHandleData->Socket, &PerIoData->DataBuf, 1&dwRecv, &Flags, &PerIoData->Overlapped, NULL); 
    138     }
    139 
    140     return 0;
    141 }
    142 
    143 void main()
    144 {
    145     InitWinsock();
    146 
    147     HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 00);
    148 
    149     // 根据系统的CPU来创建工作者线程
    150     SYSTEM_INFO SystemInfo;
    151     GetSystemInfo(&SystemInfo);
    152 
    153     for(int i = 0; i < SystemInfo.dwNumberOfProcessors * 2; i++)
    154     {
    155         HANDLE hProcessIO = CreateThread(NULL, 0, ProcessIO, CompletionPort, 0, NULL);
    156         //if(hProcessIO)
    157         CloseHandle(hProcessIO);
    158     }
    159 
    160     // 创建侦听SOCKET
    161     SOCKET sListen = BindServerOverlapped(PORT);
    162 
    163 
    164     SOCKET sClient;
    165     LPPER_HANDLE_DATA PerHandleData;
    166     LPPER_IO_OPERATION_DATA PerIoData;
    167     while(true)
    168     {
    169         // 等待客户端接入
    170         //sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
    171         sClient = accept(sListen, 00);
    172 
    173         cout << "Socket " << sClient << "连接进来" << endl;
    174 
    175         PerHandleData = new PER_HANDLE_DATA();
    176         PerHandleData->Socket = sClient;
    177 
    178         // 将接入的客户端和完成端口联系起来
    179         CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)PerHandleData, 0);
    180 
    181         // 建立一个Overlapped,并使用这个Overlapped结构对socket投递操作
    182         PerIoData = new PER_IO_OPERATION_DATA();
    183 
    184         ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
    185         PerIoData->DataBuf.buf = PerIoData->Buffer;
    186         PerIoData->DataBuf.len = DATA_BUFSIZE;
    187 
    188         // 投递一个WSARecv操作
    189         DWORD Flags = 0;
    190         DWORD dwRecv = 0;
    191         WSARecv(sClient, &PerIoData->DataBuf, 1&dwRecv, &Flags, &PerIoData->Overlapped, NULL);
    192     }
    193 
    194     DWORD dwByteTrans;
    195     PostQueuedCompletionStatus(CompletionPort, dwByteTrans, 00);
    196     closesocket(sListen);
    197 }
    复制代码
    复制代码
      1 #include <WINSOCK2.H>
      2 #include <stdio.h>
      3 
      4 #define PORT             5150
      5 #define MSGSIZE       1024
      6 
      7 #pragma comment(lib, "ws2_32.lib")
      8 
      9 typedef enum
     10 {
     11     RECV_POSTED
     12 }OPERATION_TYPE;
     13 
     14 typedef struct
     15 {
     16     WSAOVERLAPPED overlap;
     17     WSABUF         Buffer;
     18     char           szMessage[MSGSIZE];
     19     DWORD          NumberOfBytesRecvd;
     20     DWORD          Flags;
     21     OPERATION_TYPE OperationType;
     22 }PER_IO_OPERATION_DATA, *LPPER_IO_OPERATION_DATA;
     23 
     24 DWORD WINAPI WorkerThread(LPVOID);
     25 
     26 int main()
     27 {
     28     WSADATA                 wsaData;
     29     SOCKET                  sListen, sClient;
     30     SOCKADDR_IN             local, client;
     31     DWORD                   i, dwThreadId;
     32     int                     iaddrSize = sizeof(SOCKADDR_IN);
     33     HANDLE                  CompletionPort = INVALID_HANDLE_VALUE;
     34     SYSTEM_INFO             systeminfo;
     35     LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
     36     // Initialize Windows Socket library
     37     WSAStartup(0x0202&wsaData);
     38     // Create completion port
     39     CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 00);
     40     // Create worker thread
     41     GetSystemInfo(&systeminfo);
     42     for (i = 0; i < systeminfo.dwNumberOfProcessors; i++)
     43     {
     44         CreateThread(NULL, 0, WorkerThread, CompletionPort, 0&dwThreadId);
     45     }
     46 
     47     // Create listening socket
     48     sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     49     // Bind
     50     local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
     51     local.sin_family = AF_INET;
     52     local.sin_port = htons(PORT);
     53     bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));
     54     // Listen
     55     listen(sListen, 3);
     56     while (TRUE)
     57     {
     58         // Accept a connection
     59         sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
     60         printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
     61         // Associate the newly arrived client socket with completion port
     62         CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)sClient, 0);
     63 
     64         // Launch an asynchronous operation for new arrived connection
     65         lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(
     66             GetProcessHeap(),
     67             HEAP_ZERO_MEMORY,
     68             sizeof(PER_IO_OPERATION_DATA));
     69         lpPerIOData->Buffer.len = MSGSIZE;
     70         lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
     71         lpPerIOData->OperationType = RECV_POSTED;
     72         WSARecv(sClient,
     73             &lpPerIOData->Buffer,
     74             1,
     75             &lpPerIOData->NumberOfBytesRecvd,
     76             &lpPerIOData->Flags,
     77             &lpPerIOData->overlap,
     78             NULL);
     79     }
     80     PostQueuedCompletionStatus(CompletionPort, 0xFFFFFFFF0, NULL);
     81     CloseHandle(CompletionPort);
     82     closesocket(sListen);
     83     WSACleanup();
     84     return 0;
     85 }
     86 DWORD WINAPI WorkerThread(LPVOID CompletionPortID)
     87 {
     88     HANDLE                  CompletionPort=(HANDLE)CompletionPortID;
     89     DWORD                   dwBytesTransferred;
     90     SOCKET                  sClient;
     91     LPPER_IO_OPERATION_DATA lpPerIOData = NULL;
     92     while (TRUE)
     93     {
     94         GetQueuedCompletionStatus(
     95             CompletionPort,
     96             &dwBytesTransferred,
     97             &sClient,
     98             (LPOVERLAPPED *)&lpPerIOData,
     99             INFINITE);
    100         if (dwBytesTransferred == 0xFFFFFFFF)
    101         {
    102             return 0;
    103         }
    104 
    105         if (lpPerIOData->OperationType == RECV_POSTED)
    106         {
    107             if (dwBytesTransferred == 0)
    108             {
    109                 // Connection was closed by client
    110                 closesocket(sClient);
    111                 HeapFree(GetProcessHeap(), 0, lpPerIOData);       
    112             }
    113             else
    114             {
    115                 lpPerIOData->szMessage[dwBytesTransferred] = '\0';
    116                 send(sClient, lpPerIOData->szMessage, dwBytesTransferred, 0);
    117 
    118                 // Launch another asynchronous operation for sClient
    119                 memset(lpPerIOData, 0sizeof(PER_IO_OPERATION_DATA));
    120                 lpPerIOData->Buffer.len = MSGSIZE;
    121                 lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
    122                 lpPerIOData->OperationType = RECV_POSTED;
    123                 WSARecv(sClient,
    124                     &lpPerIOData->Buffer,
    125                     1,
    126                     &lpPerIOData->NumberOfBytesRecvd,
    127                     &lpPerIOData->Flags,
    128                     &lpPerIOData->overlap,
    129                     NULL);
    130             }
    131         }
    132     }
    133     return 0;
    134 }
    复制代码

    首先,说说主线程:
    1.创建完成端口对象
    2.创建工作者线程(这里工作者线程的数量是按照CPU的个数来决定的,这样可以达到最佳性能)
    3.创建监听套接字,绑定,监听,然后程序进入循环
    4.在循环中,我做了以下几件事情:
    (1).接受一个客户端连接
    (2).将该客户端套接字与完成端口绑定到一起(还是调用CreateIoCompletionPort,但这次的作用不同),注意,按道理来讲,此时传递
    给CreateIoCompletionPort的第三个参数应该是一个完成键,一般来讲,程序都是传递一个单句柄数据结构的地址,该单句柄数据包含
    了和该客户端连接有关的信息,由于我们只关心套接字句柄,所以直接将套接字句柄作为完成键传递;
    (3).触发一个WSARecv异步调用,这次又用到了“尾随数据”,使接收数据所用的缓冲区紧跟在WSAOVERLAPPED对象之后,此外,还有操作
    类型等重要信息。


    在工作者线程的循环中,我们

    1.调用GetQueuedCompletionStatus取得本次I/O的相关信息(例如套接字句柄、传送的字节数、单I/O数据结构的地址等等)
    2.通过单I/O数据结构找到接收数据缓冲区,然后将数据原封不动的发送到客户端
    3.再次触发一个WSARecv异步操作

  • 相关阅读:
    poj 2411 Mondriaan's Dream 骨牌铺放 状压dp
    zoj 3471 Most Powerful (有向图)最大生成树 状压dp
    poj 2280 Islands and Bridges 哈密尔顿路 状压dp
    hdu 3001 Travelling 经过所有点(最多两次)的最短路径 三进制状压dp
    poj 3311 Hie with the Pie 经过所有点(可重)的最短路径 floyd + 状压dp
    poj 1185 炮兵阵地 状压dp
    poj 3254 Corn Fields 状压dp入门
    loj 6278 6279 数列分块入门 2 3
    VIM记事——大小写转换
    DKIM支持样本上传做检测的网站
  • 原文地址:https://www.cnblogs.com/toosuo/p/2862375.html
Copyright © 2011-2022 走看看