zoukankan      html  css  js  c++  java
  • UDT源码剖析(五)之Buffer

    UDT源码的体系结构中存在两种Buffer,分别是RecvBuffer和SendBuffer。这两种Buffer分别用于UDT套接字的缓冲区,注意了是UDT SOCKET的数据缓冲,不是UDP SOCKET的数据缓冲。UDP SOCKET有自己的SendQueue和RecvQueue。我会挑选一些非常有必要的代码详细的分析,比如说从Send Buffer中取出数据这种,小众操作。详见代码注释:

    CSndBuffer

    • 基础数据结构
    class CSndBuffer
    {
    private:
       udt_pthread_mutex_t m_BufLock;           // used to synchronize buffer operation
    
       struct Block    //为了方便的提交给SendQueue
       {
          char* m_pcData;                   // 指向数据块
          int m_iLength;                       // 数据块的长度
          int32_t m_iMsgNo;                // 数据块的编号
          uint64_t m_OriginTime;         // 原始请求时间
          int m_iTTL;                            // TTL(ms)
    
          Block* m_pNext;                   // next block
       } *m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock;
    
       // m_pBlock:         The head pointer
       // m_pFirstBlock:    The first block
       // m_pCurrBlock:	The current block
       // m_pLastBlock:     The last block (if first == last, buffer is empty)
    
       struct Buffer    //真实的存储Buffer
       {
          char* m_pcData;			// buffer
          int m_iSize;			        // size
          Buffer* m_pNext;			// next buffer
       } *m_pBuffer;			        // physical buffer
    
       int32_t m_iNextMsgNo;                //下一条消息编号
    
       int m_iSize;				        // 32 (number of packets)
       int m_iMSS;                                  // 1500
       int m_iCount;			        // 已经使用的Blocks
    };
    
    • 初始化:CSndBuffer::CSndBuffer(int size, int mss)
    CSndBuffer::CSndBuffer(int size, int mss):
    m_BufLock(),
    m_pBlock(NULL),
    m_pFirstBlock(NULL),
    m_pCurrBlock(NULL),
    m_pLastBlock(NULL),
    m_pBuffer(NULL),
    m_iNextMsgNo(1),
    m_iSize(size),
    m_iMSS(mss),
    m_iCount(0)
    {
       // 一次性创建一个大的Buffer,以后Buffer不够用的时候也是再次创建一个Buffer
       m_pBuffer = new Buffer;
       m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];        //创建实际存储的Buffer,注意空间
       m_pBuffer->m_iSize = m_iSize;
       m_pBuffer->m_pNext = NULL;
    
       // 创建size个Block,循环链表a
       m_pBlock = new Block;    
       Block* pb = m_pBlock;
       for (int i = 1; i < m_iSize; ++ i)
       {
          pb->m_pNext = new Block;
          pb->m_iMsgNo = 0;
          pb = pb->m_pNext;
       }
       pb->m_pNext = m_pBlock;  
    
       //紧接着调整Block中的指针,指向Buffre中相应的位置 
       pb = m_pBlock;
       char* pc = m_pBuffer->m_pcData;
       for (int i = 0; i < m_iSize; ++ i)   
       {
          pb->m_pcData = pc;
          pb = pb->m_pNext;
          pc += m_iMSS;
       }
    
       m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;    //调整Block指针的位置
    
       #ifndef WINDOWS
          pthread_mutex_init(&m_BufLock, NULL);    //初始化互斥锁
       #else
          m_BufLock = CreateMutex(NULL, false, NULL);
       #endif
    }
    
    • 销毁:CSndBuffer::~CSndBuffer()
    CSndBuffer::~CSndBuffer()
    {
       //逐个删除Block,此时还没有销毁Block中指针指向的实际的Buffer 
       Block* pb = m_pBlock->m_pNext;   
       while (pb != m_pBlock)
       {
          Block* temp = pb;
          pb = pb->m_pNext;
          delete temp;
       }
       delete m_pBlock;
    
       //逐个删除Buffer,初始化时只创建了一个Buffer,increase()时会增加多个Buffer 
       while (m_pBuffer != NULL)    
       {
          Buffer* temp = m_pBuffer;
          m_pBuffer = m_pBuffer->m_pNext;
          delete [] temp->m_pcData;
          delete temp;
       }
    
       #ifndef WINDOWS
          pthread_mutex_destroy(&m_BufLock);    //销毁锁
       #else
          CloseHandle(m_BufLock);
       #endif
    }
    
    • 向Buffer添加数据:void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
    void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
    {
       int size = len / m_iMSS;     //根据MSS判断需要多少个Block
       if ((len % m_iMSS) != 0)
          size ++;
    
       //如果已经使用的Blocks和即将要使用的Blocks不能满足需求,增加
       while (size + m_iCount >= m_iSize)  
          increase();
    
       //获取当前的时间 
       uint64_t time = CTimer::getTime();   
       int32_t inorder = order;
       inorder <<= 29;
    
       //获得最后使用的Block
       Block* s = m_pLastBlock; 
       for (int i = 0; i < size; ++ i)
       {
          int pktlen = len - i * m_iMSS;    //在这块判断当前剩余的len
          if (pktlen > m_iMSS)
             pktlen = m_iMSS;    //如果长度不够MSS时,注意填充
    
          memcpy(s->m_pcData, data + i * m_iMSS, pktlen);   //将数据逐步拷贝到Block中
          s->m_iLength = pktlen;    //调整Block的长度
    
          s->m_iMsgNo = m_iNextMsgNo | inorder; //Message Number并判断是否需要交付给用户
          if (i == 0)
             s->m_iMsgNo |= 0x80000000; //如果是这一个数据流的起始,加上起始标志
          if (i == size - 1)    //如果是这个数据流的结尾,加上结尾标志
             s->m_iMsgNo |= 0x40000000;
    
          s->m_OriginTime = time;   //原始请求时间
          s->m_iTTL = ttl;  //更新TTL
    
          s = s->m_pNext;   //调整至下一个Block
       }
       m_pLastBlock = s;    //调整上一个访问的Block
    
       CGuard::enterCS(m_BufLock);
       m_iCount += size;    //更新目前已经使用的Block Count
       CGuard::leaveCS(m_BufLock);
    
       m_iNextMsgNo ++; //更新信息号(如果数据较大,n个数据报共享同一个MsgNo)
       if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) //如果发生回绕,调整MsgNo
          m_iNextMsgNo = 1;
    }
    
    • 确认数据:void CSndBuffer::ackData(int offset):在收到ACK对发送缓冲区中的数据进行确认,类似于TCP的Buffer
    void CSndBuffer::ackData(int offset)
    {
       CGuard bufferguard(m_BufLock);
        
       //根据offset更新收到的数据
       for (int i = 0; i < offset; ++ i)
          m_pFirstBlock = m_pFirstBlock->m_pNext;    //此处First指向的是已发送,但是还没有确认的数据
    
       m_iCount -= offset;      //因为已经对数据确认了一部分数据,所以调整已经用过的Block
    
       CTimer::triggerEvent();
    }
    
    • 获取当前已经使用的数据量:int CSndBuffer::getCurrBufSize() const
    int CSndBuffer::getCurrBufSize() const
    {
       return m_iCount; 
    }
    
    • 增加Buffer:void CSndBuffer::increase()
    void CSndBuffer::increase()
    {
       int unitsize = m_pBuffer->m_iSize;    //获取Buffer的大小,MSS 
    
       Buffer* nbuf = NULL;
       try
       {
          nbuf  = new Buffer;
          nbuf->m_pcData = new char [unitsize * m_iMSS];    //每次新增加和之前一样的Size
       }
       catch (...)
       {
          delete nbuf;
          throw CUDTException(3, 2, 0);
       }
       nbuf->m_iSize = unitsize;    //调整新增加的Buffer
       nbuf->m_pNext = NULL;
    
       // insert the buffer at the end of the buffer list
       Buffer* p = m_pBuffer;   //将新创建的Buffer加入到之前Buffer List的Tail
       while (NULL != p->m_pNext)
          p = p->m_pNext;
       p->m_pNext = nbuf;
    
       // new packet blocks
       Block* nblk = NULL;  //创建Blocks,并调整Blocks中data*的位置
       try
       {
          nblk = new Block;
       }
       catch (...)
       {
          delete nblk;
          throw CUDTException(3, 2, 0);
       }
       Block* pb = nblk;
       for (int i = 1; i < unitsize; ++ i)
       {
          pb->m_pNext = new Block;
          pb = pb->m_pNext;
       }
    
       // insert the new blocks onto the existing one
       pb->m_pNext = m_pLastBlock->m_pNext;
       m_pLastBlock->m_pNext = nblk;
    
       pb = nblk;
       char* pc = nbuf->m_pcData;
       for (int i = 0; i < unitsize; ++ i)
       {
          pb->m_pcData = pc;
          pb = pb->m_pNext;
          pc += m_iMSS;
       }
    
       m_iSize += unitsize; //增加Block整体的数量
    }
    

    CRcvBuffer

    • 数据结构:
    class CRcvBuffer
    {
    private:
       CUnit** m_pUnit;                           // 数据缓冲Buffer
       int m_iSize;                                   // 65536(bytes)
       CUnitQueue* m_pUnitQueue;      // 共享的接收队列
       int m_iStartPos;                            // 开始读取data的位置
       int m_iLastAckPos;                       // 上一次被确认的位置,start~lastpos之间的数据可读
                                                            // EMPTY: m_iStartPos = m_iLastAckPos   FULL: m_iStartPos = m_iLastAckPos + 1
       int m_iMaxPos;			        // 数据存在的最大的位置,还没有被确认
       int m_iNotch;			        // 第一个CUnit的读取点
    };
    
    struct CUnit
    {
       CPacket m_Packet;		// packet
       int m_iFlag;			        // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
    };
    
    class CUnitQueue
    {
    private:
       struct CQEntry
       {
          CUnit* m_pUnit;		// unit queue
          char* m_pBuffer;		// data buffer
          int m_iSize;		// size of each queue
    
          CQEntry* m_pNext;
       }
       *m_pQEntry,			// 指向起始的Entry队列
       *m_pCurrQueue,		// 指向当前的Entry队列
       *m_pLastQueue;		// 指向最后一个Entry队列
    
       CUnit* m_pAvailUnit;         //最近访问的Unit* 
       int m_iSize;			// 总共的Packets数量
       int m_iCount;		// 已经使用的Packets数量
       int m_iMSS;			// unit buffer size
       int m_iIPversion;		// IP version
    };
    
    class CPacket
    {
    public:
       int32_t& m_iSeqNo;                       // sequence number
       int32_t& m_iMsgNo;                       // message number
       int32_t& m_iTimeStamp;                // timestamp
       int32_t& m_iID;			          // socket ID
       char*& m_pcData;                          // data/control information
    
       static const int m_iPktHdrSize;	  // packet header size = 16
    
    protected:
       uint32_t m_nHeader[4];               // The 128-bit header field
       iovec m_PacketVector[2];             // The 2-demension vector of UDT packet [header, data]
    
       int32_t __pad;
    };
    
    • 初始化:CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize)
    CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize):
    m_pUnit(NULL),
    m_iSize(bufsize),
    m_pUnitQueue(queue),
    m_iStartPos(0),
    m_iLastAckPos(0),
    m_iMaxPos(0),
    m_iNotch(0)
    {
       m_pUnit = new CUnit* [m_iSize];  //创建CUnit* Array(65536),一次创建这么多,反正没有实际的数据存在,不揪心
       for (int i = 0; i < m_iSize; ++ i)   //每个CUnit都设置为NULL
          m_pUnit[i] = NULL;
    }
    
    • 销毁:CRcvBuffer::~CRcvBuffer()
    CRcvBuffer::~CRcvBuffer()
    {
       for (int i = 0; i < m_iSize; ++ i)
       {
          if (NULL != m_pUnit[i])
          {
             m_pUnit[i]->m_iFlag = 0;   //逐个将CUnit的标志设置为Free,并减少使用的数量
             -- m_pUnitQueue->m_iCount; //减少Queue中已使用的数量
          }
       }
    
       delete [] m_pUnit;
    }
    
    • 读取数据:int CRcvBuffer::readBuffer(char* data, int len)
    int CRcvBuffer::readBuffer(char* data, int len)
    {
       int p = m_iStartPos;     //先前被确认的数据位置
       int lastack = m_iLastAckPos;     //上一次被确认的数据位置
       int rs = len;    //计划读取的数据量
    
       while ((p != lastack) && (rs > 0))   //有空间可以读取数据并且想要读取的数据>0
       {
          int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;//第一个CUnit中剩多少data
          if (unitsize > rs)    //如果剩余的data>需要读取的data
             unitsize = rs; //多读取一点也无妨哈
    
          //将buffer中的数据copy进user data
          memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
          data += unitsize; //更新用户data的位置
    
          //如果用户还需要读取数据
          if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
          {
             CUnit* tmp = m_pUnit[p];   //将刚刚读取完的CUnit置为NULL,并调整Queue已使用CUnit的数量
             m_pUnit[p] = NULL;    
             tmp->m_iFlag = 0;    //调整为free
             -- m_pUnitQueue->m_iCount;
    
             if (++ p == m_iSize) //循环的嘛,你懂吧?不过要保证小于ACKED
                p = 0;
    
             m_iNotch = 0;  //调整下一个在CUnit中读取的起始位置为0
          }
          else
             m_iNotch += rs;    //否则调整在本格CUnit中的位置
    
          rs -= unitsize;   //调整已经读取的位置
       }
    
       m_iStartPos = p; //更新可以开始读取的位置
       return len - rs;    //返回已经读取的数据量
    }
    
    • 确认数据:void CRcvBuffer::ackData(int len)
    void CRcvBuffer::ackData(int len)
    {
       m_iLastAckPos = (m_iLastAckPos + len) % m_iSize; //更新已经被确认的数据的位置
       m_iMaxPos -= len;    //还没有被确认的数据有这么多
       if (m_iMaxPos < 0)   
          m_iMaxPos = 0;
    
       CTimer::triggerEvent();
    }
    
    • 返回剩余空间(未被确认的数据也被计算在内):int CRcvBuffer::getAvailBufSize() const
    int CRcvBuffer::getAvailBufSize() const
    {
       // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
       return m_iSize - getRcvDataSize() - 1;
    }
    
    • 返回已经确认空间:int CRcvBuffer::getRcvDataSize() const
    int CRcvBuffer::getRcvDataSize() const
    {
       if (m_iLastAckPos >= m_iStartPos)    //防止回绕    
          return m_iLastAckPos - m_iStartPos;
    
       return m_iSize + m_iLastAckPos - m_iStartPos;
    }
    
    • 获得一个数据流中的所有数据:int CRcvBuffer::readMsg(char* data, int len)
    int CRcvBuffer::readMsg(char* data, int len)
    {
       int p, q;
       bool passack;
       if (!scanMsg(p, q, passack)) //如果没有找到一个流的合适位置
          return 0;
    
       int rs = len;
       while (p != (q + 1) % m_iSize)   //逐个将流中的数据copy进user data
       {
          int unitsize = m_pUnit[p]->m_Packet.getLength();  //获取起始CUnit的数据
          if ((rs >= 0) && (unitsize > rs)) //如果获取的数据小于一个CUnit的中的数据
             unitsize = rs; //调整获取数据的大小
    
          if (unitsize > 0)
          {
             memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);//将数据进行copy
             data += unitsize;  //调整数据copy的位置
             rs -= unitsize;    //调整下一步需要获取数据的length
          }
    
          if (!passack) //如果已经被ack确认过了
          {
             CUnit* tmp = m_pUnit[p];   //可以在读取之后调整数据指针的位置
             m_pUnit[p] = NULL;
             tmp->m_iFlag = 0;
             -- m_pUnitQueue->m_iCount;
          }
          else
             m_pUnit[p]->m_iFlag = 2;   //没有的话,标记为一斤被读取,但是没有free
    
          if (++ p == m_iSize)      //防止回绕
             p = 0;
       }
    
       if (!passack)    //如果已经被确认过了,调整起始的位置
          m_iStartPos = (q + 1) % m_iSize;
    
       return len - rs; //返回读取的数据量
    }
    
    • 判断已经确认的空间中是否存在一个完成的数据流:bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
    bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
    {
       //如果为空,或者最大位置<=0,返回错误
       if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
          return false;
    
       while (m_iStartPos != m_iLastAckPos) //从可以读取的位置到确认的位置进行搜索
       {
          if (NULL == m_pUnit[m_iStartPos]) //如果中间出现的NULL之类的,跳过就行,防止回绕哈
          {
             if (++ m_iStartPos == m_iSize)
                m_iStartPos = 0;
             continue;
          }
    
          if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1))    //如果不是独立的报文流,后续还有报文
          {
             bool good = true;
    
             //就往后搜索这一系列的报文 
             for (int i = m_iStartPos; i != m_iLastAckPos;)
             {
                if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag))
                {
                   good = false;
                   break;
                }
    
                if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
                   break;
    
                if (++ i == m_iSize)
                   i = 0;
             }
    
             if (good)  //说明没有啥问题,已经找到一个流的边界,退出循环
                break;
          }
    
          CUnit* tmp = m_pUnit[m_iStartPos];    //如果出现了NULL之类的报文,不做任何保存,直接丢弃
          m_pUnit[m_iStartPos] = NULL;
          tmp->m_iFlag = 0;
          -- m_pUnitQueue->m_iCount;
    
          if (++ m_iStartPos == m_iSize)
             m_iStartPos = 0;
       }
    
       //真的琐碎..反正就是需要判断是否读取了一个完整的数据流
       p = -1;                  // message head
       q = m_iStartPos;         // message tail
       passack = m_iStartPos == m_iLastAckPos;
       bool found = false;
    
       //找到一个流的起始位置和结束为止
       for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i)
       {
          if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag))
          {
             switch (m_pUnit[q]->m_Packet.getMsgBoundary())
             {
             case 3: // 11
                p = q;
                found = true;
                break;
    
             case 2: // 10
                p = q;
                break;
    
             case 1: // 01
                if (p != -1)
                   found = true;
             }
          }
          else
          {
             // a hole in this message, not valid, restart search
             p = -1;
          }
    
          if (found)
          {
             //MSG必须被确认并且之前没有读出来
             if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
                break;
    
             found = false;
          }
    
          if (++ q == m_iSize)
             q = 0;
    
          if (q == m_iLastAckPos)
             passack = true;
       }
    
       if (!found)
       {
          if ((p != -1) && ((q + 1) % m_iSize == p))
             found = true;
       }
    
       return found;
    }
    
  • 相关阅读:
    汉斯·季默:布拉格现场
    天使在美国第二部:重建
    欢迎访问我的独立博客 tracefact.net (2019.1.30)
    Kafka 分布式消息系统
    Webpack入门
    《.NET之美》消息及勘误
    MacBook笔记本微信视频聊天没有声音怎么办?
    libnuma.so.1()(64bit) is needed by mysql-community-server-5.7.9-1.el6.x86_64
    List stream 对象 属性去重
    JS遍历对象的方式
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191054.html
Copyright © 2011-2022 走看看