#include "pch.h" #include "framework.h" #include <winsock2.h> #include <MSTcpIP.h> #pragma comment(lib,"ws2_32.lib") #include "CLock.h" #include "CIOCPServer.h" #include "../common/lz4/lz4.h" //传送协议规则 //第一个字节 数据包是否加密 //int 4字节 数据包原大小 //int 4字节 数据包压缩后大小 //int 4字节 数据包最大缓冲区 #define HDR_SIZE 13 //先声明临界区对象 防止找不到对象 CRITICAL_SECTION CIOCPServer::m_cs; /// <summary> /// 初始化类对象成员 /// </summary> CIOCPServer::CIOCPServer() { WSADATA wsaData; WSAStartup(0x202, &wsaData); //初始化套接字2.2 //初始化临界区对象 InitializeCriticalSection(&m_cs); m_hThread = NULL; m_socListen = NULL; m_bTimeToKill = false; m_bDisconnectAll = false; m_hEvent = NULL; m_hCompletionPort = NULL; m_bInit = false; m_nCurrentThreads = 0; m_nBusyThreads = 0; m_nSendKbps = 0; m_nRecvKbps = 0; m_nMaxConnections = 10000; m_nKeepLiveTime = 1000 * 60 * 3; // 三分钟探测一次 } CIOCPServer::~CIOCPServer() { } /// <summary> /// 初始化监听端口 /// </summary> /// <param name="nMaxConnections">最大连接数</param> /// <param name="nPort">端口</param> /// <returns>bool</returns> bool CIOCPServer::Initialize(int nMaxConnections, int nPort) { m_nMaxConnections = nMaxConnections; //创建一个重叠模型socket初始化套接字WSA_FLAG_OVERLAPPED m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //初始化 套接字 if (m_socListen == INVALID_SOCKET) { return false; } //完成端口初始化 InitializeIOCP(); #pragma region 绑定端口&监听 SOCKADDR_IN saServer; //大小端转换 绑定端口 saServer.sin_port = htons(nPort); //绑定空地址 saServer.sin_family = AF_INET; saServer.sin_addr.s_addr = INADDR_ANY; // 绑定端口 套接字 int nRet = bind(m_socListen, (LPSOCKADDR)&saServer, sizeof(struct sockaddr)); if (nRet == SOCKET_ERROR) { closesocket(m_socListen); return false; } // 监听 nRet = listen(m_socListen, SOMAXCONN); if (nRet == SOCKET_ERROR) { closesocket(m_socListen); return false; } #pragma endregion #pragma region 事件模型线程池 // 创建一个事件模型 m_hEvent = WSACreateEvent(); if (m_hEvent == WSA_INVALID_EVENT) { closesocket(m_socListen); return false; } //绑定当前事件和套接字 FD_ACCEPT 应用程序想接收与进入连接有关的通知 nRet = WSAEventSelect(m_socListen, m_hEvent, FD_ACCEPT); if (nRet == SOCKET_ERROR) { closesocket(m_socListen); return false; } //开启监听线程 m_hThread = (HANDLE)_beginthreadex(NULL, 0, ListenThreadProc, (void*)this, 0, NULL); #pragma endregion return false; } /// <summary> /// 客户端连接线程 /// </summary> /// <param name="lParam">this</param> /// <returns></returns> unsigned WINAPI CIOCPServer::ListenThreadProc(LPVOID lParam) { //强转当前处理iocp类 CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam); WSANETWORKEVENTS events; while (1)//循环等待接收连接处理信号 { DWORD dwRet; //是否有客户端连接 1 数量 2 对象事件 dwRet = WSAWaitForMultipleEvents(1, &pThis->m_hEvent, FALSE, 1000,//超时时间 FALSE); //事件对象超时 重新等待 if (dwRet == WSA_WAIT_TIMEOUT) continue; int nRet = WSAEnumNetworkEvents(pThis->m_socListen, pThis->m_hEvent, &events); if (nRet == SOCKET_ERROR) { break; } //是否有客户端连接事件 if (events.lNetworkEvents & FD_ACCEPT) { pThis->OnAccept();//连接处理代码 } } return 0; } /// <summary> /// 连接处理 /// </summary> void CIOCPServer::OnAccept() { SOCKADDR_IN SockAddr; SOCKET clientSocket; int nRet; int nLen; //退出 if (m_bTimeToKill || m_bDisconnectAll) return; //连接 在事件处理过后 代表有人连接时 再去缓冲区拿数据 nLen = sizeof(SOCKADDR_IN); clientSocket = accept(m_socListen, (LPSOCKADDR)&SockAddr, &nLen); if (clientSocket == SOCKET_ERROR) { nRet = WSAGetLastError(); if (nRet != WSAEWOULDBLOCK) { return; } } //初始化 ClientContext* pContext = new ClientContext; if (pContext == NULL) return; pContext->m_Socket = clientSocket; pContext->m_wsaInBuffer.buf = (char*)pContext->m_byInBuffer; pContext->m_wsaInBuffer.len = sizeof(pContext->m_byInBuffer); //完成端口 传递pcontext 初始化 HANDLE h = CreateIoCompletionPort((HANDLE)clientSocket, m_hCompletionPort, (DWORD)pContext, 0); if (!(m_hCompletionPort == h)) { delete pContext; pContext = NULL; closesocket(clientSocket); closesocket(m_socListen); return; } const char chOpt = 1; // Set KeepAlive 开启保活机制 给每个创建的客户端开启保存获取包 setsockopt(pContext->m_Socket, SOL_SOCKET, SO_KEEPALIVE, &chOpt, sizeof(char)); // 设置超时详细信息 保活探测 tcp_keepalive klive; klive.onoff = 1; // 启用保活 klive.keepalivetime = m_nKeepLiveTime; klive.keepaliveinterval = 1000 * 10; // 重试间隔为10秒 unsigned long lgpt=NULL; //设置套接字模式 int t=WSAIoctl ( pContext->m_Socket, SIO_KEEPALIVE_VALS, &klive, sizeof(tcp_keepalive), NULL, 0, &lgpt, 0, NULL ); CLock cs(m_cs);//加锁 //当前客户端连接加入队列 m_listContexts.AddTail(pContext); //完成端口 向端口发送数据告知有客户端成功连接 功能占时没写 //BOOL bSuccess = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)pContext, NULL); ////客户端是否断开连接 //if ((!bSuccess && GetLastError() != ERROR_IO_PENDING)) //{ // //清除客户端 // RemoveStaleClient(pContext, TRUE); // return; //} PostRecv(pContext); } /// <summary> /// 接收数据 /// </summary> /// <param name="pContext"></param> void CIOCPServer::PostRecv(ClientContext* pContext) { // issue a read request OVERLAPPEDPLUS* pOverlap = new OVERLAPPEDPLUS(IORead); ULONG ulFlags = MSG_PARTIAL; DWORD dwNumberOfBytesRecvd; //代表异步收好了 那边再来展示 UINT nRetVal = WSARecv(pContext->m_Socket, &pContext->m_wsaInBuffer, 1, &dwNumberOfBytesRecvd, &ulFlags, &pOverlap->m_ol, NULL); //WSAGetLastError() == WSAECONNRESET 应该是或者 if (nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { RemoveStaleClient(pContext, FALSE); } } /// <summary> /// 创建线程池绑定端口 /// </summary> /// <param name=""></param> /// <returns></returns> bool CIOCPServer::InitializeIOCP() { SOCKET s; DWORD i; UINT nThreadID; SYSTEM_INFO systemInfo; //完成所有端口绑定 m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (m_hCompletionPort == NULL) { return false; } //获取计算机信息 GetSystemInfo(&systemInfo); m_nThreadPoolMin = systemInfo.dwNumberOfProcessors * 2; //开2*线程 UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * 2; HANDLE hWorker; m_nWorkerCnt = 0; //创建线程池 for (i = 0; i < nWorkerCnt; i++) { hWorker = (HANDLE)_beginthreadex(NULL, 0, ThreadPoolFunc, //轮询收数据 (void*)this, 0, &nThreadID); if (hWorker == NULL) { CloseHandle(m_hCompletionPort); return false; } m_nWorkerCnt++; CloseHandle(hWorker); } return true; } /// <summary> /// 线程池工作线程 /// </summary> /// <param name="thisContext"></param> /// <returns></returns> unsigned CIOCPServer::ThreadPoolFunc(LPVOID thisContext) { ULONG ulFlags = MSG_PARTIAL; CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext); ASSERT(pThis); //完成端口 HANDLE hCompletionPort = pThis->m_hCompletionPort; DWORD dwIoSize; LPOVERLAPPED lpOverlapped; ClientContext* lpClientContext; OVERLAPPEDPLUS* pOverlapPlus; bool bError; bool bEnterRead; InterlockedIncrement(&pThis->m_nCurrentThreads);//i++ InterlockedIncrement(&pThis->m_nBusyThreads); for (BOOL bStayInPool = TRUE; bStayInPool && pThis->m_bTimeToKill == false; ) { pOverlapPlus = NULL; lpClientContext = NULL; bError = false; bEnterRead = false; InterlockedDecrement(&pThis->m_nBusyThreads);//i-- //检查完成端口状态 BOOL bIORet = GetQueuedCompletionStatus( hCompletionPort, &dwIoSize, (LPDWORD)&lpClientContext, &lpOverlapped, INFINITE); DWORD dwIOError = GetLastError(); pOverlapPlus = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol); //忙碌++ int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads); //删除无用客户端 if (!bIORet && dwIOError != WAIT_TIMEOUT) { if (lpClientContext && pThis->m_bTimeToKill == false) { pThis->RemoveStaleClient(lpClientContext, FALSE); } continue; // anyway, this was an error and we should exit bError = true; } if (!bError) { if (bIORet && NULL != pOverlapPlus && NULL != lpClientContext) { try { if (pOverlapPlus->m_ioType == IORead) {//处理客户端发来消息 pThis->OnClientReading(lpClientContext, dwIoSize); } if (pOverlapPlus->m_ioType == IOWrite) {//向客户端发送消息 pThis->OnClientWriting(lpClientContext, dwIoSize); } } catch (...) {} } } if (pOverlapPlus) delete pOverlapPlus; // from previous call } InterlockedDecrement(&pThis->m_nWorkerCnt);//减减 InterlockedDecrement(&pThis->m_nCurrentThreads);//减减 InterlockedDecrement(&pThis->m_nBusyThreads);//减减 return 0; } /// <summary> /// 处理客户端发来消息 /// </summary> /// <param name="pContext">发送来的数据结构</param> /// <param name="dwIoSize">数据包大小</param> /// <returns></returns> bool CIOCPServer::OnClientReading(ClientContext* pContext, DWORD dwIoSize) { //传送协议规则 //第一个字节 数据包是否加密 //int 4字节 数据包原大小 //int 4字节 数据包压缩后大小 CLock cs(CIOCPServer::m_cs); try { //如果为0就删除此客户端 if (dwIoSize == 0) { RemoveStaleClient(pContext, FALSE); return false; } //接收数据 pContext->m_CompressionBuffer.Write(pContext->m_byInBuffer, dwIoSize); //数据包大小是否合法 while (pContext->m_CompressionBuffer.m_csSize > HDR_SIZE) { int PacketLen = 0;//这里是原包大小 int UnZipLen = 0;//这里是压缩后包大小 int MaxPacketLen = 0;//这里是原包缓冲区大小 CopyMemory(&PacketLen, pContext->m_CompressionBuffer.m_pBase + 1, sizeof(int)); CopyMemory(&UnZipLen, pContext->m_CompressionBuffer.m_pBase + 5, sizeof(int)); CopyMemory(&MaxPacketLen, pContext->m_CompressionBuffer.m_pBase + 9, sizeof(int)); if (!PacketLen ||!UnZipLen || !MaxPacketLen) { return false; } //去除协议头 char temp[HDR_SIZE] = {}; pContext->m_CompressionBuffer.Read((PBYTE)temp, HDR_SIZE); PBYTE pDst = new BYTE[MaxPacketLen]; //2 返回解压数据 3原大小 4压缩大小 int nRet = LZ4_decompress_safe((char*)pContext->m_CompressionBuffer.m_pBase, (char*)pDst, UnZipLen, PacketLen); if (nRet < 1) { return false; } pContext->m_DeCompressionBuffer.ClearBuffer(); //赋值给解压缩后的大小 pContext->m_DeCompressionBuffer.Write(pDst, PacketLen); delete[] pDst; } } catch (...) { pContext->m_CompressionBuffer.ClearBuffer(); // 要求重发,就发送0, 内核自动添加数包标志 Send(pContext, NULL, 0); } //接收包 每次接受完就要接收下一个包 PostRecv(pContext); return true; } //向客户端发送消息 bool CIOCPServer::OnClientWriting(ClientContext* pContext, DWORD dwIoSize) { try { ULONG ulFlags = MSG_PARTIAL; pContext->m_WriteBuffer.Delete(dwIoSize); if (pContext->m_WriteBuffer.m_csSize == 0) { pContext->m_WriteBuffer.ClearBuffer(); SetEvent(pContext->m_hWriteComplete); return true; } else { OVERLAPPEDPLUS* pOverlap = new OVERLAPPEDPLUS(IOWrite); pContext->m_wsaOutBuffer.buf = (char*)pContext->m_WriteBuffer.m_pBase; pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.m_csSize; //发送客户端数据 int nRetVal = WSASend(pContext->m_Socket, &pContext->m_wsaOutBuffer, 1, &pContext->m_wsaOutBuffer.len, ulFlags, &pOverlap->m_ol, NULL); //失败就是删除客户端 if (nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) { RemoveStaleClient(pContext, FALSE); } } } catch (...) {} return false; } //发送消息 void CIOCPServer::Send(ClientContext* pContext, LPBYTE lpData, UINT nSize) { if (pContext == NULL) return; try { if (nSize > 0) { //压缩数据 const size_t src_size = strlen((char*)lpData) + 1; const size_t max_dst_size = LZ4_compressBound(src_size); LPBYTE pDest = new BYTE[max_dst_size];//分配压缩数据的空间 int nRet = LZ4_compress_default((char*)lpData, (char*)pDest, src_size, max_dst_size); if (nRet <1) { delete[] pDest; return; } LONG nBufLen = src_size + HDR_SIZE;//数据中加入数据头标识大小 //是否加密 pContext->m_WriteBuffer.Write(0, sizeof(char)); //原数据包大小 pContext->m_WriteBuffer.Write((PBYTE)&src_size, sizeof(src_size)); //压缩后大小 pContext->m_WriteBuffer.Write((PBYTE)&nRet, sizeof(nRet)); //缓冲区大小 pContext->m_WriteBuffer.Write((PBYTE)&max_dst_size, sizeof(max_dst_size)); //数据包 pContext->m_WriteBuffer.Write(pDest, sizeof(nRet)); delete[] pDest; // 发送完后,再备份数据, 因为有可能是m_ResendWriteBuffer本身在发送,所以不直接写入 LPBYTE lpResendWriteBuffer = new BYTE[nSize]; CopyMemory(lpResendWriteBuffer, lpData, nSize); pContext->m_ResendWriteBuffer.ClearBuffer(); pContext->m_ResendWriteBuffer.Write(lpResendWriteBuffer, nSize); // 备份发送的数据 delete[] lpResendWriteBuffer; } else // 要求重发 { // 备份发送的数据 } WaitForSingleObject(pContext->m_hWriteComplete, INFINITE); OVERLAPPEDPLUS* pOverlap = new OVERLAPPEDPLUS(IOWrite); //让完成端口发数据 PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)pContext, &pOverlap->m_ol); pContext->m_nMsgOut++; } catch (...) {} } // 删除客户端 void CIOCPServer::RemoveStaleClient(ClientContext* pContext, BOOL bGraceful) { CLock cs(m_cs); LINGER lingerStruct; if (!bGraceful) { lingerStruct.l_onoff = 1; lingerStruct.l_linger = 0; //把数据发送完 setsockopt(pContext->m_Socket, SOL_SOCKET, SO_LINGER, (char*)&lingerStruct, sizeof(lingerStruct)); } // //防止当前客户端已经不存在 列表中 if (m_listContexts.Find(pContext)) { // // 发完消息删除socket CancelIo((HANDLE)pContext->m_Socket); closesocket(pContext->m_Socket); pContext->m_Socket = INVALID_SOCKET; //检查是否有没有返回的重叠I/O请求 while (!HasOverlappedIoCompleted((LPOVERLAPPED)pContext)) Sleep(0); MoveToFreePool(pContext); } } void CIOCPServer::MoveToFreePool(ClientContext* pContext) { //清楚列表这个接连结构体 CLock cs(m_cs); POSITION pos = m_listContexts.Find(pContext); if (pos) { pContext->m_CompressionBuffer.ClearBuffer(); pContext->m_WriteBuffer.ClearBuffer(); pContext->m_DeCompressionBuffer.ClearBuffer(); pContext->m_ResendWriteBuffer.ClearBuffer(); m_listFreePool.AddTail(pContext); m_listContexts.RemoveAt(pos); } }