zoukankan      html  css  js  c++  java
  • IOCP模型源代码学习

    首先是头文件:

    IOCPServer.h
      1 // IOCPServer.h: interface for the CIOCPServer class.
    2
    3 //我们要做的就是建立一个IOCP,把远程连接的socket句柄绑定到刚才创建的IOCP上,最后创建n个线程,并告诉这n个线程到这个IOCP上去访问数据就可以了。
    4 //
    5 //////////////////////////////////////////////////////////////////////
    6
    7 #if !defined(AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_)
    8 #define AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_
    9
    10 #if _MSC_VER > 1000
    11 #pragma once
    12 #endif // _MSC_VER > 1000
    13
    14 //以上预编译命令代表什么意思呢?
    15
    16 #include <winsock2.h>
    17 #include <MSTcpIP.h>
    18 #pragma comment(lib,"ws2_32.lib") //加载winSock2库文件
    19 #include "Buffer.h" //加载头文件
    20 #include "CpuUsage.h"
    21
    22
    23 #include <process.h>
    24
    25 #include <afxtempl.h>
    26
    27
    28
    29 ////////////////////////////////////////////////////////////////////
    30 //通知代码宏定义
    31 //NotifyCode
    32 #define NC_CLIENT_CONNECT 0x0001
    33 #define NC_CLIENT_DISCONNECT 0x0002
    34 #define NC_TRANSMIT 0x0003
    35 #define NC_RECEIVE 0x0004
    36 #define NC_RECEIVE_COMPLETE 0x0005 // 完整接收
    37
    38 //##ModelId=4DF8B1EC039B
    39 //互斥锁对象
    40 class CLock
    41 {
    42 protected:
    43 //##ModelId=4DF8B1EC03BB
    44 CRITICAL_SECTION* m_pcs;
    45 //##ModelId=4DF8B1EC03BC
    46 CString m_strFunc;
    47 public:
    48 //##ModelId=4DF8B1EC03AB
    49 CLock(CRITICAL_SECTION& cs, const CString& strFunc)
    50 {
    51 m_strFunc = strFunc;
    52 m_pcs = &cs;
    53 Lock();
    54 }
    55 //##ModelId=4DF8B1EC03AE
    56 ~CLock()
    57 {
    58 Unlock();
    59
    60 }
    61 //##ModelId=4DF8B1EC03B0
    62
    63 void Lock()
    64 {
    65 TRACE(_T("EC %d %s\n") , GetCurrentThreadId(), m_strFunc);
    66 EnterCriticalSection(m_pcs);
    67 }
    68 //##ModelId=4DF8B1EC03AF
    69 void Unlock()
    70 {
    71 LeaveCriticalSection(m_pcs);
    72 TRACE(_T("LC %d %s\n") , GetCurrentThreadId() , m_strFunc);
    73 }
    74 };
    75
    76 //##ModelId=4DF8B1EC03CA
    77 //枚举类型,IO初始化,IO读,IO写,IO空闲
    78 //定义了消息类型
    79 enum IOType
    80 {
    81 //##ModelId=4DF8B1EC03DA
    82 IOInitialize,
    83 //##ModelId=4DF8B1EC03DB
    84 IORead,
    85 //##ModelId=4DF8B1EC03DC
    86 IOWrite,
    87 //##ModelId=4DF8B1ED0001
    88 IOIdle
    89 };
    90
    91
    92 //##ModelId=4DF8B1ED0030
    93 //缓冲区,由server负责接收数据,接收的数据存在缓存区,等接收完毕通知应用程序(目前的理解)
    94 //在原有OVERLAPPED的基础上扩展,加上IO类型,封装为OVERLAPPEDPLUS
    95 class OVERLAPPEDPLUS
    96 {
    97 public:
    98 //##ModelId=4DF8B1ED0032
    99 OVERLAPPED m_ol;
    100 //##ModelId=4DF8B1ED0041
    101 IOType m_ioType;
    102
    103 //##ModelId=4DF8B1ED0045
    104 OVERLAPPEDPLUS(IOType ioType) {
    105 ZeroMemory(this, sizeof(OVERLAPPEDPLUS));
    106 m_ioType = ioType;
    107 }
    108 };
    109
    110
    111
    112
    113 //##ModelId=4DF8B1ED005F
    114 //客户端上下文
    115 struct ClientContext
    116 {
    117 //##ModelId=4DF8B1ED0062
    118 SOCKET m_Socket;
    119 // Store buffers
    120 //##ModelId=4DF8B1ED0070
    121 CBuffer m_WriteBuffer;
    122 //##ModelId=4DF8B1ED0075
    123 CBuffer m_CompressionBuffer; // 接收到的压缩的数据
    124 //##ModelId=4DF8B1ED007A
    125 CBuffer m_DeCompressionBuffer; // 解压后的数据
    126 //##ModelId=4DF8B1ED0081
    127 CBuffer m_ResendWriteBuffer; // 上次发送的数据包,接收失败时重发时用
    128
    129 //##ModelId=4DF8B1ED0085
    130 int m_Dialog[2]; // 放对话框列表用,第一个int是类型(dlg的id),第二个是CDialog的地址
    131 //##ModelId=4DF8B1ED008E
    132 int m_nTransferProgress;
    133 // Input Elements for Winsock
    134 //##ModelId=4DF8B1ED0090
    135 WSABUF m_wsaInBuffer;
    136 /*
    137 typedef struct _WSABUF {
    138 u_long len; // the length of the buffer
    139 char FAR * buf; // the pointer to the buffer
    140 } WSABUF, FAR * LPWSABUF;
    141 */
    142 //##ModelId=4DF8B1ED0094
    143 BYTE m_byInBuffer[8192];
    144
    145 // Output elements for Winsock
    146 //##ModelId=4DF8B1ED009E
    147 WSABUF m_wsaOutBuffer;
    148 //##ModelId=4DF8B1ED00A2
    149 HANDLE m_hWriteComplete;
    150
    151 // Message counts... purely for example purposes
    152 //##ModelId=4DF8B1ED00AD
    153 LONG m_nMsgIn;
    154 //##ModelId=4DF8B1ED00AE
    155 LONG m_nMsgOut;
    156
    157 //##ModelId=4DF8B1ED00AF
    158 BOOL m_bIsMainSocket; // 是不是主socket
    159
    160 //##ModelId=4DF8B1ED00C0
    161 ClientContext* m_pWriteContext;
    162 //##ModelId=4DF8B1ED00C5
    163 ClientContext* m_pReadContext;
    164 };
    165
    166 //返回右移4位的hash值
    167 //Windows Data Types
    168 template<>
    169 inline UINT AFXAPI HashKey(CString & strGuid)
    170 {
    171 return HashKey( (LPCTSTR) strGuid);
    172 }
    173
    174 //Mapper.h到底用来干啥呢?(貌似用于消息循环)
    175 #include "Mapper.h"
    176
    177 //以下typedef表示啥呢?
    178 //NOTIFYPROC指向函数的指针
    179 //##ModelId=4DF8B1ED00CC
    180 typedef void (CALLBACK* NOTIFYPROC)(LPVOID, ClientContext*, UINT nCode);
    181
    182 //##ModelId=4DF8B1ED00DD
    183 typedef CList<ClientContext*, ClientContext* > ContextList; //这个是ClientContext列表
    184
    185
    186 //这是什么意思呢?
    187 class CMainFrame;
    188
    189 //##ModelId=4DF8B1ED0149
    190 class CIOCPServer
    191 {
    192 public:
    193 //##ModelId=4DF8B1ED0159
    194 void DisconnectAll();
    195 //##ModelId=4DF8B1ED015A
    196 CIOCPServer();
    197 //##ModelId=4DF8B1ED0168
    198 virtual ~CIOCPServer();
    199
    200 //##ModelId=4DF8B1ED016B
    201 //回调函数
    202 NOTIFYPROC m_pNotifyProc;
    203 //##ModelId=4DF8B1ED0179
    204 CMainFrame* m_pFrame;
    205
    206 //##ModelId=4DF8B1ED017D
    207 //初始化函数Initialize
    208 //注册了一个回调函数 m_pNotifyProc ,
    209 //创建了一个监听套接字,一个监听线程 ListenThreadProc ,
    210 //然后初始化 IOCP 服务端
    211 bool Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort);
    212
    213
    214 //__stdcall是什么意思呢?
    215 //##ModelId=4DF8B1ED0197
    216
    217 //对于IOCPServer对象Initialize方法里创建的监听套接字所对应的监听线程
    218 static unsigned __stdcall ListenThreadProc(LPVOID lpVoid);
    219 //##ModelId=4DF8B1ED019A
    220
    221 //线程池?
    222 static unsigned __stdcall ThreadPoolFunc(LPVOID WorkContext);
    223 //##ModelId=4DF8B1ED01A9
    224 //关键代码区
    225 static CRITICAL_SECTION m_cs;
    226
    227 //##ModelId=4DF8B1ED01B6
    228 //发送数据
    229 void Send(ClientContext* pContext, LPBYTE lpData, UINT nSize);
    230 //##ModelId=4DF8B1ED01C7
    231 //接收数据
    232 void PostRecv(ClientContext* pContext);
    233
    234 //##ModelId=4DF8B1ED01C9
    235 bool IsRunning();
    236 //##ModelId=4DF8B1ED01D5
    237 void Shutdown();
    238 //##ModelId=4DF8B1ED01D6
    239 //重置连接
    240 void ResetConnection(ClientContext* pContext);
    241
    242 //##ModelId=4DF8B1ED01D8
    243 //当前连接进程数
    244 LONG m_nCurrentThreads;
    245 //##ModelId=4DF8B1ED01E5
    246 //当前忙进程数
    247 LONG m_nBusyThreads;
    248
    249
    250 //##ModelId=4DF8B1ED01E6
    251 UINT m_nSendKbps; // 发送即时速度
    252 //##ModelId=4DF8B1ED01F5
    253 UINT m_nRecvKbps; // 接受即时速度
    254 //##ModelId=4DF8B1ED01F6
    255 UINT m_nMaxConnections; // 最大连接数
    256 protected:
    257 //##ModelId=4DF8B1ED01F7
    258 void InitializeClientRead(ClientContext* pContext);
    259 //##ModelId=4DF8B1ED0204
    260 BOOL AssociateSocketWithCompletionPort(SOCKET device, HANDLE hCompletionPort, DWORD dwCompletionKey);
    261 //##ModelId=4DF8B1ED0208
    262 void RemoveStaleClient(ClientContext* pContext, BOOL bGraceful);
    263 //##ModelId=4DF8B1ED0216
    264 void MoveToFreePool(ClientContext *pContext);
    265 //##ModelId=4DF8B1ED0223
    266 ClientContext* AllocateContext();
    267
    268 //##ModelId=4DF8B1ED0224
    269 LONG m_nWorkerCnt;
    270
    271 //##ModelId=4DF8B1ED0225
    272 //bool变量,标识IOCPServer是否初始化成功
    273 bool m_bInit;
    274 //##ModelId=4DF8B1ED0233
    275 bool m_bDisconnectAll;
    276 //##ModelId=4DF8B1ED0234
    277 BYTE m_bPacketFlag[5];
    278 //##ModelId=4DF8B1ED0243
    279 void CloseCompletionPort();
    280 //##ModelId=4DF8B1ED0244
    281 void OnAccept();
    282 //##ModelId=4DF8B1ED0252
    283 bool InitializeIOCP(void);
    284 //##ModelId=4DF8B1ED0254
    285 void Stop();
    286
    287 //##ModelId=4DF8B1ED0263
    288 ContextList m_listContexts;
    289 //##ModelId=4DF8B1ED0272
    290 ContextList m_listFreePool;
    291 //##ModelId=4DF8B1ED0277
    292 WSAEVENT m_hEvent;
    293 //##ModelId=4DF8B1ED0282
    294 SOCKET m_socListen;
    295 //##ModelId=4DF8B1ED0286
    296 HANDLE m_hKillEvent;
    297 //##ModelId=4DF8B1ED0287
    298 //监听线程
    299 HANDLE m_hThread;
    300 //##ModelId=4DF8B1ED0291
    301 //完成端口
    302 HANDLE m_hCompletionPort;
    303 //##ModelId=4DF8B1ED0292
    304 bool m_bTimeToKill;
    305 //##ModelId=4DF8B1ED02A1
    306 CCpuUsage m_cpu;
    307
    308 //##ModelId=4DF8B1ED02A5
    309 LONG m_nKeepLiveTime; // 心跳超时
    310
    311 // Thread Pool Tunables
    312 //##ModelId=4DF8B1ED02B0
    313 //线程池最小线程数
    314 LONG m_nThreadPoolMin;
    315 //##ModelId=4DF8B1ED02B1
    316 //线程池最大线程数
    317 LONG m_nThreadPoolMax;
    318 //##ModelId=4DF8B1ED02BF
    319 LONG m_nCPULoThreshold;
    320 //##ModelId=4DF8B1ED02C0
    321 LONG m_nCPUHiThreshold;
    322
    323
    324 //##ModelId=4DF8B1ED02C1
    325 CString GetHostName(SOCKET socket);
    326
    327 //##ModelId=4DF8B1ED02D0
    328 void CreateStream(ClientContext* pContext);
    329
    330 //以下消息循环定义详见mapper.h头文件之定义
    331 BEGIN_IO_MSG_MAP()
    332 IO_MESSAGE_HANDLER(IORead, OnClientReading)
    333 IO_MESSAGE_HANDLER(IOWrite, OnClientWriting)
    334 IO_MESSAGE_HANDLER(IOInitialize, OnClientInitializing)
    335 END_IO_MSG_MAP()
    336
    337 /*
    338 我来试着翻一下:(根据Mapper.h)
    339 BEGIN_IO_MSG_MAP()
    340 IO_MESSAGE_HANDLER(IORead, OnClientReading)
    341 IO_MESSAGE_HANDLER(IOWrite, OnClientWriting)
    342 IO_MESSAGE_HANDLER(IOInitialize, OnClientInitializing)
    343 END_IO_MSG_MAP()
    344
    345 public:
    346 bool ProcessIOMessage(IOType clientIO,ClientContext * pContext,DWORD dwSize = 0)
    347 {
    348 bool bRet = false;
    349
    350 if(IORead == clientIO)
    351 bRet = OnClentReading(pContext,dwSize);
    352 if(IOWrite == clientIO)
    353 bRet = OnClientWriting(pContext,dwSize);
    354 if(IOInitialize == clientIO)
    355 bRet = OnClientInitializing(pContext,dwSize);
    356
    357 return bRet;
    358
    359 }
    360
    361 */
    362
    363 //##ModelId=4DF8B1ED02DF
    364 bool OnClientInitializing (ClientContext* pContext, DWORD dwSize = 0);
    365 //##ModelId=4DF8B1ED02E2
    366 bool OnClientReading (ClientContext* pContext, DWORD dwSize = 0);
    367 //##ModelId=4DF8B1ED02F0
    368 bool OnClientWriting (ClientContext* pContext, DWORD dwSize = 0);
    369
    370 };
    371
    372 #endif // !defined(AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_)

    其次是cpp文件:

    IOCPServer.cpp1
    // IOCPServer.cpp: implementation of the CIOCPServer class.
    //
    //////////////////////////////////////////////////////////////////////

    #include "stdafx.h"
    #include "IOCPServer.h"
    #include "../MainFrm.h"

    //提供文件压缩的库
    #include "zlib/zlib.h"

    #ifdef _DEBUG
    #undef THIS_FILE
    static char THIS_FILE[]=__FILE__;
    #define new DEBUG_NEW
    #endif

    // Change at your Own Peril

    // 'G' 'h' '0' 's' 't' | PacketLen | UnZipLen
    #define HDR_SIZE 13
    #define FLAG_SIZE 5
    #define HUERISTIC_VALUE 2
    //##ModelId=4DF8B1ED01A9
    CRITICAL_SECTION CIOCPServer::m_cs; //静态成员变量


    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////


    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: CIOCPServer::CIOCPServer
    //
    // DESCRIPTION: C'tor initializes Winsock2 and miscelleanous events etc.
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED015A

    //构造函数,一些初始化工作
    CIOCPServer::CIOCPServer()
    {
    //用于debug
    TRACE("CIOCPServer=%p\n",this);

    // WSAStartup用于加载套接字,WSADATA用于存储winsock库版本的有关信息
    //MAKEWORD(2,2)用于加载2.2版本的winsock
    WSADATA wsaData;
    WSAStartup(MAKEWORD(2,2), &wsaData);

    //初始化关键代码段(作用类似于创建电话亭的门)
    InitializeCriticalSection(&m_cs);

    m_hThread = NULL;
    m_hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL); //无名事件,初始无信号,需手动重置状态。
    m_socListen = NULL;

    m_bTimeToKill = false;
    m_bDisconnectAll = false;

    m_hEvent = NULL;
    m_hCompletionPort= NULL;

    m_bInit = false;
    m_nCurrentThreads = 0;
    m_nBusyThreads = 0;

    m_nSendKbps = 0;
    m_nRecvKbps = 0;

    m_nMaxConnections = 10000;
    m_nKeepLiveTime = 1000 * 60 * 3; // 三分钟探测一次
    // Packet Flag;
    BYTE bPacketFlag[] = {'G', 'h', '0', 's', 't'};
    memcpy(m_bPacketFlag, bPacketFlag, sizeof(bPacketFlag));
    }


    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: CIOCPServer::CIOCPServer
    //
    // DESCRIPTION: Tidy up
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED0168

    //析构函数
    CIOCPServer::~CIOCPServer()
    {
    try
    {
    Shutdown();//关闭服务器
    WSACleanup();//关闭套接字扫尾工作
    }catch(...){}
    }

    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: Init
    //
    // DESCRIPTION: Starts listener into motion
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED017D

    //IOCPServer初始化函数

    /*
    Initialize 注册了一个回调函数 m_pNotifyProc ,
    创建了一个监听套接字,
    一个监听线程 ListenThreadProc ,
    然后初始化 IOCP 服务端
    */

    bool CIOCPServer::Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort)
    {
    m_pNotifyProc = pNotifyProc; //回调函数赋值
    m_pFrame = pFrame; //指向CMainFrame指针,获取主界面框架指针
    m_nMaxConnections = nMaxConnections;

    //创建套接字
    m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    //AF_INET -> IPV4
    //SOCK_STREAM -> 流式套接字
    //0 -> 自动选择协议


    if (m_socListen == INVALID_SOCKET)
    {
    TRACE(_T("Could not create listen socket %ld\n"),WSAGetLastError());
    return false;
    }

    ////////////////////////////
    //////////////////
    /////////
    // Event for handling Network IO
    //m_hEvent WSAEVENT
    //创建一个新的事件对象
    m_hEvent = WSACreateEvent();

    if (m_hEvent == WSA_INVALID_EVENT)
    {
    TRACE(_T("WSACreateEvent() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }

    // The listener is ONLY interested in FD_ACCEPT
    // That is when a client connects to or IP/Port
    // Request async notification

    //事件与监听套接字绑定
    //这个代码规定了事件句柄m_hEvent响应FD_ACCEPT事件。
    //也就是说当m_socListen套接字accept时,m_hEvent有信号。
    int nRet = WSAEventSelect(m_socListen,
    m_hEvent,
    FD_ACCEPT);

    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("WSAAsyncSelect() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }
    /////////////////////////////////////
    ///////////////
    //////
    SOCKADDR_IN saServer;
    // Listen on our designated Port#
    saServer.sin_port = htons(nPort);

    // Fill in the rest of the address structure
    saServer.sin_family = AF_INET;
    saServer.sin_addr.s_addr = htonl(INADDR_ANY);

    // 将端口绑定到套接字上
    nRet = bind(m_socListen,
    (LPSOCKADDR)&saServer,
    sizeof(struct sockaddr));

    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("bind() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }

    // Set the socket to listen
    nRet = listen(m_socListen, SOMAXCONN);//最大合理值
    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("listen() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }


    ////////////////////////////////////////////////////////////////////////////////////////
    ////////////////////////////////////////////////////////////////////////////////////////
    //监听端口后,启用一个监听线程 ListenThreadProc
    UINT dwThreadId = 0;

    m_hThread =
    (HANDLE)_beginthreadex(NULL, // Security
    0, // Stack size - use default
    ListenThreadProc, // Thread fn entry point
    (void*) this, //将IOCPServer对象指针传给监听线程函数
    0, // Init flag
    &dwThreadId); // Thread address用于接收线程id

    if (m_hThread != INVALID_HANDLE_VALUE)
    {
    InitializeIOCP();
    m_bInit = true;
    return true;
    }

    return false;
    }
    IOCPServer.cpp2
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::ListenThreadProc
    4 //
    5 // DESCRIPTION: Listens for incoming clients
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 //
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED0197
    18
    19 //一个监听线程
    20 unsigned CIOCPServer::ListenThreadProc(LPVOID lParam)
    21 {
    22 CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam);
    23
    24 //This structure is used to store a socket's internal information about network events.
    25 //lNetworkEvents
    26 //Indicates which of the FD_XXX network events have occurred.
    27 //用于存储网络事件信息,其中events.lNetworkEvents记录了具体事件(FD_XXX的形式)
    28 WSANETWORKEVENTS events;
    29
    30 while(1)
    31 {
    32 //
    33 // Wait for something to happen
    34 //
    35 if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0) //(初始无信号)若有信号就跳出循环,结束监听
    36 break;
    37
    38 DWORD dwRet;
    39 dwRet = WSAWaitForMultipleEvents(1,
    40 &pThis->m_hEvent,
    41 FALSE,
    42 100,
    43 FALSE);
    44
    45 if (dwRet == WSA_WAIT_TIMEOUT)
    46 continue;
    47 //以上代码表示一直等待直到与m_hEvent绑定的事件发生
    48
    49 //
    50 // Figure out what happened
    51 //
    52 //事件发生了,到底是神马事件呢?
    53 int nRet = WSAEnumNetworkEvents(pThis->m_socListen,
    54 pThis->m_hEvent,
    55 &events);
    56
    57 if (nRet == SOCKET_ERROR)
    58 {
    59 TRACE(_T("WSAEnumNetworkEvents error %ld\n"),WSAGetLastError());
    60 break;
    61 }
    62
    63 // Handle Network events //
    64 // ACCEPT
    65 //if (events.lNetworkEvents == FD_ACCEPT 相比而言下面代码更高效
    66 if (events.lNetworkEvents & FD_ACCEPT)
    67 {
    68 if (events.iErrorCode[FD_ACCEPT_BIT] == 0) //如果木有发生错误
    69 pThis->OnAccept(); //异步IO模型,当accept时具体做啥事情呢,请看OnAccept函数
    70 else
    71 {
    72 TRACE(_T("Unknown network event error %ld\n"),WSAGetLastError());
    73 break;
    74 }
    75
    76 }
    77
    78 } // while....
    79
    80 return 0; // Normal Thread Exit Code...
    81 }
    82
    83 ////////////////////////////////////////////////////////////////////////////////
    84 //
    85 // FUNCTION: CIOCPServer::OnAccept
    86 //
    87 // DESCRIPTION: Listens for incoming clients
    88 //
    89 // INPUTS:
    90 //
    91 // NOTES:
    92 //
    93 // MODIFICATIONS:
    94 //
    95 // Name Date Version Comments
    96 // N T ALMOND 06042001 1.0 Origin
    97 // Ulf Hedlund 09072001 Changes for OVERLAPPEDPLUS
    98 ////////////////////////////////////////////////////////////////////////////////
    99 //##ModelId=4DF8B1ED0244
    100 void CIOCPServer::OnAccept()
    101 {
    102
    103 SOCKADDR_IN SockAddr;
    104 SOCKET clientSocket;
    105
    106 int nRet;
    107 int nLen;
    108
    109 if (m_bTimeToKill || m_bDisconnectAll)
    110 return;
    111
    112 //
    113 // accept the new socket descriptor
    114 //
    115 nLen = sizeof(SOCKADDR_IN);
    116 clientSocket = accept(m_socListen,
    117 (LPSOCKADDR)&SockAddr,
    118 &nLen);
    119
    120 if (clientSocket == SOCKET_ERROR)
    121 {
    122 nRet = WSAGetLastError();
    123 if (nRet != WSAEWOULDBLOCK)
    124 {
    125 //
    126 // Just log the error and return
    127 //
    128 TRACE(_T("accept() error\n"),WSAGetLastError());
    129 return;
    130 }
    131 }
    132
    133 // Create the Client context to be associted with the completion port
    134 ClientContext* pContext = AllocateContext();
    135 // AllocateContext fail
    136 if (pContext == NULL)
    137 return;
    138
    139 pContext->m_Socket = clientSocket;
    140
    141 // Fix up In Buffer
    142 pContext->m_wsaInBuffer.buf = (char*)pContext->m_byInBuffer;
    143 pContext->m_wsaInBuffer.len = sizeof(pContext->m_byInBuffer);
    144
    145 // Associate the new socket with a completion port.
    146 if (!AssociateSocketWithCompletionPort(clientSocket, m_hCompletionPort, (DWORD) pContext))
    147 {
    148 delete pContext;
    149 pContext = NULL;
    150
    151 closesocket( clientSocket );
    152 closesocket( m_socListen );
    153 return;
    154 }
    155
    156 // 关闭nagle算法,以免影响性能,因为控制时控制端要发送很多数据量很小的数据包,要求马上发送
    157 // 暂不关闭,实验得知能网络整体性能有很大影响
    158 const char chOpt = 1;
    159
    160 // int nErr = setsockopt(pContext->m_Socket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
    161 // if (nErr == -1)
    162 // {
    163 // TRACE(_T("setsockopt() error\n"),WSAGetLastError());
    164 // return;
    165 // }
    166
    167 // Set KeepAlive 开启保活机制
    168 if (setsockopt(pContext->m_Socket, SOL_SOCKET, SO_KEEPALIVE, (char *)&chOpt, sizeof(chOpt)) != 0)
    169 {
    170 TRACE(_T("setsockopt() error\n"), WSAGetLastError());
    171 }
    172
    173 // 设置超时详细信息
    174 tcp_keepalive klive;
    175 klive.onoff = 1; // 启用保活
    176 klive.keepalivetime = m_nKeepLiveTime;
    177 klive.keepaliveinterval = 1000 * 10; // 重试间隔为10秒 Resend if No-Reply
    178 WSAIoctl
    179 (
    180 pContext->m_Socket,
    181 SIO_KEEPALIVE_VALS,
    182 &klive,
    183 sizeof(tcp_keepalive),
    184 NULL,
    185 0,
    186 (unsigned long *)&chOpt,
    187 0,
    188 NULL
    189 );
    190
    191 CLock cs(m_cs, "OnAccept" );
    192 // Hold a reference to the context
    193 m_listContexts.AddTail(pContext);
    194
    195
    196 // Trigger first IO Completion Request
    197 // Otherwise the Worker thread will remain blocked waiting for GetQueuedCompletionStatus...
    198 // The first message that gets queued up is ClientIoInitializing - see ThreadPoolFunc and
    199 // IO_MESSAGE_HANDLER
    200
    201
    202 OVERLAPPEDPLUS *pOverlap = new OVERLAPPEDPLUS(IOInitialize);
    203
    204 BOOL bSuccess = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
    205
    206 if ( (!bSuccess && GetLastError( ) != ERROR_IO_PENDING))
    207 {
    208 RemoveStaleClient(pContext,TRUE);
    209 return;
    210 }
    211
    212 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_CLIENT_CONNECT);
    213
    214 // Post to WSARecv Next
    215 PostRecv(pContext);
    216 }
    217
    218
    219 ////////////////////////////////////////////////////////////////////////////////
    220 //
    221 // FUNCTION: CIOCPServer::InitializeIOCP
    222 //
    223 // DESCRIPTION: Create a dummy socket and associate a completion port with it.
    224 // once completion port is create we can dicard the socket
    225 //
    226 // INPUTS:
    227 //
    228 // NOTES:
    229 //
    230 // MODIFICATIONS:
    231 //
    232 // Name Date Version Comments
    233 // N T ALMOND 06042001 1.0 Origin
    234 //
    235 ////////////////////////////////////////////////////////////////////////////////
    236 //##ModelId=4DF8B1ED0252
    237 bool CIOCPServer::InitializeIOCP(void)
    238 {
    239
    240 SOCKET s;
    241 DWORD i;
    242 UINT nThreadID;
    243 SYSTEM_INFO systemInfo;
    244
    245 //
    246 // First open a temporary socket that we will use to create the
    247 // completion port. In NT 3.51 it will not be necessary to specify
    248 // the FileHandle parameter of CreateIoCompletionPort()--it will
    249 // be legal to specify FileHandle as NULL. However, for NT 3.5
    250 // we need an overlapped file handle.
    251 //
    252
    253 s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
    254 if ( s == INVALID_SOCKET )
    255 return false;
    256
    257 // Create the completion port that will be used by all the worker
    258 // threads.
    259 m_hCompletionPort = CreateIoCompletionPort( (HANDLE)s, NULL, 0, 0 );
    260 if ( m_hCompletionPort == NULL )
    261 {
    262 closesocket( s );
    263 return false;
    264 }
    265
    266 // Close the socket, we don't need it any longer.
    267 closesocket( s );
    268
    269 // Determine how many processors are on the system.
    270 GetSystemInfo( &systemInfo );
    271
    272 m_nThreadPoolMin = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
    273 m_nThreadPoolMax = m_nThreadPoolMin;
    274 m_nCPULoThreshold = 10;
    275 m_nCPUHiThreshold = 75;
    276
    277 //CCpuUsage对象
    278 m_cpu.Init();
    279
    280
    281 // We use two worker threads for eachprocessor on the system--this is choosen as a good balance
    282 // that ensures that there are a sufficient number of threads available to get useful work done
    283 // but not too many that context switches consume significant overhead.
    284 UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
    285
    286 // We need to save the Handles for Later Termination...
    287 HANDLE hWorker;
    288 m_nWorkerCnt = 0;
    289
    290 //创建nWorkerCnt个线程
    291 for ( i = 0; i < nWorkerCnt; i++ )
    292 {
    293 hWorker = (HANDLE)_beginthreadex(NULL, // Security
    294 0, // Stack size - use default
    295 ThreadPoolFunc, // Thread fun entry point
    296 (void*) this, // Param for thread
    297 0, // Init flag
    298 &nThreadID); // Thread address
    299
    300
    301 if (hWorker == NULL )
    302 {
    303 CloseHandle( m_hCompletionPort );
    304 return false;
    305 }
    306
    307 m_nWorkerCnt++;
    308
    309 CloseHandle(hWorker);
    310 }
    311
    312 return true;
    313 }
    314
    315 ////////////////////////////////////////////////////////////////////////////////
    316 //
    317 // FUNCTION: CIOCPServer::ThreadPoolFunc
    318 //
    319 // DESCRIPTION: This is the main worker routine for the worker threads.
    320 // Worker threads wait on a completion port for I/O to complete.
    321 // When it completes, the worker thread processes the I/O, then either pends
    322 // new I/O or closes the client's connection. When the service shuts
    323 // down, other code closes the completion port which causes
    324 // GetQueuedCompletionStatus() to wake up and the worker thread then
    325 // exits.
    326 //
    327 // INPUTS:
    328 //
    329 // NOTES:
    330 //
    331 // MODIFICATIONS:
    332 //
    333 // Name Date Version Comments
    334 // N T ALMOND 06042001 1.0 Origin
    335 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    336 ////////////////////////////////////////////////////////////////////////////////
    337 //##ModelId=4DF8B1ED019A
    338 //完成端口线程入口函数
    339 unsigned CIOCPServer::ThreadPoolFunc (LPVOID thisContext)
    340 {
    341 // Get back our pointer to the class
    342 ULONG ulFlags = MSG_PARTIAL;
    343 CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext);
    344 ASSERT(pThis);
    345
    346 HANDLE hCompletionPort = pThis->m_hCompletionPort;
    347
    348 DWORD dwIoSize;
    349 LPOVERLAPPED lpOverlapped;
    350 ClientContext* lpClientContext;
    351 OVERLAPPEDPLUS* pOverlapPlus;
    352 bool bError;
    353 bool bEnterRead;
    354
    355 InterlockedIncrement(&pThis->m_nCurrentThreads);
    356 InterlockedIncrement(&pThis->m_nBusyThreads);
    357
    358 //
    359 // Loop round and round servicing I/O completions.
    360 //
    361
    362 for (BOOL bStayInPool = TRUE; bStayInPool && pThis->m_bTimeToKill == false; )
    363 {
    364 pOverlapPlus = NULL;
    365 lpClientContext = NULL;
    366 bError = false;
    367 bEnterRead = false;
    368 // Thread is Block waiting for IO completion
    369 InterlockedDecrement(&pThis->m_nBusyThreads);
    370
    371
    372 // Get a completed IO request.
    373 BOOL bIORet = GetQueuedCompletionStatus(
    374 hCompletionPort,
    375 &dwIoSize,
    376 (LPDWORD) &lpClientContext,
    377 &lpOverlapped, INFINITE);
    378
    379 DWORD dwIOError = GetLastError();
    380 pOverlapPlus = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol);
    381
    382
    383 int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads);
    384
    385 if (!bIORet && dwIOError != WAIT_TIMEOUT )
    386 {
    387 if (lpClientContext && pThis->m_bTimeToKill == false)
    388 {
    389 pThis->RemoveStaleClient(lpClientContext, FALSE);
    390 }
    391 continue;
    392
    393 // anyway, this was an error and we should exit
    394 bError = true;
    395 }
    396
    397 if (!bError)
    398 {
    399
    400 // Allocate another thread to the thread Pool?
    401 if (nBusyThreads == pThis->m_nCurrentThreads)
    402 {
    403 if (nBusyThreads < pThis->m_nThreadPoolMax)
    404 {
    405 if (pThis->m_cpu.GetUsage() > pThis->m_nCPUHiThreshold)
    406 {
    407 UINT nThreadID = -1;
    408
    409 // HANDLE hThread = (HANDLE)_beginthreadex(NULL, // Security
    410 // 0, // Stack size - use default
    411 // ThreadPoolFunc, // Thread fn entry point
    412 /// (void*) pThis,
    413 // 0, // Init flag
    414 // &nThreadID); // Thread address
    415
    416 // CloseHandle(hThread);
    417 }
    418 }
    419 }
    420
    421
    422 // Thread timed out - IDLE?
    423 if (!bIORet && dwIOError == WAIT_TIMEOUT)
    424 {
    425 if (lpClientContext == NULL)
    426 {
    427 if (pThis->m_cpu.GetUsage() < pThis->m_nCPULoThreshold)
    428 {
    429 // Thread has no outstanding IO - Server hasn't much to do so die
    430 if (pThis->m_nCurrentThreads > pThis->m_nThreadPoolMin)
    431 bStayInPool = FALSE;
    432 }
    433
    434 bError = true;
    435 }
    436 }
    437 }
    438 //////////////////////////////////////////////////////////////////////////////////////////
    439 //////////////////////////////////////////////////////////////////////////////////////////
    440 if (!bError)
    441 {
    442 if(bIORet && NULL != pOverlapPlus && NULL != lpClientContext)
    443 {
    444 try
    445 {
    446 pThis->ProcessIOMessage(pOverlapPlus->m_ioType, lpClientContext, dwIoSize);
    447 }
    448 catch (...) {}
    449 }
    450 }
    451
    452 if(pOverlapPlus)
    453 delete pOverlapPlus; // from previous call
    454 }
    455
    456 InterlockedDecrement(&pThis->m_nWorkerCnt);
    457
    458 InterlockedDecrement(&pThis->m_nCurrentThreads);
    459 InterlockedDecrement(&pThis->m_nBusyThreads);
    460 return 0;
    461 }
    462
    463 ////////////////////////////////////////////////////////////////////////////////
    464 //
    465 // FUNCTION: CIOCPServer::Stop
    466 //
    467 // DESCRIPTION: Signal the listener to quit his thread
    468 //
    469 // INPUTS:
    470 //
    471 // NOTES:
    472 //
    473 // MODIFICATIONS:
    474 //
    475 // Name Date Version Comments
    476 // N T ALMOND 06042001 1.0 Origin
    477 //
    478 ////////////////////////////////////////////////////////////////////////////////
    479 //##ModelId=4DF8B1ED0254
    480 void CIOCPServer::Stop()
    481 {
    482 ::SetEvent(m_hKillEvent);
    483 WaitForSingleObject(m_hThread, INFINITE);
    484 CloseHandle(m_hThread);
    485 CloseHandle(m_hKillEvent);
    486 }
    IOCPServer.cpp3
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::GetHostName
    4 //
    5 // DESCRIPTION: Get the host name of the connect client
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 //
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED02C1
    18 CString CIOCPServer::GetHostName(SOCKET socket)
    19 {
    20 sockaddr_in sockAddr;
    21 memset(&sockAddr, 0, sizeof(sockAddr));
    22
    23 int nSockAddrLen = sizeof(sockAddr);
    24
    25 BOOL bResult = getpeername(socket,(SOCKADDR*)&sockAddr, &nSockAddrLen);
    26
    27 return bResult != INVALID_SOCKET ? inet_ntoa(sockAddr.sin_addr) : "";
    28 }
    29
    30
    31 //##ModelId=4DF8B1ED01C7
    32 void CIOCPServer::PostRecv(ClientContext* pContext)
    33 {
    34 // issue a read request
    35 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IORead);
    36 ULONG ulFlags = MSG_PARTIAL;
    37 DWORD dwNumberOfBytesRecvd;
    38 UINT nRetVal = WSARecv(pContext->m_Socket,
    39 &pContext->m_wsaInBuffer,
    40 1,
    41 &dwNumberOfBytesRecvd,
    42 &ulFlags,
    43 &pOverlap->m_ol,
    44 NULL);
    45
    46 if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
    47 {
    48 RemoveStaleClient(pContext, FALSE);
    49 }
    50 }
    51 ////////////////////////////////////////////////////////////////////////////////
    52 //
    53 // FUNCTION: CIOCPServer::Send
    54 //
    55 // DESCRIPTION: Posts a Write + Data to IO CompletionPort for transfer
    56 //
    57 // INPUTS:
    58 //
    59 // NOTES:
    60 //
    61 // MODIFICATIONS:
    62 //
    63 // Name Date Version Comments
    64 // N T ALMOND 06042001 1.0 Origin
    65 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    66 ////////////////////////////////////////////////////////////////////////////////
    67 //##ModelId=4DF8B1ED01B6
    68 void CIOCPServer::Send(ClientContext* pContext, LPBYTE lpData, UINT nSize)
    69 {
    70 if (pContext == NULL)
    71 return;
    72
    73 try
    74 {
    75 if (nSize > 0)
    76 {
    77 // Compress data
    78 unsigned long destLen = (double)nSize * 1.001 + 12;
    79 LPBYTE pDest = new BYTE[destLen];
    80 int nRet = compress(pDest, &destLen, lpData, nSize);
    81
    82 if (nRet != Z_OK)
    83 {
    84 delete [] pDest;
    85 return;
    86 }
    87
    88 //////////////////////////////////////////////////////////////////////////
    89 LONG nBufLen = destLen + HDR_SIZE;
    90 // 5 bytes packet flag
    91 pContext->m_WriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag));
    92 // 4 byte header [Size of Entire Packet]
    93 pContext->m_WriteBuffer.Write((PBYTE) &nBufLen, sizeof(nBufLen));
    94 // 4 byte header [Size of UnCompress Entire Packet]
    95 pContext->m_WriteBuffer.Write((PBYTE) &nSize, sizeof(nSize));
    96 // Write Data
    97 pContext->m_WriteBuffer.Write(pDest, destLen);
    98 delete [] pDest;
    99
    100 // 发送完后,再备份数据, 因为有可能是m_ResendWriteBuffer本身在发送,所以不直接写入
    101 LPBYTE lpResendWriteBuffer = new BYTE[nSize];
    102 CopyMemory(lpResendWriteBuffer, lpData, nSize);
    103 pContext->m_ResendWriteBuffer.ClearBuffer();
    104 pContext->m_ResendWriteBuffer.Write(lpResendWriteBuffer, nSize); // 备份发送的数据
    105 delete [] lpResendWriteBuffer;
    106 }
    107 else // 要求重发
    108 {
    109 pContext->m_WriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag));
    110 pContext->m_ResendWriteBuffer.ClearBuffer();
    111 pContext->m_ResendWriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag)); // 备份发送的数据
    112 }
    113 // Wait for Data Ready signal to become available
    114 WaitForSingleObject(pContext->m_hWriteComplete, INFINITE);
    115
    116 // Prepare Packet
    117 // pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[nSize];
    118 // pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
    119
    120 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
    121 PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
    122
    123 pContext->m_nMsgOut++;
    124 }catch(...){}
    125 }
    126
    127
    128 ////////////////////////////////////////////////////////////////////////////////
    129 //
    130 // FUNCTION: CClientListener::OnClientInitializing
    131 //
    132 // DESCRIPTION: Called when client is initailizing
    133 //
    134 // INPUTS:
    135 //
    136 // NOTES:
    137 //
    138 // MODIFICATIONS:
    139 //
    140 // Name Date Version Comments
    141 // N T ALMOND 06042001 1.0 Origin
    142 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    143 ////////////////////////////////////////////////////////////////////////////////
    144 //##ModelId=4DF8B1ED02DF
    145 bool CIOCPServer::OnClientInitializing(ClientContext* pContext, DWORD dwIoSize)
    146 {
    147 // We are not actually doing anything here, but we could for instance make
    148 // a call to Send() to send a greeting message or something
    149
    150 return true; // make sure to issue a read after this
    151 }
    152
    153 ////////////////////////////////////////////////////////////////////////////////
    154 //
    155 // FUNCTION: CIOCPServer::OnClientReading
    156 //
    157 // DESCRIPTION: Called when client is reading
    158 //
    159 // INPUTS:
    160 //
    161 // NOTES:
    162 //
    163 // MODIFICATIONS:
    164 //
    165 // Name Date Version Comments
    166 // N T ALMOND 06042001 1.0 Origin
    167 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    168 ////////////////////////////////////////////////////////////////////////////////
    169 //##ModelId=4DF8B1ED02E2
    170 bool CIOCPServer::OnClientReading(ClientContext* pContext, DWORD dwIoSize)
    171 {
    172 CLock cs(CIOCPServer::m_cs, "OnClientReading");
    173 try
    174 {
    175 //////////////////////////////////////////////////////////////////////////
    176 static DWORD nLastTick = GetTickCount();
    177 static DWORD nBytes = 0;
    178 nBytes += dwIoSize;
    179
    180 if (GetTickCount() - nLastTick >= 1000)
    181 {
    182 nLastTick = GetTickCount();
    183 InterlockedExchange((LPLONG)&(m_nRecvKbps), nBytes);
    184 nBytes = 0;
    185 }
    186
    187 //////////////////////////////////////////////////////////////////////////
    188
    189 if (dwIoSize == 0)
    190 {
    191 RemoveStaleClient(pContext, FALSE);
    192 return false;
    193 }
    194
    195 if (dwIoSize == FLAG_SIZE && memcmp(pContext->m_byInBuffer, m_bPacketFlag, FLAG_SIZE) == 0)
    196 {
    197 // 重新发送
    198 Send(pContext, pContext->m_ResendWriteBuffer.GetBuffer(), pContext->m_ResendWriteBuffer.GetBufferLen());
    199 // 必须再投递一个接收请求
    200 PostRecv(pContext);
    201 return true;
    202 }
    203
    204 // Add the message to out message
    205 // Dont forget there could be a partial, 1, 1 or more + partial mesages
    206 pContext->m_CompressionBuffer.Write(pContext->m_byInBuffer,dwIoSize);
    207
    208 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_RECEIVE);
    209
    210
    211 // Check real Data
    212 while (pContext->m_CompressionBuffer.GetBufferLen() > HDR_SIZE)
    213 {
    214 BYTE bPacketFlag[FLAG_SIZE];
    215 CopyMemory(bPacketFlag, pContext->m_CompressionBuffer.GetBuffer(), sizeof(bPacketFlag));
    216
    217 if (memcmp(m_bPacketFlag, bPacketFlag, sizeof(m_bPacketFlag)) != 0)
    218 throw "bad buffer";
    219
    220 int nSize = 0;
    221 CopyMemory(&nSize, pContext->m_CompressionBuffer.GetBuffer(FLAG_SIZE), sizeof(int));
    222
    223 // Update Process Variable
    224 pContext->m_nTransferProgress = pContext->m_CompressionBuffer.GetBufferLen() * 100 / nSize;
    225
    226 if (nSize && (pContext->m_CompressionBuffer.GetBufferLen()) >= nSize)
    227 {
    228 int nUnCompressLength = 0;
    229 // Read off header
    230 pContext->m_CompressionBuffer.Read((PBYTE) bPacketFlag, sizeof(bPacketFlag));
    231
    232 pContext->m_CompressionBuffer.Read((PBYTE) &nSize, sizeof(int));
    233 pContext->m_CompressionBuffer.Read((PBYTE) &nUnCompressLength, sizeof(int));
    234
    235 ////////////////////////////////////////////////////////
    236 ////////////////////////////////////////////////////////
    237 // SO you would process your data here
    238 //
    239 // I'm just going to post message so we can see the data
    240 int nCompressLength = nSize - HDR_SIZE;
    241 PBYTE pData = new BYTE[nCompressLength];
    242 PBYTE pDeCompressionData = new BYTE[nUnCompressLength];
    243
    244 if (pData == NULL || pDeCompressionData == NULL)
    245 throw "bad Allocate";
    246
    247 pContext->m_CompressionBuffer.Read(pData, nCompressLength);
    248
    249 //////////////////////////////////////////////////////////////////////////
    250 unsigned long destLen = nUnCompressLength;
    251 int nRet = uncompress(pDeCompressionData, &destLen, pData, nCompressLength);
    252 //////////////////////////////////////////////////////////////////////////
    253 if (nRet == Z_OK)
    254 {
    255 pContext->m_DeCompressionBuffer.ClearBuffer();
    256 pContext->m_DeCompressionBuffer.Write(pDeCompressionData, destLen);
    257 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_RECEIVE_COMPLETE);
    258 }
    259 else
    260 {
    261 throw "bad buffer";
    262 }
    263
    264 delete [] pData;
    265 delete [] pDeCompressionData;
    266 pContext->m_nMsgIn++;
    267 }
    268 else
    269 break;
    270 }
    271 // Post to WSARecv Next
    272 PostRecv(pContext);
    273 }catch(...)
    274 {
    275 pContext->m_CompressionBuffer.ClearBuffer();
    276 // 要求重发,就发送0, 内核自动添加数包标志
    277 Send(pContext, NULL, 0);
    278 PostRecv(pContext);
    279 }
    280
    281 return true;
    282 }
    IOCPServer.cpp4
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::OnClientWriting
    4 //
    5 // DESCRIPTION: Called when client is writing
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED02F0
    18 bool CIOCPServer::OnClientWriting(ClientContext* pContext, DWORD dwIoSize)
    19 {
    20 try
    21 {
    22 //////////////////////////////////////////////////////////////////////////
    23 static DWORD nLastTick = GetTickCount();
    24 static DWORD nBytes = 0;
    25
    26 nBytes += dwIoSize;
    27
    28 if (GetTickCount() - nLastTick >= 1000)
    29 {
    30 nLastTick = GetTickCount();
    31 InterlockedExchange((LPLONG)&(m_nSendKbps), nBytes);
    32 nBytes = 0;
    33 }
    34 //////////////////////////////////////////////////////////////////////////
    35
    36 ULONG ulFlags = MSG_PARTIAL;
    37
    38 // Finished writing - tidy up
    39 pContext->m_WriteBuffer.Delete(dwIoSize);
    40 if (pContext->m_WriteBuffer.GetBufferLen() == 0)
    41 {
    42 pContext->m_WriteBuffer.ClearBuffer();
    43 // Write complete
    44 SetEvent(pContext->m_hWriteComplete);
    45 return true; // issue new read after this one
    46 }
    47 else
    48 {
    49 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
    50
    51 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_TRANSMIT);
    52
    53
    54 pContext->m_wsaOutBuffer.buf = (char*) pContext->m_WriteBuffer.GetBuffer();
    55 pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
    56
    57 int nRetVal = WSASend(pContext->m_Socket,
    58 &pContext->m_wsaOutBuffer,
    59 1,
    60 &pContext->m_wsaOutBuffer.len,
    61 ulFlags,
    62 &pOverlap->m_ol,
    63 NULL);
    64
    65
    66 if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING )
    67 {
    68 RemoveStaleClient( pContext, FALSE );
    69 }
    70
    71 }
    72 }catch(...){}
    73 return false; // issue new read after this one
    74 }
    75
    76 ////////////////////////////////////////////////////////////////////////////////
    77 //
    78 // FUNCTION: CIOCPServer::CloseCompletionPort
    79 //
    80 // DESCRIPTION: Close down the IO Complete Port, queue and associated client context structs
    81 // which in turn will close the sockets...
    82 //
    83 //
    84 // INPUTS:
    85 //
    86 // NOTES:
    87 //
    88 // MODIFICATIONS:
    89 //
    90 // Name Date Version Comments
    91 // N T ALMOND 06042001 1.0 Origin
    92 //
    93 ////////////////////////////////////////////////////////////////////////////////
    94 //##ModelId=4DF8B1ED0243
    95 void CIOCPServer::CloseCompletionPort()
    96 {
    97
    98 while (m_nWorkerCnt)
    99 {
    100 PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) NULL, NULL);
    101 Sleep(100);
    102 }
    103
    104 // Close the CompletionPort and stop any more requests
    105 CloseHandle(m_hCompletionPort);
    106
    107 ClientContext* pContext = NULL;
    108
    109 do
    110 {
    111 POSITION pos = m_listContexts.GetHeadPosition();
    112 if (pos)
    113 {
    114 pContext = m_listContexts.GetNext(pos);
    115 RemoveStaleClient(pContext, FALSE);
    116 }
    117 }
    118 while (!m_listContexts.IsEmpty());
    119
    120 m_listContexts.RemoveAll();
    121
    122 }
    123
    124
    125 //##ModelId=4DF8B1ED0204
    126 BOOL CIOCPServer::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
    127 {
    128 HANDLE h = CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
    129 return h == hCompletionPort;
    130 }
    131
    132 ////////////////////////////////////////////////////////////////////////////////
    133 //
    134 // FUNCTION: CIOCPServer::RemoveStaleClient
    135 //
    136 // DESCRIPTION: Client has died on us, close socket and remove context from our list
    137 //
    138 // INPUTS:
    139 //
    140 // NOTES:
    141 //
    142 // MODIFICATIONS:
    143 //
    144 // Name Date Version Comments
    145 // N T ALMOND 06042001 1.0 Origin
    146 //
    147 ////////////////////////////////////////////////////////////////////////////////
    148 //##ModelId=4DF8B1ED0208
    149 void CIOCPServer::RemoveStaleClient(ClientContext* pContext, BOOL bGraceful)
    150 {
    151 CLock cs(m_cs, "RemoveStaleClient");
    152
    153 TRACE("CIOCPServer::RemoveStaleClient\n");
    154
    155 LINGER lingerStruct;
    156
    157
    158 //
    159 // If we're supposed to abort the connection, set the linger value
    160 // on the socket to 0.
    161 //
    162
    163 if ( !bGraceful )
    164 {
    165
    166 lingerStruct.l_onoff = 1;
    167 lingerStruct.l_linger = 0;
    168 setsockopt( pContext->m_Socket, SOL_SOCKET, SO_LINGER,
    169 (char *)&lingerStruct, sizeof(lingerStruct) );
    170 }
    171
    172
    173
    174 //
    175 // Free context structures
    176 if (m_listContexts.Find(pContext))
    177 {
    178
    179 //
    180 // Now close the socket handle. This will do an abortive or graceful close, as requested.
    181 CancelIo((HANDLE) pContext->m_Socket);
    182
    183 closesocket( pContext->m_Socket );
    184 pContext->m_Socket = INVALID_SOCKET;
    185
    186 while (!HasOverlappedIoCompleted((LPOVERLAPPED)pContext))
    187 Sleep(0);
    188
    189 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_CLIENT_DISCONNECT);
    190
    191 MoveToFreePool(pContext);
    192
    193 }
    194 }
    195
    196
    197 //##ModelId=4DF8B1ED01D5
    198 void CIOCPServer::Shutdown()
    199 {
    200 if (m_bInit == false)
    201 return;
    202
    203 m_bInit = false;
    204 m_bTimeToKill = true;
    205
    206 // Stop the listener
    207 Stop();
    208
    209
    210 closesocket(m_socListen);
    211 WSACloseEvent(m_hEvent);
    212
    213
    214 CloseCompletionPort();
    215
    216 DeleteCriticalSection(&m_cs);
    217
    218 while (!m_listFreePool.IsEmpty())
    219 delete m_listFreePool.RemoveTail();
    220
    221 }
    222
    223
    224 ////////////////////////////////////////////////////////////////////////////////
    225 //
    226 // FUNCTION: CIOCPServer::MoveToFreePool
    227 //
    228 // DESCRIPTION: Checks free pool otherwise allocates a context
    229 //
    230 // INPUTS:
    231 //
    232 // NOTES:
    233 //
    234 // MODIFICATIONS:
    235 //
    236 // Name Date Version Comments
    237 // N T ALMOND 06042001 1.0 Origin
    238 //
    239 ////////////////////////////////////////////////////////////////////////////////
    240 //##ModelId=4DF8B1ED0216
    241 void CIOCPServer::MoveToFreePool(ClientContext *pContext)
    242 {
    243 CLock cs(m_cs, "MoveToFreePool");
    244 // Free context structures
    245 POSITION pos = m_listContexts.Find(pContext);
    246 if (pos)
    247 {
    248 pContext->m_CompressionBuffer.ClearBuffer();
    249 pContext->m_WriteBuffer.ClearBuffer();
    250 pContext->m_DeCompressionBuffer.ClearBuffer();
    251 pContext->m_ResendWriteBuffer.ClearBuffer();
    252 m_listFreePool.AddTail(pContext);
    253 m_listContexts.RemoveAt(pos);
    254 }
    255 }
    256
    257
    258
    259 ////////////////////////////////////////////////////////////////////////////////
    260 //
    261 // FUNCTION: CIOCPServer::MoveToFreePool
    262 //
    263 // DESCRIPTION: Moves an 'used/stale' Context to the free pool for reuse
    264 //
    265 // INPUTS:
    266 //
    267 // NOTES:
    268 //
    269 // MODIFICATIONS:
    270 //
    271 // Name Date Version Comments
    272 // N T ALMOND 06042001 1.0 Origin
    273 //
    274 ////////////////////////////////////////////////////////////////////////////////
    275 //##ModelId=4DF8B1ED0223
    276 ClientContext* CIOCPServer::AllocateContext()
    277 {
    278 ClientContext* pContext = NULL;
    279
    280 CLock cs(CIOCPServer::m_cs, "AllocateContext");
    281
    282 if (!m_listFreePool.IsEmpty())
    283 {
    284 pContext = m_listFreePool.RemoveHead();
    285 }
    286 else
    287 {
    288 pContext = new ClientContext;
    289 }
    290
    291 ASSERT(pContext);
    292
    293 if (pContext != NULL)
    294 {
    295
    296 ZeroMemory(pContext, sizeof(ClientContext));
    297 pContext->m_bIsMainSocket = false;
    298 memset(pContext->m_Dialog, 0, sizeof(pContext->m_Dialog));
    299 }
    300 return pContext;
    301 }
    302
    303
    304 //##ModelId=4DF8B1ED01D6
    305 void CIOCPServer::ResetConnection(ClientContext* pContext)
    306 {
    307
    308 CString strHost;
    309 ClientContext* pCompContext = NULL;
    310
    311 CLock cs(CIOCPServer::m_cs, "ResetConnection");
    312
    313 POSITION pos = m_listContexts.GetHeadPosition();
    314 while (pos)
    315 {
    316 pCompContext = m_listContexts.GetNext(pos);
    317 if (pCompContext == pContext)
    318 {
    319 RemoveStaleClient(pContext, TRUE);
    320 break;
    321 }
    322 }
    323 }
    324
    325 //##ModelId=4DF8B1ED0159
    326 void CIOCPServer::DisconnectAll()
    327 {
    328 m_bDisconnectAll = true;
    329 CString strHost;
    330 ClientContext* pContext = NULL;
    331
    332 CLock cs(CIOCPServer::m_cs, "DisconnectAll");
    333
    334 POSITION pos = m_listContexts.GetHeadPosition();
    335 while (pos)
    336 {
    337 pContext = m_listContexts.GetNext(pos);
    338 RemoveStaleClient(pContext, TRUE);
    339 }
    340 m_bDisconnectAll = false;
    341
    342 }
    343
    344 //##ModelId=4DF8B1ED01C9
    345 bool CIOCPServer::IsRunning()
    346 {
    347 return m_bInit;
    348 }






     

  • 相关阅读:
    浮于文字上方的图片如何设置居中
    grub4dos_BIOS和grub4dos_UEFI编译环境搭建(ubuntu14.04)
    微信小程序开发——开发小技巧集锦
    vue+Better-scroll实现滚动位置保持并对页面切换效果进行优化
    谈谈fork/join实现原理
    ES系列(七):多节点任务的高效分发与收集实现
    JVM系列(五):gc实现概要01
    谈谈stream的运行原理
    ES系列(六):search处理过程实现1框架
    ES系列(五):获取单条数据get处理过程实现
  • 原文地址:https://www.cnblogs.com/lovememo/p/2296415.html
Copyright © 2011-2022 走看看