zoukankan      html  css  js  c++  java
  • 简单谈谈消息发送缓冲区

    在服务器的网络层中,发送缓冲区是一个不可绕过的课题

    目前我遇到了主要有两种处理方式

    方式一:队列处理

    1、在逻辑线程里面有一个总的发送队列,然后服务器每帧都会处理这个队列

    2、每一个cLink里面有一个消息队列,当cLink的状态变化时候会去处理这个消息队列

    3、在每一帧处理的时候

                  

     1     switch(pLink->m_eStat)
     2         {
     3         case EF_IO_EAGAIN:
     4             pLink->m_send_q.push(msg);
     5             continue;
     6             break;
     7         case EF_IO_NORMAL:
     8         {
     9             switch(send_message_process(&msg))
    10             {
    11                 case EF_EVR_TX_EAGAIN:
    12                 {
    13                     pLink->m_send_q.push(msg);
    14                     pLink->m_eStat = EF_IO_EAGAIN;
    15 
    16                     uint32 type = EF_EVT_READ | EF_EVT_WRITE;
    17                     ModIoHandle(pLink->_fd, pLink->_sn, type);
    18 
    19                 //    cout << "mod_io_handler " << pLink->_fd << ":" << pLink->_sn << " " << type << endl;
    20                 }
    21                     break;

        如果该链接状态可以发送则直接发送出去,否则该消息从总的消息队列转移到了cLink的消息队列,

    4、在epoll中,当cLink的网络状态转换为可发送时候,再针对每一个cLink的发送队列进行处理,发送消息

    优点:

      处理方式简单,处理流程清晰明了

    缺点:

      因为每一个队列里面都是一个消息包,发送的时候,每一个包发送一次,调用一次send(),可能会造成太多小包的发送

    方式二:总的缓冲区处理

             1、不使用队列,而是维护自定义的一个消息缓冲区 cSendMsg

    1、每一次都把消息先放进消息缓冲区,自己缓冲内存,缓冲大小可扩展(有最大值限制)

    2、每一次发送的时候把缓冲区的内容取出,整块内存发送

    缺点:

         维护这个缓冲区需要自己处理,容易出错

    优点:

         可以避免多次发送小包,可以多个消息包作为一个打包一起发送,提高IO效率

    static const uint32 DEFAUTL_BUFFER = 1024 * 40; //10k
    
    class CEncrypt;
    static const uint32  c_wMaxSendMsg = 5120000;
    class cSendMsg
    {
    public:
    	~cSendMsg();
    	cSendMsg(uint32 sz = DEFAUTL_BUFFER);
    	bool AddMsg(const void* pMsg, uint32 nSize);
    	bool AddQueueMsg(const char* pBuf, uint32 nSize);
    	bool Init(uint32 sz);
    	bool Release();
    	bool AddBuffer(const void* pMsg, uint32 sz);
    	bool AddQueueBuffer(const char* pBuf, uint32 sz);
    	bool AddQueueBufferFront(const char* pBuf, uint32 sz);
    	bool AddQueueMsgFront(const char* pBuf, uint32 nSize);
    	//bool AddQueueBufferA(const char* pBuf, uint32 nSize);
    	bool DoubleBuf(uint32 dwSize = 0);
    	void ResetBuf();
    	void ResetBufLen(uint32 nLen, uint32 off);
    	const char* GetGroupBuffer(uint32& nLen, uint32& off) const;
    	const char* GetQueueBuffer(uint32& nLen, uint32& off) const;
    	uint32& GetLength(){return *m_pLength;}
    	bool IsEmpty(){
    		if (!m_pBuffer)
    			return true;
    		return GetLength() == 0;
    	}
    	void ResetBufLen(uint32 nLen);
    
    	const char* GetBuffer(uint32& len) const;
    
    	//////////////////////////////////////////////////////////////////////////
    	//加密 、 解密
    	void Encode(const char* buffer, uint32 size);
    	void Decode(const char* buffer, uint32 size);
    	bool SetEncode(int32 id, const char* KeyStr) ;
    	void GenerateRandEncodeID();
    	void GenerateRandKey();
    	void InitEncode();
    	void GetNewEncode(int32& EncodeID, char* KeyStr);
    	bool EncodeData(char* srcBuffer,char* objBuffer, uint32 size);
    	bool DecodeData(char* srcBuffer,char* objBuffer, uint32 size);
    private:
    	char* m_pBuffer;                      //带长度
    	char* m_pBuf;                         //不带长度
    	uint32* m_pLength;
    	uint32 m_dwSize;
    
    	CEncrypt* m_pEncode;			//加密类成员
    	int32 m_EncodeID;					//当前的加密算法编号
    	int32 m_NewEncodeID;				//准备更换的加密算法编号
    	char m_KeyValue[17];			//当前的密钥
    	char m_NewKeyValue[17];			//准备更换的密钥
    };
    

      

    static const uint32  g_nMaxDoubleSize = 1024 * 1024 * 10;   //扩展最大10M
    
    cSendMsg::cSendMsg(uint32 sz)
    {
    	if (sz != 0)
    		Init(sz);
    	else
    		m_pBuffer = NULL;
    	m_pEncode = mempool_NEW(CEncrypt);
    	m_NewEncodeID = 0;
    	m_EncodeID = 0;
    	InitEncode();
    
    }
    
    cSendMsg::~cSendMsg()
    {
    	Release();
    	m_pBuf = NULL;
    	mempool_DELETE(m_pEncode);
    }
    
    bool cSendMsg::Init(uint32 sz)
    {
    	m_dwSize = sz;
    	uint32 nLen = sizeof(uint32);
    	m_pBuffer = new char[sz + nLen];
    	memset(m_pBuffer, 0, sizeof(sz + nLen));
    	m_pLength = (uint32*)m_pBuffer;
    	m_pBuf = m_pBuffer + nLen;
    	if (m_pBuffer)
    		GetLength() = 0;
    	return true;
    }
    
    bool cSendMsg::DoubleBuf(uint32 dwSize)
    {
    	m_dwSize = max(m_dwSize * 2, dwSize);
    	uint32 wLen = sizeof(uint32);
    	char* pBuf = new char[m_dwSize + wLen];
    	memset(pBuf,0, m_dwSize + wLen);
    	memcpy(pBuf, m_pBuffer, GetLength() + wLen);
    	delete[] m_pBuffer;
    	m_pBuffer = pBuf;
    	m_pLength = (uint32*)m_pBuffer;
    	m_pBuf = m_pBuffer + wLen;
    	return true;
    }
    
    bool cSendMsg::Release()
    {
    	if (m_pBuffer)
    	{
    		*m_pLength = 0;
    		delete m_pBuffer;
    		m_pBuffer = NULL;
    	}
    	return true;
    }
    
    bool cSendMsg::AddBuffer(const void* pMsg, uint32 nSize)
    {
    	//uint32 dwLen = sizeof(uint32);
    	uint32 dwCurLen = *m_pLength;
    	if (dwCurLen  + nSize > m_dwSize)
    		return false;
    	//memcpy(m_pBuf + dwCurLen, &nSize, dwLen);
    	memcpy(m_pBuf + dwCurLen, pMsg, nSize);
    	*m_pLength += nSize;
    	return true;
    }
    
    //bool cSendMsg::AddQueueBufferA(const char* pBuf, uint32 nSize)
    //{
    //	uint32 dwCurLen = *m_pLength;
    //	if (dwCurLen + nSize > m_dwSize)
    //		return false;
    //	memcpy(m_pBuf + dwCurLen, pBuf, nSize);
    //	*m_pLength += nSize;
    //	return true;
    //}
    
    bool cSendMsg::AddQueueBufferFront(const char* pBuf, uint32 sz)
    {
    	uint32 dwCurLen = *m_pLength;
    	if (dwCurLen + sz > m_dwSize)
    		return false;
    	char* pTemp = new char[dwCurLen];
    	memcpy(pTemp, m_pBuf, dwCurLen);
    	memcpy(m_pBuf, pBuf, sz);
    	memcpy(m_pBuf + sz, pTemp,dwCurLen);
    	*m_pLength = sz + dwCurLen;
    	delete[]pTemp;
    	return true;
    }
    
    bool cSendMsg::AddQueueBuffer(const char* pBuf, uint32 nSize)
    {
    	while(!AddBuffer(pBuf,nSize))
    	{
    		std::cout << __FUNCTION__ <<"AllSize: "<<m_dwSize <<"  size: " << nSize << std::endl;
    		if(GetLength() >= g_nMaxDoubleSize)
    			return false;
    		DoubleBuf();
    	}
    	return true;
    }
    
    
    void cSendMsg::ResetBuf()
    {
    	if (m_pBuffer)
    	{
    		*m_pLength = 0;
    	}
    }
    
    void cSendMsg::ResetBufLen(uint32 nLen, uint32 off)
    {
    	if (*m_pLength != (off + nLen))
    	{
    		std::cout << __FUNCTION__ << "buffer长度出错了"<<std::endl;
    	}
    	if (m_pBuffer)
    	{
    		if (off)
    		{
    			memmove(m_pBuf, m_pBuf + nLen, off);
    		}
    		*m_pLength = off;
    	}
    }
    
    
    bool cSendMsg::AddMsg(const void* pMsg, uint32 nSize)
    {
    	int32 i = 0;
    	while(!AddBuffer(pMsg, nSize))
    	{
    		//std::cout << __FUNCTION__ <<" AllSize: "<<m_dwSize <<"  size: " << nSize << std::endl;
    		if(GetLength() >= g_nMaxDoubleSize)
    		{
    			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
    			return false;
    		}
    		DoubleBuf();
    	}
    	return true;
    }
    
    bool cSendMsg::AddQueueMsg(const char* pBuf, uint32 nSize)
    {
    	while(!AddQueueBuffer(pBuf, nSize))
    	{
    		if(GetLength() >= g_nMaxDoubleSize)
    		{
    			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
    			return false;
    		}
    		DoubleBuf();
    	}
    	return true;
    }
    
    
    
    bool cSendMsg::AddQueueMsgFront(const char* pBuf, uint32 nSize)
    {
    	while(!AddQueueBufferFront(pBuf, nSize))
    	{
    		if(GetLength() >= g_nMaxDoubleSize)
    		{
    			std::cout << __FUNCTION__ <<" 内存超过最大值"<< std::endl;
    			return false;
    		}
    		DoubleBuf();
    	}
    	return true;
    }
    
    const char* cSendMsg::GetQueueBuffer(uint32& nLen, uint32& off) const
    {
    	//uint32 wLen = *m_pLength;
    	//if (wLen > c_wMaxSendMsg)
    	//{
    	//	static char sChar[c_wMaxSendMsg];                   //注意 不是安全的 调用的地方需要枷锁
    	//	memcpy(sChar, m_pBuf, c_wMaxSendMsg);
    	//	nLen = c_wMaxSendMsg;
    	//	*m_pLength -= nLen;
    	//	memmove(m_pBuf,m_pBuf + c_wMaxSendMsg, *m_pLength);
    	//	std::cout << __FUNCTION__ <<" still" << wLen - nLen << std::endl; 
    	//	return sChar;
    	//}
    	//else
    	//{
    	//	nLen = wLen;
    	//	*m_pLength = 0;
    	//	return m_pBuf;
    	//}
    	uint32 wLen = *m_pLength;
    	if (wLen > c_wMaxSendMsg)
    	{
    		nLen = c_wMaxSendMsg;
    		off = wLen - nLen;
    		std::cout << __FUNCTION__ <<" still size: " << off << std::endl; 
    	}
    	else
    	{
    		nLen = wLen;
    		off = 0;
    	}
    	return m_pBuf;
    }
    
    const char* cSendMsg::GetGroupBuffer(uint32& nLen, uint32& off) const
    {
    	uint32 wLen = *m_pLength;
    	if (wLen > c_wMaxSendMsg)
    	{
    		nLen = c_wMaxSendMsg;
    		off = wLen - nLen;
    		std::cout << __FUNCTION__ <<" all size:"<< wLen <<" still: " << off << std::endl; 
    	}
    	else
    	{
    		nLen = wLen;
    		off = 0;
    	}
    	return m_pBuf;
    }
    
    const char* cSendMsg::GetBuffer(uint32& len) const
    {
    	len = *m_pLength;
    	return m_pBuf;	
    }
    
    /////////////////////////////////////////////////////////////////////////
    //   以下代码为处理消息加密操作
    //
    
    //消息加密方法
    void cSendMsg::Encode(const char* buffer, uint32 size)
    {
    	while(size > m_dwSize)
    		DoubleBuf(size);
    	Assert(size <= m_dwSize);
    	//ENCODE_START 
    	GetLength() = size;
    	//encode(buffer,size,m_buffer);
    	m_pEncode->Encrypt((char*)buffer,m_pBuf,size);
    	//ENCODE_END
    }
    
    //消息解密方法
    void cSendMsg::Decode(const char* buffer, uint32 size)
    {
    	while(size > m_dwSize)
    		DoubleBuf(size);
    	Assert(size <= m_dwSize);
    	GetLength() = size;
    	//decode(buffer,size,m_buffer);
    	m_pEncode->Decrypt((char*)buffer,m_pBuf,size);
    }
    
    //设置加密算法
    bool cSendMsg::SetEncode(int32 id, const char* KeyStr) 
    {
    	m_EncodeID = id;
    	//ZeroMemory(m_KeyValue,17);
    	memcpy(m_KeyValue,KeyStr,17);
    	return m_pEncode->MakeKey((CEncrypt::EncryptEnum)m_EncodeID,m_KeyValue);
    };
    
    //随机生成加密算法编号
    void cSendMsg::GenerateRandEncodeID()
    {
    	//m_NewEncodeID = nsCommon::RandomAInt()% CEncrypt::Encrypt_MaxNum;
    	m_NewEncodeID = CEncrypt::Encrypt_XOR256;
    }
    
    //随机生成加密密钥字符串(16个字符,128位)
    void cSendMsg::GenerateRandKey()
    {
    	for(int32 i=0; i<16; i++)
    		m_NewKeyValue[i] = (char)nsCommon::RandomInt(32, 126);		//从space至~字符[32-126]
    	m_NewKeyValue[16] = '';
    }
    
    //初始化加密算法编号与密钥,用于首次使用
    void cSendMsg::InitEncode()
    {
    	m_pEncode->MakeKey(CEncrypt::Encrypt_XOR256,"[WWW-7zgame-COM]");
    }
    
    //获取准备更换的加密算法编号和加密密钥字符串
    void cSendMsg::GetNewEncode(int32& EncodeID, char* KeyStr)
    {
    	EncodeID = m_NewEncodeID;
    	memcpy(KeyStr,m_NewKeyValue,17);
    }
    
    //以下两个加密解密方法,专门为Msg_Net_QueryEncodeLogin和Msg_Net_ChangeEncode特别定制的
    bool cSendMsg::EncodeData(char* srcBuffer,char* objBuffer, uint32 size)
    {
    	return m_pEncode->Encrypt(srcBuffer,objBuffer,size);
    }
    
    bool cSendMsg::DecodeData(char* srcBuffer,char* objBuffer, uint32 size)
    {
    	return m_pEncode->Decrypt(srcBuffer,objBuffer,size);
    }
    

      

          

  • 相关阅读:
    Linux(CentOS 7)搭建DHCP服务器实验
    Linux(CentOS 7)发布静态网站
    Linux(CentOS 7)搭建VSFTP服务器
    Linux(CentOS 7)搭建samba服务器搭建实验
    Linux(CentOS 7)软件的安装
    Linux(CentOS 7)服务于进程
    Linux磁盘配额
    Linux LVM逻辑卷管理
    Linux磁盘挂载
    Linux磁盘分区
  • 原文地址:https://www.cnblogs.com/Jimmy104/p/5310405.html
Copyright © 2011-2022 走看看