在服务器的网络层中,发送缓冲区是一个不可绕过的课题
目前我遇到了主要有两种处理方式
方式一:队列处理
1、在逻辑线程里面有一个总的发送队列,然后服务器每帧都会处理这个队列
2、每一个cLink里面有一个消息队列,当cLink的状态变化时候会去处理这个消息队列
3、在每一帧处理的时候
1 switch(pLink->m_eStat) 2 { 3 case EF_IO_EAGAIN: 4 pLink->m_send_q.push(msg); 5 continue; 6 break; 7 case EF_IO_NORMAL: 8 { 9 switch(send_message_process(&msg)) 10 { 11 case EF_EVR_TX_EAGAIN: 12 { 13 pLink->m_send_q.push(msg); 14 pLink->m_eStat = EF_IO_EAGAIN; 15 16 uint32 type = EF_EVT_READ | EF_EVT_WRITE; 17 ModIoHandle(pLink->_fd, pLink->_sn, type); 18 19 // cout << "mod_io_handler " << pLink->_fd << ":" << pLink->_sn << " " << type << endl; 20 } 21 break;
如果该链接状态可以发送则直接发送出去,否则该消息从总的消息队列转移到了cLink的消息队列,
4、在epoll中,当cLink的网络状态转换为可发送时候,再针对每一个cLink的发送队列进行处理,发送消息
优点:
处理方式简单,处理流程清晰明了
缺点:
因为每一个队列里面都是一个消息包,发送的时候,每一个包发送一次,调用一次send(),可能会造成太多小包的发送
方式二:总的缓冲区处理
1、不使用队列,而是维护自定义的一个消息缓冲区 cSendMsg
1、每一次都把消息先放进消息缓冲区,自己缓冲内存,缓冲大小可扩展(有最大值限制)
2、每一次发送的时候把缓冲区的内容取出,整块内存发送
缺点:
维护这个缓冲区需要自己处理,容易出错
优点:
可以避免多次发送小包,可以多个消息包作为一个打包一起发送,提高IO效率
static const uint32 DEFAUTL_BUFFER = 1024 * 40; //10k class CEncrypt; static const uint32 c_wMaxSendMsg = 5120000; class cSendMsg { public: ~cSendMsg(); cSendMsg(uint32 sz = DEFAUTL_BUFFER); bool AddMsg(const void* pMsg, uint32 nSize); bool AddQueueMsg(const char* pBuf, uint32 nSize); bool Init(uint32 sz); bool Release(); bool AddBuffer(const void* pMsg, uint32 sz); bool AddQueueBuffer(const char* pBuf, uint32 sz); bool AddQueueBufferFront(const char* pBuf, uint32 sz); bool AddQueueMsgFront(const char* pBuf, uint32 nSize); //bool AddQueueBufferA(const char* pBuf, uint32 nSize); bool DoubleBuf(uint32 dwSize = 0); void ResetBuf(); void ResetBufLen(uint32 nLen, uint32 off); const char* GetGroupBuffer(uint32& nLen, uint32& off) const; const char* GetQueueBuffer(uint32& nLen, uint32& off) const; uint32& GetLength(){return *m_pLength;} bool IsEmpty(){ if (!m_pBuffer) return true; return GetLength() == 0; } void ResetBufLen(uint32 nLen); const char* GetBuffer(uint32& len) const; ////////////////////////////////////////////////////////////////////////// //加密 、 解密 void Encode(const char* buffer, uint32 size); void Decode(const char* buffer, uint32 size); bool SetEncode(int32 id, const char* KeyStr) ; void GenerateRandEncodeID(); void GenerateRandKey(); void InitEncode(); void GetNewEncode(int32& EncodeID, char* KeyStr); bool EncodeData(char* srcBuffer,char* objBuffer, uint32 size); bool DecodeData(char* srcBuffer,char* objBuffer, uint32 size); private: char* m_pBuffer; //带长度 char* m_pBuf; //不带长度 uint32* m_pLength; uint32 m_dwSize; CEncrypt* m_pEncode; //加密类成员 int32 m_EncodeID; //当前的加密算法编号 int32 m_NewEncodeID; //准备更换的加密算法编号 char m_KeyValue[17]; //当前的密钥 char m_NewKeyValue[17]; //准备更换的密钥 };
static const uint32 g_nMaxDoubleSize = 1024 * 1024 * 10; //扩展最大10M cSendMsg::cSendMsg(uint32 sz) { if (sz != 0) Init(sz); else m_pBuffer = NULL; m_pEncode = mempool_NEW(CEncrypt); m_NewEncodeID = 0; m_EncodeID = 0; InitEncode(); } cSendMsg::~cSendMsg() { Release(); m_pBuf = NULL; mempool_DELETE(m_pEncode); } bool cSendMsg::Init(uint32 sz) { m_dwSize = sz; uint32 nLen = sizeof(uint32); m_pBuffer = new char[sz + nLen]; memset(m_pBuffer, 0, sizeof(sz + nLen)); m_pLength = (uint32*)m_pBuffer; m_pBuf = m_pBuffer + nLen; if (m_pBuffer) GetLength() = 0; return true; } bool cSendMsg::DoubleBuf(uint32 dwSize) { m_dwSize = max(m_dwSize * 2, dwSize); uint32 wLen = sizeof(uint32); char* pBuf = new char[m_dwSize + wLen]; memset(pBuf,0, m_dwSize + wLen); memcpy(pBuf, m_pBuffer, GetLength() + wLen); delete[] m_pBuffer; m_pBuffer = pBuf; m_pLength = (uint32*)m_pBuffer; m_pBuf = m_pBuffer + wLen; return true; } bool cSendMsg::Release() { if (m_pBuffer) { *m_pLength = 0; delete m_pBuffer; m_pBuffer = NULL; } return true; } bool cSendMsg::AddBuffer(const void* pMsg, uint32 nSize) { //uint32 dwLen = sizeof(uint32); uint32 dwCurLen = *m_pLength; if (dwCurLen + nSize > m_dwSize) return false; //memcpy(m_pBuf + dwCurLen, &nSize, dwLen); memcpy(m_pBuf + dwCurLen, pMsg, nSize); *m_pLength += nSize; return true; } //bool cSendMsg::AddQueueBufferA(const char* pBuf, uint32 nSize) //{ // uint32 dwCurLen = *m_pLength; // if (dwCurLen + nSize > m_dwSize) // return false; // memcpy(m_pBuf + dwCurLen, pBuf, nSize); // *m_pLength += nSize; // return true; //} bool cSendMsg::AddQueueBufferFront(const char* pBuf, uint32 sz) { uint32 dwCurLen = *m_pLength; if (dwCurLen + sz > m_dwSize) return false; char* pTemp = new char[dwCurLen]; memcpy(pTemp, m_pBuf, dwCurLen); memcpy(m_pBuf, pBuf, sz); memcpy(m_pBuf + sz, pTemp,dwCurLen); *m_pLength = sz + dwCurLen; delete[]pTemp; return true; } bool cSendMsg::AddQueueBuffer(const char* pBuf, uint32 nSize) { while(!AddBuffer(pBuf,nSize)) { std::cout << __FUNCTION__ <<"AllSize: "<<m_dwSize <<" size: " << nSize << std::endl; if(GetLength() >= g_nMaxDoubleSize) return false; DoubleBuf(); } return true; } void cSendMsg::ResetBuf() { if (m_pBuffer) { *m_pLength = 0; } } void cSendMsg::ResetBufLen(uint32 nLen, uint32 off) { if (*m_pLength != (off + nLen)) { std::cout << __FUNCTION__ << "buffer长度出错了"<<std::endl; } if (m_pBuffer) { if (off) { memmove(m_pBuf, m_pBuf + nLen, off); } *m_pLength = off; } } bool cSendMsg::AddMsg(const void* pMsg, uint32 nSize) { int32 i = 0; while(!AddBuffer(pMsg, nSize)) { //std::cout << __FUNCTION__ <<" AllSize: "<<m_dwSize <<" size: " << nSize << std::endl; if(GetLength() >= g_nMaxDoubleSize) { std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl; return false; } DoubleBuf(); } return true; } bool cSendMsg::AddQueueMsg(const char* pBuf, uint32 nSize) { while(!AddQueueBuffer(pBuf, nSize)) { if(GetLength() >= g_nMaxDoubleSize) { std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl; return false; } DoubleBuf(); } return true; } bool cSendMsg::AddQueueMsgFront(const char* pBuf, uint32 nSize) { while(!AddQueueBufferFront(pBuf, nSize)) { if(GetLength() >= g_nMaxDoubleSize) { std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl; return false; } DoubleBuf(); } return true; } const char* cSendMsg::GetQueueBuffer(uint32& nLen, uint32& off) const { //uint32 wLen = *m_pLength; //if (wLen > c_wMaxSendMsg) //{ // static char sChar[c_wMaxSendMsg]; //注意 不是安全的 调用的地方需要枷锁 // memcpy(sChar, m_pBuf, c_wMaxSendMsg); // nLen = c_wMaxSendMsg; // *m_pLength -= nLen; // memmove(m_pBuf,m_pBuf + c_wMaxSendMsg, *m_pLength); // std::cout << __FUNCTION__ <<" still" << wLen - nLen << std::endl; // return sChar; //} //else //{ // nLen = wLen; // *m_pLength = 0; // return m_pBuf; //} uint32 wLen = *m_pLength; if (wLen > c_wMaxSendMsg) { nLen = c_wMaxSendMsg; off = wLen - nLen; std::cout << __FUNCTION__ <<" still size: " << off << std::endl; } else { nLen = wLen; off = 0; } return m_pBuf; } const char* cSendMsg::GetGroupBuffer(uint32& nLen, uint32& off) const { uint32 wLen = *m_pLength; if (wLen > c_wMaxSendMsg) { nLen = c_wMaxSendMsg; off = wLen - nLen; std::cout << __FUNCTION__ <<" all size:"<< wLen <<" still: " << off << std::endl; } else { nLen = wLen; off = 0; } return m_pBuf; } const char* cSendMsg::GetBuffer(uint32& len) const { len = *m_pLength; return m_pBuf; } ///////////////////////////////////////////////////////////////////////// // 以下代码为处理消息加密操作 // //消息加密方法 void cSendMsg::Encode(const char* buffer, uint32 size) { while(size > m_dwSize) DoubleBuf(size); Assert(size <= m_dwSize); //ENCODE_START GetLength() = size; //encode(buffer,size,m_buffer); m_pEncode->Encrypt((char*)buffer,m_pBuf,size); //ENCODE_END } //消息解密方法 void cSendMsg::Decode(const char* buffer, uint32 size) { while(size > m_dwSize) DoubleBuf(size); Assert(size <= m_dwSize); GetLength() = size; //decode(buffer,size,m_buffer); m_pEncode->Decrypt((char*)buffer,m_pBuf,size); } //设置加密算法 bool cSendMsg::SetEncode(int32 id, const char* KeyStr) { m_EncodeID = id; //ZeroMemory(m_KeyValue,17); memcpy(m_KeyValue,KeyStr,17); return m_pEncode->MakeKey((CEncrypt::EncryptEnum)m_EncodeID,m_KeyValue); }; //随机生成加密算法编号 void cSendMsg::GenerateRandEncodeID() { //m_NewEncodeID = nsCommon::RandomAInt()% CEncrypt::Encrypt_MaxNum; m_NewEncodeID = CEncrypt::Encrypt_XOR256; } //随机生成加密密钥字符串(16个字符,128位) void cSendMsg::GenerateRandKey() { for(int32 i=0; i<16; i++) m_NewKeyValue[i] = (char)nsCommon::RandomInt(32, 126); //从space至~字符[32-126] m_NewKeyValue[16] = ' '; } //初始化加密算法编号与密钥,用于首次使用 void cSendMsg::InitEncode() { m_pEncode->MakeKey(CEncrypt::Encrypt_XOR256,"[WWW-7zgame-COM]"); } //获取准备更换的加密算法编号和加密密钥字符串 void cSendMsg::GetNewEncode(int32& EncodeID, char* KeyStr) { EncodeID = m_NewEncodeID; memcpy(KeyStr,m_NewKeyValue,17); } //以下两个加密解密方法,专门为Msg_Net_QueryEncodeLogin和Msg_Net_ChangeEncode特别定制的 bool cSendMsg::EncodeData(char* srcBuffer,char* objBuffer, uint32 size) { return m_pEncode->Encrypt(srcBuffer,objBuffer,size); } bool cSendMsg::DecodeData(char* srcBuffer,char* objBuffer, uint32 size) { return m_pEncode->Decrypt(srcBuffer,objBuffer,size); }