设计概述
服务端通信组件的设计是一项非常严谨的工作,其中性能、伸缩性和稳定性是必须考虑的硬性质量指标,若要把组件设计为通用组件提供给多种已知或未知的上层应用使用,则设计的难度更会大大增加,通用性、可用性和灵活性必须考虑在内。
现以一个基于 IOCP 的通用异步 Windows Socket TCP 服务端组件为例子,讲述其设计与实现相关的问题,希望能引发大家的思考,对大家日后开展相关类似工作时有所帮助。关于通用性、可用性、Socket 模型选型以及接口模型的设计等问题已经在本座前段时间发表的《通用异步 Windows Socket TCP 客户端组件的设计与实现》中进行过阐述,此处就不再重复了。现在主要针对服务端通信组件的特点阐述设计其设计和实现相关的问题。
一、线程结构
与组件相关的线程有 3 种:使用者线程、Accept 线程和工作线程,其中后 2 种由组件实现。
- 使用者线程:通过调用 Start/Stop/Send 等组件方法操作组件的一个或多个线程,通常是程序的主线程或其它业务逻辑线程。
- Accept 线程:使用 AcceptEx() 接收客户端连接请求并创建 Client Socket 的线程,将其独立出来,实现为单独的线程将使组件的模块划分更清晰,更重要的是避免与业务逻辑和通信处理的相互影响。
- 工作线程:使用 GetQueuedCompletionStatus() 监听网络事件并处理网络交互的多个线程,工作线程处理完网络事件后会向上层应用发送 OnAccept/OnSend/OnReceive 等组件通知。工作线程的数量可以根据实际情况之行设置(通常建议为:CPU Core Number * 2 + 2)。
注意:如果上层应用在接收到 OnAccept/OnSend/OnReceive 这些组件通知时直接进行业务逻辑处理并在其中操作组件,则工作线程也成为了使用者线程。另外,如果要处理的业务逻辑比较耗时,上层应用应该在接收到组件通知后交由其他线程处理。
二、性能
组件采用 Windows 平台效率最高的 IOCP Socket 通信模型,因此在通信接口的性能方面是有保证的,这里就不多说了。现在从组件的设计与实现的角度来来阐述性能的优化。组件在代码级别做了很多优化,一些看似多余或繁琐的代码其实都是为了性能服务;组件在设计方面主要采用了 2 中优化策略:缓存池和私有堆。
- 缓存池:在通信的过程中,通常需要频繁的申请和释放内存缓冲区(TBufferObj)和 Socket 相关的结构体(TSocketObj),这会大大影响组件的性能,因此,组件为 TBufferObj 和 TSocketObj 建立了动态缓存池, 只有当缓存池中没有可用对象时才创建新对象,而当缓存对象过多时则会压缩缓存池。
- 私有堆(Private Heap):在操作系统中,new / malloc 等操作是串行化的,虽然一般的应用程序不用太在乎这个问题,但是在一个高并发的服务器中则是个不可忽略的问题,另外 TBufferObj 和 TSocketObj 均为大小固定的结构体,因此非常适合在私有堆中分配内存,避免与 new / malloc 竞争同时又减少内存空洞。(关于私有堆的使用方法请参考这里 ^_^)
三、通用性与可用性
与《通用异步 Windows Socket TCP 客户端组件的设计与实现》描述的客户端接口一样,服务端组件也提供了两组接口:ISocketServer 接口提供组件操作方法,由上层应用直接调用;IServerSocketListener 接口提供组件通知方法,由上层应用实现,这两个接口设计得非常简单,主要方法均不超过 5 个。由于组件自身功能完备(不需要附带其它库或代码)并且职责单一(只管通信,不参与业务逻辑),因此可以十分方便第整合到任何类型的应用程序中。
四、伸缩性
可以根据实际的使用环境要求设置工作线程的数量、 TBufferObj 和 TSocketObj 缓存池的大小、TBufferObj 缓冲区的大小、Socket 监听队列的大小、AccepEx 派发的数目以及心跳检查的间隔等。
五、连接标识
组件完全封装了所有的底层 Socket 通信,上层应用看不到任何通信细节,不必也不能干预任何通信操作。另外,组件在 IServerSocketListener 通知接口的所有方法中都有一个 Connection ID 参数,该参数作为连接标识提供给上层应用识别不同的连接。
下面我们来看看组件的主要实现逻辑。
组件实现
- ISocketServer 和 IServerSocketListener 接口
// 操作类型
enum EnSocketOperation
{
SO_UNKNOWN = 0,
SO_ACCEPT = 1,
SO_CONNECT = 2,
SO_SEND = 3,
SO_RECEIVE = 4,
};
// 监听器基接口
class ISocketListener
{
public:
// 返回值类型
enum EnHandleResult
{
HR_OK = 0,
HR_IGNORE = 1,
HR_ERROR = 2,
};
public:
// 已发出数据通知
virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 已接收数据通知
virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 关闭连接通知
virtual EnHandleResult OnClose(DWORD dwConnectionID) = 0;
// 通信错误通知
virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode) = 0;
public:
virtual ~ISocketListener() {}
};
// 监听器接口
class IServerSocketListener : public ISocketListener
{
public:
// 接收连接通知
virtual EnHandleResult OnAccept(DWORD dwConnectionID) = 0;
// 服务关闭通知
virtual EnHandleResult OnServerShutdown() = 0;
};
// 操作接口
class ISocketServer
{
public:
// 错误码
enum En_ISS_Error
{
ISS_OK = 0,
ISS_SOCKET_CREATE = 1,
ISS_SOCKET_BIND = 2,
ISS_SOCKET_LISTEN = 3,
ISS_CP_CREATE = 4,
ISS_WORKER_THREAD_CREATE = 5,
ISS_SOCKE_ATTACH_TO_CP = 6,
ISS_ACCEPT_THREAD_CREATE = 7,
};
public:
// 启动通行
virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount) = 0;
// 关闭通信
virtual BOOL Stop () = 0;
// 发送数据
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;
// 是否已启动
virtual BOOL HasStarted () = 0;
// 获取错误码
virtual En_ISS_Error GetLastError () = 0;
// 获取错误描述
virtual LPCTSTR GetLastErrorDesc() = 0;
// 获取客户端的地址信息
virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort) = 0;
public:
virtual ~ISocketServer() {}
};
// ISocketServer 接口只能指针
typedef auto_ptr<ISocketServer> ISocketServerPtr;
从上面的接口定义可以看出,ISocketServer 和 IServerSocketListener 接口非常简单,上层应用只需调用 ISocketServer 接口的 Start()、Stop() 和 Send() 方法操作组件,并实现 IServerSocketListener 接口的几个 On***() 方法接收组件通知,底层通信过程并不需要上层应用参与。
- TBufferObj 和 TSocketObj 结构体
struct TBufferObjBase
{
OVERLAPPED ov; // 异步 Overlapped
WSABUF buff; // 数据缓冲 buffer
EnSocketOperation operation; // 操作类型
};
struct TBufferObj : public TBufferObjBase
{
SOCKET client; // Client Socket
};
typedef list<TBufferObj*> TBufferObjPtrList;
struct TSocketObjBase
{
SOCKET socket; // Client Socket
};
struct TSocketObj : public TSocketObjBase
{
SOCKADDR_IN clientAddr; // Socket 地址
DWORD connID; // Connection ID
CCriSec2 crisec; // Critical Session
};
typedef list<TSocketObj*> TSocketObjPtrList;
typedef hash_map<DWORD, TSocketObj*> TSocketObjPtrMap;
typedef TSocketObjPtrMap::iterator TSocketObjPtrMapI;
typedef TSocketObjPtrMap::const_iterator TSocketObjPtrMapCI;
TBufferObj 和 TSocketObj 是负责通信数据交换的载体,并由对应的缓冲池负责管理它们的实例对象。
- CIocpServer 类
// 组件实现类
class CIocpServer : public ISocketServer
{
public:
/* 如果需要,可以提供 getter & setter 方法设置下列工作参数 */
static const long DEFAULT_IOCP_THREAD_COUNT; // 默认工作线程数
static const long DEFAULT_ACCEPT_SOCKET_COUNT; // 默认并发 AcceptEx 调用次数
static const long DEFAULT_IOCP_BUFFER_SIZE; // 默认 TBufferObj 数据缓冲区大小
static const long DEFAULT_SOCKET_LISTEN_QUEUE; // 默认 Socket 等候队列数目
static const long DEFAULT_FREE_SOCKETOBJ_POOL; // TSocketObj 缓冲池大小
static const long DEFAULT_FREE_BUFFEROBJ_POOL; // TBufferObj 缓冲池大小
static const long DEFAULT_FREE_SOCKETOBJ_HOLD; // TSocketObj 缓冲池压缩阀值
static const long DEFAULT_FREE_BUFFEROBJ_HOLD; // TBufferObj 缓冲池压缩阀值
static const long DEFALUT_KEEPALIVE_TIMES; // 心跳检测次数
static const long DEFALUT_KEEPALIVE_INTERVAL; // 心跳检测间隔
public:
CIocpServer(IServerSocketListener* pListener) // IServerSocketListener 监听器
: m_psoListener(pListener)
, m_hAcceptThread(NULL)
, m_hCompletePort(NULL)
, m_soListen(INVALID_SOCKET)
, m_pfnAcceptEx(NULL)
, m_pfnGetAcceptExSockaddrs(NULL)
, m_enLastError(ISS_OK)
, m_bStarted(FALSE)
, m_semAccept(DEFAULT_IOCP_THREAD_COUNT, DEFAULT_IOCP_THREAD_COUNT)
{
ASSERT(m_wsSocket.IsValid());
ASSERT(m_psoListener);
Reset();
}
virtual ~CIocpServer()
{
if(HasStarted())
Stop();
}
public:
/* ISocketServer 接口方法实现 */
virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount = DEFAULT_IOCP_THREAD_COUNT);
virtual BOOL Stop ();
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen);
virtual BOOL HasStarted () {return m_bStarted;}
virtual En_ISS_Error GetLastError () {return m_enLastError;}
virtual LPCTSTR GetLastErrorDesc();
virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort);
private:
void SetLastError(En_ISS_Error code, LPCTSTR func, int ec);
void Reset();
private:
BOOL CreateListenSocket(LPCTSTR pszBindAddress, USHORT usPort);
BOOL CreateCompletePort();
BOOL CreateWorkerThreads(long lThreadCount);
BOOL StartAcceptThread();
void CloseListenSocket();
void WaitForAcceptThreadEnd();
void CloseClientSocket();
void ReleaseFreeSocket();
void CompressFreeSocket(size_t size);
void ReleaseFreeBuffer();
void CompressFreeBuffer(size_t size);
void WaitForWorkerThreadEnd();
void TerminateWorkerThread();
void CloseCompletePort();
/* TBufferObj 和 TSocketObj 缓冲池系列方法 */
TBufferObj* GetFreeBufferObj(int iLen = DEFAULT_IOCP_BUFFER_SIZE);
TSocketObj* GetFreeSocketObj();
void AddFreeBufferObj(TBufferObj* pBufferObj);
void AddFreeSocketObj(DWORD dwConnID, BOOL bClose = TRUE, BOOL bGraceful = TRUE, BOOL bReuseAddress = FALSE);
TBufferObj* CreateBufferObj();
TSocketObj* CreateSocketObj();
void DeleteBufferObj(TBufferObj* pBufferObj);
void DeleteSocketObj(TSocketObj* pSocketObj);
void AddClientSocketObj(DWORD dwConnID, TSocketObj* pSocketObj);
TSocketObj* FindSocketObj(DWORD dwConnID);
private:
static UINT WINAPI AcceptThreadProc(LPVOID pv); // Accept 线程函数
static UINT WINAPI WorkerThreadProc(LPVOID pv); // 工作线程函数
void HandleIo (TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwBytes, DWORD dwErrorCode);
void HandleAccept (SOCKET soListen, TBufferObj* pBufferObj);
void HandleSend (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
void HandleReceive (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
int DoSend (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
int DoReceive (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
private:
SOCKET GetAcceptSocket();
BOOL DeleteAcceptSocket(SOCKET socket, BOOL bCloseSocket = FALSE);
void ReleaseAcceptSockets();
private:
// 这个属性是否似曾相识 ^_^ (参考讲述客户端组件的那篇文章)
CInitSocket m_wsSocket;
// AcceptEx() 函数指针
LPFN_ACCEPTEX m_pfnAcceptEx;
// GetAcceptExSockAddrs() 函数指针
LPFN_GETACCEPTEXSOCKADDRS m_pfnGetAcceptExSockaddrs;
private:
IServerSocketListener* m_psoListener; // 监听器指针
volatile BOOL m_bStarted; // 启动标识
volatile DWORD m_dwConnID; // Connection ID 当前值
En_ISS_Error m_enLastError;
SOCKET m_soListen; // 监听 Socket
HANDLE m_hCompletePort; // 完成端口
HANDLE m_hAcceptThread; // Accept 线程句柄
vector<HANDLE> m_vtWorkerThreads; // 工作线程句柄集合
TBufferObjPtrList m_lsFreeBuffer; // TBufferObj 缓冲池队列
TSocketObjPtrList m_lsFreeSocket; // TSocketObj 缓冲池队列
TSocketObjPtrMap m_mpClientSocket; // Connection ID 映射
CCriSec m_csFreeBuffer;
CCriSec m_csFreeSocket;
CCriSec m_csClientSocket;
CEvt m_evtAccept;
CSEM m_semAccept;
CCriSec m_csAccept;
ulong_set m_setAccept;
CPrivateHeap m_hpPrivate; // 缓冲池私有堆
};
这个类定义文件看上去有点复杂,但我们只需关注被注释的那些方法和属性就可以了。从上面的类定义可以看出,CIocpServer 实现了 ISocketServer 接口,而它本身并没有增加任何 public 方法,因此它的使用方式十分简单。另外,CIocpServer 的构造函数接收一个 ISocketServerListener 指针,CIocpServer 就是通过该指针把组件通知发送给上层应用的。
- CIocpServer 的主要方法
// 事件触发宏
#define FireAccept(id) (m_bStarted ? m_psoListener->OnAccept(id) : ISocketListener::HR_IGNORE)
#define FireSend(id, buff, len) (m_bStarted ? m_psoListener->OnSend(id, buff, len) : ISocketListener::HR_IGNORE)
#define FireReceive(id, buff, len) (m_bStarted ? m_psoListener->OnReceive(id, buff, len) : ISocketListener::HR_IGNORE)
#define FireClose(id) (m_bStarted ? m_psoListener->OnClose(id) : ISocketListener::HR_IGNORE)
#define FireError(id, op, code) (m_bStarted ? m_psoListener->OnError(id, op, code) : ISocketListener::HR_IGNORE)
#define FireServerShutdown() m_psoListener->OnServerShutdown()
// 成员常量定义
const long CIocpServer::DEFAULT_IOCP_THREAD_COUNT = ::GetCpuCount() * 2 + 2;
const long CIocpServer::DEFAULT_ACCEPT_SOCKET_COUNT = DEFAULT_IOCP_THREAD_COUNT;
const long CIocpServer::DEFAULT_IOCP_BUFFER_SIZE = 4 * 1024;
const long CIocpServer::DEFAULT_SOCKET_LISTEN_QUEUE = 30;
const long CIocpServer::DEFAULT_FREE_SOCKETOBJ_POOL = 50;
const long CIocpServer::DEFAULT_FREE_BUFFEROBJ_POOL = 150;
const long CIocpServer::DEFAULT_FREE_SOCKETOBJ_HOLD = 150;
const long CIocpServer::DEFAULT_FREE_BUFFEROBJ_HOLD = 450;
const long CIocpServer::DEFALUT_KEEPALIVE_TIMES = 3;
const long CIocpServer::DEFALUT_KEEPALIVE_INTERVAL = 10 * 1000;
BOOL CIocpServer::Start(LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount)
{
if(CreateListenSocket(pszBindAddress, usPort)) // 创建监听 Socket
if(CreateCompletePort()) // 创建完成端口
if(CreateWorkerThreads(lThreadCount)) // 启动工作线程
if(StartAcceptThread()) // 启动 Accept 线程
return (m_bStarted = TRUE);
Stop();
return (m_bStarted = FALSE);
}
BOOL CIocpServer::Stop()
{
BOOL bStarted = m_bStarted;
m_bStarted = FALSE;
CloseListenSocket();
if(bStarted)
{
WaitForAcceptThreadEnd(); // 停止 Accept 线程
FireServerShutdown(); // 发送关闭通知
CloseClientSocket(); // 关闭所有连接
WaitForWorkerThreadEnd(); // 停止工作线程
ReleaseFreeSocket(); // 释放 TSocketObj 缓冲池
ReleaseFreeBuffer(); // 释放 TBufferObj 缓冲池
}
else
TerminateWorkerThread(); // 终止工作线程
CloseCompletePort(); // 关闭完成端口
Reset(); // 重设组件属性
return TRUE;
}
BOOL CIocpServer::Send(DWORD dwConnID, const BYTE* pBuffer, int iLen)
{
ASSERT(iLen > 0);
TSocketObj* pSocketObj = NULL;
{
CCriSecLock locallock1(m_csClientSocket);
// 根据 Connection ID 查找对应 TSocketObj 对象
TSocketObjPtrMapCI it = m_mpClientSocket.find(dwConnID);
if(it != m_mpClientSocket.end())
pSocketObj = it->second;
}
if(pSocketObj == NULL)
return FALSE;
CCriSecLock2 locallock2(pSocketObj->crisec);
int iRemain = iLen;
while(iRemain > 0)
{
// 填充 TBufferObj 缓冲区
int iBufferSize = min(iRemain, DEFAULT_IOCP_BUFFER_SIZE);
TBufferObj* pBufferObj = GetFreeBufferObj(iBufferSize);
memcpy(pBufferObj->buff.buf, pBuffer, iBufferSize);
// 发送数据
if(DoSend(pSocketObj, pBufferObj) != NO_ERROR)
return FALSE;
iRemain -= iBufferSize;
pBuffer += iBufferSize;
}
return TRUE;
}
// Accept 线程函数
UINT WINAPI CIocpServer::AcceptThreadProc(LPVOID pv)
{
CIocpServer* pServer = (CIocpServer*)pv;
ASSERT(pServer->m_soListen != INVALID_SOCKET);
TRACE("-----> 启动监听线程 <----- ");
while(TRUE)
{
HANDLE handles[] = {pServer->m_semAccept, pServer->m_evtAccept};
DWORD dwResult = ::WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE);
if(dwResult == WAIT_OBJECT_0)
{
TBufferObj* pBufferObj = pServer->GetFreeBufferObj();
SOCKET soClient = pServer->GetAcceptSocket();
// 调用 AcceptEx() 异步接收连接请求
if(::PostAccept(pServer->m_pfnAcceptEx, pServer->m_soListen, soClient, pBufferObj) != NO_ERROR)
{
pServer->DeleteBufferObj(pBufferObj);
pServer->DeleteAcceptSocket(soClient);
::ManualCloseSocket(soClient);
TRACE1("-----> 监听线程异常终止 (EC: %d) <----- ", ::WSAGetLastError());
break;
}
}
else if(dwResult == WAIT_OBJECT_0 + 1)
{
pServer->ReleaseAcceptSockets();
TRACE("-----> 停止监听线程 <----- ");
break;
}
else
VERIFY(FALSE);
}
return 0;
}
// 工作线程函数
UINT WINAPI CIocpServer::WorkerThreadProc(LPVOID pv)
{
CIocpServer* pServer = (CIocpServer*)pv;
while (TRUE)
{
DWORD dwErrorCode = NO_ERROR;
DWORD dwBytes;
OVERLAPPED* pOverlapped;
TSocketObj* pSocketObj;
TBufferObj* pBufferObj;
// 等待完成事件
BOOL result = ::GetQueuedCompletionStatus
(
pServer->m_hCompletePort,
&dwBytes,
(PULONG_PTR)&pSocketObj,
&pOverlapped,
INFINITE
);
if(dwBytes == 0 && pSocketObj == NULL && pOverlapped == NULL)
return 0;
pBufferObj = CONTAINING_RECORD(pOverlapped, TBufferObj, ov);
if (!result)
{
DWORD dwFlag = 0;
DWORD dwSysCode = ::GetLastError();
if(pServer->m_bStarted)
{
SOCKET sock = pBufferObj->operation != SO_ACCEPT ? pSocketObj->socket : (SOCKET)pSocketObj;
result = ::WSAGetOverlappedResult(pSocketObj->socket, &pBufferObj->ov, &dwBytes, FALSE, &dwFlag);
if (!result)
{
dwErrorCode = ::WSAGetLastError();
TRACE3("GetQueuedCompletionStatus failed (SYS: %d, SOCK: %d, FLAG: %d) ", dwSysCode, dwErrorCode, dwFlag);
}
}
else
dwErrorCode = dwSysCode;
}
// 处理 IO 事件
pServer->HandleIo(pSocketObj, pBufferObj, dwBytes, dwErrorCode);
}
return 0;
}
// 处理 IO 事件
void CIocpServer::HandleIo(TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwBytes, DWORD dwErrorCode)
{
ASSERT(pBufferObj != NULL);
ASSERT(pSocketObj != NULL);
if(dwErrorCode != NO_ERROR)
{
if(pBufferObj->operation != SO_ACCEPT)
{
FireError(pSocketObj->connID, pBufferObj->operation, dwErrorCode);
AddFreeSocketObj(pSocketObj->connID);
}
else
{
DeleteAcceptSocket(pBufferObj->client);
::ManualCloseSocket(pBufferObj->client);
}
AddFreeBufferObj(pBufferObj);
return;
}
if(dwBytes == 0 && pBufferObj->operation != SO_ACCEPT)
{
FireClose(pSocketObj->connID);
AddFreeSocketObj(pSocketObj->connID);
AddFreeBufferObj(pBufferObj);
return;
}
pBufferObj->buff.len = dwBytes;
switch(pBufferObj->operation)
{
case SO_ACCEPT: // 处理 Accept 事件(内部调用 FireAccept())
HandleAccept((SOCKET)pSocketObj, pBufferObj);
break;
case SO_SEND: // 处理 Send 事件(内部调用 FireSend())
HandleSend(pSocketObj, pBufferObj);
break;
case SO_RECEIVE: // 处理 Receive 事件(内部调用 FireReceive())
HandleReceive(pSocketObj, pBufferObj);
break;
default:
ASSERT(FALSE);
}
}
上面的代码就不多作解析了,有兴趣的朋友可以下载完整的代码烟酒烟酒 ^_^ 下面一起来看一个组件的使用示例。
使用示例
好了,码了一个晚上的字,累啊!到此为止吧,感谢收看~ 晚安 ^_^