zoukankan      html  css  js  c++  java
  • UDT源码剖析(十一)之SendQueue And RecvQueue

    SendQueue与RecvQueue在代码中与UDP SOCKET相关。在用户将想要发送的数据提交给Buffer之后,由Buffer将数据打包,根据拥塞控制提供的时间计算,在合适的时间提交给SendQueue进行发送。在接收到数据包之后,通过事件驱动的模式通知用户从RecvQueue中拿去数据包。
    删除了交会连接模式的代码。
    SendQueue与RecvQueue依赖于几个通用的数据结构,先列出来哈,可能会与前面的描述有重复,怕来回查找麻烦,索性全部列出来:

    CMultiplexer:每个UDP端口对应一个此对象,资源的实际持有者

    struct CMultiplexer
    {
       CSndQueue* m_pSndQueue;	// The sending queue
       CRcvQueue* m_pRcvQueue;	// The receiving queue
       CChannel* m_pChannel;	// The UDP channel for sending and receiving
       CTimer* m_pTimer;		// The timer
    
       int m_iPort;			// The UDP port number of this multiplexer
       int m_iIPversion;		// IP version
       int m_iMSS;			// Maximum Segment Size
       int m_iRefCount;		//与此资源复用器相关联的UDT实例的数量
       bool m_bReusable;		//这个资源复用器是否可以被共享
    
       int m_iID;			// multiplexer ID
    };
    

    CUnitQueue

    struct CUnit
    {
       CPacket m_Packet;		// packet
       int m_iFlag;			        // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
    };
    
    class CUnitQueue
    {
    private:
       struct CQEntry
       {
          CUnit* m_pUnit;		// unit queue
          char* m_pBuffer;		// data buffer
          int m_iSize;		// size of each queue
    
          CQEntry* m_pNext;
       }
       *m_pQEntry,			// 指向起始的Entry队列
       *m_pCurrQueue,		// 指向当前的Entry队列
       *m_pLastQueue;		// 指向最后一个Entry队列
    
       CUnit* m_pAvailUnit;         //最近访问的Unit* 
       int m_iSize;			// 总共的Packets数量
       int m_iCount;		// 已经使用的Packets数量
       int m_iMSS;			// unit buffer size
       int m_iIPversion;		// IP version
    };
    
    • 初始化:int CUnitQueue::init(int size, int mss, int version)
    CUnitQueue::CUnitQueue():
    m_pQEntry(NULL),
    m_pCurrQueue(NULL),
    m_pLastQueue(NULL),
    m_iSize(0),
    m_iCount(0),
    m_iMSS(),
    m_iIPversion()
    {
    }
    
    int CUnitQueue::init(int size, int mss, int version)
    {
       CQEntry* tempq = NULL;
       CUnit* tempu = NULL;
       char* tempb = NULL;
    
       try
       {
          tempq = new CQEntry;  //初始化需要管理的内存
          tempu = new CUnit [size];
          tempb = new char [size * mss];
       }
       catch (...)
       {
          delete tempq;     //出现异常后退出
          delete [] tempu;
          delete [] tempb;
    
          return -1;
       }
    
       for (int i = 0; i < size; ++ i)
       {
          tempu[i].m_iFlag = 0; //0表示当前CUint是空闲的
          tempu[i].m_Packet.m_pcData = tempb + i * mss; //初始化packet中的数据字段,只有使用权限
       }
       tempq->m_pUnit = tempu;  //初始化队列
       tempq->m_pBuffer = tempb;    //初始化队列
       tempq->m_iSize = size;   //初始化大小
    
       m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; //初始化整体的队列
       m_pQEntry->m_pNext = m_pQEntry;  //初始化下一个指向自己
    
       m_pAvailUnit = m_pCurrQueue->m_pUnit;    //指向首个可用的CUnit
    
       m_iSize = size;  //整体的packet个数,CUint大小为0
       m_iMSS = mss;    
       m_iIPversion = version;
    
       return 0;
    }
    
    • 销毁:CUnitQueue::~CUnitQueue()
    CUnitQueue::~CUnitQueue()
    {
       CQEntry* p = m_pQEntry;  //有size条队列,获取首条队列
    
       while (p != NULL)    //释放队列等的信息
       {
          delete [] p->m_pUnit;
          delete [] p->m_pBuffer;    //这个Buffer是一个CQEntry中的首部,指向这个CQEntry中所使用Buffer的起始位置,在分配的时候是一整块分配的,删除的时候也是一整块的删除
    
          CQEntry* q = p;
          if (p == m_pLastQueue)
             p = NULL;
          else
             p = p->m_pNext;
          delete q;
       }
    }
    
    • 增加队列长度:int CUnitQueue::increase()
    int CUnitQueue::increase()
    {
       int real_count = 0;
       CQEntry* p = m_pQEntry;  //获取首条队列的指针
       while (p != NULL)
       {
          CUnit* u = p->m_pUnit;    //获取首条队列的首个CUnit
          for (CUnit* end = u + p->m_iSize; u != end; ++ u)
             if (u->m_iFlag != 0)   //如果队列中的Cunit状态不是free
                ++ real_count;  //真实的,使用中的CUnit数量++
    
          if (p == m_pLastQueue)
             p = NULL;
          else
             p = p->m_pNext;
       }
       m_iCount = real_count;   //更新使用状况
       if (double(m_iCount) / m_iSize < 0.9)    //已经使用的数量/总数量 < 0.9
          return -1; //就是说还有剩余的空间,不进行扩张
    
       CQEntry* tempq = NULL;   //创建一条新的队列,链接到末尾
       CUnit* tempu = NULL;
       char* tempb = NULL;
    
       // all queues have the same size
       int size = m_pQEntry->m_iSize;
    
       try
       {
          tempq = new CQEntry;
          tempu = new CUnit [size];
          tempb = new char [size * m_iMSS];
       }
       catch (...)
       {
          delete tempq;
          delete [] tempu;
          delete [] tempb;
    
          return -1;
       }
    
       for (int i = 0; i < size; ++ i)
       {
          tempu[i].m_iFlag = 0;
          tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
       }
       tempq->m_pUnit = tempu;
       tempq->m_pBuffer = tempb;
       tempq->m_iSize = size;
    
       m_pLastQueue->m_pNext = tempq;   //链接到末尾
       m_pLastQueue = tempq;
       m_pLastQueue->m_pNext = m_pQEntry;   //这是一条环形链表
    
       m_iSize += size; //增加总共的数量
    
       return 0;
    }
    
    • 获取下一个可用的CUnit:CUnit* CUnitQueue::getNextAvailUnit()
    CUnit* CUnitQueue::getNextAvailUnit()
    {
       if (m_iCount * 10 > m_iSize * 9) //如果已经使用的比例超过0.9,就增加数量
          increase();
    
       if (m_iCount >= m_iSize) //如果已经使用的数量超过总数量,就返回
          return NULL;
    
       CQEntry* entrance = m_pCurrQueue;    //获取当前队列
    
       do
       {
          for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
             if (m_pAvailUnit->m_iFlag == 0)    //在当前的队列中找到第一个free的CUnit
                return m_pAvailUnit;
    
          if (m_pCurrQueue->m_pUnit->m_iFlag == 0)  //如果当前队列的第一个CUnit为free
          {
             m_pAvailUnit = m_pCurrQueue->m_pUnit;  //更新avail CUnit并返回
             return m_pAvailUnit;
          }
    
          m_pCurrQueue = m_pCurrQueue->m_pNext; //availCUint一直存在于CurrentQuene
          m_pAvailUnit = m_pCurrQueue->m_pUnit;
       } while (m_pCurrQueue != entrance);
    
       increase();  //完了立马判断是否需要增加数量
    
       return NULL;
    }
    

    CSndUList

    struct CSNode
    {
       CUDT* m_pUDT;		        // 指向CUDT*的指针
       uint64_t m_llTimeStamp;     // 堆化时排序的时间戳
    
       int m_iHeapLoc;		        // 堆的层次,-1意味着暂时不存在与当前堆中
    };
    
    class CSndUList
    {
    private:
       CSNode** m_pHeap;			                // 堆化数组
       int m_iArrayLength;			                // 堆数组长度
       int m_iLastEntry;			                        // 最近一次发送的位置
       udt_pthread_mutex_t m_ListLock;               
       udt_pthread_mutex_t* m_pWindowLock;     
       udt_pthread_cond_t* m_pWindowCond;         
       CTimer* m_pTimer;
    };
    
    • 初始化:CSndUList::CSndUList()
    CSndUList::CSndUList():
    m_pHeap(NULL),
    m_iArrayLength(4096),    //将队列长度预设为4096
    m_iLastEntry(-1),
    m_ListLock(),
    m_pWindowLock(NULL),
    m_pWindowCond(NULL),
    m_pTimer(NULL)
    {
       m_pHeap = new CSNode*[m_iArrayLength];   //创建length条指针队列
    
       #ifndef WINDOWS
          pthread_mutex_init(&m_ListLock, NULL);
       #else
          m_ListLock = CreateMutex(NULL, false, NULL);
       #endif
    }
    
    • 销毁:CSndUList::~CSndUList()
    CSndUList::~CSndUList()
    {
       delete [] m_pHeap;   //销毁指针队列
    
       #ifndef WINDOWS
          pthread_mutex_destroy(&m_ListLock);
       #else
          CloseHandle(m_ListLock);
       #endif
    }
    
    • 向堆中添加CUDT实例:void CSndUList::insert(int64_t ts, const CUDT* u)
    void CSndUList::insert(int64_t ts, const CUDT* u)
    {
       CGuard listguard(m_ListLock);    //获取锁的guard
    
       // increase the heap array size if necessary
       if (m_iLastEntry == m_iArrayLength - 1)  //如果上一次使用的是最后一条,增加长度
       {
          CSNode** temp = NULL;
    
          try   //都是指针,调整起来,消耗也不是很大
          {         
             temp = new CSNode*[m_iArrayLength * 2];    //长度*2,然后将以往的都copy
          }
          catch(...)
          {
             return;
          }
    
          memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
          m_iArrayLength *= 2;
          delete [] m_pHeap;    //释放以前的操作
          m_pHeap = temp;
       }
    
       insert_(ts, u);  //真实的insert
    }
    
    • 向堆中添加CUDT实例:void CSndUList::insert_(int64_t ts, const CUDT* u)
    void CSndUList::insert_(int64_t ts, const CUDT* u)  //真实的insert
    {
       CSNode* n = u->m_pSNode; //反向获取CSNode
    
       // do not insert repeated node
       //-1意味着,这个CSNode没有在堆中,可以插入.>=1意味着在堆中,直接返回
       if (n->m_iHeapLoc >= 0)
          return;
    
       m_iLastEntry ++; //修改即将插入的位置
       m_pHeap[m_iLastEntry] = n;   //将这个指针指向的CSNode插入堆中
       n->m_llTimeStamp = ts;   //修改时间为ts
    
       int q = m_iLastEntry;
       int p = q;
       while (p != 0)   //堆化的过程,调整新插入的位置,根据ts的大小调整位置
       {
          p = (q - 1) >> 1;
          if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) //ts越大,越往上
          {
             CSNode* t = m_pHeap[p];
             m_pHeap[p] = m_pHeap[q];
             m_pHeap[q] = t;
             t->m_iHeapLoc = q;
             q = p;
          }
          else
             break;
       }
    
       n->m_iHeapLoc = q;   //这个变量还表示在当前堆中的层数吗? 反正大于0,就是存在于堆中了
    
       // an earlier event has been inserted, wake up sending worker
       if (n->m_iHeapLoc == 0)  //如果当前的CUDT*在最顶层,唤醒发送线程
          m_pTimer->interrupt();
    
       if (0 == m_iLastEntry)   //如果队列为空,唤醒发送队列??
       {
          #ifndef WINDOWS
             pthread_mutex_lock(m_pWindowLock);
             pthread_cond_signal(m_pWindowCond);
             pthread_mutex_unlock(m_pWindowLock);
          #else
             SetEvent(*m_pWindowCond);
          #endif
       }
    }
    
    • 更新CUDT的发送时间戳:void CSndUList::update(const CUDT* u, bool reschedule)
    void CSndUList::update(const CUDT* u, bool reschedule)  //更新CUDT*的发送时间戳
    {
       CGuard listguard(m_ListLock);
    
       CSNode* n = u->m_pSNode; //反向获取指针
    
       if (n->m_iHeapLoc >= 0)  //>0说明存在于堆中,需要调整
       {
          if (!reschedule)  //如果调整参数为false,直接退出
             return;
    
          if (n->m_iHeapLoc == 0)   //如果在堆顶部,唤醒发送队列,那还重新调整吗?
          {
             n->m_llTimeStamp = 1;  
             m_pTimer->interrupt();
             return;
          }
    
          remove_(u);   //删除已经存在的CUDT*
       }
    
       insert_(1, u);   //重新插入CUDT*
    }
    
    • 取得发送地址中下一个packet和addr,并将CUDT重新加入堆中:int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
    int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
    {
       CGuard listguard(m_ListLock);
    
       if (-1 == m_iLastEntry)  //索引有问题,直接返回
          return -1;
    
       // no pop until the next schedulled time
       uint64_t ts;
       CTimer::rdtsc(ts);   //获取当前时间
       if (ts < m_pHeap[0]->m_llTimeStamp)  //如果发送的时间小于当前的时间(在规定的时间内没有发送出去)
          return -1;
    
       CUDT* u = m_pHeap[0]->m_pUDT;    //从堆顶部获取并删除
       remove_(u);  //只是将flag调整为-1,不进行直接的删除
    
       if (!u->m_bConnected || u->m_bBroken)    //和CUDT中的函数挂钩了,回头调整注释
          return -1;
    
       // pack a packet from the socket
       if (u->packData(pkt, ts) <= 0)
          return -1;
    
       addr = u->m_pPeerAddr;
    
       // insert a new entry, ts is the next processing time
       if (ts > 0)
          insert_(ts, u);   //将当前时间设置为下一次的发送时间
    
       return 1;
    }
    
    • 获取下一次发哦少年宫的事件:uint64_t CSndUList::getNextProcTime()
    uint64_t CSndUList::getNextProcTime()
    {
       CGuard listguard(m_ListLock);
    
       if (-1 == m_iLastEntry)
          return 0;
    
       return m_pHeap[0]->m_llTimeStamp;    //获取下一次的处理时间
    }
    
    • 将CUDT从堆中删除:void CSndUList::remove_(const CUDT* u)
    void CSndUList::remove_(const CUDT* u)
    {
       CSNode* n = u->m_pSNode;     //反向获取CUDT*在堆中的表现形式
    
       if (n->m_iHeapLoc >= 0)  //如果存在于堆中,进行删除,堆的删除操作
       {
          // remove the node from heap
          m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
          m_iLastEntry --;
          m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;
    
          int q = n->m_iHeapLoc;
          int p = q * 2 + 1;
          while (p <= m_iLastEntry)
          {
             if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
                p ++;
    
             if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
             {
                CSNode* t = m_pHeap[p];
                m_pHeap[p] = m_pHeap[q];
                m_pHeap[p]->m_iHeapLoc = p;
                m_pHeap[q] = t;
                m_pHeap[q]->m_iHeapLoc = q;
    
                q = p;
                p = q * 2 + 1;
             }
             else
                break;
          }
    
          n->m_iHeapLoc = -1;   //此时,这个CSNode不存在于堆中,只调整指针位置,不进行实际的删除
       }
    
       // the only event has been deleted, wake up immediately
       if (0 == m_iLastEntry)   //如果队列已经empty,唤醒队列
          m_pTimer->interrupt();
    }
    

    CSndQueue:Send Queue

    class CSndQueue
    {
    private:
       static void* worker(void* param);        //发送线程
       udt_pthread_t m_WorkerThread;
    
    private:
       CSndUList* m_pSndUList;		    // 堆化的Send List
       CChannel* m_pChannel;                 // The UDP channel for data sending
       CTimer* m_pTimer;			    // 定时器设施
    
       udt_pthread_mutex_t m_WindowLock;
       udt_pthread_cond_t m_WindowCond;
    
       volatile bool m_bClosing;		// 发送线程是否启动
       udt_pthread_cond_t m_ExitCond;
    };
    
    • 初始化:void CSndQueue::init(CChannel* c, CTimer* t)
    CSndQueue::CSndQueue():
    m_WorkerThread(),
    m_pSndUList(NULL),
    m_pChannel(NULL),
    m_pTimer(NULL),
    m_WindowLock(),
    m_WindowCond(),
    m_bClosing(false),
    m_ExitCond()
    {
       #ifndef WINDOWS
          pthread_cond_init(&m_WindowCond, NULL);
          pthread_mutex_init(&m_WindowLock, NULL);
       #else
          m_WindowLock = CreateMutex(NULL, false, NULL);
          m_WindowCond = CreateEvent(NULL, false, false, NULL);
          m_ExitCond = CreateEvent(NULL, false, false, NULL);
       #endif
    }
    
    void CSndQueue::init(CChannel* c, CTimer* t)    //讲真,SendQueue和RecvQueue共用UDP SOCKET的Timer
    {
       m_pChannel = c;
       m_pTimer = t;
       m_pSndUList = new CSndUList;
       m_pSndUList->m_pWindowLock = &m_WindowLock;  //初始化SendList控制变量
       m_pSndUList->m_pWindowCond = &m_WindowCond;
       m_pSndUList->m_pTimer = m_pTimer;
    
       #ifndef WINDOWS
          if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
          { //启动工作线程
             m_WorkerThread = 0;
             throw CUDTException(3, 1);
          }
       #else
          DWORD threadID;
          m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
          if (NULL == m_WorkerThread)
             throw CUDTException(3, 1);
       #endif
    }
    
    • 工作线程:
    void* CSndQueue::worker(void* param) //工作线程
    {
       CSndQueue* self = (CSndQueue*)param; //获取处理的Queue
    
       while (!self->m_bClosing)    //控制发送线程是否继续发送
       {
          uint64_t ts = self->m_pSndUList->getNextProcTime();   //获得发送队列下一个将要发送的包的具体信息
    
          if (ts > 0)   //如果还没有到发送时间
          {
             // wait until next processing time of the first socket on the list
             uint64_t currtime;
             CTimer::rdtsc(currtime);
             if (currtime < ts) //如果当前的时间小鱼发送时间,小睡一会
                self->m_pTimer->sleepto(ts);
    
             // it is time to send the next pkt
             sockaddr* addr;    //已经到了发送时间
             CPacket pkt;   
             if (self->m_pSndUList->pop(addr, pkt) < 0) //从发送队列中获取包和发送地址
                continue;
    
             self->m_pChannel->sendto(addr, pkt);   //调用Channel的UDP的封装发送
          }
          else
          {
              //如果没有包需要发送的时候,就在这块休眠
             // wait here if there is no sockets with data to be sent
             #ifndef WINDOWS
                pthread_mutex_lock(&self->m_WindowLock);
                if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
                   pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
                pthread_mutex_unlock(&self->m_WindowLock);
             #else
                WaitForSingleObject(self->m_WindowCond, INFINITE);
             #endif
          }
       }
    
       #ifndef WINDOWS
          return NULL;
       #else
          SetEvent(self->m_ExitCond);
          return 0;
       #endif
    }
    
    • 发送数据包:int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
    int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
    {
       m_pChannel->sendto(addr, packet);    //调用Channel发送
       return packet.getLength();
    }
    

    CRcvUList

    struct CRNode
    {
       CUDT* m_pUDT;                // Pointer to CUDT*
       uint64_t m_llTimeStamp;      // Time Stamp
    
       CRNode* m_pPrev;             // previous link
       CRNode* m_pNext;             // next link
    
       bool m_bOnList;              // 当前节点是否在双向链表上
    };
    
    class CRcvUList    //用于接收数据的双向链表
    {
    public:
       CRNode* m_pUList;		// the head node
    private:
       CRNode* m_pLast;		// the last node
    };
    
    • 向双向链表中插入CUDT实例:void CRcvUList::insert(const CUDT* u)
    void CRcvUList::insert(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //反向获取在双向链表中的表现形式
       CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间,gettimeofday()
    
       if (NULL == m_pUList)    //如果双向链表为空
       {
          // empty list, insert as the single node
          n->m_pPrev = n->m_pNext = NULL;
          m_pLast = m_pUList = n;
    
          return;
       }
    
       // always insert at the end for RcvUList
       n->m_pPrev = m_pLast;    //插入双向链表的末尾
       n->m_pNext = NULL;
       m_pLast->m_pNext = n;
       m_pLast = n;
    }
    
    • 向双向链表中移除CUDT实例:void CRcvUList::remove(const CUDT* u)
    void CRcvUList::remove(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //方向获取在链表中的表现形式
    
       if (!n->m_bOnList)   //如果不在链表中,直接返回
          return;
    
       if (NULL == n->m_pPrev)  //如果需要删除的结点是首部结点
       {
          // n is the first node
          m_pUList = n->m_pNext;
          if (NULL == m_pUList)
             m_pLast = NULL;
          else
             m_pUList->m_pPrev = NULL;
       }
       else     //反正不是真正的删除,只是在链表中摘除,然后调整是否在链表中的标志
       {
          n->m_pPrev->m_pNext = n->m_pNext;
          if (NULL == n->m_pNext)
          {
             // n is the last node
             m_pLast = n->m_pPrev;
          }
          else
             n->m_pNext->m_pPrev = n->m_pPrev;
       }
    
       n->m_pNext = n->m_pPrev = NULL;
    }
    
    • 更新CUDT在双向链表中的位置:void CRcvUList::update(const CUDT* u)
    void CRcvUList::update(const CUDT* u)
    {
       CRNode* n = u->m_pRNode; //反向获取在链表中的表现形式
    
       if (!n->m_bOnList)
          return;
    
       CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间
    
       // if n is the last node, do not need to change
       if (NULL == n->m_pNext)  //如果n是末尾的结点,不调整
          return;
    
       if (NULL == n->m_pPrev)  //如果n是头部结点
       {
          m_pUList = n->m_pNext;    //将下一个结点调整为头部结点
          m_pUList->m_pPrev = NULL;
       }
       else
       {
          n->m_pPrev->m_pNext = n->m_pNext; //否则在链表中剔除这个结点
          n->m_pNext->m_pPrev = n->m_pPrev;
       }
    
       n->m_pPrev = m_pLast;    //然后把这个结点添加到末尾
       n->m_pNext = NULL;
       m_pLast->m_pNext = n;
       m_pLast = n;
    }
    

    CHash

    class CHash
    {
    private:
       struct CBucket
       {
          int32_t m_iID;		// Socket ID
          CUDT* m_pUDT;		// Socket instance
    
          CBucket* m_pNext;		// next bucket
       } **m_pBucket;		// list of buckets (the hash table)
    
       int m_iHashSize;		// size of hash table
    };
    
    • 初始化:void CHash::init(int size)
    CHash::CHash():
    m_pBucket(NULL),
    m_iHashSize(0)
    {
    }
    
    void CHash::init(int size)
    {
       m_pBucket = new CBucket* [size]; //创建size个HASH ENTRY
    
       for (int i = 0; i < size; ++ i)
          m_pBucket[i] = NULL;  //每个HASH ENTRY指向空
    
       m_iHashSize = size;  //调整HASH SIZE
    }
    
    • 销毁:CHash::~CHash()
    CHash::~CHash()
    {
       for (int i = 0; i < m_iHashSize; ++ i)   //删除所有的Buckets
       {
          CBucket* b = m_pBucket[i];
          while (NULL != b)
          {
             CBucket* n = b->m_pNext;
             delete b;
             b = n;
          }
       }
    
       delete [] m_pBucket;
    }
    
    • 查找:CUDT* CHash::lookup(int32_t id)
    CUDT* CHash::lookup(int32_t id)
    {
       // simple hash function (% hash table size); suitable for socket descriptors
       CBucket* b = m_pBucket[id % m_iHashSize];
    
       while (NULL != b)    //寻找CUDT*
       {
          if (id == b->m_iID)
             return b->m_pUDT;
          b = b->m_pNext;
       }
    
       return NULL;
    }
    
    • 插入:void CHash::insert(int32_t id, CUDT* u)
    void CHash::insert(int32_t id, CUDT* u) //新加入的Bucket加入到最接近数组的底部
    {
       CBucket* b = m_pBucket[id % m_iHashSize];
    
       CBucket* n = new CBucket;
       n->m_iID = id;
       n->m_pUDT = u;
       n->m_pNext = b;
    
       m_pBucket[id % m_iHashSize] = n;
    }
    
    • 删除:void CHash::remove(int32_t id)
    void CHash::remove(int32_t id)
    {
       CBucket* b = m_pBucket[id % m_iHashSize];    //找到Bucket
       CBucket* p = NULL;   
    
       while (NULL != b)    //在桶中的链表中删除一个结点
       {
          if (id == b->m_iID)
          {
             if (NULL == p)
                m_pBucket[id % m_iHashSize] = b->m_pNext;
             else
                p->m_pNext = b->m_pNext;
    
             delete b;
    
             return;
          }
    
          p = b;
          b = b->m_pNext;
       }
    }
    

    CRcvQueue

    class CRcvQueue
    {
    private:
       static void* worker(void* param);    //接收线程
       udt_pthread_t m_WorkerThread;
    
    private:
       CUnitQueue m_UnitQueue;		// The received packet queue(就是那个类似于Hash的组织)
    
       CRcvUList* m_pRcvUList;		// 这个List中的UDT实例准备从Queue中读取数据
       CHash* m_pHash;			// HASH可以加速在List中寻找UDT实例
       CChannel* m_pChannel;		// UDP channel for receving packets
       CTimer* m_pTimer;			// 与发送队列共享Timer
    
       int m_iPayloadSize;                      // packet中的有效载荷
    
       volatile bool m_bClosing;              // 接收线程是否启动
       udt_pthread_cond_t m_ExitCond;
    
    private:
       udt_pthread_mutex_t m_LSLock;
       CUDT* m_pListener;                                   // pointer to the (unique, if any) listening UDT entity
       CRendezvousQueue* m_pRendezvousQueue;                // 汇合模式中的UDT SOCKET列表
    
       std::vector<CUDT*> m_vNewEntry;                                  // 新添加的条目
       udt_pthread_mutex_t m_IDLock;
    
       std::map<int32_t, std::queue<CPacket*> > m_mBuffer;	// 用于集合连接请求的临时缓冲区
       udt_pthread_mutex_t m_PassLock;
       udt_pthread_cond_t m_PassCond;
    };
    
    • 初始化:void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
    CRcvQueue::CRcvQueue():
    m_WorkerThread(),
    m_UnitQueue(),
    m_pRcvUList(NULL),
    m_pHash(NULL),
    m_pChannel(NULL),
    m_pTimer(NULL),
    m_iPayloadSize(),
    m_bClosing(false),
    m_ExitCond(),
    m_LSLock(),
    m_pListener(NULL),
    m_pRendezvousQueue(NULL),
    m_vNewEntry(),
    m_IDLock(),
    m_mBuffer(),
    m_PassLock(),
    m_PassCond()
    {
       #ifndef WINDOWS
          pthread_mutex_init(&m_PassLock, NULL);
          pthread_cond_init(&m_PassCond, NULL);
          pthread_mutex_init(&m_LSLock, NULL);
          pthread_mutex_init(&m_IDLock, NULL);
       #else
          m_PassLock = CreateMutex(NULL, false, NULL);
          m_PassCond = CreateEvent(NULL, false, false, NULL);
          m_LSLock = CreateMutex(NULL, false, NULL);
          m_IDLock = CreateMutex(NULL, false, NULL);
          m_ExitCond = CreateEvent(NULL, false, false, NULL);
       #endif
    }
    
    void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
    {
       m_iPayloadSize = payload;
    
       m_UnitQueue.init(qsize, payload, version);   
    
       m_pHash = new CHash;
       m_pHash->init(hsize);
    
       m_pChannel = cc;
       m_pTimer = t;
    
       m_pRcvUList = new CRcvUList;
       m_pRendezvousQueue = new CRendezvousQueue;
    
       #ifndef WINDOWS
          if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
          { //启动接收线程
             m_WorkerThread = 0;
             throw CUDTException(3, 1);
          }
       #else
          DWORD threadID;
          m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
          if (NULL == m_WorkerThread)
             throw CUDTException(3, 1);
       #endif
    }
    
    • 销毁:CRcvQueue::~CRcvQueue()
    CRcvQueue::~CRcvQueue()
    {
       m_bClosing = true;
    
       #ifndef WINDOWS
          if (0 != m_WorkerThread)  //终止接受线程
             pthread_join(m_WorkerThread, NULL);
          pthread_mutex_destroy(&m_PassLock);
          pthread_cond_destroy(&m_PassCond);
          pthread_mutex_destroy(&m_LSLock);
          pthread_mutex_destroy(&m_IDLock);
       #else
          if (NULL != m_WorkerThread)
             WaitForSingleObject(m_ExitCond, INFINITE);
          CloseHandle(m_WorkerThread);
          CloseHandle(m_PassLock);
          CloseHandle(m_PassCond);
          CloseHandle(m_LSLock);
          CloseHandle(m_IDLock);
          CloseHandle(m_ExitCond);
       #endif
    
       delete m_pRcvUList;
       delete m_pHash;
       delete m_pRendezvousQueue;
    
       // 删除队列中所有没有处理的信息
       for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
       {
          while (!i->second.empty())
          {
             CPacket* pkt = i->second.front();
             delete [] pkt->m_pcData;
             delete pkt;
             i->second.pop();
          }
       }
    }
    
    • 发送工作线程: void* CRcvQueue::worker(void* param)
    {
       CRcvQueue* self = (CRcvQueue*)param; //反向获取接收队列对象
    
       //准备好ID,地址之类的东西
       sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
       CUDT* u = NULL;
       int32_t id;
    
       while (!self->m_bClosing)
       {
          #ifdef NO_BUSY_WAITING
             self->m_pTimer->tick();
          #endif
    
          // check waiting list, if new socket, insert it to the list
          while (self->ifNewEntry())    //如果有新的CUDT*
          {
             CUDT* ne = self->getNewEntry();    //将新的CUDT*加入到合适的队列中
             if (NULL != ne)
             {
                self->m_pRcvUList->insert(ne);
                self->m_pHash->insert(ne->m_SocketID, ne);
             }
          }
    
          // 为了接收packet,获取一个可用的slot
          CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
          if (NULL == unit) //如果获取失败
          {
             // 没有足够的空间,读取这个packet后直接drop掉
             CPacket temp;
             temp.m_pcData = new char[self->m_iPayloadSize];
             temp.setLength(self->m_iPayloadSize);
             self->m_pChannel->recvfrom(addr, temp);
             delete [] temp.m_pcData;
             goto TIMER_CHECK;
          }
    
          unit->m_Packet.setLength(self->m_iPayloadSize);//设置packet的有效载荷
    
          //recv一个packet.如果返回-1,就相当于什么都没有收到 
          if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
             goto TIMER_CHECK;
    
          id = unit->m_Packet.m_iID;    //获取收到的packet的ID
    
          // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
          if (0 == id)//Connect request
          {
             if (NULL != self->m_pListener)
                self->m_pListener->listen(addr, unit->m_Packet);//通过这个CUDT*处理这个packet
             else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
             {  //同样是处理new connect
                // asynchronous connect: call connect here
                // otherwise wait for the UDT socket to retrieve this packet
                if (!u->m_bSynRecving)
                   u->connect(unit->m_Packet);
                else
                   self->storePkt(id, unit->m_Packet.clone());
             }
          }
          else if (id > 0)  //发送往一个socket的数据包
          {
             if (NULL != (u = self->m_pHash->lookup(id)))//找到UDT对应的CUDT*
             {
                if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
                {//对比地址
                   if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
                   {//如果这个CUDT*的状态正常
                      if (0 == unit->m_Packet.getFlag())//如果这是一个数据包
                         u->processData(unit);  //处理数据
                      else//如果这是一个控制包
                         u->processCtrl(unit->m_Packet);//处理控制信息
    
                      u->checkTimers();//检查定时器
                      self->m_pRcvUList->update(u);//将这个处理过的CUDT*插入List末尾
                   }
                }
             }
             else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
             {//如果是一个新的连接状态
                if (!u->m_bSynRecving)
                   u->connect(unit->m_Packet);//建立稳定的连接状态
                else
                   self->storePkt(id, unit->m_Packet.clone());//连接状态还没有稳定,先存储数据,稍后处理
             }
          }
    
          //当drop packet时或者没有free CUnit时,跳转到这块
    TIMER_CHECK:
          // take care of the timing event for all UDT sockets
    
          uint64_t currtime;
          CTimer::rdtsc(currtime);  //获取当前的时间
    
          CRNode* ul = self->m_pRcvUList->m_pUList;//获取头部的CUDT*
          uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
          while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
          {
             CUDT* udt = ul->m_pUDT;    //获取CUDT*
    
             if (udt->m_bConnected && !udt->m_bBroken && !udt->m_bClosing)
             {//如果这个CUDT*的状态正常,更新到Recv List的后面就行了
                udt->checkTimers(); 
                self->m_pRcvUList->update(udt);
             }
             else
             {//如果这个CUDT*的状态出现差错,直接删除
                // the socket must be removed from Hash table first, then RcvUList
                self->m_pHash->remove(udt->m_SocketID);
                self->m_pRcvUList->remove(udt);
                udt->m_pRNode->m_bOnList = false;
             }
    
             ul = self->m_pRcvUList->m_pUList;
          }
    
          //还没有进入正常的连接状态的CUDT*,发送探测包
          // Check connection requests status for all sockets in the RendezvousQueue.
          self->m_pRendezvousQueue->updateConnStatus();
       }
    
       //收尾工作
       if (AF_INET == self->m_UnitQueue.m_iIPversion)
          delete (sockaddr_in*)addr;
       else
          delete (sockaddr_in6*)addr;
    
       #ifndef WINDOWS
          return NULL;
       #else
          SetEvent(self->m_ExitCond);
          return 0;
       #endif
    }
    
    • 从Queue中获取一个Packet:int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
    int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
    {
       CGuard bufferlock(m_PassLock);
    
       map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
    
       if (i == m_mBuffer.end())    //如果描述这个UDT的Packet Queue为空
       {
          #ifndef WINDOWS
             uint64_t now = CTimer::getTime();
             timespec timeout;
    
             timeout.tv_sec = now / 1000000 + 1;
             timeout.tv_nsec = (now % 1000000) * 1000;
    
             pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);//就睡一会
          #else
             ReleaseMutex(m_PassLock);
             WaitForSingleObject(m_PassCond, 1000);
             WaitForSingleObject(m_PassLock, INFINITE);
          #endif
    
          i = m_mBuffer.find(id);   //被唤醒后还没有packet可读的话,设置packet,并返回-1
          if (i == m_mBuffer.end())
          {
             packet.setLength(-1);
             return -1;
          }
       }
    
       // retrieve the earliest packet
       CPacket* newpkt = i->second.front(); //获取一个包
    
       if (packet.getLength() < newpkt->getLength())//如果两个包的数据量有差错,返回-1
       {
          packet.setLength(-1);
          return -1;
       }
    
       // copy packet content
       // 将首部的packet拷贝出来
       memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
       memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
       packet.setLength(newpkt->getLength());
    
       delete [] newpkt->m_pcData;
       delete newpkt;
    
       // remove this message from queue,
       // if no more messages left for this socket, release its data structure
       i->second.pop();
       if (i->second.empty())   //如果队列为空的话,删除这个队列。对应上文,如果在m_buffer中没有找到Queue,就意味着没有Packet可读
          m_mBuffer.erase(i);
    
       return packet.getLength();
    }
    
    • 设置与取消Listener:int CRcvQueue::setListener(CUDT* u)void CRcvQueue::removeListener(const CUDT* u)
    int CRcvQueue::setListener(CUDT* u)
    {
       CGuard lslock(m_LSLock);
    
       if (NULL != m_pListener)
          return -1;
    
       m_pListener = u;
       return 0;
    }
    
    void CRcvQueue::removeListener(const CUDT* u)
    {
       CGuard lslock(m_LSLock);
    
       if (u == m_pListener)
          m_pListener = NULL;
    }
    
    • 从队列中删除UDT SOCKET以及为其维护的信息:void CRcvQueue::removeConnector(const UDTSOCKET& id)
    void CRcvQueue::removeConnector(const UDTSOCKET& id)    //删除和控制包有关的所有信息,这个队列中维护着和连接相关的信息
    {
       m_pRendezvousQueue->remove(id);
    
       CGuard bufferlock(m_PassLock);
    
       map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
       if (i != m_mBuffer.end())
       {
          while (!i->second.empty())
          {
             delete [] i->second.front()->m_pcData;
             delete i->second.front();
             i->second.pop();
          }
          m_mBuffer.erase(i);
       }
    }
    
    • 处理连接尚未完全建立的情况:void CRcvQueue::setNewEntry(CUDT* u) bool CRcvQueue::ifNewEntry() CUDT* CRcvQueue::getNewEntry()
    void CRcvQueue::setNewEntry(CUDT* u)        //加入新的CUDT*
    {
       CGuard listguard(m_IDLock);
       m_vNewEntry.push_back(u);
    }
    
    bool CRcvQueue::ifNewEntry()        //判断是否有新的entry
    {
       return !(m_vNewEntry.empty());
    }
    
    CUDT* CRcvQueue::getNewEntry()        //获取新的Entry
    {
       CGuard listguard(m_IDLock);
    
       if (m_vNewEntry.empty())
          return NULL;
    
       CUDT* u = (CUDT*)*(m_vNewEntry.begin());
       m_vNewEntry.erase(m_vNewEntry.begin());
    
       return u;
    }
    
  • 相关阅读:
    Educational Codeforces Round 20 D. Magazine Ad
    Educational Codeforces Round 20 C. Maximal GCD
    紫书第三章训练2 暴力集
    Educational Codeforces Round 20 B. Distances to Zero
    Educational Codeforces Round 20 A. Maximal Binary Matrix
    紫书第三章训练1 D
    紫书第一章训练1 D -Message Decoding
    HAZU校赛 Problem K: Deadline
    Mutual Training for Wannafly Union #8 D
    紫书第三章训练1 E
  • 原文地址:https://www.cnblogs.com/ukernel/p/9191068.html
Copyright © 2011-2022 走看看