zoukankan      html  css  js  c++  java
  • 完成端口之二:服务器代码

    一:服务器代码:

    1.ServerDlg.cpp中启动或停止服务器

    CIOCPServer *iocp;
    void CServerDlg::OnBnClickedOk2()
    {
        iocp->StartServer(10000, AfxGetMainWnd());
    }

    2.   IOCPServer.h和IOCPServer.cpp

    IOCPServer.h:

    #pragma once
    #include<Mswsock.h>
    #include<windows.h>
    
    #define MAX_BUF_SIZE 4096
    #define WM_SHOW_MSG WM_USER+100
    
    //    I/O类型
    enum IO_TYPE
    {
        IO_TYPE_ACCEPT,
        IO_TYPE_READ,
        IO_TYPE_WRITE,
        IO_TYPE_UNKNOWN
    };
    
    //扩展重叠的结构
    class COverLappedEX
    {
    public:
        OVERLAPPED m_OLap;
        IO_TYPE m_IOType;     
        char m_szBuf[MAX_BUF_SIZE];
    
        COverLappedEX(IO_TYPE ioType)
        {
            ZeroMemory(&m_OLap, sizeof(OVERLAPPED));
            m_IOType = ioType;
            ZeroMemory(m_szBuf, MAX_BUF_SIZE);
        }
    
    private:
    
    };
    
    //单句柄数据, 包含监听Socket和接收的客户端信息
    class CPerSocketData
    {
    public:
        SOCKET m_Socket;
        SOCKET m_AccSocket;
    
        CPerSocketData()
        {
            m_Socket = INVALID_SOCKET;
            m_AccSocket = INVALID_SOCKET;
        }
        ~CPerSocketData()
        {}
    
    private:
    
    };
    
    class CIOCPServer
    {
    public:
        CIOCPServer(void);
        ~CIOCPServer(void);
    
    public:
        void SetLastErrorMsg(CString strErrorMsg);
        CString GetLastErrorMsg();
        BOOL WinSockInit();
        void AssociateWnd(CWnd *pWnd);
        BOOL StartServer(UINT uListenPort, CWnd *pWnd);
        //发送异步接收客户端连接请求
        BOOL PostAccept(CPerSocketData *pSockData);
        //发送异步接收客户端数据请求
        BOOL PostRecv(CPerSocketData *pSockData);
        BOOL PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes);
        //线程池的线程函数
        static DWORD WINAPI ThreadPoolProc(LPVOID lpParam);
        //关联某一个Socket到I/O完成端口队列
        CPerSocketData *AssignSockToCompletionPort(SOCKET tSocket);
        void StopServer();
    
    private:
        LONG m_ThreadNums;        //线程池工作线程的个数
        HWND m_pWnd;                //窗口句柄
        CString m_strErrorMsg;        
        SOCKET m_ListenSocket;        //监听Socket
        HANDLE m_hCompletionPort;    //完成端口对象句柄
        CList<CPerSocketData *, CPerSocketData *> m_ArrSocketData;
        CList<COverLappedEX *, COverLappedEX *> m_ArrOverLapEx;
    };

    IOCPServer.cpp:

    #include "stdafx.h"
    #include "IOCPServer.h"
    
    CIOCPServer::CIOCPServer()
    {
        m_ThreadNums = 0;
        //m_pWnd = NULL;
        m_ListenSocket = INVALID_SOCKET;
    }
    
    CIOCPServer::~CIOCPServer()
    {
        StopServer();
    }
    
    void CIOCPServer::AssociateWnd(CWnd *pWnd)
    {
        m_pWnd = *pWnd;
    }
    
    void CIOCPServer::SetLastErrorMsg(CString strErrorMsg)
    {
        m_strErrorMsg = strErrorMsg;
    }
    
    CString CIOCPServer::GetLastErrorMsg()
    {
        return m_strErrorMsg;
    }
    
    BOOL CIOCPServer::WinSockInit()
    {
        WSADATA data = { 0 };
        if (WSAStartup(MAKEWORD(2, 2), &data))
            return FALSE;
        if (LOBYTE(data.wVersion) != 2 || HIBYTE(data.wVersion) != 2)
        {
            WSACleanup();
            return FALSE;
        }
        return TRUE;
    }
    
    CPerSocketData * CIOCPServer::AssignSockToCompletionPort(SOCKET tSocket)
    {
        ASSERT(tSocket != INVALID_SOCKET);
        CPerSocketData *pSockData = new CPerSocketData();
        pSockData->m_Socket = tSocket;
        m_ArrSocketData.AddTail(pSockData);
        CreateIoCompletionPort((HANDLE)tSocket, m_hCompletionPort, (ULONG_PTR)pSockData, 0);
        return pSockData;
    }
    
    BOOL CIOCPServer::PostAccept(CPerSocketData *pSockData)
    {
        DWORD dwBytesRecv = 0;
        ASSERT(pSockData != NULL);
        COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_ACCEPT);
        m_ArrOverLapEx.AddTail(m_pOverLap);
        pSockData->m_AccSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
        BOOL bRet = AcceptEx(pSockData->m_Socket, pSockData->m_AccSocket, m_pOverLap->m_szBuf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesRecv, &m_pOverLap->m_OLap);
        if (!bRet)
        {
            if (WSAGetLastError() != WSA_IO_PENDING)
            {
                return FALSE;
            }
        }
        return TRUE;
    }
    
    BOOL CIOCPServer::PostRecv(CPerSocketData *pSockData)
    {
        ASSERT(pSockData != NULL);
        WSABUF wsaBuf = { 0 };
        COverLappedEX *m_pOverLap = new COverLappedEX(IO_TYPE_READ);
        m_ArrOverLapEx.AddTail(m_pOverLap);
        wsaBuf.buf = m_pOverLap->m_szBuf;
        wsaBuf.len = MAX_BUF_SIZE;
        DWORD dwBytesRecv = 0, dwFlags = 0;
        int iRet = WSARecv(pSockData->m_Socket, &wsaBuf, 1, &dwBytesRecv, &dwFlags, &(m_pOverLap->m_OLap), NULL);
        if (!iRet)
        {
            if (WSAGetLastError() != WSA_IO_PENDING)
            {
                return FALSE;
            }
        }
        return TRUE;
    }
    
    BOOL CIOCPServer::PostSend(LPCTSTR lpszText, DWORD dwSizeInBytes)
    {
        DWORD dwSend = 0;
        CPerSocketData *pSocketData = NULL;
        COverLappedEX *pOverLapEx = NULL;
    
        POSITION pos = m_ArrSocketData.GetHeadPosition();
        while (pos != NULL)
        {
            pSocketData = m_ArrSocketData.GetNext(pos);
            if (pSocketData->m_Socket != m_ListenSocket)
            {
                pOverLapEx = new COverLappedEX(IO_TYPE_WRITE);
                memcpy_s(pOverLapEx->m_szBuf, MAX_BUF_SIZE, lpszText, dwSizeInBytes);
                WSABUF wsaBuf = { 0 };
                wsaBuf.buf = pOverLapEx->m_szBuf;
                wsaBuf.len = dwSizeInBytes;
                m_ArrOverLapEx.AddTail(pOverLapEx);
                WSASend(pSocketData->m_Socket, &wsaBuf, 1, &dwSend, 0, &(pOverLapEx->m_OLap), NULL);
            }
        }
        return TRUE;
    }
    
    DWORD WINAPI CIOCPServer::ThreadPoolProc(LPVOID lpParam)
    {
        COverLappedEX *pOverLaps = NULL;
        CPerSocketData *pPerSockData = NULL;
        CIOCPServer *pThis = (CIOCPServer *)lpParam;
        ASSERT(pThis != NULL);
    
        //工作线程+1
        InterlockedIncrement(&pThis->m_ThreadNums);
        BOOL bIORet = FALSE;
        DWORD dwTrans = 0;
        
        while (TRUE)
        {
            bIORet = GetQueuedCompletionStatus(pThis->m_hCompletionPort, &dwTrans, (PULONG_PTR)&pPerSockData, (LPOVERLAPPED *)&pOverLaps, INFINITE);
            
            //客户端退出
            if (dwTrans == 0 && (pOverLaps->m_IOType == IO_TYPE_READ || pOverLaps->m_IOType == IO_TYPE_WRITE))
            {
                closesocket(pPerSockData->m_Socket);
                pThis->m_ArrSocketData.RemoveAt(pThis->m_ArrSocketData.Find(pPerSockData));
                delete pPerSockData;
                pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                delete pOverLaps;
                continue;
            }
            if (bIORet&&pOverLaps&&pPerSockData)
            {
                switch (pOverLaps->m_IOType)
                {
                case IO_TYPE_READ:
                {
                    //pThis->m_pWnd->SendMessage(WM_SHOW_MSG, 0, (LPARAM)pOverLaps->m_szBuf);
                    //CString* msg = new CString(pOverLaps->m_szBuf);
                    CString str((char*)pOverLaps->m_szBuf);
                    ::SendMessage(pThis->m_pWnd, WM_SHOW_MSG, (WPARAM)pOverLaps->m_szBuf, (LPARAM)pOverLaps->m_szBuf);
                    pThis->PostRecv(pPerSockData);
                    pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                    delete pOverLaps;
                    break;
                }
                case IO_TYPE_WRITE:
                {
                    pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                    delete pOverLaps;
                    break;
                }
                case IO_TYPE_ACCEPT:
                {
                    if (pPerSockData->m_AccSocket == INVALID_SOCKET)
                        continue;
                    //新接收的客户端与完成端口关联到一起
                    CPerSocketData *pData = pThis->AssignSockToCompletionPort(pPerSockData->m_AccSocket);
                    //新来一个客户端,激活一个工作线程,最多激活与电脑CPU个数一样多
                    QueueUserWorkItem(ThreadPoolProc, pThis, WT_EXECUTELONGFUNCTION);
                    pThis->PostRecv(pData);
                    pThis->PostAccept(pPerSockData);
                    pThis->m_ArrOverLapEx.RemoveAt(pThis->m_ArrOverLapEx.Find(pOverLaps));
                    delete pOverLaps;
                    break;
                }
                default:
                    break;
                }
            }
            else if (!pOverLaps&&!pPerSockData)
            {
                //exit the thread   服务器要关闭
                break;
            }
    
        }
        InterlockedDecrement(&pThis->m_ThreadNums);
        return TRUE;
    }
    
    BOOL CIOCPServer::StartServer(UINT uListenPort, CWnd *pWnd)
    {
        m_pWnd = *pWnd;
        if (!WinSockInit())
        {
            SetLastErrorMsg(TEXT("Socket库初始化失败!"));
            return FALSE;
        }
    
        m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
        if (m_ListenSocket == INVALID_SOCKET)
        {
            SetLastErrorMsg(TEXT("新建Socket失败!"));
            return FALSE;
        }
    
        sockaddr_in service;
        service.sin_family = AF_INET;
        service.sin_addr.s_addr = INADDR_ANY;
        service.sin_port = htons(uListenPort);
    
        if (bind(m_ListenSocket, (sockaddr *)&service, sizeof(sockaddr_in)) == SOCKET_ERROR)
        {
            SetLastErrorMsg(TEXT("绑定端口失败!"));
            goto _Error_End;
        }
    
        if (listen(m_ListenSocket, SOMAXCONN) == SOCKET_ERROR)
        {
            SetLastErrorMsg(TEXT("监听失败!"));
            goto _Error_End;
        }
    
        m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
        if (m_hCompletionPort == NULL)
        {
            SetLastErrorMsg(TEXT("完成端口创建失败!"));
            goto _Error_End;
        }
    
        QueueUserWorkItem(ThreadPoolProc, this, WT_EXECUTELONGFUNCTION);
        CPerSocketData *pSockData = AssignSockToCompletionPort(m_ListenSocket);
        
        PostAccept(pSockData);
        goto _Error_End;
    
    _Error_End:
        /*if (m_ListenSocket != NULL)
            closesocket(m_ListenSocket);
        if (m_hCompletionPort != NULL)
            CloseHandle(m_hCompletionPort);
    
        WSACleanup();*/
        return TRUE;
    }
    
    void CIOCPServer::StopServer()
    {
        CPerSocketData *pSocketData = NULL;
        POSITION pos = m_ArrSocketData.GetHeadPosition();
    
        while (pos != NULL)
        {
            pSocketData = m_ArrSocketData.GetNext(pos);
            closesocket(pSocketData->m_Socket);
            delete pSocketData;
        }
        m_ArrSocketData.RemoveAll();
    
        COverLappedEX *pOverLap = NULL;
        pos = m_ArrOverLapEx.GetHeadPosition();
    
        while (pos != NULL)
        {
            pOverLap = m_ArrOverLapEx.GetNext(pos);
            delete pOverLap;
        }
        m_ArrSocketData.RemoveAll();
    
        while (m_ThreadNums>0)
        {
            PostQueuedCompletionStatus(m_hCompletionPort, 0, 0, NULL);
            Sleep(100);
        }
    
        if (m_hCompletionPort != NULL)
            CloseHandle(m_hCompletionPort);
    
        WSACleanup();
    }

    二   运行结果:

    111
  • 相关阅读:
    react typescript 子组件调用父组件
    Mongodb query查询
    CentOS虚拟机不能联网状况下yum方式从本地安装软件包(转载的)
    CentOS6.5 mini开启网络
    MySQL的InnoDB表如何设计主键索引-转自淘宝MySQL经典案例
    linux下安装mysql-community后起不来
    Eclipse 快捷键
    maven下载jta失败,自己本地安装jta库
    spring配置文件中id与name
    @autowired和@resource的区别
  • 原文地址:https://www.cnblogs.com/zwj-199306231519/p/13946630.html
Copyright © 2011-2022 走看看