zoukankan      html  css  js  c++  java
  • IOCP模型与网络编程

    【服务端源代码】

    #include "stdafx.h"

    /*
    iocp是windows上通讯模型,把socket的阻塞函数,如recv改成完成端口的来完成.
    基本的思路,创建一个线程池来作为工作者线程,然后线程的处理函数是接收/转发数据.
    server
    */

    /*
    所用到的函数:
    HANDLE WINAPI CreateIoCompletionPort(
    __in HANDLE FileHandle, // 已经打开的文件句柄或者空句柄,一般是客户端的句柄
    __in HANDLE ExistingCompletionPort, // 已经存在的IOCP句柄
    __in ULONG_PTR CompletionKey, // 完成键,包含了指定I/O完成包的指定文件
    __in DWORD NumberOfConcurrentThreads );// 真正并发同时执行最大线程数,一般推介是CPU核心数*2+2


    BOOL GetQueuedCompletionStatus(
    HANDLE CompletionPort, //指定的IOCP
    LPDWORD lpNumberOfBytes, //一次完成后的I/O操作所传送数据的字节数
    PULONG_PTR lpCompletionKey, //当文件I/O操作完成后,用于存放与之关联的CK
    LPOVERLAPPED *lpOverlapped, //为调用IOCP机制所引用的OVERLAPPED结构
    DWORD dwMilliseconds); //用于指定调用者等待CP的时间

    int WSAAPI WSARecv (
    SOCKET s, //socket
    LPWSABUF lpBuffers, //存放数据的数组的指针
    DWORD dwBufferCount, //数组的数量
    LPDWORD lpNumberOfBytesRecvd, //接收字节数的指针
    LPINT lpFlags, //指向标志位的指针
    LPWSAOVERLAPPED lpOverlapped, //指向重叠结构的指针
    LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine);//调用例程的指针


    */

    #define _CRT_SECURE_CPP_OVERLOAD_STANDARD_NAMES 1

    #include <WinSock2.h>
    #include <Windows.h>
    #include <vector>
    #include <iostream>

    using namespace std;


    #pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库
    #pragma comment(lib, "Kernel32.lib") // IOCP需要用到的动态链接库

    //字符串处理函数
    int myStrLen(const char* str)
    {
    int i = 0;
    while (*str != '')
    {
    i++;
    str++;
    }
    return i;
    }

    //自定义全局变量
    HANDLE CompletionPort = NULL;
    HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
    const int g_DataBuffSize = 2 * 1024; //服务器端口
    typedef struct
    {
    OVERLAPPED overlapped;
    WSABUF databuff;
    char buffer[g_DataBuffSize];
    int BufferLen;
    int operationType;
    }PER_IO_OPERATEION_DATA, *LPPER_IO_OPERATION_DATA, *LPPER_IO_DATA, PER_IO_DATA;
    typedef struct //客户端结构
    {
    SOCKET socket;
    SOCKADDR_STORAGE ClientAddr;
    char *pszClientName;
    }PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
    const int g_DefaultPort = 6000;
    vector < PER_HANDLE_DATA* > g_clientGroup; // 记录客户端的向量组
    SOCKET serverSocket; //服务器的socket

    //自定义函数声明
    BOOL InitNetwork(WORD port); //初始化网络,参数port是服务器的端口
    BOOL InitWorkThread(); //初始化工作者线程
    DWORD WINAPI ServerWorkThread(LPVOID CompletionPortID); //工作者线程处理函数
    DWORD WINAPI ServerSendThread(LPVOID IpParam); //发送消息的函数
    DWORD WINAPI AddClient(LPVOID IpParam); //向服务器添加客户端
    BOOL DelClient(vector<PER_HANDLE_DATA*>&clientGroup, SOCKET& clientSocket);//向服务器减少客户端

    int _tmain(int argc, _TCHAR* argv[])
    {
    if (!InitNetwork(g_DefaultPort))return -1;
    if (!InitWorkThread())return -1;
    HANDLE sendThread = CreateThread(NULL, 0, ServerSendThread, 0, 0, NULL);
    HANDLE addThread = CreateThread(NULL, 0, AddClient, 0, 0, NULL);
    WaitForSingleObject(sendThread, INFINITE);
    WaitForSingleObject(addThread, INFINITE);
    system("pause");

    return 0;
    }

    BOOL InitNetwork(WORD port)
    //初始化网络
    {
    //请求2.2的wsa版本的网络库
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSADATA wsaData;
    DWORD err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0)
    {
    cerr << "请求动态链接库失败 ";
    return FALSE;
    }
    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
    {
    WSACleanup();
    cerr << "请求的版本不是2.2版本 ";
    return FALSE;
    }

    //建立起服务器的socket
    serverSocket = socket(AF_INET, SOCK_STREAM, 0); //流式套接字
    SOCKADDR_IN serverAddr;
    serverAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(port);
    int bindResult = bind(serverSocket, (SOCKADDR*)&serverAddr, sizeof(SOCKADDR));
    if (SOCKET_ERROR == bindResult)
    {
    cerr << "服务器绑定失败:" << GetLastError() << endl;
    return FALSE;
    }

    int listenResult = listen(serverSocket, 10); // 将SOCKET设置为监听模式
    if (listenResult == SOCKET_ERROR)
    {
    cerr << "进入监听模式失败: " << GetLastError() << endl;
    return FALSE;
    }
    cout << " IOCP服务器已准备就绪,正在等待客户端的接入中............ ";
    return TRUE;
    }

    BOOL InitWorkThread()
    //初始化工作者线程
    {

    // 创建IOCP的内核对象
    HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (completionPort == NULL){ // 创建IO内核对象失败
    cerr << "创建IO完成端口发生错误:" << GetLastError() << endl;
    return FALSE;
    }

    // 创建IOCP线程--线程里面创建线程池,并将完成端口传递到该线程
    SYSTEM_INFO mySysInfo;
    GetSystemInfo(&mySysInfo); // 确定处理器的核心数量
    for (DWORD i = 0; i < (mySysInfo.dwNumberOfProcessors * 2 + 2); ++i){// 基于处理器的核心数量创建线程
    HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkThread, completionPort, 0, NULL);
    if (ThreadHandle == NULL){
    cerr << "创建线程失败" << GetLastError() << endl;
    return FALSE;
    }
    CloseHandle(ThreadHandle);
    }
    return TRUE;
    }

    DWORD WINAPI ServerWorkThread(LPVOID IpParam)
    //工作线程的处理函数
    {
    CompletionPort = (HANDLE)IpParam;
    DWORD BytesTransferred;
    LPOVERLAPPED IpOverlapped;
    LPPER_HANDLE_DATA PerHandleData = NULL;
    LPPER_IO_DATA PerIoData = NULL;
    DWORD RecvBytes;
    DWORD Flags = 0;
    BOOL bRet = false;

    while (true){
    bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&PerHandleData, (LPOVERLAPPED*)&IpOverlapped, INFINITE);//
    //GetQueuedCompletionStatus参数(完成端口对象,传输完成的状态码,socket的信息,重叠结构信息,等待时间)
    if (bRet == 0){
    cerr << "获取完成端口状态发生失败: " << GetLastError() << endl;
    DelClient(g_clientGroup, PerHandleData->socket);
    continue;
    }
    PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(IpOverlapped, PER_IO_DATA, overlapped);//有意思,先计算出地址的偏移量,然后根据特定的变量地址找到结构体地址,这样就相当于一个client取一个特定内存区域的址进行读取,当然多个client就有多个区域

    // 检查在套接字上是否有错误发生
    if (BytesTransferred == 0){
    closesocket(PerHandleData->socket);
    GlobalFree(PerHandleData);
    GlobalFree(PerIoData);
    continue;
    }

    // 开始数据处理,接收来自客户端的数据
    WaitForSingleObject(hMutex, INFINITE);
    SOCKET clientsockt = NULL;
    cout << "A Client says: " << PerIoData->databuff.buf << endl;
    ReleaseMutex(hMutex);

    // 为下一个重叠调用建立单I/O操作数据
    ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED)); // 清空内存
    PerIoData->databuff.len = 1024;
    PerIoData->databuff.buf = PerIoData->buffer;
    PerIoData->operationType = 0; // read
    WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
    }

    return 0;
    }

    DWORD WINAPI ServerSendThread(LPVOID IpParam)
    //发送信息的执行函数
    {
    while (1){
    char talk[200];
    gets_s(talk);
    int len;
    for (len = 0; talk[len] != ''; ++len);
    talk[len] = ' ';
    talk[++len] = '';
    printf("I Say:");
    cout << talk << g_clientGroup.size();

    WaitForSingleObject(hMutex, INFINITE);
    for (int i = 0; i < g_clientGroup.size(); ++i){
    send(g_clientGroup[i]->socket, talk, 200, 0); // 发送信息
    }
    ReleaseMutex(hMutex);
    }
    return 0;
    }

    DWORD WINAPI AddClient(LPVOID IpParam)
    //向服务器添加客户端
    {
    while (true){
    PER_HANDLE_DATA * PerHandleData = NULL;
    SOCKADDR_IN saRemote;
    int RemoteLen;
    SOCKET acceptSocket;

    // 接收连接,并分配完成端,这儿可以用AcceptEx()
    RemoteLen = sizeof(saRemote);
    acceptSocket = accept(serverSocket, (SOCKADDR*)&saRemote, &RemoteLen);
    if (SOCKET_ERROR == acceptSocket){ // 接收客户端失败
    cerr << "接收客户端失败:" << GetLastError() << endl;
    return -1;
    }

    // 创建用来和套接字关联的单句柄数据信息结构
    PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA)); // 在堆中为这个PerHandleData申请指定大小的内存
    PerHandleData->socket = acceptSocket;
    memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen);
    g_clientGroup.push_back(PerHandleData); // 将单个客户端数据指针放到客户端组中

    // 将接受套接字和完成端口关联(socket/完成端口/传递给处理函数的参数,多少个线程访问完成端口0是和处理器的数量相同)
    CreateIoCompletionPort((HANDLE)(PerHandleData->socket), CompletionPort, (DWORD)PerHandleData, 0);

    // 开始在接受套接字上处理I/O使用重叠I/O机制
    // 在新建的套接字上投递一个或多个异步
    // WSARecv或WSASend请求,这些I/O请求完成后,工作者线程会为I/O请求提供服务
    // 单I/O操作数据(I/O重叠)
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATEION_DATA));
    ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
    PerIoData->databuff.len = 1024;
    PerIoData->databuff.buf = PerIoData->buffer;
    PerIoData->operationType = 0; // read

    DWORD RecvBytes;
    DWORD Flags = 0;
    WSARecv(PerHandleData->socket, &(PerIoData->databuff), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL);
    //参数:socket,buff,buff的数目,指向所接收字节数的指针,指向标志位的指针,重叠指针,结束调用例程指针
    }
    }

    BOOL DelClient(vector<PER_HANDLE_DATA*>&clientGroup, SOCKET& clientSocket)
    {
    for (int i = 0; i < clientGroup.size(); i++)
    {
    if (clientGroup[i]->socket == clientSocket)
    {
    clientGroup.erase(clientGroup.begin() + i);
    cout << "有客户端发生退出 ";
    return TRUE;
    }
    }
    return FALSE;
    }

     【客户端源代码】

    #include "stdafx.h"

    #include <iostream>
    #include <cstdio>
    #include <string>
    #include <cstring>
    #include <winsock2.h>
    #include <Windows.h>

    using namespace std;

    #pragma comment(lib, "Ws2_32.lib") // Socket编程需用的动态链接库

    SOCKET sockClient; // 连接成功后的套接字
    HANDLE bufferMutex; // 令其能互斥成功正常通信的信号量句柄
    const int DefaultPort = 6000;

    int _tmain(int argc, _TCHAR* argv[])
    {
    // 加载socket动态链接库(dll)
    WORD wVersionRequested;
    WSADATA wsaData; // 这结构是用于接收Wjndows Socket的结构信息的
    wVersionRequested = MAKEWORD(2, 2); // 请求2.2版本的WinSock库
    int err = WSAStartup(wVersionRequested, &wsaData);
    if (err != 0) { // 返回值为零的时候是表示成功申请WSAStartup
    return -1;
    }
    if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { // 检查版本号是否正确
    WSACleanup();
    return -1;
    }

    // 创建socket操作,建立流式套接字,返回套接字号sockClient
    sockClient = socket(AF_INET, SOCK_STREAM, 0);
    if (sockClient == INVALID_SOCKET) {
    printf("Error at socket():%ld ", WSAGetLastError());
    WSACleanup();
    return -1;
    }

    // 将套接字sockClient与远程主机相连
    // int connect( SOCKET s, const struct sockaddr* name, int namelen);
    // 第一个参数:需要进行连接操作的套接字
    // 第二个参数:设定所需要连接的地址信息
    // 第三个参数:地址的长度
    SOCKADDR_IN addrSrv;
    addrSrv.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); // 本地回路地址是127.0.0.1;
    addrSrv.sin_family = AF_INET;
    addrSrv.sin_port = htons(DefaultPort);
    while (SOCKET_ERROR == connect(sockClient, (SOCKADDR*)&addrSrv, sizeof(SOCKADDR))){
    // 如果还没连接上服务器则要求重连
    cout << "服务器连接失败,是否重新连接?(Y/N):";
    char choice;
    while (cin >> choice && (!((choice != 'Y' && choice == 'N') || (choice == 'Y' && choice != 'N')))){
    cout << "输入错误,请重新输入:";
    cin.sync();
    cin.clear();
    }
    if (choice == 'Y'){
    continue;
    }
    else{
    cout << "退出系统中...";
    system("pause");
    return 0;
    }
    }
    cin.sync();
    cout << " 客户端用户已准备完毕,可直接输入向服务器反馈消息. ";

    send(sockClient, " Attention: A Client has enter... ", 200, 0);

    bufferMutex = CreateSemaphore(NULL, 1, 1, NULL);

    DWORD WINAPI SendMessageThread(LPVOID IpParameter);
    DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter);

    HANDLE sendThread = CreateThread(NULL, 0, SendMessageThread, NULL, 0, NULL);
    HANDLE receiveThread = CreateThread(NULL, 0, ReceiveMessageThread, NULL, 0, NULL);


    WaitForSingleObject(sendThread, INFINITE); // 等待线程结束
    closesocket(sockClient);
    CloseHandle(sendThread);
    CloseHandle(receiveThread);
    CloseHandle(bufferMutex);
    WSACleanup(); // 终止对套接字库的使用

    printf("End linking... ");
    printf(" ");
    system("pause");

    return 0;
    }

    DWORD WINAPI SendMessageThread(LPVOID IpParameter)
    {
    while (1){
    string talk;
    getline(cin, talk);
    WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)
    if ("quit" == talk){
    talk.push_back('');
    send(sockClient, talk.c_str(), 200, 0);
    break;
    }
    else{
    talk.append(" ");
    }
    printf(" 客户端数据消息(输入quit)退出程序:");
    cout << talk;
    send(sockClient, talk.c_str(), 200, 0); // 发送信息
    ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
    }
    return 0;
    }


    DWORD WINAPI ReceiveMessageThread(LPVOID IpParameter)
    {
    while (1){
    char recvBuf[300];
    recv(sockClient, recvBuf, 200, 0);
    WaitForSingleObject(bufferMutex, INFINITE); // P(资源未被占用)

    printf("%s Says: %s", "Server", recvBuf); // 接收信息

    ReleaseSemaphore(bufferMutex, 1, NULL); // V(资源占用完毕)
    }
    return 0;
    }

    【运行效果如下】

  • 相关阅读:
    JavaScript 开发进阶:理解 JavaScript 作用域和作用域链
    jquery插件 源码
    webkit浏览器渲染影响因素分析
    JS操作JSON总结
    转:ie6与firefox操作iframe中DOM节点的一点不同
    input:focus
    JS设计模式一:单例模式
    JavaScript 变量、函数与原型链
    让你提前认识软件开发(49):自己主动測试
    Docker 开源管理工具集锦
  • 原文地址:https://www.cnblogs.com/chinasirius/p/12076756.html
Copyright © 2011-2022 走看看