zoukankan      html  css  js  c++  java
  • IOCP模型源代码学习

    首先是头文件:

    IOCPServer.h
      1 // IOCPServer.h: interface for the CIOCPServer class.
    2
    3 //我们要做的就是建立一个IOCP,把远程连接的socket句柄绑定到刚才创建的IOCP上,最后创建n个线程,并告诉这n个线程到这个IOCP上去访问数据就可以了。
    4 //
    5 //////////////////////////////////////////////////////////////////////
    6
    7 #if !defined(AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_)
    8 #define AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_
    9
    10 #if _MSC_VER > 1000
    11 #pragma once
    12 #endif // _MSC_VER > 1000
    13
    14 //以上预编译命令代表什么意思呢?
    15
    16 #include <winsock2.h>
    17 #include <MSTcpIP.h>
    18 #pragma comment(lib,"ws2_32.lib") //加载winSock2库文件
    19 #include "Buffer.h" //加载头文件
    20 #include "CpuUsage.h"
    21
    22
    23 #include <process.h>
    24
    25 #include <afxtempl.h>
    26
    27
    28
    29 ////////////////////////////////////////////////////////////////////
    30 //通知代码宏定义
    31 //NotifyCode
    32 #define NC_CLIENT_CONNECT 0x0001
    33 #define NC_CLIENT_DISCONNECT 0x0002
    34 #define NC_TRANSMIT 0x0003
    35 #define NC_RECEIVE 0x0004
    36 #define NC_RECEIVE_COMPLETE 0x0005 // 完整接收
    37
    38 //##ModelId=4DF8B1EC039B
    39 //互斥锁对象
    40 class CLock
    41 {
    42 protected:
    43 //##ModelId=4DF8B1EC03BB
    44 CRITICAL_SECTION* m_pcs;
    45 //##ModelId=4DF8B1EC03BC
    46 CString m_strFunc;
    47 public:
    48 //##ModelId=4DF8B1EC03AB
    49 CLock(CRITICAL_SECTION& cs, const CString& strFunc)
    50 {
    51 m_strFunc = strFunc;
    52 m_pcs = &cs;
    53 Lock();
    54 }
    55 //##ModelId=4DF8B1EC03AE
    56 ~CLock()
    57 {
    58 Unlock();
    59
    60 }
    61 //##ModelId=4DF8B1EC03B0
    62
    63 void Lock()
    64 {
    65 TRACE(_T("EC %d %s\n") , GetCurrentThreadId(), m_strFunc);
    66 EnterCriticalSection(m_pcs);
    67 }
    68 //##ModelId=4DF8B1EC03AF
    69 void Unlock()
    70 {
    71 LeaveCriticalSection(m_pcs);
    72 TRACE(_T("LC %d %s\n") , GetCurrentThreadId() , m_strFunc);
    73 }
    74 };
    75
    76 //##ModelId=4DF8B1EC03CA
    77 //枚举类型,IO初始化,IO读,IO写,IO空闲
    78 //定义了消息类型
    79 enum IOType
    80 {
    81 //##ModelId=4DF8B1EC03DA
    82 IOInitialize,
    83 //##ModelId=4DF8B1EC03DB
    84 IORead,
    85 //##ModelId=4DF8B1EC03DC
    86 IOWrite,
    87 //##ModelId=4DF8B1ED0001
    88 IOIdle
    89 };
    90
    91
    92 //##ModelId=4DF8B1ED0030
    93 //缓冲区,由server负责接收数据,接收的数据存在缓存区,等接收完毕通知应用程序(目前的理解)
    94 //在原有OVERLAPPED的基础上扩展,加上IO类型,封装为OVERLAPPEDPLUS
    95 class OVERLAPPEDPLUS
    96 {
    97 public:
    98 //##ModelId=4DF8B1ED0032
    99 OVERLAPPED m_ol;
    100 //##ModelId=4DF8B1ED0041
    101 IOType m_ioType;
    102
    103 //##ModelId=4DF8B1ED0045
    104 OVERLAPPEDPLUS(IOType ioType) {
    105 ZeroMemory(this, sizeof(OVERLAPPEDPLUS));
    106 m_ioType = ioType;
    107 }
    108 };
    109
    110
    111
    112
    113 //##ModelId=4DF8B1ED005F
    114 //客户端上下文
    115 struct ClientContext
    116 {
    117 //##ModelId=4DF8B1ED0062
    118 SOCKET m_Socket;
    119 // Store buffers
    120 //##ModelId=4DF8B1ED0070
    121 CBuffer m_WriteBuffer;
    122 //##ModelId=4DF8B1ED0075
    123 CBuffer m_CompressionBuffer; // 接收到的压缩的数据
    124 //##ModelId=4DF8B1ED007A
    125 CBuffer m_DeCompressionBuffer; // 解压后的数据
    126 //##ModelId=4DF8B1ED0081
    127 CBuffer m_ResendWriteBuffer; // 上次发送的数据包,接收失败时重发时用
    128
    129 //##ModelId=4DF8B1ED0085
    130 int m_Dialog[2]; // 放对话框列表用,第一个int是类型(dlg的id),第二个是CDialog的地址
    131 //##ModelId=4DF8B1ED008E
    132 int m_nTransferProgress;
    133 // Input Elements for Winsock
    134 //##ModelId=4DF8B1ED0090
    135 WSABUF m_wsaInBuffer;
    136 /*
    137 typedef struct _WSABUF {
    138 u_long len; // the length of the buffer
    139 char FAR * buf; // the pointer to the buffer
    140 } WSABUF, FAR * LPWSABUF;
    141 */
    142 //##ModelId=4DF8B1ED0094
    143 BYTE m_byInBuffer[8192];
    144
    145 // Output elements for Winsock
    146 //##ModelId=4DF8B1ED009E
    147 WSABUF m_wsaOutBuffer;
    148 //##ModelId=4DF8B1ED00A2
    149 HANDLE m_hWriteComplete;
    150
    151 // Message counts... purely for example purposes
    152 //##ModelId=4DF8B1ED00AD
    153 LONG m_nMsgIn;
    154 //##ModelId=4DF8B1ED00AE
    155 LONG m_nMsgOut;
    156
    157 //##ModelId=4DF8B1ED00AF
    158 BOOL m_bIsMainSocket; // 是不是主socket
    159
    160 //##ModelId=4DF8B1ED00C0
    161 ClientContext* m_pWriteContext;
    162 //##ModelId=4DF8B1ED00C5
    163 ClientContext* m_pReadContext;
    164 };
    165
    166 //返回右移4位的hash值
    167 //Windows Data Types
    168 template<>
    169 inline UINT AFXAPI HashKey(CString & strGuid)
    170 {
    171 return HashKey( (LPCTSTR) strGuid);
    172 }
    173
    174 //Mapper.h到底用来干啥呢?(貌似用于消息循环)
    175 #include "Mapper.h"
    176
    177 //以下typedef表示啥呢?
    178 //NOTIFYPROC指向函数的指针
    179 //##ModelId=4DF8B1ED00CC
    180 typedef void (CALLBACK* NOTIFYPROC)(LPVOID, ClientContext*, UINT nCode);
    181
    182 //##ModelId=4DF8B1ED00DD
    183 typedef CList<ClientContext*, ClientContext* > ContextList; //这个是ClientContext列表
    184
    185
    186 //这是什么意思呢?
    187 class CMainFrame;
    188
    189 //##ModelId=4DF8B1ED0149
    190 class CIOCPServer
    191 {
    192 public:
    193 //##ModelId=4DF8B1ED0159
    194 void DisconnectAll();
    195 //##ModelId=4DF8B1ED015A
    196 CIOCPServer();
    197 //##ModelId=4DF8B1ED0168
    198 virtual ~CIOCPServer();
    199
    200 //##ModelId=4DF8B1ED016B
    201 //回调函数
    202 NOTIFYPROC m_pNotifyProc;
    203 //##ModelId=4DF8B1ED0179
    204 CMainFrame* m_pFrame;
    205
    206 //##ModelId=4DF8B1ED017D
    207 //初始化函数Initialize
    208 //注册了一个回调函数 m_pNotifyProc ,
    209 //创建了一个监听套接字,一个监听线程 ListenThreadProc ,
    210 //然后初始化 IOCP 服务端
    211 bool Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort);
    212
    213
    214 //__stdcall是什么意思呢?
    215 //##ModelId=4DF8B1ED0197
    216
    217 //对于IOCPServer对象Initialize方法里创建的监听套接字所对应的监听线程
    218 static unsigned __stdcall ListenThreadProc(LPVOID lpVoid);
    219 //##ModelId=4DF8B1ED019A
    220
    221 //线程池?
    222 static unsigned __stdcall ThreadPoolFunc(LPVOID WorkContext);
    223 //##ModelId=4DF8B1ED01A9
    224 //关键代码区
    225 static CRITICAL_SECTION m_cs;
    226
    227 //##ModelId=4DF8B1ED01B6
    228 //发送数据
    229 void Send(ClientContext* pContext, LPBYTE lpData, UINT nSize);
    230 //##ModelId=4DF8B1ED01C7
    231 //接收数据
    232 void PostRecv(ClientContext* pContext);
    233
    234 //##ModelId=4DF8B1ED01C9
    235 bool IsRunning();
    236 //##ModelId=4DF8B1ED01D5
    237 void Shutdown();
    238 //##ModelId=4DF8B1ED01D6
    239 //重置连接
    240 void ResetConnection(ClientContext* pContext);
    241
    242 //##ModelId=4DF8B1ED01D8
    243 //当前连接进程数
    244 LONG m_nCurrentThreads;
    245 //##ModelId=4DF8B1ED01E5
    246 //当前忙进程数
    247 LONG m_nBusyThreads;
    248
    249
    250 //##ModelId=4DF8B1ED01E6
    251 UINT m_nSendKbps; // 发送即时速度
    252 //##ModelId=4DF8B1ED01F5
    253 UINT m_nRecvKbps; // 接受即时速度
    254 //##ModelId=4DF8B1ED01F6
    255 UINT m_nMaxConnections; // 最大连接数
    256 protected:
    257 //##ModelId=4DF8B1ED01F7
    258 void InitializeClientRead(ClientContext* pContext);
    259 //##ModelId=4DF8B1ED0204
    260 BOOL AssociateSocketWithCompletionPort(SOCKET device, HANDLE hCompletionPort, DWORD dwCompletionKey);
    261 //##ModelId=4DF8B1ED0208
    262 void RemoveStaleClient(ClientContext* pContext, BOOL bGraceful);
    263 //##ModelId=4DF8B1ED0216
    264 void MoveToFreePool(ClientContext *pContext);
    265 //##ModelId=4DF8B1ED0223
    266 ClientContext* AllocateContext();
    267
    268 //##ModelId=4DF8B1ED0224
    269 LONG m_nWorkerCnt;
    270
    271 //##ModelId=4DF8B1ED0225
    272 //bool变量,标识IOCPServer是否初始化成功
    273 bool m_bInit;
    274 //##ModelId=4DF8B1ED0233
    275 bool m_bDisconnectAll;
    276 //##ModelId=4DF8B1ED0234
    277 BYTE m_bPacketFlag[5];
    278 //##ModelId=4DF8B1ED0243
    279 void CloseCompletionPort();
    280 //##ModelId=4DF8B1ED0244
    281 void OnAccept();
    282 //##ModelId=4DF8B1ED0252
    283 bool InitializeIOCP(void);
    284 //##ModelId=4DF8B1ED0254
    285 void Stop();
    286
    287 //##ModelId=4DF8B1ED0263
    288 ContextList m_listContexts;
    289 //##ModelId=4DF8B1ED0272
    290 ContextList m_listFreePool;
    291 //##ModelId=4DF8B1ED0277
    292 WSAEVENT m_hEvent;
    293 //##ModelId=4DF8B1ED0282
    294 SOCKET m_socListen;
    295 //##ModelId=4DF8B1ED0286
    296 HANDLE m_hKillEvent;
    297 //##ModelId=4DF8B1ED0287
    298 //监听线程
    299 HANDLE m_hThread;
    300 //##ModelId=4DF8B1ED0291
    301 //完成端口
    302 HANDLE m_hCompletionPort;
    303 //##ModelId=4DF8B1ED0292
    304 bool m_bTimeToKill;
    305 //##ModelId=4DF8B1ED02A1
    306 CCpuUsage m_cpu;
    307
    308 //##ModelId=4DF8B1ED02A5
    309 LONG m_nKeepLiveTime; // 心跳超时
    310
    311 // Thread Pool Tunables
    312 //##ModelId=4DF8B1ED02B0
    313 //线程池最小线程数
    314 LONG m_nThreadPoolMin;
    315 //##ModelId=4DF8B1ED02B1
    316 //线程池最大线程数
    317 LONG m_nThreadPoolMax;
    318 //##ModelId=4DF8B1ED02BF
    319 LONG m_nCPULoThreshold;
    320 //##ModelId=4DF8B1ED02C0
    321 LONG m_nCPUHiThreshold;
    322
    323
    324 //##ModelId=4DF8B1ED02C1
    325 CString GetHostName(SOCKET socket);
    326
    327 //##ModelId=4DF8B1ED02D0
    328 void CreateStream(ClientContext* pContext);
    329
    330 //以下消息循环定义详见mapper.h头文件之定义
    331 BEGIN_IO_MSG_MAP()
    332 IO_MESSAGE_HANDLER(IORead, OnClientReading)
    333 IO_MESSAGE_HANDLER(IOWrite, OnClientWriting)
    334 IO_MESSAGE_HANDLER(IOInitialize, OnClientInitializing)
    335 END_IO_MSG_MAP()
    336
    337 /*
    338 我来试着翻一下:(根据Mapper.h)
    339 BEGIN_IO_MSG_MAP()
    340 IO_MESSAGE_HANDLER(IORead, OnClientReading)
    341 IO_MESSAGE_HANDLER(IOWrite, OnClientWriting)
    342 IO_MESSAGE_HANDLER(IOInitialize, OnClientInitializing)
    343 END_IO_MSG_MAP()
    344
    345 public:
    346 bool ProcessIOMessage(IOType clientIO,ClientContext * pContext,DWORD dwSize = 0)
    347 {
    348 bool bRet = false;
    349
    350 if(IORead == clientIO)
    351 bRet = OnClentReading(pContext,dwSize);
    352 if(IOWrite == clientIO)
    353 bRet = OnClientWriting(pContext,dwSize);
    354 if(IOInitialize == clientIO)
    355 bRet = OnClientInitializing(pContext,dwSize);
    356
    357 return bRet;
    358
    359 }
    360
    361 */
    362
    363 //##ModelId=4DF8B1ED02DF
    364 bool OnClientInitializing (ClientContext* pContext, DWORD dwSize = 0);
    365 //##ModelId=4DF8B1ED02E2
    366 bool OnClientReading (ClientContext* pContext, DWORD dwSize = 0);
    367 //##ModelId=4DF8B1ED02F0
    368 bool OnClientWriting (ClientContext* pContext, DWORD dwSize = 0);
    369
    370 };
    371
    372 #endif // !defined(AFX_IOCPSERVER_H__75B80E90_FD25_4FFB_B273_0090AA43BBDF__INCLUDED_)

    其次是cpp文件:

    IOCPServer.cpp1
    // IOCPServer.cpp: implementation of the CIOCPServer class.
    //
    //////////////////////////////////////////////////////////////////////

    #include "stdafx.h"
    #include "IOCPServer.h"
    #include "../MainFrm.h"

    //提供文件压缩的库
    #include "zlib/zlib.h"

    #ifdef _DEBUG
    #undef THIS_FILE
    static char THIS_FILE[]=__FILE__;
    #define new DEBUG_NEW
    #endif

    // Change at your Own Peril

    // 'G' 'h' '0' 's' 't' | PacketLen | UnZipLen
    #define HDR_SIZE 13
    #define FLAG_SIZE 5
    #define HUERISTIC_VALUE 2
    //##ModelId=4DF8B1ED01A9
    CRITICAL_SECTION CIOCPServer::m_cs; //静态成员变量


    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////


    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: CIOCPServer::CIOCPServer
    //
    // DESCRIPTION: C'tor initializes Winsock2 and miscelleanous events etc.
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED015A

    //构造函数,一些初始化工作
    CIOCPServer::CIOCPServer()
    {
    //用于debug
    TRACE("CIOCPServer=%p\n",this);

    // WSAStartup用于加载套接字,WSADATA用于存储winsock库版本的有关信息
    //MAKEWORD(2,2)用于加载2.2版本的winsock
    WSADATA wsaData;
    WSAStartup(MAKEWORD(2,2), &wsaData);

    //初始化关键代码段(作用类似于创建电话亭的门)
    InitializeCriticalSection(&m_cs);

    m_hThread = NULL;
    m_hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL); //无名事件,初始无信号,需手动重置状态。
    m_socListen = NULL;

    m_bTimeToKill = false;
    m_bDisconnectAll = false;

    m_hEvent = NULL;
    m_hCompletionPort= NULL;

    m_bInit = false;
    m_nCurrentThreads = 0;
    m_nBusyThreads = 0;

    m_nSendKbps = 0;
    m_nRecvKbps = 0;

    m_nMaxConnections = 10000;
    m_nKeepLiveTime = 1000 * 60 * 3; // 三分钟探测一次
    // Packet Flag;
    BYTE bPacketFlag[] = {'G', 'h', '0', 's', 't'};
    memcpy(m_bPacketFlag, bPacketFlag, sizeof(bPacketFlag));
    }


    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: CIOCPServer::CIOCPServer
    //
    // DESCRIPTION: Tidy up
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED0168

    //析构函数
    CIOCPServer::~CIOCPServer()
    {
    try
    {
    Shutdown();//关闭服务器
    WSACleanup();//关闭套接字扫尾工作
    }catch(...){}
    }

    ////////////////////////////////////////////////////////////////////////////////
    //
    // FUNCTION: Init
    //
    // DESCRIPTION: Starts listener into motion
    //
    // INPUTS:
    //
    // NOTES:
    //
    // MODIFICATIONS:
    //
    // Name Date Version Comments
    // N T ALMOND 06042001 1.0 Origin
    //
    ////////////////////////////////////////////////////////////////////////////////
    //##ModelId=4DF8B1ED017D

    //IOCPServer初始化函数

    /*
    Initialize 注册了一个回调函数 m_pNotifyProc ,
    创建了一个监听套接字,
    一个监听线程 ListenThreadProc ,
    然后初始化 IOCP 服务端
    */

    bool CIOCPServer::Initialize(NOTIFYPROC pNotifyProc, CMainFrame* pFrame, int nMaxConnections, int nPort)
    {
    m_pNotifyProc = pNotifyProc; //回调函数赋值
    m_pFrame = pFrame; //指向CMainFrame指针,获取主界面框架指针
    m_nMaxConnections = nMaxConnections;

    //创建套接字
    m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    //AF_INET -> IPV4
    //SOCK_STREAM -> 流式套接字
    //0 -> 自动选择协议


    if (m_socListen == INVALID_SOCKET)
    {
    TRACE(_T("Could not create listen socket %ld\n"),WSAGetLastError());
    return false;
    }

    ////////////////////////////
    //////////////////
    /////////
    // Event for handling Network IO
    //m_hEvent WSAEVENT
    //创建一个新的事件对象
    m_hEvent = WSACreateEvent();

    if (m_hEvent == WSA_INVALID_EVENT)
    {
    TRACE(_T("WSACreateEvent() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }

    // The listener is ONLY interested in FD_ACCEPT
    // That is when a client connects to or IP/Port
    // Request async notification

    //事件与监听套接字绑定
    //这个代码规定了事件句柄m_hEvent响应FD_ACCEPT事件。
    //也就是说当m_socListen套接字accept时,m_hEvent有信号。
    int nRet = WSAEventSelect(m_socListen,
    m_hEvent,
    FD_ACCEPT);

    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("WSAAsyncSelect() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }
    /////////////////////////////////////
    ///////////////
    //////
    SOCKADDR_IN saServer;
    // Listen on our designated Port#
    saServer.sin_port = htons(nPort);

    // Fill in the rest of the address structure
    saServer.sin_family = AF_INET;
    saServer.sin_addr.s_addr = htonl(INADDR_ANY);

    // 将端口绑定到套接字上
    nRet = bind(m_socListen,
    (LPSOCKADDR)&saServer,
    sizeof(struct sockaddr));

    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("bind() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }

    // Set the socket to listen
    nRet = listen(m_socListen, SOMAXCONN);//最大合理值
    if (nRet == SOCKET_ERROR)
    {
    TRACE(_T("listen() error %ld\n"),WSAGetLastError());
    closesocket(m_socListen);
    return false;
    }


    ////////////////////////////////////////////////////////////////////////////////////////
    ////////////////////////////////////////////////////////////////////////////////////////
    //监听端口后,启用一个监听线程 ListenThreadProc
    UINT dwThreadId = 0;

    m_hThread =
    (HANDLE)_beginthreadex(NULL, // Security
    0, // Stack size - use default
    ListenThreadProc, // Thread fn entry point
    (void*) this, //将IOCPServer对象指针传给监听线程函数
    0, // Init flag
    &dwThreadId); // Thread address用于接收线程id

    if (m_hThread != INVALID_HANDLE_VALUE)
    {
    InitializeIOCP();
    m_bInit = true;
    return true;
    }

    return false;
    }
    IOCPServer.cpp2
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::ListenThreadProc
    4 //
    5 // DESCRIPTION: Listens for incoming clients
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 //
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED0197
    18
    19 //一个监听线程
    20 unsigned CIOCPServer::ListenThreadProc(LPVOID lParam)
    21 {
    22 CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam);
    23
    24 //This structure is used to store a socket's internal information about network events.
    25 //lNetworkEvents
    26 //Indicates which of the FD_XXX network events have occurred.
    27 //用于存储网络事件信息,其中events.lNetworkEvents记录了具体事件(FD_XXX的形式)
    28 WSANETWORKEVENTS events;
    29
    30 while(1)
    31 {
    32 //
    33 // Wait for something to happen
    34 //
    35 if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0) //(初始无信号)若有信号就跳出循环,结束监听
    36 break;
    37
    38 DWORD dwRet;
    39 dwRet = WSAWaitForMultipleEvents(1,
    40 &pThis->m_hEvent,
    41 FALSE,
    42 100,
    43 FALSE);
    44
    45 if (dwRet == WSA_WAIT_TIMEOUT)
    46 continue;
    47 //以上代码表示一直等待直到与m_hEvent绑定的事件发生
    48
    49 //
    50 // Figure out what happened
    51 //
    52 //事件发生了,到底是神马事件呢?
    53 int nRet = WSAEnumNetworkEvents(pThis->m_socListen,
    54 pThis->m_hEvent,
    55 &events);
    56
    57 if (nRet == SOCKET_ERROR)
    58 {
    59 TRACE(_T("WSAEnumNetworkEvents error %ld\n"),WSAGetLastError());
    60 break;
    61 }
    62
    63 // Handle Network events //
    64 // ACCEPT
    65 //if (events.lNetworkEvents == FD_ACCEPT 相比而言下面代码更高效
    66 if (events.lNetworkEvents & FD_ACCEPT)
    67 {
    68 if (events.iErrorCode[FD_ACCEPT_BIT] == 0) //如果木有发生错误
    69 pThis->OnAccept(); //异步IO模型,当accept时具体做啥事情呢,请看OnAccept函数
    70 else
    71 {
    72 TRACE(_T("Unknown network event error %ld\n"),WSAGetLastError());
    73 break;
    74 }
    75
    76 }
    77
    78 } // while....
    79
    80 return 0; // Normal Thread Exit Code...
    81 }
    82
    83 ////////////////////////////////////////////////////////////////////////////////
    84 //
    85 // FUNCTION: CIOCPServer::OnAccept
    86 //
    87 // DESCRIPTION: Listens for incoming clients
    88 //
    89 // INPUTS:
    90 //
    91 // NOTES:
    92 //
    93 // MODIFICATIONS:
    94 //
    95 // Name Date Version Comments
    96 // N T ALMOND 06042001 1.0 Origin
    97 // Ulf Hedlund 09072001 Changes for OVERLAPPEDPLUS
    98 ////////////////////////////////////////////////////////////////////////////////
    99 //##ModelId=4DF8B1ED0244
    100 void CIOCPServer::OnAccept()
    101 {
    102
    103 SOCKADDR_IN SockAddr;
    104 SOCKET clientSocket;
    105
    106 int nRet;
    107 int nLen;
    108
    109 if (m_bTimeToKill || m_bDisconnectAll)
    110 return;
    111
    112 //
    113 // accept the new socket descriptor
    114 //
    115 nLen = sizeof(SOCKADDR_IN);
    116 clientSocket = accept(m_socListen,
    117 (LPSOCKADDR)&SockAddr,
    118 &nLen);
    119
    120 if (clientSocket == SOCKET_ERROR)
    121 {
    122 nRet = WSAGetLastError();
    123 if (nRet != WSAEWOULDBLOCK)
    124 {
    125 //
    126 // Just log the error and return
    127 //
    128 TRACE(_T("accept() error\n"),WSAGetLastError());
    129 return;
    130 }
    131 }
    132
    133 // Create the Client context to be associted with the completion port
    134 ClientContext* pContext = AllocateContext();
    135 // AllocateContext fail
    136 if (pContext == NULL)
    137 return;
    138
    139 pContext->m_Socket = clientSocket;
    140
    141 // Fix up In Buffer
    142 pContext->m_wsaInBuffer.buf = (char*)pContext->m_byInBuffer;
    143 pContext->m_wsaInBuffer.len = sizeof(pContext->m_byInBuffer);
    144
    145 // Associate the new socket with a completion port.
    146 if (!AssociateSocketWithCompletionPort(clientSocket, m_hCompletionPort, (DWORD) pContext))
    147 {
    148 delete pContext;
    149 pContext = NULL;
    150
    151 closesocket( clientSocket );
    152 closesocket( m_socListen );
    153 return;
    154 }
    155
    156 // 关闭nagle算法,以免影响性能,因为控制时控制端要发送很多数据量很小的数据包,要求马上发送
    157 // 暂不关闭,实验得知能网络整体性能有很大影响
    158 const char chOpt = 1;
    159
    160 // int nErr = setsockopt(pContext->m_Socket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
    161 // if (nErr == -1)
    162 // {
    163 // TRACE(_T("setsockopt() error\n"),WSAGetLastError());
    164 // return;
    165 // }
    166
    167 // Set KeepAlive 开启保活机制
    168 if (setsockopt(pContext->m_Socket, SOL_SOCKET, SO_KEEPALIVE, (char *)&chOpt, sizeof(chOpt)) != 0)
    169 {
    170 TRACE(_T("setsockopt() error\n"), WSAGetLastError());
    171 }
    172
    173 // 设置超时详细信息
    174 tcp_keepalive klive;
    175 klive.onoff = 1; // 启用保活
    176 klive.keepalivetime = m_nKeepLiveTime;
    177 klive.keepaliveinterval = 1000 * 10; // 重试间隔为10秒 Resend if No-Reply
    178 WSAIoctl
    179 (
    180 pContext->m_Socket,
    181 SIO_KEEPALIVE_VALS,
    182 &klive,
    183 sizeof(tcp_keepalive),
    184 NULL,
    185 0,
    186 (unsigned long *)&chOpt,
    187 0,
    188 NULL
    189 );
    190
    191 CLock cs(m_cs, "OnAccept" );
    192 // Hold a reference to the context
    193 m_listContexts.AddTail(pContext);
    194
    195
    196 // Trigger first IO Completion Request
    197 // Otherwise the Worker thread will remain blocked waiting for GetQueuedCompletionStatus...
    198 // The first message that gets queued up is ClientIoInitializing - see ThreadPoolFunc and
    199 // IO_MESSAGE_HANDLER
    200
    201
    202 OVERLAPPEDPLUS *pOverlap = new OVERLAPPEDPLUS(IOInitialize);
    203
    204 BOOL bSuccess = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
    205
    206 if ( (!bSuccess && GetLastError( ) != ERROR_IO_PENDING))
    207 {
    208 RemoveStaleClient(pContext,TRUE);
    209 return;
    210 }
    211
    212 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_CLIENT_CONNECT);
    213
    214 // Post to WSARecv Next
    215 PostRecv(pContext);
    216 }
    217
    218
    219 ////////////////////////////////////////////////////////////////////////////////
    220 //
    221 // FUNCTION: CIOCPServer::InitializeIOCP
    222 //
    223 // DESCRIPTION: Create a dummy socket and associate a completion port with it.
    224 // once completion port is create we can dicard the socket
    225 //
    226 // INPUTS:
    227 //
    228 // NOTES:
    229 //
    230 // MODIFICATIONS:
    231 //
    232 // Name Date Version Comments
    233 // N T ALMOND 06042001 1.0 Origin
    234 //
    235 ////////////////////////////////////////////////////////////////////////////////
    236 //##ModelId=4DF8B1ED0252
    237 bool CIOCPServer::InitializeIOCP(void)
    238 {
    239
    240 SOCKET s;
    241 DWORD i;
    242 UINT nThreadID;
    243 SYSTEM_INFO systemInfo;
    244
    245 //
    246 // First open a temporary socket that we will use to create the
    247 // completion port. In NT 3.51 it will not be necessary to specify
    248 // the FileHandle parameter of CreateIoCompletionPort()--it will
    249 // be legal to specify FileHandle as NULL. However, for NT 3.5
    250 // we need an overlapped file handle.
    251 //
    252
    253 s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
    254 if ( s == INVALID_SOCKET )
    255 return false;
    256
    257 // Create the completion port that will be used by all the worker
    258 // threads.
    259 m_hCompletionPort = CreateIoCompletionPort( (HANDLE)s, NULL, 0, 0 );
    260 if ( m_hCompletionPort == NULL )
    261 {
    262 closesocket( s );
    263 return false;
    264 }
    265
    266 // Close the socket, we don't need it any longer.
    267 closesocket( s );
    268
    269 // Determine how many processors are on the system.
    270 GetSystemInfo( &systemInfo );
    271
    272 m_nThreadPoolMin = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
    273 m_nThreadPoolMax = m_nThreadPoolMin;
    274 m_nCPULoThreshold = 10;
    275 m_nCPUHiThreshold = 75;
    276
    277 //CCpuUsage对象
    278 m_cpu.Init();
    279
    280
    281 // We use two worker threads for eachprocessor on the system--this is choosen as a good balance
    282 // that ensures that there are a sufficient number of threads available to get useful work done
    283 // but not too many that context switches consume significant overhead.
    284 UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
    285
    286 // We need to save the Handles for Later Termination...
    287 HANDLE hWorker;
    288 m_nWorkerCnt = 0;
    289
    290 //创建nWorkerCnt个线程
    291 for ( i = 0; i < nWorkerCnt; i++ )
    292 {
    293 hWorker = (HANDLE)_beginthreadex(NULL, // Security
    294 0, // Stack size - use default
    295 ThreadPoolFunc, // Thread fun entry point
    296 (void*) this, // Param for thread
    297 0, // Init flag
    298 &nThreadID); // Thread address
    299
    300
    301 if (hWorker == NULL )
    302 {
    303 CloseHandle( m_hCompletionPort );
    304 return false;
    305 }
    306
    307 m_nWorkerCnt++;
    308
    309 CloseHandle(hWorker);
    310 }
    311
    312 return true;
    313 }
    314
    315 ////////////////////////////////////////////////////////////////////////////////
    316 //
    317 // FUNCTION: CIOCPServer::ThreadPoolFunc
    318 //
    319 // DESCRIPTION: This is the main worker routine for the worker threads.
    320 // Worker threads wait on a completion port for I/O to complete.
    321 // When it completes, the worker thread processes the I/O, then either pends
    322 // new I/O or closes the client's connection. When the service shuts
    323 // down, other code closes the completion port which causes
    324 // GetQueuedCompletionStatus() to wake up and the worker thread then
    325 // exits.
    326 //
    327 // INPUTS:
    328 //
    329 // NOTES:
    330 //
    331 // MODIFICATIONS:
    332 //
    333 // Name Date Version Comments
    334 // N T ALMOND 06042001 1.0 Origin
    335 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    336 ////////////////////////////////////////////////////////////////////////////////
    337 //##ModelId=4DF8B1ED019A
    338 //完成端口线程入口函数
    339 unsigned CIOCPServer::ThreadPoolFunc (LPVOID thisContext)
    340 {
    341 // Get back our pointer to the class
    342 ULONG ulFlags = MSG_PARTIAL;
    343 CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext);
    344 ASSERT(pThis);
    345
    346 HANDLE hCompletionPort = pThis->m_hCompletionPort;
    347
    348 DWORD dwIoSize;
    349 LPOVERLAPPED lpOverlapped;
    350 ClientContext* lpClientContext;
    351 OVERLAPPEDPLUS* pOverlapPlus;
    352 bool bError;
    353 bool bEnterRead;
    354
    355 InterlockedIncrement(&pThis->m_nCurrentThreads);
    356 InterlockedIncrement(&pThis->m_nBusyThreads);
    357
    358 //
    359 // Loop round and round servicing I/O completions.
    360 //
    361
    362 for (BOOL bStayInPool = TRUE; bStayInPool && pThis->m_bTimeToKill == false; )
    363 {
    364 pOverlapPlus = NULL;
    365 lpClientContext = NULL;
    366 bError = false;
    367 bEnterRead = false;
    368 // Thread is Block waiting for IO completion
    369 InterlockedDecrement(&pThis->m_nBusyThreads);
    370
    371
    372 // Get a completed IO request.
    373 BOOL bIORet = GetQueuedCompletionStatus(
    374 hCompletionPort,
    375 &dwIoSize,
    376 (LPDWORD) &lpClientContext,
    377 &lpOverlapped, INFINITE);
    378
    379 DWORD dwIOError = GetLastError();
    380 pOverlapPlus = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol);
    381
    382
    383 int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads);
    384
    385 if (!bIORet && dwIOError != WAIT_TIMEOUT )
    386 {
    387 if (lpClientContext && pThis->m_bTimeToKill == false)
    388 {
    389 pThis->RemoveStaleClient(lpClientContext, FALSE);
    390 }
    391 continue;
    392
    393 // anyway, this was an error and we should exit
    394 bError = true;
    395 }
    396
    397 if (!bError)
    398 {
    399
    400 // Allocate another thread to the thread Pool?
    401 if (nBusyThreads == pThis->m_nCurrentThreads)
    402 {
    403 if (nBusyThreads < pThis->m_nThreadPoolMax)
    404 {
    405 if (pThis->m_cpu.GetUsage() > pThis->m_nCPUHiThreshold)
    406 {
    407 UINT nThreadID = -1;
    408
    409 // HANDLE hThread = (HANDLE)_beginthreadex(NULL, // Security
    410 // 0, // Stack size - use default
    411 // ThreadPoolFunc, // Thread fn entry point
    412 /// (void*) pThis,
    413 // 0, // Init flag
    414 // &nThreadID); // Thread address
    415
    416 // CloseHandle(hThread);
    417 }
    418 }
    419 }
    420
    421
    422 // Thread timed out - IDLE?
    423 if (!bIORet && dwIOError == WAIT_TIMEOUT)
    424 {
    425 if (lpClientContext == NULL)
    426 {
    427 if (pThis->m_cpu.GetUsage() < pThis->m_nCPULoThreshold)
    428 {
    429 // Thread has no outstanding IO - Server hasn't much to do so die
    430 if (pThis->m_nCurrentThreads > pThis->m_nThreadPoolMin)
    431 bStayInPool = FALSE;
    432 }
    433
    434 bError = true;
    435 }
    436 }
    437 }
    438 //////////////////////////////////////////////////////////////////////////////////////////
    439 //////////////////////////////////////////////////////////////////////////////////////////
    440 if (!bError)
    441 {
    442 if(bIORet && NULL != pOverlapPlus && NULL != lpClientContext)
    443 {
    444 try
    445 {
    446 pThis->ProcessIOMessage(pOverlapPlus->m_ioType, lpClientContext, dwIoSize);
    447 }
    448 catch (...) {}
    449 }
    450 }
    451
    452 if(pOverlapPlus)
    453 delete pOverlapPlus; // from previous call
    454 }
    455
    456 InterlockedDecrement(&pThis->m_nWorkerCnt);
    457
    458 InterlockedDecrement(&pThis->m_nCurrentThreads);
    459 InterlockedDecrement(&pThis->m_nBusyThreads);
    460 return 0;
    461 }
    462
    463 ////////////////////////////////////////////////////////////////////////////////
    464 //
    465 // FUNCTION: CIOCPServer::Stop
    466 //
    467 // DESCRIPTION: Signal the listener to quit his thread
    468 //
    469 // INPUTS:
    470 //
    471 // NOTES:
    472 //
    473 // MODIFICATIONS:
    474 //
    475 // Name Date Version Comments
    476 // N T ALMOND 06042001 1.0 Origin
    477 //
    478 ////////////////////////////////////////////////////////////////////////////////
    479 //##ModelId=4DF8B1ED0254
    480 void CIOCPServer::Stop()
    481 {
    482 ::SetEvent(m_hKillEvent);
    483 WaitForSingleObject(m_hThread, INFINITE);
    484 CloseHandle(m_hThread);
    485 CloseHandle(m_hKillEvent);
    486 }
    IOCPServer.cpp3
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::GetHostName
    4 //
    5 // DESCRIPTION: Get the host name of the connect client
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 //
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED02C1
    18 CString CIOCPServer::GetHostName(SOCKET socket)
    19 {
    20 sockaddr_in sockAddr;
    21 memset(&sockAddr, 0, sizeof(sockAddr));
    22
    23 int nSockAddrLen = sizeof(sockAddr);
    24
    25 BOOL bResult = getpeername(socket,(SOCKADDR*)&sockAddr, &nSockAddrLen);
    26
    27 return bResult != INVALID_SOCKET ? inet_ntoa(sockAddr.sin_addr) : "";
    28 }
    29
    30
    31 //##ModelId=4DF8B1ED01C7
    32 void CIOCPServer::PostRecv(ClientContext* pContext)
    33 {
    34 // issue a read request
    35 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IORead);
    36 ULONG ulFlags = MSG_PARTIAL;
    37 DWORD dwNumberOfBytesRecvd;
    38 UINT nRetVal = WSARecv(pContext->m_Socket,
    39 &pContext->m_wsaInBuffer,
    40 1,
    41 &dwNumberOfBytesRecvd,
    42 &ulFlags,
    43 &pOverlap->m_ol,
    44 NULL);
    45
    46 if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
    47 {
    48 RemoveStaleClient(pContext, FALSE);
    49 }
    50 }
    51 ////////////////////////////////////////////////////////////////////////////////
    52 //
    53 // FUNCTION: CIOCPServer::Send
    54 //
    55 // DESCRIPTION: Posts a Write + Data to IO CompletionPort for transfer
    56 //
    57 // INPUTS:
    58 //
    59 // NOTES:
    60 //
    61 // MODIFICATIONS:
    62 //
    63 // Name Date Version Comments
    64 // N T ALMOND 06042001 1.0 Origin
    65 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    66 ////////////////////////////////////////////////////////////////////////////////
    67 //##ModelId=4DF8B1ED01B6
    68 void CIOCPServer::Send(ClientContext* pContext, LPBYTE lpData, UINT nSize)
    69 {
    70 if (pContext == NULL)
    71 return;
    72
    73 try
    74 {
    75 if (nSize > 0)
    76 {
    77 // Compress data
    78 unsigned long destLen = (double)nSize * 1.001 + 12;
    79 LPBYTE pDest = new BYTE[destLen];
    80 int nRet = compress(pDest, &destLen, lpData, nSize);
    81
    82 if (nRet != Z_OK)
    83 {
    84 delete [] pDest;
    85 return;
    86 }
    87
    88 //////////////////////////////////////////////////////////////////////////
    89 LONG nBufLen = destLen + HDR_SIZE;
    90 // 5 bytes packet flag
    91 pContext->m_WriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag));
    92 // 4 byte header [Size of Entire Packet]
    93 pContext->m_WriteBuffer.Write((PBYTE) &nBufLen, sizeof(nBufLen));
    94 // 4 byte header [Size of UnCompress Entire Packet]
    95 pContext->m_WriteBuffer.Write((PBYTE) &nSize, sizeof(nSize));
    96 // Write Data
    97 pContext->m_WriteBuffer.Write(pDest, destLen);
    98 delete [] pDest;
    99
    100 // 发送完后,再备份数据, 因为有可能是m_ResendWriteBuffer本身在发送,所以不直接写入
    101 LPBYTE lpResendWriteBuffer = new BYTE[nSize];
    102 CopyMemory(lpResendWriteBuffer, lpData, nSize);
    103 pContext->m_ResendWriteBuffer.ClearBuffer();
    104 pContext->m_ResendWriteBuffer.Write(lpResendWriteBuffer, nSize); // 备份发送的数据
    105 delete [] lpResendWriteBuffer;
    106 }
    107 else // 要求重发
    108 {
    109 pContext->m_WriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag));
    110 pContext->m_ResendWriteBuffer.ClearBuffer();
    111 pContext->m_ResendWriteBuffer.Write(m_bPacketFlag, sizeof(m_bPacketFlag)); // 备份发送的数据
    112 }
    113 // Wait for Data Ready signal to become available
    114 WaitForSingleObject(pContext->m_hWriteComplete, INFINITE);
    115
    116 // Prepare Packet
    117 // pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[nSize];
    118 // pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
    119
    120 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
    121 PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
    122
    123 pContext->m_nMsgOut++;
    124 }catch(...){}
    125 }
    126
    127
    128 ////////////////////////////////////////////////////////////////////////////////
    129 //
    130 // FUNCTION: CClientListener::OnClientInitializing
    131 //
    132 // DESCRIPTION: Called when client is initailizing
    133 //
    134 // INPUTS:
    135 //
    136 // NOTES:
    137 //
    138 // MODIFICATIONS:
    139 //
    140 // Name Date Version Comments
    141 // N T ALMOND 06042001 1.0 Origin
    142 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    143 ////////////////////////////////////////////////////////////////////////////////
    144 //##ModelId=4DF8B1ED02DF
    145 bool CIOCPServer::OnClientInitializing(ClientContext* pContext, DWORD dwIoSize)
    146 {
    147 // We are not actually doing anything here, but we could for instance make
    148 // a call to Send() to send a greeting message or something
    149
    150 return true; // make sure to issue a read after this
    151 }
    152
    153 ////////////////////////////////////////////////////////////////////////////////
    154 //
    155 // FUNCTION: CIOCPServer::OnClientReading
    156 //
    157 // DESCRIPTION: Called when client is reading
    158 //
    159 // INPUTS:
    160 //
    161 // NOTES:
    162 //
    163 // MODIFICATIONS:
    164 //
    165 // Name Date Version Comments
    166 // N T ALMOND 06042001 1.0 Origin
    167 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    168 ////////////////////////////////////////////////////////////////////////////////
    169 //##ModelId=4DF8B1ED02E2
    170 bool CIOCPServer::OnClientReading(ClientContext* pContext, DWORD dwIoSize)
    171 {
    172 CLock cs(CIOCPServer::m_cs, "OnClientReading");
    173 try
    174 {
    175 //////////////////////////////////////////////////////////////////////////
    176 static DWORD nLastTick = GetTickCount();
    177 static DWORD nBytes = 0;
    178 nBytes += dwIoSize;
    179
    180 if (GetTickCount() - nLastTick >= 1000)
    181 {
    182 nLastTick = GetTickCount();
    183 InterlockedExchange((LPLONG)&(m_nRecvKbps), nBytes);
    184 nBytes = 0;
    185 }
    186
    187 //////////////////////////////////////////////////////////////////////////
    188
    189 if (dwIoSize == 0)
    190 {
    191 RemoveStaleClient(pContext, FALSE);
    192 return false;
    193 }
    194
    195 if (dwIoSize == FLAG_SIZE && memcmp(pContext->m_byInBuffer, m_bPacketFlag, FLAG_SIZE) == 0)
    196 {
    197 // 重新发送
    198 Send(pContext, pContext->m_ResendWriteBuffer.GetBuffer(), pContext->m_ResendWriteBuffer.GetBufferLen());
    199 // 必须再投递一个接收请求
    200 PostRecv(pContext);
    201 return true;
    202 }
    203
    204 // Add the message to out message
    205 // Dont forget there could be a partial, 1, 1 or more + partial mesages
    206 pContext->m_CompressionBuffer.Write(pContext->m_byInBuffer,dwIoSize);
    207
    208 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_RECEIVE);
    209
    210
    211 // Check real Data
    212 while (pContext->m_CompressionBuffer.GetBufferLen() > HDR_SIZE)
    213 {
    214 BYTE bPacketFlag[FLAG_SIZE];
    215 CopyMemory(bPacketFlag, pContext->m_CompressionBuffer.GetBuffer(), sizeof(bPacketFlag));
    216
    217 if (memcmp(m_bPacketFlag, bPacketFlag, sizeof(m_bPacketFlag)) != 0)
    218 throw "bad buffer";
    219
    220 int nSize = 0;
    221 CopyMemory(&nSize, pContext->m_CompressionBuffer.GetBuffer(FLAG_SIZE), sizeof(int));
    222
    223 // Update Process Variable
    224 pContext->m_nTransferProgress = pContext->m_CompressionBuffer.GetBufferLen() * 100 / nSize;
    225
    226 if (nSize && (pContext->m_CompressionBuffer.GetBufferLen()) >= nSize)
    227 {
    228 int nUnCompressLength = 0;
    229 // Read off header
    230 pContext->m_CompressionBuffer.Read((PBYTE) bPacketFlag, sizeof(bPacketFlag));
    231
    232 pContext->m_CompressionBuffer.Read((PBYTE) &nSize, sizeof(int));
    233 pContext->m_CompressionBuffer.Read((PBYTE) &nUnCompressLength, sizeof(int));
    234
    235 ////////////////////////////////////////////////////////
    236 ////////////////////////////////////////////////////////
    237 // SO you would process your data here
    238 //
    239 // I'm just going to post message so we can see the data
    240 int nCompressLength = nSize - HDR_SIZE;
    241 PBYTE pData = new BYTE[nCompressLength];
    242 PBYTE pDeCompressionData = new BYTE[nUnCompressLength];
    243
    244 if (pData == NULL || pDeCompressionData == NULL)
    245 throw "bad Allocate";
    246
    247 pContext->m_CompressionBuffer.Read(pData, nCompressLength);
    248
    249 //////////////////////////////////////////////////////////////////////////
    250 unsigned long destLen = nUnCompressLength;
    251 int nRet = uncompress(pDeCompressionData, &destLen, pData, nCompressLength);
    252 //////////////////////////////////////////////////////////////////////////
    253 if (nRet == Z_OK)
    254 {
    255 pContext->m_DeCompressionBuffer.ClearBuffer();
    256 pContext->m_DeCompressionBuffer.Write(pDeCompressionData, destLen);
    257 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_RECEIVE_COMPLETE);
    258 }
    259 else
    260 {
    261 throw "bad buffer";
    262 }
    263
    264 delete [] pData;
    265 delete [] pDeCompressionData;
    266 pContext->m_nMsgIn++;
    267 }
    268 else
    269 break;
    270 }
    271 // Post to WSARecv Next
    272 PostRecv(pContext);
    273 }catch(...)
    274 {
    275 pContext->m_CompressionBuffer.ClearBuffer();
    276 // 要求重发,就发送0, 内核自动添加数包标志
    277 Send(pContext, NULL, 0);
    278 PostRecv(pContext);
    279 }
    280
    281 return true;
    282 }
    IOCPServer.cpp4
      1 ////////////////////////////////////////////////////////////////////////////////
    2 //
    3 // FUNCTION: CIOCPServer::OnClientWriting
    4 //
    5 // DESCRIPTION: Called when client is writing
    6 //
    7 // INPUTS:
    8 //
    9 // NOTES:
    10 //
    11 // MODIFICATIONS:
    12 //
    13 // Name Date Version Comments
    14 // N T ALMOND 06042001 1.0 Origin
    15 // Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
    16 ////////////////////////////////////////////////////////////////////////////////
    17 //##ModelId=4DF8B1ED02F0
    18 bool CIOCPServer::OnClientWriting(ClientContext* pContext, DWORD dwIoSize)
    19 {
    20 try
    21 {
    22 //////////////////////////////////////////////////////////////////////////
    23 static DWORD nLastTick = GetTickCount();
    24 static DWORD nBytes = 0;
    25
    26 nBytes += dwIoSize;
    27
    28 if (GetTickCount() - nLastTick >= 1000)
    29 {
    30 nLastTick = GetTickCount();
    31 InterlockedExchange((LPLONG)&(m_nSendKbps), nBytes);
    32 nBytes = 0;
    33 }
    34 //////////////////////////////////////////////////////////////////////////
    35
    36 ULONG ulFlags = MSG_PARTIAL;
    37
    38 // Finished writing - tidy up
    39 pContext->m_WriteBuffer.Delete(dwIoSize);
    40 if (pContext->m_WriteBuffer.GetBufferLen() == 0)
    41 {
    42 pContext->m_WriteBuffer.ClearBuffer();
    43 // Write complete
    44 SetEvent(pContext->m_hWriteComplete);
    45 return true; // issue new read after this one
    46 }
    47 else
    48 {
    49 OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
    50
    51 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_TRANSMIT);
    52
    53
    54 pContext->m_wsaOutBuffer.buf = (char*) pContext->m_WriteBuffer.GetBuffer();
    55 pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
    56
    57 int nRetVal = WSASend(pContext->m_Socket,
    58 &pContext->m_wsaOutBuffer,
    59 1,
    60 &pContext->m_wsaOutBuffer.len,
    61 ulFlags,
    62 &pOverlap->m_ol,
    63 NULL);
    64
    65
    66 if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING )
    67 {
    68 RemoveStaleClient( pContext, FALSE );
    69 }
    70
    71 }
    72 }catch(...){}
    73 return false; // issue new read after this one
    74 }
    75
    76 ////////////////////////////////////////////////////////////////////////////////
    77 //
    78 // FUNCTION: CIOCPServer::CloseCompletionPort
    79 //
    80 // DESCRIPTION: Close down the IO Complete Port, queue and associated client context structs
    81 // which in turn will close the sockets...
    82 //
    83 //
    84 // INPUTS:
    85 //
    86 // NOTES:
    87 //
    88 // MODIFICATIONS:
    89 //
    90 // Name Date Version Comments
    91 // N T ALMOND 06042001 1.0 Origin
    92 //
    93 ////////////////////////////////////////////////////////////////////////////////
    94 //##ModelId=4DF8B1ED0243
    95 void CIOCPServer::CloseCompletionPort()
    96 {
    97
    98 while (m_nWorkerCnt)
    99 {
    100 PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) NULL, NULL);
    101 Sleep(100);
    102 }
    103
    104 // Close the CompletionPort and stop any more requests
    105 CloseHandle(m_hCompletionPort);
    106
    107 ClientContext* pContext = NULL;
    108
    109 do
    110 {
    111 POSITION pos = m_listContexts.GetHeadPosition();
    112 if (pos)
    113 {
    114 pContext = m_listContexts.GetNext(pos);
    115 RemoveStaleClient(pContext, FALSE);
    116 }
    117 }
    118 while (!m_listContexts.IsEmpty());
    119
    120 m_listContexts.RemoveAll();
    121
    122 }
    123
    124
    125 //##ModelId=4DF8B1ED0204
    126 BOOL CIOCPServer::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
    127 {
    128 HANDLE h = CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
    129 return h == hCompletionPort;
    130 }
    131
    132 ////////////////////////////////////////////////////////////////////////////////
    133 //
    134 // FUNCTION: CIOCPServer::RemoveStaleClient
    135 //
    136 // DESCRIPTION: Client has died on us, close socket and remove context from our list
    137 //
    138 // INPUTS:
    139 //
    140 // NOTES:
    141 //
    142 // MODIFICATIONS:
    143 //
    144 // Name Date Version Comments
    145 // N T ALMOND 06042001 1.0 Origin
    146 //
    147 ////////////////////////////////////////////////////////////////////////////////
    148 //##ModelId=4DF8B1ED0208
    149 void CIOCPServer::RemoveStaleClient(ClientContext* pContext, BOOL bGraceful)
    150 {
    151 CLock cs(m_cs, "RemoveStaleClient");
    152
    153 TRACE("CIOCPServer::RemoveStaleClient\n");
    154
    155 LINGER lingerStruct;
    156
    157
    158 //
    159 // If we're supposed to abort the connection, set the linger value
    160 // on the socket to 0.
    161 //
    162
    163 if ( !bGraceful )
    164 {
    165
    166 lingerStruct.l_onoff = 1;
    167 lingerStruct.l_linger = 0;
    168 setsockopt( pContext->m_Socket, SOL_SOCKET, SO_LINGER,
    169 (char *)&lingerStruct, sizeof(lingerStruct) );
    170 }
    171
    172
    173
    174 //
    175 // Free context structures
    176 if (m_listContexts.Find(pContext))
    177 {
    178
    179 //
    180 // Now close the socket handle. This will do an abortive or graceful close, as requested.
    181 CancelIo((HANDLE) pContext->m_Socket);
    182
    183 closesocket( pContext->m_Socket );
    184 pContext->m_Socket = INVALID_SOCKET;
    185
    186 while (!HasOverlappedIoCompleted((LPOVERLAPPED)pContext))
    187 Sleep(0);
    188
    189 m_pNotifyProc((LPVOID) m_pFrame, pContext, NC_CLIENT_DISCONNECT);
    190
    191 MoveToFreePool(pContext);
    192
    193 }
    194 }
    195
    196
    197 //##ModelId=4DF8B1ED01D5
    198 void CIOCPServer::Shutdown()
    199 {
    200 if (m_bInit == false)
    201 return;
    202
    203 m_bInit = false;
    204 m_bTimeToKill = true;
    205
    206 // Stop the listener
    207 Stop();
    208
    209
    210 closesocket(m_socListen);
    211 WSACloseEvent(m_hEvent);
    212
    213
    214 CloseCompletionPort();
    215
    216 DeleteCriticalSection(&m_cs);
    217
    218 while (!m_listFreePool.IsEmpty())
    219 delete m_listFreePool.RemoveTail();
    220
    221 }
    222
    223
    224 ////////////////////////////////////////////////////////////////////////////////
    225 //
    226 // FUNCTION: CIOCPServer::MoveToFreePool
    227 //
    228 // DESCRIPTION: Checks free pool otherwise allocates a context
    229 //
    230 // INPUTS:
    231 //
    232 // NOTES:
    233 //
    234 // MODIFICATIONS:
    235 //
    236 // Name Date Version Comments
    237 // N T ALMOND 06042001 1.0 Origin
    238 //
    239 ////////////////////////////////////////////////////////////////////////////////
    240 //##ModelId=4DF8B1ED0216
    241 void CIOCPServer::MoveToFreePool(ClientContext *pContext)
    242 {
    243 CLock cs(m_cs, "MoveToFreePool");
    244 // Free context structures
    245 POSITION pos = m_listContexts.Find(pContext);
    246 if (pos)
    247 {
    248 pContext->m_CompressionBuffer.ClearBuffer();
    249 pContext->m_WriteBuffer.ClearBuffer();
    250 pContext->m_DeCompressionBuffer.ClearBuffer();
    251 pContext->m_ResendWriteBuffer.ClearBuffer();
    252 m_listFreePool.AddTail(pContext);
    253 m_listContexts.RemoveAt(pos);
    254 }
    255 }
    256
    257
    258
    259 ////////////////////////////////////////////////////////////////////////////////
    260 //
    261 // FUNCTION: CIOCPServer::MoveToFreePool
    262 //
    263 // DESCRIPTION: Moves an 'used/stale' Context to the free pool for reuse
    264 //
    265 // INPUTS:
    266 //
    267 // NOTES:
    268 //
    269 // MODIFICATIONS:
    270 //
    271 // Name Date Version Comments
    272 // N T ALMOND 06042001 1.0 Origin
    273 //
    274 ////////////////////////////////////////////////////////////////////////////////
    275 //##ModelId=4DF8B1ED0223
    276 ClientContext* CIOCPServer::AllocateContext()
    277 {
    278 ClientContext* pContext = NULL;
    279
    280 CLock cs(CIOCPServer::m_cs, "AllocateContext");
    281
    282 if (!m_listFreePool.IsEmpty())
    283 {
    284 pContext = m_listFreePool.RemoveHead();
    285 }
    286 else
    287 {
    288 pContext = new ClientContext;
    289 }
    290
    291 ASSERT(pContext);
    292
    293 if (pContext != NULL)
    294 {
    295
    296 ZeroMemory(pContext, sizeof(ClientContext));
    297 pContext->m_bIsMainSocket = false;
    298 memset(pContext->m_Dialog, 0, sizeof(pContext->m_Dialog));
    299 }
    300 return pContext;
    301 }
    302
    303
    304 //##ModelId=4DF8B1ED01D6
    305 void CIOCPServer::ResetConnection(ClientContext* pContext)
    306 {
    307
    308 CString strHost;
    309 ClientContext* pCompContext = NULL;
    310
    311 CLock cs(CIOCPServer::m_cs, "ResetConnection");
    312
    313 POSITION pos = m_listContexts.GetHeadPosition();
    314 while (pos)
    315 {
    316 pCompContext = m_listContexts.GetNext(pos);
    317 if (pCompContext == pContext)
    318 {
    319 RemoveStaleClient(pContext, TRUE);
    320 break;
    321 }
    322 }
    323 }
    324
    325 //##ModelId=4DF8B1ED0159
    326 void CIOCPServer::DisconnectAll()
    327 {
    328 m_bDisconnectAll = true;
    329 CString strHost;
    330 ClientContext* pContext = NULL;
    331
    332 CLock cs(CIOCPServer::m_cs, "DisconnectAll");
    333
    334 POSITION pos = m_listContexts.GetHeadPosition();
    335 while (pos)
    336 {
    337 pContext = m_listContexts.GetNext(pos);
    338 RemoveStaleClient(pContext, TRUE);
    339 }
    340 m_bDisconnectAll = false;
    341
    342 }
    343
    344 //##ModelId=4DF8B1ED01C9
    345 bool CIOCPServer::IsRunning()
    346 {
    347 return m_bInit;
    348 }






     

  • 相关阅读:
    1082 射击比赛 (20 分)
    1091 N-自守数 (15 分)
    1064 朋友数 (20 分)
    1031 查验身份证 (15 分)
    1028 人口普查 (20 分)
    1059 C语言竞赛 (20 分)
    1083 是否存在相等的差 (20 分)
    1077 互评成绩计算 (20 分)
    792. 高精度减法
    791. 高精度加法
  • 原文地址:https://www.cnblogs.com/lovememo/p/2296415.html
Copyright © 2011-2022 走看看